Code cleanup. Shorter header for streaming and enable array caching again.
[charm.git] / src / ck-com / MsgPacker.h
1 #ifndef MESSAGE_PACKER_H
2 #define MESSAGE_PACKER_H
3
4 #include "charm++.h"
5 #include "envelope.h"
6 #include "ComlibManager.h"
7 #include "register.h"
8 #include "pup_cmialloc.h"
9
10 #define MAX_MESSAGE_SIZE 32768
11
12 class short_envelope {
13  public:
14     UShort epIdx;
15     UShort size;  //Can only send messages up to 64KB :)    
16     
17     CkArrayIndexMax idx;
18     char *data;
19
20     short_envelope();
21     ~short_envelope();
22     inline short_envelope(CkMigrateMessage *){}
23     
24     void pup(PUP::er &p);
25 };
26
27 inline short_envelope::short_envelope(){
28     epIdx = 0;
29     data = NULL;
30 }
31
32 inline short_envelope::~short_envelope(){
33     /*
34       if(data) 
35       CmiFree(data);        
36       data = NULL;
37     */
38 }
39
40 inline void short_envelope::pup(PUP::er &p){    
41   char nints = 0;
42
43   p | epIdx;
44   //  p | size;        
45   //p | idx;
46   
47   //Complex pup of arrays, even want to save 3 bytes, GREEDY, GREEDY :)
48   if(!p.isUnpacking()) 
49     nints = idx.nInts;
50
51   p | nints;
52   idx.nInts = nints;
53   p((int *)(idx.data()), idx.nInts);
54   
55   if(p.isUnpacking()) {
56       p.pupCmiAllocBuf((void **)&data);
57       size = SIZEFIELD(data);
58   }
59   else 
60       p.pupCmiAllocBuf((void **)&data, size);
61 }
62
63 struct CombinedMessage{
64     char header[CmiReservedHeaderSize];
65     CkArrayID aid;
66     unsigned short srcPE;  //Will not work on a very large bluegene machine!
67     unsigned short nmsgs;
68 };
69
70 PUPbytes(CombinedMessage);
71
72 class MsgPacker {        
73     CkArrayID aid;
74     short_envelope * msgList;
75     int nShortMsgs;   
76
77  public:
78     MsgPacker();
79     ~MsgPacker();    
80     
81     //Makes a message out of a queue of CharmMessageHolders
82     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
83     
84     //Takes a queue of envelopes as char* ptrs and not charm message holders
85     //Used by mesh streaming strategy
86     MsgPacker::MsgPacker(CkQ<char *> &msgq, int n_msgs);
87     
88     void getMessage(CombinedMessage *&msg, int &size);
89     static void deliver(CombinedMessage *cmb_msg);
90 };
91
92 inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
93
94     CombinedMessage cmb_hdr;
95
96     PUP_fromCmiAllocMem fp(cmb_msg);
97     fp | cmb_hdr;
98
99     int nmsgs = cmb_hdr.nmsgs;
100
101     ComlibPrintf("In MsgPacker::deliver\n");
102     CkArrayID aid = cmb_hdr.aid;
103     int src_pe = cmb_hdr.srcPE;
104     CkArray *a=(CkArray *)_localBranch(aid);
105
106     ArrayElement *a_elem=NULL, *prev_elem=NULL;
107     CkArrayIndexMax prev_idx;
108     prev_idx.nInts = -1;
109
110     for(int count = 0; count < nmsgs; count ++){
111         short_envelope senv;
112         fp | senv;
113         
114         int ep = senv.epIdx;
115         int size = senv.size;
116
117         if(senv.idx == prev_idx) {
118             a_elem = prev_elem;
119         }
120         else {
121             CProxyElement_ArrayBase ap(aid, senv.idx);
122             a_elem = ap.ckLocal();
123         }
124
125         int msgIdx = _entryTable[ep]->msgIdx;
126         if(_entryTable[ep]->noKeep && a_elem != NULL) {
127             //Unpack the message
128             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
129             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
130
131             prev_elem = a_elem;
132             prev_idx = senv.idx;
133             CmiFree(senv.data);
134         }
135         else {
136             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
137             envelope *env = _allocEnv(ForArrayEltMsg, 
138                                       sizeof(envelope) + size);
139
140             void *data = EnvToUsr(env);
141             memcpy(data, senv.data, size);
142             
143             //Unpack the message
144             data = (char *)_msgTable[msgIdx]->unpack(data); 
145             
146             env->getsetArrayMgr() = aid;
147             env->getsetArrayIndex() = senv.idx;
148             env->getsetArrayEp() = ep;
149             env->setPacked(0); 
150             env->getsetArraySrcPe()=src_pe;  
151             env->getsetArrayHops()=1;  
152             env->setQueueing(CK_QUEUEING_FIFO);            
153             env->setUsed(0);
154             env->setMsgIdx(msgIdx);
155
156             env->setTotalsize(sizeof(envelope) + size);
157
158             //if(a_elem)
159             //  CkDeliverMessageFree(ep, data, a_elem);                     
160             //else
161             //ap.ckSend((CkArrayMessage *)data, ep);
162             
163             a->deliver((CkArrayMessage *)data, CkDeliver_queue);
164
165             prev_elem = a_elem;
166             prev_idx = senv.idx;
167             CmiFree(senv.data);
168         }   
169     }      
170         
171     CmiFree(cmb_msg);
172 }
173
174
175 #endif