6866c97cfdf4f836641d5a4a3b04ef0239fc7795
[charm.git] / src / ck-com / MsgPacker.C
1 #include "MsgPacker.h"
2
3 CkpvExtern(int, RecvCombinedShortMsgHdlrIdx);
4
5 MsgPacker::MsgPacker(){
6     nShortMsgs = 0;
7     msgList = 0;    
8 }
9
10 MsgPacker::MsgPacker(CkQ<CharmMessageHolder *> &msgq, int n_msgs){
11
12     CkAssert(n_msgs < 65536);  //16 bit field for num messages
13
14     nShortMsgs = n_msgs;
15     msgList = new short_envelope[n_msgs];    
16
17     for(int count = 0; count < n_msgs; count ++){
18         CharmMessageHolder *cmsg = msgq.deq();
19         char *msg = cmsg->getCharmMessage();
20         envelope *env = (envelope *)UsrToEnv(msg);
21         CkPackMessage(&env);
22
23         if(count == 0) {
24             aid = env->getsetArrayMgr();
25             if(aid.isZero()) 
26                 CkAbort("Array packing set and ArrayID is zero");
27         }        
28         
29         msgList[count].epIdx = env->getsetArrayEp();
30         msgList[count].size = env->getTotalsize() - sizeof(envelope);
31         msgList[count].idx = env->getsetArrayIndex();
32         msgList[count].data = msg;
33
34         CkAssert(msgList[count].size < MAX_MESSAGE_SIZE);
35         delete cmsg;
36     }
37 }
38
39 //Takes a queue of envelopes as char* ptrs and not charm message holders
40 //Used by mesh streaming strategy
41 MsgPacker::MsgPacker(CkQ<char *> &msgq, int n_msgs){
42     
43     CkAssert(n_msgs < 65536);  //16 bit field for num messages
44     
45     nShortMsgs = n_msgs;
46     msgList = new short_envelope[n_msgs];    
47     
48     for(int count = 0; count < n_msgs; count ++){
49         envelope *env = (envelope *)msgq.deq();
50         char *msg = (char *)EnvToUsr(env);
51         CkPackMessage(&env);
52
53         if(count == 0) {
54             aid = env->getsetArrayMgr();
55             if(aid.isZero()) 
56                 CkAbort("Array packing set and ArrayID is zero");
57         }        
58         
59         msgList[count].epIdx = env->getsetArrayEp();
60         msgList[count].size = env->getTotalsize() - sizeof(envelope);
61         msgList[count].idx = env->getsetArrayIndex();
62         msgList[count].data = msg;
63         
64         CkAssert(msgList[count].size < MAX_MESSAGE_SIZE);
65     }
66 }
67
68 MsgPacker::~MsgPacker(){
69     if(nShortMsgs > 0 && msgList != NULL) {
70         for(int count = 0; count < nShortMsgs; count ++)
71             CkFreeMsg(msgList[count].data);        
72         
73         delete [] msgList;
74     }
75 }
76
77 void MsgPacker::getMessage(CombinedMessage *&cmb_msg, int &total_size){
78     int count;
79     PUP_cmiAllocSizer sp;
80
81     CombinedMessage cmb_hdr;
82     cmb_hdr.aid = aid;
83     cmb_hdr.srcPE = CkMyPe();
84     cmb_hdr.nmsgs = nShortMsgs;
85
86     sp | cmb_hdr;
87     for(count = 0; count < nShortMsgs; count ++)
88         sp | msgList[count];
89     
90     total_size = sp.size();
91     ComlibPrintf("In MsgPacker with %d bytes and %d messages\n", total_size, 
92                  nShortMsgs);
93
94     cmb_msg = (CombinedMessage *)CmiAlloc(sp.size());
95
96     PUP_toCmiAllocMem mp(cmb_msg);
97     mp | cmb_hdr;
98
99     for(count = 0; count < nShortMsgs; count ++)
100         mp | msgList[count];
101
102     CmiSetHandler(cmb_msg, CkpvAccess(RecvCombinedShortMsgHdlrIdx));
103 }