Merge branch 'ramv/cleanup-arridx-hierarchy' into charm
[charm.git] / src / ck-com / ChunkMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "ChunkMulticastStrategy.h"
10 #include <string>
11 #include <set>
12 #include <vector>
13 #include "queueing.h"
14 #include "ck.h"
15 #include "spanningTreeStrategy.h"
16
17 #define DEBUG 0
18 #define CHUNK_LL 2048   //minimum chunk size
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 ChunkMulticastStrategy::ChunkMulticastStrategy()
23   : Strategy(), CharmStrategy() {
24   //  ComlibPrintf("ChunkMulticastStrategy constructor\n");
25   setType(ARRAY_STRATEGY);
26   //numChunks = 0;
27   //nrecv = 0;
28   sentCount = 0;
29 }
30
31 ChunkMulticastStrategy::~ChunkMulticastStrategy() {
32 }
33
34 void ChunkMulticastStrategy::pup(PUP::er &p){
35   Strategy::pup(p);
36   CharmStrategy::pup(p);
37 }
38
39
40 /** Called when the user invokes the entry method on the delegated proxy. */
41 void ChunkMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
42 #if DEBUG
43   CkPrintf("[%d] ChunkMulticastStrategy::insertMessage\n", CkMyPe());
44   fflush(stdout);
45 #endif 
46
47   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
48     CkAbort("ChunkMulticastStrategy can only be used with an array section proxy");
49   }
50     
51
52   envelope *env = UsrToEnv(cmsg->getCharmMessage());
53   int npes = 1;
54   int pes[1] = {0};
55
56
57
58   //THIS IS A TEMPORARY HACK, WILL WORK ONLY FOR RING
59   const CkArrayID destArrayID(env->getsetArrayMgr());
60   int nRemotePes=-1, nRemoteIndices=-1;
61   ComlibMulticastIndexCount *indicesCount;
62   int *belongingList;
63   sinfo.getPeCount(cmsg->sec_id->_nElems, cmsg->sec_id->_elems, destArrayID, nRemotePes, nRemoteIndices, indicesCount, belongingList);
64 //  int numChunks = nRemotePes/2;
65   
66   delete [] belongingList;
67   delete [] indicesCount;
68
69 #if DEBUG
70   CkPrintf("[%d] after TRACE_CREATION_MULTICAST menv->event=%d\n", CkMyPe(), (int)env->getEvent());  
71 #endif
72
73   //message needs to be unpacked to correctly access envelope information
74   CkUnpackMessage(&env);
75   //char* msg = EnvToUsr(env);
76   int totalSize = cmsg->getSize() -sizeof(envelope) - env->getPrioBytes(); //totalsize = envelope size + usermsg size + priobits
77   int numChunks;
78   if (totalSize/CHUNK_LL < nRemotePes) numChunks = totalSize/CHUNK_LL;
79   else numChunks = nRemotePes;
80   if (numChunks == 0) numChunks = 1;
81   int chunkSize, sendingSize;
82   char **sendingMsgArr = new char*[numChunks];
83   char *sendingMsg;
84   envelope *envchunk;
85   CharmMessageHolder *holder;
86   ComlibMulticastMsg *commsg;
87   ChunkInfo *info;
88
89   //increment send counter;
90   sentCount++;
91
92   //set up message chunks and define chunk information
93   for(int i = 0; i < numChunks; i++){
94     chunkSize = totalSize / numChunks;
95     if (i < totalSize % numChunks)
96       chunkSize++;
97     chunkSize = CkMsgAlignLength(chunkSize);
98     sendingSize = chunkSize+CkMsgAlignLength(sizeof(ChunkInfo));
99     sendingMsg = (char*)CkAllocBuffer(EnvToUsr(env), sendingSize);
100     info = (ChunkInfo*)(sendingMsg);
101     info->srcPe = CkMyPe();
102     info->chunkNumber = i;
103     info->numChunks = numChunks;
104     info->chunkSize = chunkSize;
105     //info->messageSize = sendingSize;
106     info->idx = sentCount;
107     sendingMsgArr[i] = sendingMsg;
108   }
109   
110   //pack message before copying data for correct varsize packing
111   CkPackMessage(&env);
112   char *nextChunk = (char*)EnvToUsr(env);
113   for(int i = 0; i < numChunks; i++){
114     sendingMsg = sendingMsgArr[i];
115     info = (ChunkInfo*)(sendingMsg);
116     CmiMemcpy(sendingMsg+CkMsgAlignLength(sizeof(ChunkInfo)), nextChunk, info->chunkSize);
117     envchunk = UsrToEnv(sendingMsg);
118     
119     nextChunk += info->chunkSize;
120
121     CkPackMessage(&envchunk);
122     envchunk->setPacked(1);
123    
124     holder = cmsg;
125     holder->data = (char*)envchunk;
126     holder->size = envchunk->getTotalsize();
127     // Create a multicast message containing all information about remote destination objects 
128     _TRACE_CREATION_MULTICAST(envchunk, npes, pes);
129     
130     commsg = sinfo.getNewMulticastMessage(holder, 0, getInstance()); 
131
132     envchunk = UsrToEnv(commsg);
133
134     // The remote multicast method will send the message to the remote PEs, as specified in multMsg
135     remoteMulticast(commsg, true, i, numChunks);
136   //  delete holder->data;
137   }
138
139
140 #if DEBUG
141 //    CkPrintf("[%d] after TRACE_CREATION_MULTICAST multMsg->event=%d\n", CkMyPe(), (int)( UsrToEnv(commsg)->getEvent() ) );  
142 #endif
143
144   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
145   cmsg->data =  (char*)env;
146   cmsg->size = env->getTotalsize();
147   localMulticast(cmsg);
148
149   for (int i = 0; i < numChunks; i++){
150     CmiFree(UsrToEnv(sendingMsgArr[i]));
151   }
152   delete [] sendingMsgArr;
153   delete cmsg;    
154 }
155
156
157
158 /** Deliver the message to the local elements. */
159 void ChunkMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
160   double start = CmiWallTimer();
161   CkSectionID *sec_id = cmsg->sec_id;
162   CkVec< CkArrayIndex > localIndices;
163   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, sec_id->_cookie.aid, localIndices);
164   deliverToIndices(cmsg->getCharmMessage(), localIndices.size(), localIndices.getVec() );
165   //if (deliverToIndices(cmsg->getCharmMessage(), localIndices.size(), localIndices.getVec() ) == 0) 
166     //CkFreeMsg(cmsg->getCharmMessage());
167   traceUserBracketEvent(10000, start, CmiWallTimer());
168 }
169
170
171
172
173
174 /** 
175     Forward multicast message to our successor processors in the spanning tree. 
176     Uses CmiSyncListSendAndFree for delivery to this strategy's ChunkMulticastStrategy::handleMessage method.
177 */
178 void ChunkMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE, int chunkNumber, int numChunks) {
179   double start = CmiWallTimer();
180
181   envelope *env = UsrToEnv(multMsg);
182     
183   
184   /// The index into the PE list in the message
185   int myIndex = -10000; 
186   const int totalDestPEs = multMsg->nPes;
187   const int myPe = CkMyPe();
188   
189   // Find my index in the list of all destination PEs
190   if(rootPE){
191     myIndex = -1;
192   } else {
193     for (int i=0; i<totalDestPEs; ++i) {
194       if(multMsg->indicesCount[i].pe == myPe){
195         myIndex = i;
196         break;
197       }
198     }
199   }
200   
201   if(myIndex == -10000)
202     CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
203   
204   int npes;
205   int *pelist = NULL;
206   //CkPrintf("totalDestPEs = %d\n",totalDestPEs);
207   if(totalDestPEs > 0)
208     determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes, chunkNumber, numChunks );
209   else {
210     npes = 0;
211   }
212
213   if(npes == 0) {
214 #if DEBUG
215     CkPrintf("[%d] ChunkMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
216 #endif
217     traceUserBracketEvent(10001, start, CmiWallTimer());
218     CmiFree(env);
219     return;
220   }
221   
222   //Collect Multicast Statistics
223   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
224   
225
226   CmiSetHandler(env, CkpvAccess(comlib_handler));
227   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
228   CkPackMessage(&env);
229   double middle = CmiWallTimer();
230
231   
232   // CkPrintf("[%d] before CmiSyncListSendAndFree env->event=%d\n", CkMyPe(), (int)env->getEvent());
233
234 #if DEBUG
235   CkPrintf("[%d] remoteMulticast Sending to %d PEs: numChunks = %d\n", CkMyPe(), npes, numChunks);
236   for(int i=0;i<npes;i++){
237     CkPrintf("[%d]    %d\n", CkMyPe(), pelist[i]);
238   } 
239 #endif
240
241   CkAssert(npes > 0);
242   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
243   
244   delete[] pelist;
245
246   double end = CmiWallTimer();
247   traceUserBracketEvent(10001, start, middle);
248   traceUserBracketEvent(10002, middle, end);
249   
250 }
251
252 /** 
253     Receive an incoming multicast message(sent from ChunkMulticastStrategy::remoteMulticast).
254     Deliver the message to all local elements 
255 */
256 void ChunkMulticastStrategy::handleMessage(void *msg){
257 #if DEBUG
258   //  CkPrintf("[%d] ChunkMulticastStrategy::handleMessage\n", CkMyPe());
259 #endif
260   envelope *env = (envelope *)msg;
261   // CkPrintf("[%d] in ChunkMulticastStrategy::handleMessage env->event=%d\n", CkMyPe(), (int)env->getEvent());
262   
263   CkUnpackMessage(&env);
264   
265   // CkPrintf("[%d] in ChunkMulticastStrategy::handleMessage after CkUnpackMessage env->event=%d\n", CkMyPe(), (int)env->getEvent());
266   
267
268
269   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
270   
271   // Don't use msg after this point. Instead use the unpacked env
272   
273   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe()); // DOESN'T DO ANYTHING IN NEW COMLIB
274   
275   // Deliver to objects marked as local in the message
276   int localElems;
277   envelope *newenv;
278   CkArrayIndex *local_idx_list;  
279   sinfo.unpack(env, localElems, local_idx_list, newenv);
280   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
281
282   ChunkInfo *inf = (ChunkInfo*)newmsg;
283   std::list< recvBuffer* >::iterator iter;
284   recvBuffer* buf;
285   int nrecv = -1;
286   int cnumber = inf->chunkNumber;
287   int numChunks;
288   envelope** recvChunks;
289   for (iter=recvList.begin(); iter != recvList.end(); iter++){
290     buf = *iter;
291     if (inf->srcPe == buf->srcPe && inf->idx == buf->idx){
292       buf->nrecv++;
293       nrecv = buf->nrecv;
294       numChunks = buf->numChunks;
295       recvChunks = buf->recvChunks;
296       if (nrecv == numChunks){
297         delete buf;
298         recvList.erase(iter);
299       }
300       break;
301     }
302   }
303   if ( nrecv == -1){
304     numChunks = inf->numChunks;
305     nrecv = 1;
306     recvChunks = new envelope*[inf->numChunks];
307     if (numChunks > 1){
308       buf = new recvBuffer();
309       buf->numChunks = inf->numChunks;
310       buf->srcPe = inf->srcPe;
311       buf->idx = inf->idx;
312       buf->nrecv = 1;
313       buf->recvChunks = recvChunks;
314       recvList.push_back(buf);
315     }
316   }
317 #if DEBUG
318   CkPrintf("proc %d received %d chunks out of %d for message idx %d src %d chunk # = %d\n", CkMyPe(), nrecv, numChunks, inf->idx, inf->srcPe, inf->chunkNumber);
319 #endif
320   recvChunks[inf->chunkNumber] = newenv;
321   if (nrecv == numChunks){
322     void *wholemsg;
323     int totalSize = 0;
324     ChunkInfo* cinfo;
325     for (int i = 0; i < numChunks; i++){
326       cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[i]));
327       totalSize += cinfo->chunkSize;
328     }
329     wholemsg = CkAllocBuffer(newmsg, totalSize);
330     cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[0]));
331     int offset = 0;
332     for (int i = 0; i < numChunks; i++){
333       cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[i]));
334       CmiMemcpy(((char*)wholemsg)+offset, ((char*)cinfo)+CkMsgAlignLength(sizeof(ChunkInfo)), cinfo->chunkSize);
335       offset += cinfo->chunkSize;
336     }
337     envelope *envc = UsrToEnv(wholemsg);
338     envc->setPacked(1);
339     CkUnpackMessage(&envc);
340     ComlibMulticastMsg *cmmsg = (ComlibMulticastMsg *)EnvToUsr(envc);
341     deliverToIndices(cmmsg, localElems, local_idx_list );
342     for (int i = 0; i < numChunks; i++){
343       CmiFree(recvChunks[i]);
344     }
345     delete [] recvChunks;
346  //   CmiFree(newenv);
347   }
348   // Forward on to other processors if necessary
349   remoteMulticast(multMsg, false, cnumber, numChunks);
350   if (nrecv == numChunks) {
351     nrecv = 0;
352     numChunks = 0;
353   }
354 }
355
356
357     
358
359
360 void ChunkMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
361   //numChunks = totalDestPEs;
362   if(myIndex==-1){
363     // We are at a root node of the spanning tree. 
364     // We will forward the message to all other PEs in the destination list.
365     npes = totalDestPEs;
366     
367     pelist = new int[npes];
368     for (int i=0; i<npes; ++i) {
369       pelist[i] = destPEs[i].pe;
370     }
371   } else {
372     // We are at a leaf node of the spanning tree. 
373     npes = 0;
374   }
375   
376 }
377
378 void ChunkRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
379   if (myIndex == -1){
380     npes = 1;
381     pelist = new int[1];
382     pelist[0] = destPEs[chunkNumber*(totalDestPEs/numChunks)].pe;
383   }
384   else if (chunkNumber*(totalDestPEs/numChunks) != (myIndex+1) % totalDestPEs){
385     // All non-final PEs will send to next PE in list
386     npes = 1;
387     pelist = new int[1];
388     pelist[0] = destPEs[(myIndex+1) % totalDestPEs].pe;
389   }
390   else {
391     npes = 0;
392   }
393 }
394
395 /* Chunk Tree multicast strategy
396  * 1. Send chunks from source to unique places in array, with chunk i of c going to destination i*(d/c) of d.
397  * 2. Destination processor i*(d/c) forwards chunk to (i+1)*(d/c), unless last chunk
398  * 3. Destination processor i*(d/c) also forwards chunk to children defined recursively within (i*(d/c), (i+1)*(d/c)), with hopsize = numChunks
399  *
400  * Note that the communication is persistent. (Any intermediate processor sends to the same processors regardless of chunk number.)
401  */
402
403 //FIX: remainer pes are handled inefficiently. i.e. if destPes = 15 and numchunks = 8, pe 7 will send to 7 pes, while pe 0-6 send to no one.
404 //FIX: is there a way to calculate children statically instead of recalculating them each iteration?
405 void ChunkTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
406   int hop;
407   //if i am source
408   if (myIndex == -1){
409     npes = 1;
410     pelist = new int[1];
411     hop = totalDestPEs/numChunks;
412     pelist[0] = destPEs[chunkNumber*(totalDestPEs/numChunks)].pe;
413   }
414   else {
415     int depth = 1;
416     int idx = myIndex;
417     //ipes keeps track of how many pes we are sending to
418     int ipes = totalDestPEs;
419     while (1){
420       hop = ipes/numChunks;
421       if (hop == 0) hop = 1;
422       //if i am in remainder (at end)
423       if (idx >= hop*(numChunks-1)){
424         idx = idx - hop*(numChunks-1);
425         ipes = ipes - hop*(numChunks-1) - 1;
426       }
427       else {
428         idx = idx % hop;
429         ipes = hop - 1;
430       }
431       depth++;
432       if (idx == 0) break;
433       else idx--;
434     }
435     //if i receive the chunk first and if i need to pass it on (the chunk is not the last one)
436     if ( depth == 2 && ((chunkNumber-1+numChunks)%numChunks)*(totalDestPEs/numChunks) != myIndex){
437       if (numChunks < ipes) npes = numChunks + 1;
438       else npes = ipes + 1;
439       pelist = new int[npes];
440       //send chunk to next 2nd depth node along with all my children
441       if (myIndex == (totalDestPEs/numChunks)*(numChunks-1))
442         pelist[0] = destPEs[0].pe;
443       else
444         pelist[0] = destPEs[(myIndex+(totalDestPEs/numChunks))].pe;
445       hop = ipes/npes;
446       if (hop == 0) hop = 1;
447       for ( int i = 1; i < npes; i++ ){
448         pelist[i] = destPEs[(i-1)*hop + myIndex + 1].pe;
449       }
450     }
451     //pass chunk onto children
452     else {
453       //if no children
454       if (ipes <= 0){
455         npes = 0;
456         return;
457       }
458       if (numChunks < ipes) npes = numChunks;
459       else npes = ipes ;
460
461       pelist = new int[npes];
462       hop = ipes/npes;
463       if (hop == 0) hop = 1;
464       for ( int i = 0; i < npes; i++ ){
465         pelist[i] = destPEs[i*hop + myIndex + 1].pe;
466       }
467     }
468   }
469 }
470
471 //like tree except with the mother node sending chunks instead of a ring between depth 1 processors
472 void ChunkPipeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
473   int hop;
474   int *allpelist;
475   CkPrintf("myindex = %d\n", myIndex);
476   if (myIndex == -1) {
477     allpelist = new int[totalDestPEs+1];
478     allpelist[0] = CkMyPe();
479     for (int i = 1; i < totalDestPEs; i++){
480       allpelist[i] = destPEs[i-1].pe;
481     }
482   } else {
483     allpelist = new int[totalDestPEs];
484     for (int i = myIndex; i < totalDestPEs + myIndex; i++){
485       allpelist[i-myIndex] = destPEs[i%totalDestPEs].pe;
486     }
487   }
488   topo::SpanningTreeVertex *nextGenInfo;
489   nextGenInfo = topo::buildSpanningTreeGeneration(allpelist, allpelist + totalDestPEs, degree);
490   npes = nextGenInfo->childIndex.size();
491   pelist = new int[npes];
492   for (int i = 0; i < npes; i++){
493     pelist[i] = nextGenInfo->childIndex[i];
494   }
495
496
497   //if i am source
498   /*if (myIndex == -1){
499     npes = degree;
500     if (degree > totalDestPEs) npes = totalDestPEs;
501     pelist = new int[npes];
502     hop = totalDestPEs/npes;
503     if (hop == 0) hop = 1;
504     for (int i = 0; i < npes; i++){
505       pelist[i] = destPEs[i*hop].pe;
506     }
507   }
508   else {
509     int depth = 1;
510     int idx = myIndex;
511     //ipes keeps track of how many pes we are sending to
512     int ipes = totalDestPEs;
513     while (1){
514       hop = ipes/degree;
515       if (hop == 0) hop = 1;
516       //if i am in remainder (at end)
517       if (idx >= hop*(degree-1)){
518         idx = idx - hop*(degree-1);
519         ipes = ipes - hop*(degree-1) - 1;
520       }
521       else {
522         idx = idx % hop;
523         ipes = hop - 1;
524       }
525       depth++;
526       if (idx == 0) break;
527       else idx--;
528     }
529     //pass chunk onto children
530       //if no children
531     if (ipes <= 0){
532       npes = 0;
533       return;
534     }
535     if (degree < ipes) npes = degree;
536     else npes = ipes;
537
538     pelist = new int[npes];
539     hop = ipes/npes;
540     if (hop == 0) hop = 1;
541     for ( int i = 0; i < npes; i++ ){
542       pelist[i] = destPEs[i*hop + myIndex + 1].pe;
543     }
544   }*/
545 }
546
547 /*@}*/