adding cacheing of the array element lookup. Might break the learning frameowrk....
[charm.git] / src / ck-com / MsgPacker.h
1 #ifndef MESSAGE_PACKER_H
2 #define MESSAGE_PACKER_H
3
4 #include "charm++.h"
5 #include "envelope.h"
6 #include "ComlibManager.h"
7 #include "register.h"
8 #include "pup_cmialloc.h"
9
10 #define MAX_MESSAGE_SIZE 32768
11
12 class short_envelope {
13  public:
14     UShort epIdx;
15     UShort size;  //Can only send messages up to 64KB :)    
16     
17     CkArrayIndexMax idx;
18     char *data;
19
20     short_envelope();
21     ~short_envelope();
22     inline short_envelope(CkMigrateMessage *){}
23     
24     void pup(PUP::er &p);
25 };
26
27 inline short_envelope::short_envelope(){
28     epIdx = 0;
29     data = NULL;
30 }
31
32 inline short_envelope::~short_envelope(){
33     /*
34       if(data) 
35       CmiFree(data);        
36       data = NULL;
37     */
38 }
39
40 inline void short_envelope::pup(PUP::er &p){    
41
42     p | epIdx;
43     p | size;        
44     //p | idx;
45     
46     if(p.isUnpacking())
47         idx.nInts = 0;
48
49     p((char *)&(idx.nInts), 1);
50     p((int *)(idx.data()), idx.nInts);
51
52     p.pupCmiAllocBuf((void **)&data, size);
53 }
54
55 struct CombinedMessage{
56
57     char header[CmiReservedHeaderSize];
58     CkArrayID aid;
59     int srcPE;
60     int nmsgs;
61 };
62
63 PUPbytes(CombinedMessage);
64
65 class MsgPacker {        
66     CkArrayID aid;
67     short_envelope * msgList;
68     int nShortMsgs;   
69
70  public:
71     MsgPacker();
72     ~MsgPacker();    
73     
74     //Makes a message out of a queue of CharmMessageHolders
75     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
76     
77     //Takes a queue of envelopes as char* ptrs and not charm message holders
78     //Used by mesh streaming strategy
79     MsgPacker::MsgPacker(CkQ<char *> &msgq, int n_msgs);
80     
81     void getMessage(CombinedMessage *&msg, int &size);
82     static void deliver(CombinedMessage *cmb_msg);
83 };
84
85 inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
86
87     CombinedMessage cmb_hdr;
88
89     PUP_fromCmiAllocMem fp(cmb_msg);
90     fp | cmb_hdr;
91
92     int nmsgs = cmb_hdr.nmsgs;
93
94     ComlibPrintf("In MsgPacker::deliver\n");
95     CkArrayID aid = cmb_hdr.aid;
96     int src_pe = cmb_hdr.srcPE;
97     CkArray *a=(CkArray *)_localBranch(aid);
98
99     ArrayElement *a_elem=NULL, *prev_elem=NULL;
100     CkArrayIndexMax prev_idx;
101     prev_idx.nInts = -1;
102
103     for(int count = 0; count < nmsgs; count ++){
104         short_envelope senv;
105         fp | senv;
106         
107         int ep = senv.epIdx;
108         int size = senv.size;
109
110         if(senv.idx == prev_idx) {
111             a_elem = prev_elem;
112         }
113         else {
114             CProxyElement_ArrayBase ap(aid, senv.idx);
115             a_elem = ap.ckLocal();
116         }
117
118         int msgIdx = _entryTable[ep]->msgIdx;
119         if(_entryTable[ep]->noKeep && a_elem != NULL) {
120             //Unpack the message
121             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
122             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
123
124             prev_elem = a_elem;
125             prev_idx = senv.idx;
126             CmiFree(senv.data);
127         }
128         else {
129             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
130             envelope *env = _allocEnv(ForArrayEltMsg, 
131                                       sizeof(envelope) + size);
132
133             void *data = EnvToUsr(env);
134             memcpy(data, senv.data, size);
135             
136             //Unpack the message
137             data = (char *)_msgTable[msgIdx]->unpack(data); 
138             
139             env->getsetArrayMgr() = aid;
140             env->getsetArrayIndex() = senv.idx;
141             env->getsetArrayEp() = ep;
142             env->setPacked(0); 
143             env->getsetArraySrcPe()=src_pe;  
144             env->getsetArrayHops()=1;  
145             env->setQueueing(CK_QUEUEING_FIFO);            
146             env->setUsed(0);
147             env->setMsgIdx(msgIdx);
148
149             env->setTotalsize(sizeof(envelope) + size);
150
151             //if(a_elem)
152             //  CkDeliverMessageFree(ep, data, a_elem);                     
153             //else
154             //ap.ckSend((CkArrayMessage *)data, ep);
155             
156             a->deliver((CkArrayMessage *)data, CkDeliver_queue);
157
158             prev_elem = a_elem;
159             prev_idx = senv.idx;
160             CmiFree(senv.data);
161         }   
162     }      
163         
164     CmiFree(cmb_msg);
165 }
166
167
168 #endif