8d938a466f00e5185151878d60cf7586a5f18b21
[charm.git] / src / ck-com / MsgPacker.C
1 #include "ComlibManager.h"
2 #include "MsgPacker.h"
3 #include "register.h"
4
5 CkpvExtern(int, RecvCombinedShortMsgHdlrIdx);
6
7 void short_envelope::pup(PUP::er &p){
8     p | idx;
9     p | epIdx;
10     p | size;
11     
12     if(p.isUnpacking()) 
13         data = new char[size];    
14     p(data, size);
15 }
16
17 short_envelope::short_envelope(){
18     epIdx = 0;
19     data = NULL;
20 }
21
22 short_envelope::~short_envelope(){
23     /*
24       if(data) 
25       delete [] data;        
26       data = NULL;
27     */
28 }
29
30
31 MsgPacker::MsgPacker(){
32     nShortMsgs = 0;
33     msgList = 0;    
34 }
35
36 MsgPacker::MsgPacker(CkQ<CharmMessageHolder *> &msgq, int n_msgs){
37
38     CkAssert(n_msgs < 65536);  //16 bit field for num messages
39
40     nShortMsgs = n_msgs;
41     msgList = new short_envelope[n_msgs];    
42
43     for(int count = 0; count < n_msgs; count ++){
44         CharmMessageHolder *cmsg = msgq.deq();
45         envelope *env = (envelope *)UsrToEnv(cmsg->getCharmMessage());
46         CkPackMessage(&env);
47
48         if(count == 0) {
49             aid = env->getsetArrayMgr();
50             if(aid.isZero()) CkAbort("Array packing set and ArrayID is zero");
51         }        
52         
53         msgList[count].epIdx = env->getsetArrayEp();
54         msgList[count].size = env->getTotalsize() - sizeof(envelope);
55         msgList[count].idx = env->getsetArrayIndex();
56         msgList[count].data = cmsg->getCharmMessage();
57
58         if(msgList[count].size > MAX_MESSAGE_SIZE)
59             CkAbort("Can't send messges larger than 64KB\n");
60
61         delete cmsg;
62     }
63 }
64
65 MsgPacker::~MsgPacker(){
66     if(nShortMsgs > 0 && msgList != NULL) {
67         for(int count = 0; count < nShortMsgs; count ++)
68             CkFreeMsg(msgList[count].data);        
69         
70         delete [] msgList;
71     }
72 }
73
74 void MsgPacker::getMessage(CombinedMessage *&cmb_msg, int &total_size){
75     int count;
76     PUP::sizer sp;
77     for(count = 0; count < nShortMsgs; count ++)
78         sp | msgList[count];
79
80     int size = sp.size();  
81     total_size = ALIGN8(sizeof(CombinedMessage)) + size;
82     
83     //CkPrintf("In MsgPacker with %d bytes and %d messages\n", total_size, 
84     //           nShortMsgs);
85
86     cmb_msg = (CombinedMessage *)CmiAlloc(total_size);
87
88     PUP::toMem mp((char *)cmb_msg + ALIGN8(sizeof(CombinedMessage)));
89     for(count = 0; count < nShortMsgs; count ++)
90         mp | msgList[count];
91
92     cmb_msg->aid = aid;
93     cmb_msg->srcPE = CkMyPe();
94     cmb_msg->nmsgs = nShortMsgs;
95
96     CmiSetHandler(cmb_msg, CkpvAccess(RecvCombinedShortMsgHdlrIdx));
97 }
98
99
100 void MsgPacker::deliver(CombinedMessage *cmb_msg){
101     int nmsgs = cmb_msg->nmsgs;
102
103     ComlibPrintf("In MsgPacker::deliver\n");
104
105     char *from_addr = (char *)cmb_msg + ALIGN8(sizeof(CombinedMessage));
106     PUP::fromMem fp(from_addr);
107     CkArrayID aid = cmb_msg->aid;
108
109     int src_pe = cmb_msg->srcPE;
110
111     for(int count = 0; count < nmsgs; count ++){
112         short_envelope senv;
113         fp | senv;
114         
115         int ep = senv.epIdx;
116         CkArrayIndexMax idx = senv.idx;
117         int size = senv.size;
118
119         CProxyElement_ArrayBase ap(aid, idx);
120         ArrayElement *a_elem = ap.ckLocal();
121         CkArray *a=(CkArray *)_localBranch(aid);
122
123         int msgIdx = _entryTable[ep]->msgIdx;
124         if(_entryTable[ep]->noKeep && a_elem != NULL) {
125             //Unpack the message
126             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
127             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
128             delete[] senv.data;
129         }
130         else {
131             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
132             envelope *env = _allocEnv(ForArrayEltMsg, sizeof(envelope) + size);
133
134             void *data = EnvToUsr(env);
135             memcpy(data, senv.data, size);
136             
137             //Unpack the message
138             data = (char *)_msgTable[msgIdx]->unpack(data); 
139             
140             env->getsetArrayMgr() = aid;
141             env->getsetArrayIndex() = idx;
142             env->getsetArrayEp() = ep;
143             env->setPacked(0); 
144             env->getsetArraySrcPe()=src_pe;  
145             env->getsetArrayHops()=1;  
146             env->setQueueing(CK_QUEUEING_FIFO);            
147             env->setUsed(0);
148             env->setMsgIdx(msgIdx);
149
150             env->setTotalsize(sizeof(envelope) + size);
151
152             //if(a_elem)
153             //  CkDeliverMessageFree(ep, data, a_elem);                     
154             //else
155             //ap.ckSend((CkArrayMessage *)data, ep);
156             
157             a->deliver((CkArrayMessage *)data, CkDeliver_queue, CmiTrue);
158
159             delete[] senv.data;
160         }        
161     }      
162         
163     CmiFree(cmb_msg);
164 }
165
166
167
168
169