Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / PipeBroadcastStrategy.C
1 #include "PipeBroadcastStrategy.h"
2
3 //PipeBcastHashKey CODE
4 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
5     return ((const PipeBcastHashKey *)k1)->
6                 compare(*(const PipeBcastHashKey *)k2);
7 }
8
9 CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
10     return ((const PipeBcastHashKey *)v)->hash();
11 }
12
13 CkpvExtern(CkGroupID, cmgrID);
14
15 void propagate_handler(void *message) {
16   // call the appropriate function PipeBroadcastStrategy::propagate
17   //int instid = ((envelope *)message)->getEpIdx();
18   //int instid = ((CkMcastBaseMsg*)(EnvToUsr((envelope*)message)))->_cookie.sInfo.cInfo.instId;
19   int instid = CmiGetXHandler(message);
20   PipeBroadcastStrategy *myStrategy = (PipeBroadcastStrategy *)CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
21   myStrategy->propagate((envelope *)message, false);
22 }
23
24 void propagate_handler_frag(void *message) {
25   int instid = CmiGetXHandler(message);
26   PipeBroadcastStrategy *myStrategy = (PipeBroadcastStrategy *)CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
27   myStrategy->propagate((envelope *)message, true);
28 }
29
30
31 void PipeBroadcastStrategy::propagate(envelope *env, int isFragmented){
32   // find destination processors and send
33   int destination, tmp, k;
34   int num_pes, *dest_pes;
35   PipeBcastInfo *info = (PipeBcastInfo*)(((char*)env)+CmiReservedHeaderSize);
36   int srcPeNumber = isFragmented ? info->srcPe : env->getSrcPe();
37   int totalSendingSize = isFragmented ? info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo) : env->getTotalsize();
38
39   switch (topology) {
40   case USE_LINEAR:
41     if (srcPeNumber == (CkMyPe()+1)%CkNumPes()) break;
42     destination = (CkMyPe()+1) % CkNumPes();
43     ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CkMyPe(), destination);
44     CmiSyncSend(destination, totalSendingSize, (char *)env);
45     break;
46   case USE_HYPERCUBE:
47     tmp = srcPeNumber ^ CkMyPe();
48     k = int(log((double)CkNumPes()) * log_of_2_inv + 2);
49     if (tmp) {
50       do {--k;} while (!(tmp>>k));
51     }
52     ComlibPrintf("[%d] tmp=%d, k=%d\n",CkMyPe(),tmp,k);
53     // now 'k' is the last dimension in the hypercube used for exchange
54     if (isFragmented) info->srcPe = CkMyPe();
55     else env->setSrcPe(CkMyPe());  // where the message is coming from
56     dest_pes = (int *)malloc(k*sizeof(int));
57     --k;  // next dimension in the cube to be used
58     num_pes = HypercubeGetBcastDestinations(k, dest_pes);
59
60     /*
61     for ( ; k>=0; --k) {
62       // add the processor destination at level k if it exist
63       dest_pes[num_pes] = CkMyPe() ^ (1<<k);
64       if (dest_pes[num_pes] >= CkNumPes()) {
65         dest_pes[num_pes] &= (-1)<<k;
66         if (CkNumPes()>dest_pes[num_pes]) dest_pes[num_pes] += (CkMyPe() - (CkMyPe() & ((-1)<<k))) % (CkNumPes() - dest_pes[num_pes]);
67       }
68       if (dest_pes[num_pes] < CkNumPes()) {
69         ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CkMyPe(), dest_pes[num_pes]);
70         ++num_pes;
71       }
72     }
73     */
74
75     //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
76     for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], totalSendingSize, (char *)env);
77     free(dest_pes);
78     break;
79
80     // for other strategies
81
82   default:
83     // should NEVER reach here!
84     CmiPrintf("Error, topology %d not known\n",topology);
85     CkExit();
86   }
87
88   // deliver messages to local objects (i.e. send it to ComlibManager)
89   deliverer(env, isFragmented);
90   //CmiSetHandler(env, CmiGetXHandler(env));
91   //CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
92
93 }
94
95 void PipeBroadcastStrategy::deliverer(envelope *env_frag, int isFragmented) {
96   envelope *env;
97   int isFinished=0;
98   ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
99
100   // check if the message is fragmented
101   if (isFragmented) {
102     // store the fragment in the hash table until completed
103     ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
104     PipeBcastInfo *info = (PipeBcastInfo*)(((char*)env_frag)+CmiReservedHeaderSize);
105
106     PipeBcastHashKey key (info->bcastPe, info->seqNumber);
107     PipeBcastHashObj *position = fragments.get(key);
108
109     char *incomingMsg;
110     if (position) {
111       // the message already exist, add to it
112       ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
113       incomingMsg = position->message;
114       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), ((char*)env_frag)+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
115
116       if (--position->remaining == 0) {  // message completely received
117         isFinished = 1;
118         env = (envelope*)incomingMsg;
119         // delete from the hash table
120         fragments.remove(key);
121       }
122
123     } else {
124       // the message doesn't exist, create it
125       ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
126       incomingMsg = (char*)CmiAlloc(info->messageSize);
127       memcpy (incomingMsg, env_frag, CmiReservedHeaderSize);
128       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), ((char*)env_frag)+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
129       int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
130       if (remaining) {  // more than one chunk (it was not forced to be splitted)
131         PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
132         fragments.put(key) = object;
133       } else {  // only one chunk, it was forces to be splitted
134         isFinished = 1;
135         env = (envelope*)incomingMsg;
136         // nothing to delete from fragments since nothing has been added
137       }
138     }
139     CmiFree(env_frag);
140
141   } else {  // message not fragmented
142     ComlibPrintf("[%d] deliverer: received message in single chunk\n",CkMyPe());
143     isFinished = 1;
144     env = env_frag;
145   }
146
147   if (isFinished) {
148     if (getType() == ARRAY_STRATEGY) {
149       CkArray *dest_array = CkArrayID::CkLocalBranch(aid);
150       localDest = new CkVec<CkArrayIndexMax>;
151       dest_array->getComlibArrayListener()->getLocalIndices(*localDest);
152       void *msg = EnvToUsr(env);
153       CkArrayIndexMax idx;
154       ArrayElement *elem;
155       int ep = env->getsetArrayEp();
156       CkUnpackMessage(&env);
157
158       ComlibPrintf("[%d] deliverer: delivering a finished message\n",CkMyPe());
159       for (int count = 0; count < localDest->size(); ++count) {
160         idx = (*localDest)[count];
161         ComlibPrintf("[%d] Sending message to ",CkMyPe());
162         if (comm_debug) idx.print();
163
164         CProxyElement_ArrayBase ap(aid, idx);
165         elem = ap.ckLocal();
166         CkDeliverMessageReadonly (ep, msg, elem);
167       }
168       delete localDest;
169       // the envelope env should be deleted only if the message is delivered
170       CmiFree(env);
171     }
172
173     if (getType() == GROUP_STRATEGY) {
174       // deliver the message to the predifined group "gid"
175       CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), gid);
176     }
177   }
178 }
179
180 PipeBroadcastStrategy::PipeBroadcastStrategy()
181   :topology(USE_HYPERCUBE), pipeSize(DEFAULT_PIPE), CharmStrategy() {
182
183     //isArray = 0;
184     commonInit();
185 }
186
187 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology)
188   :topology(_topology), pipeSize(DEFAULT_PIPE), CharmStrategy() {
189     //isArray = 0;
190   commonInit();
191 }
192
193 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, int _pipeSize)
194   :topology(_topology), pipeSize(_pipeSize), CharmStrategy() {
195     //isArray = 0;
196   commonInit();
197 }
198
199 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid)
200   :topology(_topology), pipeSize(DEFAULT_PIPE), CharmStrategy() {
201     //isArray = 1;
202   setType(ARRAY_STRATEGY);
203   aid = _aid;
204   CmiPrintf("init: %d %d\n",topology, pipeSize);
205   commonInit();
206 }
207
208 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize)
209   :topology(_topology), pipeSize(_pipeSize), CharmStrategy() {
210     setType(ARRAY_STRATEGY);
211   aid = _aid;
212   commonInit();
213 }
214
215 PipeBroadcastStrategy::PipeBroadcastStrategy(CkGroupID _gid, int _topology, int _pipeSize)
216   :topology(_topology), pipeSize(_pipeSize), CharmStrategy() {
217     setType(GROUP_STRATEGY);
218   gid = _gid;
219   commonInit();
220 }
221
222 void PipeBroadcastStrategy::commonInit(){
223   log_of_2_inv = 1/log((double)2);
224   seqNumber = 0;
225 }
226
227 void PipeBroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
228   ComlibPrintf("[%d] Pipelined Broadcast with strategy %d\n",CkMyPe(),topology);
229   messageBuf->enq(cmsg);
230   doneInserting();
231 }
232
233 // routine for interfacing with converse.
234 // Require only the converse reserved header if forceSplit is true
235 void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize, int forceSplit) {
236   // set the instance ID to be used by the receiver using the XHandler variable
237   CmiSetXHandler(env, myInstanceID);
238
239   if (totalSize > pipeSize || forceSplit) {
240     ++seqNumber;
241     // message doesn't fit into the pipe: split it into chunks and propagate them individually
242     ComlibPrintf("[%d] Propagating message in multiple chunks\n",CkMyPe());
243
244     char *sendingMsg;
245     char *nextChunk = ((char*)env)+CmiReservedHeaderSize;
246     int remaining = totalSize-CmiReservedHeaderSize;
247     int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
248     ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
249     ComlibPrintf("sending %d chunks of size %d, total=%d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining);
250     for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
251       sendingMsg = (char*)CmiAlloc(pipeSize);
252       CmiSetHandler(env, propagateHandle_frag);
253       memcpy (sendingMsg, env, CmiReservedHeaderSize);
254       PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
255       info->srcPe = CkMyPe();
256       info->bcastPe = CkMyPe();
257       info->seqNumber = seqNumber;
258       info->chunkNumber = i;
259       info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
260       info->messageSize = totalSize;
261       memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, reducedPipe);
262
263       remaining -= reducedPipe;
264       nextChunk += reducedPipe;
265
266       propagate((envelope*)sendingMsg, true);
267     }
268
269   } else {
270     // the message fit into the pipe, so send it in a single chunk
271     ComlibPrintf("[%d] Propagating message in one single chunk\n",CkMyPe());
272     CmiSetHandler(env, propagateHandle);
273     env->setSrcPe(CkMyPe());
274     //env->setEpIdx(myInstanceID);
275     propagate(env, false);
276   }
277 }
278
279 void PipeBroadcastStrategy::doneInserting(){
280   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
281   while (!messageBuf->isEmpty()) {
282     CharmMessageHolder *cmsg = messageBuf->deq();
283     // modify the Handler to deliver the message to the propagator
284     envelope *env = UsrToEnv(cmsg->getCharmMessage());
285
286     conversePipeBcast(env, env->getTotalsize(), false);
287   }
288 }
289
290 void PipeBroadcastStrategy::pup(PUP::er &p){
291   ComlibPrintf("[%d] PipeBroadcast pupping %s\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
292   CharmStrategy::pup(p);
293
294   p | aid;
295   p | gid;
296   p | pipeSize;
297   p | topology;
298   p | seqNumber;
299
300   if (p.isUnpacking()) {
301     log_of_2_inv = 1/log((double)2);
302     messageBuf = new CkQ<CharmMessageHolder *>;
303     propagateHandle = CkRegisterHandler((CmiHandler)propagate_handler);
304     propagateHandle_frag = CkRegisterHandler((CmiHandler)propagate_handler_frag);
305   }
306   //p|(*messageBuf);
307   //p|fragments;
308
309 }
310
311 //PUPable_def(PipeBroadcastStrategy);