New version of communication library with learning capabilities. Learning and dynamic...
[charm.git] / src / ck-com / MsgPacker.C
1 #include "ComlibManager.h"
2 #include "MsgPacker.h"
3 #include "register.h"
4 #include "pup_cmialloc.h"
5
6 CkpvExtern(int, RecvCombinedShortMsgHdlrIdx);
7
8 void short_envelope::pup(PUP::er &p){
9     p | idx;
10     p | epIdx;
11     p | size;
12     
13     //if(p.isUnpacking()) 
14     //  data = new char[size];
15
16     p.pupCmiAllocBuf((void **)&data, size);
17 }
18
19 short_envelope::short_envelope(){
20     epIdx = 0;
21     data = NULL;
22 }
23
24 short_envelope::~short_envelope(){
25     /*
26       if(data) 
27       CmiFree(data);        
28       data = NULL;
29     */
30 }
31
32
33 MsgPacker::MsgPacker(){
34     nShortMsgs = 0;
35     msgList = 0;    
36 }
37
38 MsgPacker::MsgPacker(CkQ<CharmMessageHolder *> &msgq, int n_msgs){
39
40     CkAssert(n_msgs < 65536);  //16 bit field for num messages
41
42     nShortMsgs = n_msgs;
43     msgList = new short_envelope[n_msgs];    
44
45     for(int count = 0; count < n_msgs; count ++){
46         CharmMessageHolder *cmsg = msgq.deq();
47         envelope *env = (envelope *)UsrToEnv(cmsg->getCharmMessage());
48         CkPackMessage(&env);
49
50         if(count == 0) {
51             aid = env->getsetArrayMgr();
52             if(aid.isZero()) 
53                 CkAbort("Array packing set and ArrayID is zero");
54         }        
55         
56         msgList[count].epIdx = env->getsetArrayEp();
57         msgList[count].size = env->getTotalsize() - sizeof(envelope);
58         msgList[count].idx = env->getsetArrayIndex();
59         msgList[count].data = cmsg->getCharmMessage();
60
61         if(msgList[count].size >= MAX_MESSAGE_SIZE-1)
62             CkAbort("Can't send messges larger than 64KB\n");
63
64         delete cmsg;
65     }
66 }
67
68 MsgPacker::~MsgPacker(){
69     if(nShortMsgs > 0 && msgList != NULL) {
70         for(int count = 0; count < nShortMsgs; count ++)
71             CkFreeMsg(msgList[count].data);        
72         
73         delete [] msgList;
74     }
75 }
76
77 void MsgPacker::getMessage(CombinedMessage *&cmb_msg, int &total_size){
78     int count;
79     PUP_cmiAllocSizer sp;
80
81     CombinedMessage cmb_hdr;
82     cmb_hdr.aid = aid;
83     cmb_hdr.srcPE = CkMyPe();
84     cmb_hdr.nmsgs = nShortMsgs;
85
86     sp | cmb_hdr;
87     for(count = 0; count < nShortMsgs; count ++)
88         sp | msgList[count];
89     
90     total_size = sp.size();
91     ComlibPrintf("In MsgPacker with %d bytes and %d messages\n", total_size, 
92                  nShortMsgs);
93
94     cmb_msg = (CombinedMessage *)CmiAlloc(sp.size());
95
96     PUP_toCmiAllocMem mp(cmb_msg);
97     mp | cmb_hdr;
98
99     for(count = 0; count < nShortMsgs; count ++)
100         mp | msgList[count];
101
102     CmiSetHandler(cmb_msg, CkpvAccess(RecvCombinedShortMsgHdlrIdx));
103 }
104
105
106 void MsgPacker::deliver(CombinedMessage *cmb_msg){
107
108     CombinedMessage cmb_hdr;
109
110     PUP_fromCmiAllocMem fp(cmb_msg);
111     fp | cmb_hdr;
112
113     int nmsgs = cmb_hdr.nmsgs;
114
115     ComlibPrintf("In MsgPacker::deliver\n");
116     CkArrayID aid = cmb_hdr.aid;
117     int src_pe = cmb_hdr.srcPE;
118
119     for(int count = 0; count < nmsgs; count ++){
120         short_envelope senv;
121         fp | senv;
122         
123         int ep = senv.epIdx;
124         CkArrayIndexMax idx = senv.idx;
125         int size = senv.size;
126
127         CProxyElement_ArrayBase ap(aid, idx);
128         ArrayElement *a_elem = ap.ckLocal();
129         CkArray *a=(CkArray *)_localBranch(aid);
130
131         int msgIdx = _entryTable[ep]->msgIdx;
132         if(_entryTable[ep]->noKeep && a_elem != NULL) {
133             //Unpack the message
134             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
135             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
136             CmiFree(senv.data);
137         }
138         else {
139             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
140             envelope *env = _allocEnv(ForArrayEltMsg, 
141                                       sizeof(envelope) + size);
142
143             void *data = EnvToUsr(env);
144             memcpy(data, senv.data, size);
145             
146             //Unpack the message
147             data = (char *)_msgTable[msgIdx]->unpack(data); 
148             
149             env->getsetArrayMgr() = aid;
150             env->getsetArrayIndex() = idx;
151             env->getsetArrayEp() = ep;
152             env->setPacked(0); 
153             env->getsetArraySrcPe()=src_pe;  
154             env->getsetArrayHops()=1;  
155             env->setQueueing(CK_QUEUEING_FIFO);            
156             env->setUsed(0);
157             env->setMsgIdx(msgIdx);
158
159             env->setTotalsize(sizeof(envelope) + size);
160
161             //if(a_elem)
162             //  CkDeliverMessageFree(ep, data, a_elem);                     
163             //else
164             //ap.ckSend((CkArrayMessage *)data, ep);
165             
166             a->deliver((CkArrayMessage *)data, CkDeliver_queue);
167
168             CmiFree(senv.data);
169         }        
170     }      
171         
172     CmiFree(cmb_msg);
173 }
174
175
176
177
178