Fixed memory leak inside ComlibStrategy/OneTimeMulticastStrategy. The message would
[charm.git] / src / ck-com / MsgPacker.h
1 #ifndef MESSAGE_PACKER_H
2 #define MESSAGE_PACKER_H
3
4 /**
5    @addtogroup CharmComlib
6    *@{
7
8    @file
9    
10    @brief An envelope for packing multiple messages into a single message.
11 */
12
13
14 #include "charm++.h"
15 #include "envelope.h"
16 #include "ComlibManager.h"
17 #include "register.h"
18 #include "pup_cmialloc.h"
19
20 #define MAX_MESSAGE_SIZE 32768
21
22 class short_envelope {
23  public:
24     UShort epIdx;
25     UShort size;  //Can only send messages up to 64KB :)    
26     
27     CkArrayIndexMax idx;
28     char *data;
29
30     short_envelope();
31     ~short_envelope();
32     inline short_envelope(CkMigrateMessage *){}
33     
34     void pup(PUP::er &p);
35 };
36
37 inline short_envelope::short_envelope(){
38     epIdx = 0;
39     data = NULL;
40 }
41
42 inline short_envelope::~short_envelope(){
43     /*
44       if(data) 
45       CmiFree(data);        
46       data = NULL;
47     */
48 }
49
50 inline void short_envelope::pup(PUP::er &p){    
51   char nints = 0;
52
53   p | epIdx;
54   //  p | size;        
55   //p | idx;
56   
57   //Complex pup of arrays, even want to save 3 bytes, GREEDY, GREEDY :)
58   if(!p.isUnpacking()) 
59     nints = idx.nInts;
60
61   p | nints;
62   idx.nInts = nints;
63   p((int *)(idx.data()), idx.nInts);
64   
65   if(p.isUnpacking()) {
66       p.pupCmiAllocBuf((void **)&data);
67       size = SIZEFIELD(data);
68   }
69   else 
70       p.pupCmiAllocBuf((void **)&data, size);
71 }
72
73 struct CombinedMessage{
74     char header[CmiReservedHeaderSize];
75     CkArrayID aid;
76     unsigned short srcPE;  //Will not work on a very large bluegene machine!
77     unsigned short nmsgs;
78 };
79
80 PUPbytes(CombinedMessage)
81
82 class MsgPacker {        
83     CkArrayID aid;
84     short_envelope * msgList;
85     int nShortMsgs;   
86
87  public:
88     MsgPacker();
89     ~MsgPacker();    
90     
91     //Makes a message out of a queue of CharmMessageHolders
92     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
93     
94     //Takes a queue of envelopes as char* ptrs and not charm message holders
95     //Used by mesh streaming strategy
96     MsgPacker(CkQ<char *> &msgq, int n_msgs);
97     
98     void getMessage(CombinedMessage *&msg, int &size);
99     static void deliver(CombinedMessage *cmb_msg);
100 };
101
102 inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
103
104     CombinedMessage cmb_hdr;
105
106     PUP_fromCmiAllocMem fp(cmb_msg);
107     fp | cmb_hdr;
108
109     int nmsgs = cmb_hdr.nmsgs;
110
111     ComlibPrintf("In MsgPacker::deliver\n");
112     CkArrayID aid = cmb_hdr.aid;
113     int src_pe = cmb_hdr.srcPE;
114     CkArray *a=(CkArray *)_localBranch(aid);
115
116     ArrayElement *a_elem=NULL, *prev_elem=NULL;
117     CkArrayIndexMax prev_idx;
118     prev_idx.nInts = -1;
119
120     for(int count = 0; count < nmsgs; count ++){
121         short_envelope senv;
122         fp | senv;
123         
124         int ep = senv.epIdx;
125         int size = senv.size;
126
127         if(senv.idx == prev_idx) {
128             a_elem = prev_elem;
129         }
130         else {
131             CProxyElement_ArrayBase ap(aid, senv.idx);
132             a_elem = ap.ckLocal();
133         }
134
135         int msgIdx = _entryTable[ep]->msgIdx;
136         if(_entryTable[ep]->noKeep && a_elem != NULL) {
137             //Unpack the message
138             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
139             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
140
141             prev_elem = a_elem;
142             prev_idx = senv.idx;
143             CmiFree(senv.data);
144         }
145         else {
146             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
147             envelope *env = _allocEnv(ForArrayEltMsg, 
148                                       sizeof(envelope) + size);
149
150             void *data = EnvToUsr(env);
151             memcpy(data, senv.data, size);
152             
153             //Unpack the message
154             data = (char *)_msgTable[msgIdx]->unpack(data); 
155             
156             env->getsetArrayMgr() = aid;
157             env->getsetArrayIndex() = senv.idx;
158             env->getsetArrayEp() = ep;
159             env->setPacked(0); 
160             env->getsetArraySrcPe()=src_pe;  
161             env->getsetArrayHops()=1;  
162             env->setQueueing(CK_QUEUEING_FIFO);            
163             env->setUsed(0);
164             env->setMsgIdx(msgIdx);
165
166             env->setTotalsize(sizeof(envelope) + size);
167
168             //if(a_elem)
169             //  CkDeliverMessageFree(ep, data, a_elem);                     
170             //else
171             //ap.ckSend((CkArrayMessage *)data, ep);
172             
173             a->deliver((CkArrayMessage *)data, CkDeliver_queue);
174
175             prev_elem = a_elem;
176             prev_idx = senv.idx;
177             CmiFree(senv.data);
178         }   
179     }      
180         
181     CmiFree(cmb_msg);
182 }
183
184 /*@}*/
185 #endif
186