minor correction
[charm.git] / src / ck-com / MsgPacker.h
1 #ifndef MESSAGE_PACKER_H
2 #define MESSAGE_PACKER_H
3
4 #include "charm++.h"
5 #include "envelope.h"
6 #include "ComlibManager.h"
7 #include "register.h"
8 #include "pup_cmialloc.h"
9
10 #define MAX_MESSAGE_SIZE 32768
11
12 class short_envelope {
13  public:
14     UShort epIdx;
15     UShort size;  //Can only send messages up to 64KB :)    
16     
17     CkArrayIndexMax idx;
18     char *data;
19
20     short_envelope();
21     ~short_envelope();
22     inline short_envelope(CkMigrateMessage *){}
23     
24     void pup(PUP::er &p);
25 };
26
27 inline short_envelope::short_envelope(){
28     epIdx = 0;
29     data = NULL;
30 }
31
32 inline short_envelope::~short_envelope(){
33     /*
34       if(data) 
35       CmiFree(data);        
36       data = NULL;
37     */
38 }
39
40 inline void short_envelope::pup(PUP::er &p){
41     p | epIdx;
42     p | size;    
43     p | idx;
44     
45     p.pupCmiAllocBuf((void **)&data, size);
46 }
47
48 struct CombinedMessage{
49
50     char header[CmiReservedHeaderSize];
51     CkArrayID aid;
52     int srcPE;
53     int nmsgs;
54 };
55
56 PUPbytes(CombinedMessage);
57
58 class MsgPacker {        
59     CkArrayID aid;
60     short_envelope * msgList;
61     int nShortMsgs;   
62
63  public:
64     MsgPacker();
65     ~MsgPacker();    
66     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
67     
68     void getMessage(CombinedMessage *&msg, int &size);
69     static void deliver(CombinedMessage *cmb_msg);
70 };
71
72 inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
73
74     CombinedMessage cmb_hdr;
75
76     PUP_fromCmiAllocMem fp(cmb_msg);
77     fp | cmb_hdr;
78
79     int nmsgs = cmb_hdr.nmsgs;
80
81     ComlibPrintf("In MsgPacker::deliver\n");
82     CkArrayID aid = cmb_hdr.aid;
83     int src_pe = cmb_hdr.srcPE;
84
85     for(int count = 0; count < nmsgs; count ++){
86         short_envelope senv;
87         fp | senv;
88         
89         int ep = senv.epIdx;
90         CkArrayIndexMax idx = senv.idx;
91         int size = senv.size;
92
93         CProxyElement_ArrayBase ap(aid, idx);
94         ArrayElement *a_elem = ap.ckLocal();
95         CkArray *a=(CkArray *)_localBranch(aid);
96
97         int msgIdx = _entryTable[ep]->msgIdx;
98         if(_entryTable[ep]->noKeep && a_elem != NULL) {
99             //Unpack the message
100             senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
101             CkDeliverMessageReadonly(ep, senv.data, a_elem);            
102             CmiFree(senv.data);
103         }
104         else {
105             //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
106             envelope *env = _allocEnv(ForArrayEltMsg, 
107                                       sizeof(envelope) + size);
108
109             void *data = EnvToUsr(env);
110             memcpy(data, senv.data, size);
111             
112             //Unpack the message
113             data = (char *)_msgTable[msgIdx]->unpack(data); 
114             
115             env->getsetArrayMgr() = aid;
116             env->getsetArrayIndex() = idx;
117             env->getsetArrayEp() = ep;
118             env->setPacked(0); 
119             env->getsetArraySrcPe()=src_pe;  
120             env->getsetArrayHops()=1;  
121             env->setQueueing(CK_QUEUEING_FIFO);            
122             env->setUsed(0);
123             env->setMsgIdx(msgIdx);
124
125             env->setTotalsize(sizeof(envelope) + size);
126
127             //if(a_elem)
128             //  CkDeliverMessageFree(ep, data, a_elem);                     
129             //else
130             //ap.ckSend((CkArrayMessage *)data, ep);
131             
132             a->deliver((CkArrayMessage *)data, CkDeliver_queue);
133
134             CmiFree(senv.data);
135         }        
136     }      
137         
138     CmiFree(cmb_msg);
139 }
140
141
142 #endif