Making msgpacker faster by reducing short envelope size and making fewer calls to...
[charm.git] / src / ck-com / MsgPacker.h
1 #ifndef MESSAGE_PACKER_H
2 #define MESSAGE_PACKER_H
3
4 #include "charm++.h"
5 #include "envelope.h"
6 #include "ComlibManager.h"
7 #include "register.h"
8 #include "pup_cmialloc.h"
9
10 #define MAX_MESSAGE_SIZE 32768
11
12
13
14 class short_envelope {
15  public:
16     UShort epIdx;
17     UShort size;  //Can only send messages up to 64KB :)    
18     
19     CkArrayIndexMax idx;
20     char *data;
21
22     short_envelope();
23     ~short_envelope();
24     inline short_envelope(CkMigrateMessage *){}
25     
26     void pup(PUP::er &p);
27 };
28
29 inline short_envelope::short_envelope(){
30     epIdx = 0;
31     data = NULL;
32 }
33
34 inline short_envelope::~short_envelope(){
35     /*
36       if(data) 
37       CmiFree(data);        
38       data = NULL;
39     */
40 }
41
42 inline void short_envelope::pup(PUP::er &p){
43     p.pupCmiAllocBuf((void **)&data, size);
44     p | idx;
45     
46     int *data = (int *)this;
47     //p | epIdx;
48     //p | size;    
49     p(data, 2);
50 }
51
52 struct CombinedMessage{
53
54     char header[CmiReservedHeaderSize];
55     CkArrayID aid;
56     int srcPE;
57     int nmsgs;
58 };
59
60 PUPbytes(CombinedMessage);
61
62 class MsgPacker {        
63     CkArrayID aid;
64     short_envelope * msgList;
65     int nShortMsgs;   
66
67  public:
68     MsgPacker();
69     ~MsgPacker();    
70     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
71     
72     void getMessage(CombinedMessage *&msg, int &size);
73     static void deliver(CombinedMessage *cmb_msg);
74 };
75
76 inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
77
78     CombinedMessage cmb_hdr;
79
80     PUP_fromCmiAllocMem fp(cmb_msg);
81     fp | cmb_hdr;
82
83     int nmsgs = cmb_hdr.nmsgs;
84
85     ComlibPrintf("In MsgPacker::deliver\n");
86     CkArrayID aid = cmb_hdr.aid;
87     int src_pe = cmb_hdr.srcPE;
88
89     for(int count = 0; count < nmsgs; count ++){
90         short_envelope senv;
91         fp | senv;
92         
93         int ep = senv.epIdx;
94         CkArrayIndexMax idx = senv.idx;
95         int size = senv.size;
96
97         CProxyElement_ArrayBase ap(aid, idx);
98         ArrayElement *a_elem = ap.ckLocal();
99         CkArray *a=(CkArray *)_localBranch(aid);
100
101         int msgIdx = _entryTable[ep]->msgIdx;
102         if(_entryTable[ep]->noKeep && a_elem != NULL) {
103             //Unpack the message
104             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
105             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
106             CmiFree(senv.data);
107         }
108         else {
109             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
110             envelope *env = _allocEnv(ForArrayEltMsg, 
111                                       sizeof(envelope) + size);
112
113             void *data = EnvToUsr(env);
114             memcpy(data, senv.data, size);
115             
116             //Unpack the message
117             data = (char *)_msgTable[msgIdx]->unpack(data); 
118             
119             env->getsetArrayMgr() = aid;
120             env->getsetArrayIndex() = idx;
121             env->getsetArrayEp() = ep;
122             env->setPacked(0); 
123             env->getsetArraySrcPe()=src_pe;  
124             env->getsetArrayHops()=1;  
125             env->setQueueing(CK_QUEUEING_FIFO);            
126             env->setUsed(0);
127             env->setMsgIdx(msgIdx);
128
129             env->setTotalsize(sizeof(envelope) + size);
130
131             //if(a_elem)
132             //  CkDeliverMessageFree(ep, data, a_elem);                     
133             //else
134             //ap.ckSend((CkArrayMessage *)data, ep);
135             
136             a->deliver((CkArrayMessage *)data, CkDeliver_queue);
137
138             CmiFree(senv.data);
139         }        
140     }      
141         
142     CmiFree(cmb_msg);
143 }
144
145
146 #endif