fb9797e3ec0b31b0ac37ebaa86619e0733e71986
[charm.git] / src / ck-com / ChunkMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "ChunkMulticastStrategy.h"
10 #include <string>
11 #include <set>
12 #include <vector>
13 #include "queueing.h"
14 #include "ck.h"
15 #include "spanningTreeStrategy.h"
16
17 #define DEBUG 0
18 #define CHUNK_LL 2048   //minimum chunk size
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 ChunkMulticastStrategy::ChunkMulticastStrategy()
23   : Strategy(), CharmStrategy() {
24   //  ComlibPrintf("ChunkMulticastStrategy constructor\n");
25   setType(ARRAY_STRATEGY);
26   //numChunks = 0;
27   //nrecv = 0;
28   sentCount = 0;
29 }
30
31 ChunkMulticastStrategy::~ChunkMulticastStrategy() {
32 }
33
34 void ChunkMulticastStrategy::pup(PUP::er &p){
35   Strategy::pup(p);
36   CharmStrategy::pup(p);
37 }
38
39
40 /** Called when the user invokes the entry method on the delegated proxy. */
41 void ChunkMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
42 #if DEBUG
43   CkPrintf("[%d] ChunkMulticastStrategy::insertMessage\n", CkMyPe());
44   fflush(stdout);
45 #endif 
46
47   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
48     CkAbort("ChunkMulticastStrategy can only be used with an array section proxy");
49   }
50     
51
52   envelope *env = UsrToEnv(cmsg->getCharmMessage());
53   int npes = 1;
54   int pes[1] = {0};
55
56
57
58   //THIS IS A TEMPORARY HACK, WILL WORK ONLY FOR RING
59   const CkArrayID destArrayID(env->getsetArrayMgr());
60   int nRemotePes=-1, nRemoteIndices=-1;
61   ComlibMulticastIndexCount *indicesCount;
62   int *belongingList;
63   sinfo.getPeCount(cmsg->sec_id->_nElems, cmsg->sec_id->_elems, destArrayID, nRemotePes, nRemoteIndices, indicesCount, belongingList);
64 //  int numChunks = nRemotePes/2;
65   
66   delete [] belongingList;
67   delete [] indicesCount;
68
69 #if DEBUG
70   CkPrintf("[%d] after TRACE_CREATION_MULTICAST menv->event=%d\n", CkMyPe(), (int)env->getEvent());  
71 #endif
72
73   //message needs to be unpacked to correctly access envelope information
74   CkUnpackMessage(&env);
75   //char* msg = EnvToUsr(env);
76   int totalSize = cmsg->getSize() -sizeof(envelope) - env->getPrioBytes(); //totalsize = envelope size + usermsg size + priobits
77   int numChunks;
78   if (totalSize/CHUNK_LL < nRemotePes) numChunks = totalSize/CHUNK_LL;
79   else numChunks = nRemotePes;
80   if (numChunks == 0) numChunks = 1;
81   int chunkSize, sendingSize;
82   char **sendingMsgArr = new char*[numChunks];
83   char *sendingMsg;
84   envelope *envchunk;
85   CharmMessageHolder *holder;
86   ComlibMulticastMsg *commsg;
87   ChunkInfo *info;
88
89   //increment send counter;
90   sentCount++;
91
92   //set up message chunks and define chunk information
93   for(int i = 0; i < numChunks; i++){
94     chunkSize = totalSize / numChunks;
95     if (i < totalSize % numChunks)
96       chunkSize++;
97     chunkSize = CkMsgAlignLength(chunkSize);
98     sendingSize = chunkSize+CkMsgAlignLength(sizeof(ChunkInfo));
99     sendingMsg = (char*)CkAllocBuffer(EnvToUsr(env), sendingSize);
100     info = (ChunkInfo*)(sendingMsg);
101     info->srcPe = CkMyPe();
102     info->chunkNumber = i;
103     info->numChunks = numChunks;
104     info->chunkSize = chunkSize;
105     //info->messageSize = sendingSize;
106     info->idx = sentCount;
107     sendingMsgArr[i] = sendingMsg;
108   }
109   
110   //pack message before copying data for correct varsize packing
111   CkPackMessage(&env);
112   char *nextChunk = (char*)EnvToUsr(env);
113   for(int i = 0; i < numChunks; i++){
114     sendingMsg = sendingMsgArr[i];
115     info = (ChunkInfo*)(sendingMsg);
116     CmiMemcpy(sendingMsg+CkMsgAlignLength(sizeof(ChunkInfo)), nextChunk, info->chunkSize);
117     envchunk = UsrToEnv(sendingMsg);
118     
119     nextChunk += info->chunkSize;
120
121     CkPackMessage(&envchunk);
122     envchunk->setPacked(1);
123    
124     holder = cmsg;
125     holder->data = (char*)envchunk;
126     holder->size = envchunk->getTotalsize();
127     // Create a multicast message containing all information about remote destination objects 
128     _TRACE_CREATION_MULTICAST(envchunk, npes, pes);
129     
130     commsg = sinfo.getNewMulticastMessage(holder, 0, getInstance()); 
131
132     envchunk = UsrToEnv(commsg);
133
134     // The remote multicast method will send the message to the remote PEs, as specified in multMsg
135     remoteMulticast(commsg, true, i, numChunks);
136   //  delete holder->data;
137   }
138
139
140 #if DEBUG
141 //    CkPrintf("[%d] after TRACE_CREATION_MULTICAST multMsg->event=%d\n", CkMyPe(), (int)( UsrToEnv(commsg)->getEvent() ) );  
142 #endif
143
144   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
145   cmsg->data =  (char*)env;
146   cmsg->size = env->getTotalsize();
147   localMulticast(cmsg);
148
149   for (int i = 0; i < numChunks; i++){
150     CmiFree(UsrToEnv(sendingMsgArr[i]));
151   }
152   delete [] sendingMsgArr;
153   delete cmsg;    
154 }
155
156
157
158 /** Deliver the message to the local elements. */
159 void ChunkMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
160   double start = CmiWallTimer();
161   CkSectionID *sec_id = cmsg->sec_id;
162   CkVec< CkArrayIndex > localIndices;
163   CkArrayID aid(sec_id->_cookie.get_aid());
164   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, aid, localIndices);
165   deliverToIndices(cmsg->getCharmMessage(), localIndices.size(), localIndices.getVec() );
166   //if (deliverToIndices(cmsg->getCharmMessage(), localIndices.size(), localIndices.getVec() ) == 0) 
167     //CkFreeMsg(cmsg->getCharmMessage());
168   traceUserBracketEvent(10000, start, CmiWallTimer());
169 }
170
171
172
173
174
175 /** 
176     Forward multicast message to our successor processors in the spanning tree. 
177     Uses CmiSyncListSendAndFree for delivery to this strategy's ChunkMulticastStrategy::handleMessage method.
178 */
179 void ChunkMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE, int chunkNumber, int numChunks) {
180   double start = CmiWallTimer();
181
182   envelope *env = UsrToEnv(multMsg);
183     
184   
185   /// The index into the PE list in the message
186   int myIndex = -10000; 
187   const int totalDestPEs = multMsg->nPes;
188   const int myPe = CkMyPe();
189   
190   // Find my index in the list of all destination PEs
191   if(rootPE){
192     myIndex = -1;
193   } else {
194     for (int i=0; i<totalDestPEs; ++i) {
195       if(multMsg->indicesCount[i].pe == myPe){
196         myIndex = i;
197         break;
198       }
199     }
200   }
201   
202   if(myIndex == -10000)
203     CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
204   
205   int npes;
206   int *pelist = NULL;
207   //CkPrintf("totalDestPEs = %d\n",totalDestPEs);
208   if(totalDestPEs > 0)
209     determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes, chunkNumber, numChunks );
210   else {
211     npes = 0;
212   }
213
214   if(npes == 0) {
215 #if DEBUG
216     CkPrintf("[%d] ChunkMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
217 #endif
218     traceUserBracketEvent(10001, start, CmiWallTimer());
219     CmiFree(env);
220     return;
221   }
222   
223   //Collect Multicast Statistics
224   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
225   
226
227   CmiSetHandler(env, CkpvAccess(comlib_handler));
228   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
229   CkPackMessage(&env);
230   double middle = CmiWallTimer();
231
232   
233   // CkPrintf("[%d] before CmiSyncListSendAndFree env->event=%d\n", CkMyPe(), (int)env->getEvent());
234
235 #if DEBUG
236   CkPrintf("[%d] remoteMulticast Sending to %d PEs: numChunks = %d\n", CkMyPe(), npes, numChunks);
237   for(int i=0;i<npes;i++){
238     CkPrintf("[%d]    %d\n", CkMyPe(), pelist[i]);
239   } 
240 #endif
241
242   CkAssert(npes > 0);
243   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
244   
245   delete[] pelist;
246
247   double end = CmiWallTimer();
248   traceUserBracketEvent(10001, start, middle);
249   traceUserBracketEvent(10002, middle, end);
250   
251 }
252
253 /** 
254     Receive an incoming multicast message(sent from ChunkMulticastStrategy::remoteMulticast).
255     Deliver the message to all local elements 
256 */
257 void ChunkMulticastStrategy::handleMessage(void *msg){
258 #if DEBUG
259   //  CkPrintf("[%d] ChunkMulticastStrategy::handleMessage\n", CkMyPe());
260 #endif
261   envelope *env = (envelope *)msg;
262   // CkPrintf("[%d] in ChunkMulticastStrategy::handleMessage env->event=%d\n", CkMyPe(), (int)env->getEvent());
263   
264   CkUnpackMessage(&env);
265   
266   // CkPrintf("[%d] in ChunkMulticastStrategy::handleMessage after CkUnpackMessage env->event=%d\n", CkMyPe(), (int)env->getEvent());
267   
268
269
270   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
271   
272   // Don't use msg after this point. Instead use the unpacked env
273   
274   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe()); // DOESN'T DO ANYTHING IN NEW COMLIB
275   
276   // Deliver to objects marked as local in the message
277   int localElems;
278   envelope *newenv;
279   CkArrayIndex *local_idx_list;  
280   sinfo.unpack(env, localElems, local_idx_list, newenv);
281   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
282
283   ChunkInfo *inf = (ChunkInfo*)newmsg;
284   std::list< recvBuffer* >::iterator iter;
285   recvBuffer* buf;
286   int nrecv = -1;
287   int cnumber = inf->chunkNumber;
288   int numChunks;
289   envelope** recvChunks;
290   for (iter=recvList.begin(); iter != recvList.end(); iter++){
291     buf = *iter;
292     if (inf->srcPe == buf->srcPe && inf->idx == buf->idx){
293       buf->nrecv++;
294       nrecv = buf->nrecv;
295       numChunks = buf->numChunks;
296       recvChunks = buf->recvChunks;
297       if (nrecv == numChunks){
298         delete buf;
299         recvList.erase(iter);
300       }
301       break;
302     }
303   }
304   if ( nrecv == -1){
305     numChunks = inf->numChunks;
306     nrecv = 1;
307     recvChunks = new envelope*[inf->numChunks];
308     if (numChunks > 1){
309       buf = new recvBuffer();
310       buf->numChunks = inf->numChunks;
311       buf->srcPe = inf->srcPe;
312       buf->idx = inf->idx;
313       buf->nrecv = 1;
314       buf->recvChunks = recvChunks;
315       recvList.push_back(buf);
316     }
317   }
318 #if DEBUG
319   CkPrintf("proc %d received %d chunks out of %d for message idx %d src %d chunk # = %d\n", CkMyPe(), nrecv, numChunks, inf->idx, inf->srcPe, inf->chunkNumber);
320 #endif
321   recvChunks[inf->chunkNumber] = newenv;
322   if (nrecv == numChunks){
323     void *wholemsg;
324     int totalSize = 0;
325     ChunkInfo* cinfo;
326     for (int i = 0; i < numChunks; i++){
327       cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[i]));
328       totalSize += cinfo->chunkSize;
329     }
330     wholemsg = CkAllocBuffer(newmsg, totalSize);
331     cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[0]));
332     int offset = 0;
333     for (int i = 0; i < numChunks; i++){
334       cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[i]));
335       CmiMemcpy(((char*)wholemsg)+offset, ((char*)cinfo)+CkMsgAlignLength(sizeof(ChunkInfo)), cinfo->chunkSize);
336       offset += cinfo->chunkSize;
337     }
338     envelope *envc = UsrToEnv(wholemsg);
339     envc->setPacked(1);
340     CkUnpackMessage(&envc);
341     ComlibMulticastMsg *cmmsg = (ComlibMulticastMsg *)EnvToUsr(envc);
342     deliverToIndices(cmmsg, localElems, local_idx_list );
343     for (int i = 0; i < numChunks; i++){
344       CmiFree(recvChunks[i]);
345     }
346     delete [] recvChunks;
347  //   CmiFree(newenv);
348   }
349   // Forward on to other processors if necessary
350   remoteMulticast(multMsg, false, cnumber, numChunks);
351   if (nrecv == numChunks) {
352     nrecv = 0;
353     numChunks = 0;
354   }
355 }
356
357
358     
359
360
361 void ChunkMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
362   //numChunks = totalDestPEs;
363   if(myIndex==-1){
364     // We are at a root node of the spanning tree. 
365     // We will forward the message to all other PEs in the destination list.
366     npes = totalDestPEs;
367     
368     pelist = new int[npes];
369     for (int i=0; i<npes; ++i) {
370       pelist[i] = destPEs[i].pe;
371     }
372   } else {
373     // We are at a leaf node of the spanning tree. 
374     npes = 0;
375   }
376   
377 }
378
379 void ChunkRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
380   if (myIndex == -1){
381     npes = 1;
382     pelist = new int[1];
383     pelist[0] = destPEs[chunkNumber*(totalDestPEs/numChunks)].pe;
384   }
385   else if (chunkNumber*(totalDestPEs/numChunks) != (myIndex+1) % totalDestPEs){
386     // All non-final PEs will send to next PE in list
387     npes = 1;
388     pelist = new int[1];
389     pelist[0] = destPEs[(myIndex+1) % totalDestPEs].pe;
390   }
391   else {
392     npes = 0;
393   }
394 }
395
396 /* Chunk Tree multicast strategy
397  * 1. Send chunks from source to unique places in array, with chunk i of c going to destination i*(d/c) of d.
398  * 2. Destination processor i*(d/c) forwards chunk to (i+1)*(d/c), unless last chunk
399  * 3. Destination processor i*(d/c) also forwards chunk to children defined recursively within (i*(d/c), (i+1)*(d/c)), with hopsize = numChunks
400  *
401  * Note that the communication is persistent. (Any intermediate processor sends to the same processors regardless of chunk number.)
402  */
403
404 //FIX: remainer pes are handled inefficiently. i.e. if destPes = 15 and numchunks = 8, pe 7 will send to 7 pes, while pe 0-6 send to no one.
405 //FIX: is there a way to calculate children statically instead of recalculating them each iteration?
406 void ChunkTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
407   int hop;
408   //if i am source
409   if (myIndex == -1){
410     npes = 1;
411     pelist = new int[1];
412     hop = totalDestPEs/numChunks;
413     pelist[0] = destPEs[chunkNumber*(totalDestPEs/numChunks)].pe;
414   }
415   else {
416     int depth = 1;
417     int idx = myIndex;
418     //ipes keeps track of how many pes we are sending to
419     int ipes = totalDestPEs;
420     while (1){
421       hop = ipes/numChunks;
422       if (hop == 0) hop = 1;
423       //if i am in remainder (at end)
424       if (idx >= hop*(numChunks-1)){
425         idx = idx - hop*(numChunks-1);
426         ipes = ipes - hop*(numChunks-1) - 1;
427       }
428       else {
429         idx = idx % hop;
430         ipes = hop - 1;
431       }
432       depth++;
433       if (idx == 0) break;
434       else idx--;
435     }
436     //if i receive the chunk first and if i need to pass it on (the chunk is not the last one)
437     if ( depth == 2 && ((chunkNumber-1+numChunks)%numChunks)*(totalDestPEs/numChunks) != myIndex){
438       if (numChunks < ipes) npes = numChunks + 1;
439       else npes = ipes + 1;
440       pelist = new int[npes];
441       //send chunk to next 2nd depth node along with all my children
442       if (myIndex == (totalDestPEs/numChunks)*(numChunks-1))
443         pelist[0] = destPEs[0].pe;
444       else
445         pelist[0] = destPEs[(myIndex+(totalDestPEs/numChunks))].pe;
446       hop = ipes/npes;
447       if (hop == 0) hop = 1;
448       for ( int i = 1; i < npes; i++ ){
449         pelist[i] = destPEs[(i-1)*hop + myIndex + 1].pe;
450       }
451     }
452     //pass chunk onto children
453     else {
454       //if no children
455       if (ipes <= 0){
456         npes = 0;
457         return;
458       }
459       if (numChunks < ipes) npes = numChunks;
460       else npes = ipes ;
461
462       pelist = new int[npes];
463       hop = ipes/npes;
464       if (hop == 0) hop = 1;
465       for ( int i = 0; i < npes; i++ ){
466         pelist[i] = destPEs[i*hop + myIndex + 1].pe;
467       }
468     }
469   }
470 }
471
472 //like tree except with the mother node sending chunks instead of a ring between depth 1 processors
473 void ChunkPipeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
474   /*  int hop;*/
475   int *allpelist;
476   CkPrintf("myindex = %d\n", myIndex);
477   if (myIndex == -1) {
478     allpelist = new int[totalDestPEs+1];
479     allpelist[0] = CkMyPe();
480     for (int i = 1; i < totalDestPEs; i++){
481       allpelist[i] = destPEs[i-1].pe;
482     }
483   } else {
484     allpelist = new int[totalDestPEs];
485     for (int i = myIndex; i < totalDestPEs + myIndex; i++){
486       allpelist[i-myIndex] = destPEs[i%totalDestPEs].pe;
487     }
488   }
489   topo::SpanningTreeVertex *nextGenInfo;
490   nextGenInfo = topo::buildSpanningTreeGeneration(allpelist, allpelist + totalDestPEs, degree);
491   npes = nextGenInfo->childIndex.size();
492   pelist = new int[npes];
493   for (int i = 0; i < npes; i++){
494     pelist[i] = nextGenInfo->childIndex[i];
495   }
496
497
498   //if i am source
499   /*if (myIndex == -1){
500     npes = degree;
501     if (degree > totalDestPEs) npes = totalDestPEs;
502     pelist = new int[npes];
503     hop = totalDestPEs/npes;
504     if (hop == 0) hop = 1;
505     for (int i = 0; i < npes; i++){
506       pelist[i] = destPEs[i*hop].pe;
507     }
508   }
509   else {
510     int depth = 1;
511     int idx = myIndex;
512     //ipes keeps track of how many pes we are sending to
513     int ipes = totalDestPEs;
514     while (1){
515       hop = ipes/degree;
516       if (hop == 0) hop = 1;
517       //if i am in remainder (at end)
518       if (idx >= hop*(degree-1)){
519         idx = idx - hop*(degree-1);
520         ipes = ipes - hop*(degree-1) - 1;
521       }
522       else {
523         idx = idx % hop;
524         ipes = hop - 1;
525       }
526       depth++;
527       if (idx == 0) break;
528       else idx--;
529     }
530     //pass chunk onto children
531       //if no children
532     if (ipes <= 0){
533       npes = 0;
534       return;
535     }
536     if (degree < ipes) npes = degree;
537     else npes = ipes;
538
539     pelist = new int[npes];
540     hop = ipes/npes;
541     if (hop == 0) hop = 1;
542     for ( int i = 0; i < npes; i++ ){
543       pelist[i] = destPEs[i*hop + myIndex + 1].pe;
544     }
545   }*/
546 }
547
548 /*@}*/