Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19 #include "XArraySectionReducer.h"
20
21 #define DEBUGF(x)  // CkPrintf x;
22
23 // turn on or off fragmentation in multicast
24 #if CMK_MESSAGE_LOGGING
25 #define SPLIT_MULTICAST  0
26 #else
27 #define SPLIT_MULTICAST  1
28 #endif
29
30 // maximum number of fragments into which a message can be broken
31 #define MAXFRAGS 100
32
33 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
34 typedef CkVec<CkArrayIndex>   arrayIndexList;
35 typedef CkVec<CkSectionInfo>     sectionIdList;
36 typedef CkVec<CkReductionMsg *>  reductionMsgs;
37 typedef CkQ<int>                 PieceSize;
38 typedef CkVec<LDObjid>          ObjKeyList;
39 typedef unsigned char            byte;
40
41 /** Information about the status of reductions proceeding along a given section
42  *
43  * An instance of this class is stored in every mCastEntry object making it possible
44  * to track redn operations on a per section basis all along the spanning tree.
45  */
46 class reductionInfo {
47     public:
48         /// Number of local array elements which have contributed a given fragment
49         int lcount [MAXFRAGS];
50         /// Number of child vertices (NOT array elements) that have contributed a given fragment
51         int ccount [MAXFRAGS];
52         /// The total number of array elements that have contributed so far to a given fragment
53         int gcount [MAXFRAGS];
54         /// The number of fragments processed so far
55         int npProcessed;
56         /// User callback
57         CkCallback *storedCallback;
58         /// User reduction client function
59         redClientFn storedClient;
60         /// User provided data for the redn client function
61         void *storedClientParam;
62         /// Reduction sequence number
63         int redNo;
64         /// Messages for this reduction
65         reductionMsgs  msgs [MAXFRAGS];
66         /// Messages of future reductions
67         reductionMsgs futureMsgs;
68
69     public:
70         reductionInfo(): storedCallback(NULL),
71                          storedClientParam(NULL),
72                          redNo(0),
73                          npProcessed(0) {
74             for (int i=0; i<MAXFRAGS; i++)
75                 lcount [i] = ccount [i] = gcount [i] = 0;
76         }
77 };
78
79 /// cookie status
80 #define COOKIE_NOTREADY 0
81 #define COOKIE_READY    1
82 #define COOKIE_OLD      2
83
84 class mCastPacket {
85 public:
86   CkSectionInfo cookie;
87   int offset;
88   int n;
89   char *data;
90   int seqno;
91   int count;
92   int totalsize;
93
94   mCastPacket(CkSectionInfo &_cookie, int _offset, int _n, char *_d, int _s, int _c, int _t):
95                 cookie(_cookie), offset(_offset), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
96 };
97
98 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
99
100 class SectionLocation {
101 public:
102   mCastEntry *entry;
103   int         pe;
104 public:
105   SectionLocation(): entry(NULL), pe(-1) {}
106   SectionLocation( mCastEntry *e, int p) { set(e, p); }
107   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
108   inline void clear() { entry = NULL; pe = -1; }
109 };
110
111
112
113
114 /// Cookie for an array section 
115 class mCastEntry 
116 {
117     public:
118         /// Array ID 
119         CkArrayID     aid;
120         /// Spanning tree parent
121         CkSectionInfo parentGrp;
122         /// List of direct children
123         sectionIdList children;
124         /// Number of direct children
125         int numChild;
126         /// List of all tree member array indices (Only useful on the tree root)
127         arrayIndexList allElem;
128         /// Only useful on root for LB
129         ObjKeyList     allObjKeys;
130         /// List of array elements on local PE
131         arrayIndexList localElem;
132         /// Should always be myPE
133         int pe;
134         /// Section ID of the root
135         CkSectionInfo rootSid;
136         multicastGrpMsgBuf msgBuf;
137         /// Buffer storing the pending packets
138         multicastGrpPacketBuf packetBuf;
139         /// For multicast packetization
140         char *asm_msg;
141         int   asm_fill;
142         /// Linked list of entries on the same processor
143         mCastEntry *oldc, *newc;
144         /// Old spanning tree
145         SectionLocation   oldtree;
146         // for reduction
147         reductionInfo red;
148         //
149         char needRebuild;
150     private:
151         char flag;
152     public:
153         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
154                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
155         mCastEntry(mCastEntry *);
156         /// Check if this tree is only a branch and has a parent
157         inline int hasParent() { return parentGrp.get_val()?1:0; }
158         /// Is this tree obsolete?
159         inline int isObsolete() { return (flag == COOKIE_OLD); }
160         /// Make the current tree obsolete
161         inline void setObsolete() { flag=COOKIE_OLD; }
162         /// Check if this (branch of the) tree is ready for use
163         inline int notReady() { return (flag == COOKIE_NOTREADY); }
164         /// Mark this (branch of the) tree as ready for use
165         inline void setReady() { flag=COOKIE_READY; }
166         /// Increment the reduction number across the whole linked list of cookies
167         inline void incReduceNo() {
168             red.redNo ++;
169             for (mCastEntry *next = newc; next; next=next->newc) 
170                 next->red.redNo++;
171         }
172         /// Get a handle on the array ID this tree is a member of
173         inline CkArrayID getAid() { return aid; }
174         inline int hasOldtree() { return oldtree.entry != NULL; }
175         inline void print() {
176             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
177         }
178 };
179
180
181
182
183 class cookieMsg: public CMessage_cookieMsg {
184 public:
185   CkSectionInfo cookie;
186 public:
187   cookieMsg() {};
188   cookieMsg(CkSectionInfo m): cookie(m) {};
189 };
190
191
192
193
194 /// multicast tree setup message
195 class multicastSetupMsg: public CMessage_multicastSetupMsg {
196 public:
197   int  nIdx;
198   CkArrayIndex *arrIdx;
199   int      *lastKnown;
200   CkSectionInfo parent;
201   CkSectionInfo rootSid;
202   int redNo;
203 };
204
205
206
207
208 /// message send in spanning tree
209 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
210 };
211
212
213 extern void CkPackMessage(envelope **pEnv);
214 extern void CkUnpackMessage(envelope **pEnv);
215
216
217
218 void _ckMulticastInit(void)
219 {
220 /*
221   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
222   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
223 */
224 }
225
226
227 mCastEntry::mCastEntry (mCastEntry *old): 
228   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
229 {
230   int i;
231   aid = old->aid;
232   parentGrp = old->parentGrp;
233   for (i=0; i<old->allElem.length(); i++)
234     allElem.push_back(old->allElem[i]);
235 #if CMK_LBDB_ON
236   CmiAssert(old->allElem.length() == old->allObjKeys.length());
237   for (i=0; i<old->allObjKeys.length(); i++)
238     allObjKeys.push_back(old->allObjKeys[i]);
239 #endif
240   pe = old->pe;
241   red.storedCallback = old->red.storedCallback;
242   red.storedClient = old->red.storedClient;
243   red.storedClientParam = old->red.storedClientParam;
244   red.redNo = old->red.redNo;
245   needRebuild = 0;
246   asm_msg = NULL;
247   asm_fill = 0;
248 }
249
250 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
251
252
253
254
255 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n)
256 {
257     // Create a multicast entry
258     mCastEntry *entry = new mCastEntry(aid);
259     // Push all the section member indices into the entry
260     for (int i=0; i<n; i++) {
261         entry->allElem.push_back(al[i]);
262 #if CMK_LBDB_ON
263         const LDObjid key = idx2LDObjid(al[i]);
264         entry->allObjKeys.push_back(key);
265 #endif
266     }
267     //  entry->aid = aid;
268     _id.get_aid() = aid;
269     _id.get_val() = entry;              // allocate table for this section
270     // 
271     initCookie(_id);
272 }
273
274
275
276
277 void CkMulticastMgr::setSection(CkSectionInfo &id)
278 {
279   initCookie(id);
280 }
281
282
283
284
285 /// @warning: This is deprecated
286 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
287 {
288   CkArrayID aid = proxy.ckGetArrayID();
289   CkSectionInfo &_id = proxy.ckGetSectionInfo();
290
291   mCastEntry *entry = new mCastEntry(aid);
292
293   const CkArrayIndex *al = proxy.ckGetArrayElements();
294   for (int i=0; i<proxy.ckGetNumElements(); i++) {
295     entry->allElem.push_back(al[i]);
296 #if CMK_LBDB_ON
297     const LDObjid key = idx2LDObjid(al[i]);
298     entry->allObjKeys.push_back(key);
299 #endif
300   }
301   _id.get_type() = MulticastMsg;
302   _id.get_aid() = aid;
303   _id.get_val() = entry;                // allocate table for this section
304   initCookie(_id);
305 }
306
307
308
309
310 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
311 {
312   CkSectionInfo &info = proxy.ckGetSectionInfo();
313
314   int oldpe = info.get_pe();
315   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
316
317   CkArrayID aid = proxy.ckGetArrayID();
318   CkSectionID *sid = proxy.ckGetSectionIDs();
319   mCastEntry *entry = new mCastEntry(aid);
320
321   mCastEntry *oldentry = (mCastEntry *)info.get_val();
322   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
323
324   const CkArrayIndex *al = sid->_elems;
325   CmiAssert(info.get_aid() == aid);
326   prepareCookie(entry, *sid, al, sid->_nElems, aid);
327
328   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
329
330     // store old tree info
331   entry->oldtree.set(oldentry, oldpe);
332
333     // obsolete old tree
334   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
335
336   // find reduction number
337   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
338 }
339
340
341
342
343 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
344 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndex *al, int count, CkArrayID aid)
345 {
346   for (int i=0; i<count; i++) {
347     entry->allElem.push_back(al[i]);
348 #if CMK_LBDB_ON
349     const LDObjid key = idx2LDObjid(al[i]);
350     entry->allObjKeys.push_back(key);
351 #endif
352   }
353   sid._cookie.get_type() = MulticastMsg;
354   sid._cookie.get_aid() = aid;
355   sid._cookie.get_val() = entry;        // allocate table for this section
356   sid._cookie.get_pe() = CkMyPe();
357 }
358
359
360
361
362 // this is used
363 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
364 {
365   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
366   int numSubSections = proxy->ckGetNumSubSections();
367   for (int i=0; i<numSubSections; i++)
368   {
369       CkArrayID aid = proxy->ckGetArrayIDn(i);
370       mCastEntry *entry = new mCastEntry(aid);
371       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
372       const CkArrayIndex *al = proxy->ckGetArrayElements(i);
373       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
374       initCookie(sid->_cookie);
375   }
376 }
377
378
379
380
381 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
382 {
383   mCastEntry *entry = (mCastEntry *)s.get_val();
384   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
385   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
386 }
387
388 // now that we get reduction number from the old cookie,
389 // we continue to build the spanning tree
390 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
391 {
392   mCastEntry *entry = (mCastEntry *)s.get_val();
393   entry->red.redNo = red;
394
395   initCookie(s);
396
397   // TODO delete old tree
398 }
399
400
401
402
403 void CkMulticastMgr::initCookie(CkSectionInfo s)
404 {
405     mCastEntry *entry = (mCastEntry *)s.get_val();
406     int n = entry->allElem.length();
407     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
408     // Create and initialize a setup message
409     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
410     msg->nIdx = n;
411     msg->parent = CkSectionInfo(entry->getAid());
412     msg->rootSid = s;
413     msg->redNo = entry->red.redNo;
414     // Fill the message with the section member indices and their last known locations
415     CkArray *array = CProxy_ArrayBase(s.get_aid()).ckLocalBranch();
416     for (int i=0; i<n; i++) {
417       msg->arrIdx[i] = entry->allElem[i];
418       int ape = array->lastKnown(entry->allElem[i]);
419       CmiAssert(ape >=0 && ape < CkNumPes());
420       msg->lastKnown[i] = ape;
421     }
422     // Trigger the spanning tree build
423     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
424     mCastGrp[CkMyPe()].setup(msg);
425 }
426
427
428
429
430 void CkMulticastMgr::teardown(CkSectionInfo cookie)
431 {
432     int i;
433     mCastEntry *sect = (mCastEntry *)cookie.get_val();
434     // Mark this section as obsolete
435     sect->setObsolete();
436     // Release the buffered messages 
437     releaseBufferedReduceMsgs(sect);
438     // Propagate the teardown to each of your children
439     CProxy_CkMulticastMgr mp(thisgroup);
440     for (i=0; i<sect->children.length(); i++)
441         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
442 }
443
444
445
446
447 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
448 {
449     int i;
450     mCastEntry *sect = (mCastEntry *)cookie.get_val();
451     // Reset the root section info
452     sect->rootSid = newroot;
453     // Mark this section as obsolete
454     sect->setObsolete();
455     // Release the buffered messages 
456     releaseBufferedReduceMsgs(sect);
457     // Propagate the teardown to each of your children
458     CProxy_CkMulticastMgr mp(thisgroup);
459     for (i=0; i<sect->children.length(); i++)
460         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
461 }
462
463
464
465
466 void CkMulticastMgr::freeup(CkSectionInfo cookie)
467 {
468   mCastEntry *sect = (mCastEntry *)cookie.get_val();
469   CProxy_CkMulticastMgr mp(thisgroup);
470   // Parse through all the section members on this PE and...
471   while (sect) 
472   {
473       // Free their children
474       for (int i=0; i<sect->children.length(); i++)
475           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
476       // Free the cookie itself
477       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
478       mCastEntry *oldc= sect->oldc;
479       delete sect;
480       sect = oldc;
481   }
482 }
483
484
485
486
487 void CkMulticastMgr::setup(multicastSetupMsg *msg)
488 {
489     int i,j;
490     mCastEntry *entry;
491     CkArrayID aid = msg->rootSid.get_aid();
492     if (msg->parent.get_pe() == CkMyPe()) 
493       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
494     else 
495       entry = new mCastEntry(aid);
496     entry->aid = aid;
497     entry->pe = CkMyPe();
498     entry->rootSid = msg->rootSid;
499     entry->parentGrp = msg->parent;
500
501     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
502     entry->red.redNo = msg->redNo;
503
504     // Create a numPE sized array of vectors to hold the array elements in each PE
505     int numpes = CkNumPes();
506     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
507     // Sort each array index in the setup message based on last known location
508     for (i=0; i<msg->nIdx; i++) 
509     {
510       int lastKnown = msg->lastKnown[i];
511       // If msg->arrIdx[i] local, add it to a special local element list
512       if (lastKnown == CkMyPe())
513           entry->localElem.insertAtEnd(msg->arrIdx[i]);
514       // else, add it to the list corresponding to its PE
515       else
516           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
517     }
518
519     CkVec<int> mySubTreePEs;
520     mySubTreePEs.reserve(numpes);
521     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
522     mySubTreePEs.push_back(CkMyPe());
523     // Identify the child PEs in the tree, ie the PEs with section members on them
524     for (i=0; i<numpes; i++) 
525     {
526       if (i==CkMyPe()) continue;
527       if (lists[i].size()) 
528           mySubTreePEs.push_back(i);
529     }
530     // The number of multicast children can be limited by the spanning tree factor 
531     int num = mySubTreePEs.size() - 1, numchild = 0;
532     if (factor <= 0) numchild = num;
533     else numchild = num<factor?num:factor;
534   
535     entry->numChild = numchild;
536
537     // If there are any children, go about building a spanning tree
538     if (numchild) 
539     {
540         // Build the next generation of the spanning tree rooted at my PE
541         int *peListPtr = mySubTreePEs.getVec();
542         topo::SpanningTreeVertex *nextGenInfo;
543         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
544         //CkAssert(nextGenInfo->childIndex.size() == numchild);
545         numchild = nextGenInfo->childIndex.size();
546         entry->numChild = numchild;
547
548         // Distribute the section members across the number of direct children (branches)
549         // Direct children are simply the first section member in each of the branch lists
550         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
551
552         // For each direct child, collate indices of all section members on the PEs in that branch
553         for (i=0; i < numchild; i++)
554         {
555             // Determine the indices of the first and last PEs in this branch of my sub-tree
556             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
557             if (i < numchild-1)
558                 childEndIndex = nextGenInfo->childIndex[i+1];
559             else
560                 childEndIndex = mySubTreePEs.size();
561             // For each PE in this branch, add the section members on that PE to a list
562             for (j = childStartIndex; j < childEndIndex; j++)
563             {
564                 int pe = mySubTreePEs[j];
565                 for (int k=0; k<lists[pe].size(); k++)
566                     slots[i].push_back(lists[pe][k]);
567             }
568         }
569
570         // Ask each of your direct children to setup their branches
571         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
572         for (i=0; i<numchild; i++) 
573         {
574             // Give each child info about the number, indices and location of its children
575             int n = slots[i].length();
576             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
577             m->parent = CkSectionInfo(aid, entry);
578             m->nIdx = slots[i].length();
579             m->rootSid = msg->rootSid;
580             m->redNo = msg->redNo;
581             for (j=0; j<slots[i].length(); j++) 
582             {
583                 m->arrIdx[j] = slots[i][j].idx;
584                 m->lastKnown[j] = slots[i][j].pe;
585             }
586             int childroot = slots[i][0].pe;
587             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
588             // Send the message to the child
589             mCastGrp[childroot].setup(m);
590         }
591         delete [] slots;
592         delete nextGenInfo;
593     }
594     // else, tell yourself that your children are ready
595     else 
596     {
597         childrenReady(entry);
598     }
599     delete [] lists;
600     delete msg;
601 }
602
603
604
605
606 void CkMulticastMgr::childrenReady(mCastEntry *entry)
607 {
608     // Mark this entry as ready
609     entry->setReady();
610     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
611     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
612     if (entry->hasParent()) 
613         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
614 #if SPLIT_MULTICAST
615     // clear packet buffer
616     while (!entry->packetBuf.isEmpty()) 
617     {
618         mCastPacket *packet = entry->packetBuf.deq();
619         packet->cookie.get_val() = entry;
620         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->offset, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
621         delete [] packet->data;
622         delete packet;
623     }
624 #endif
625     // clear msg buffer
626     while (!entry->msgBuf.isEmpty()) 
627     {
628         multicastGrpMsg *newmsg = entry->msgBuf.deq();
629         DEBUGF(("[%d] release buffer %p ep:%d\n", CkMyPe(), newmsg, newmsg->ep));
630         newmsg->_cookie.get_val() = entry;
631         mCastGrp[CkMyPe()].recvMsg(newmsg);
632     }
633     // release reduction msgs
634     releaseFutureReduceMsgs(entry);
635 }
636
637
638
639
640 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
641 {
642   mCastEntry *entry = (mCastEntry *)sid.get_val();
643   entry->children.push_back(child);
644   if (entry->children.length() == entry->numChild) {
645     childrenReady(entry);
646   }
647 }
648
649
650
651
652 // rebuild is called when root not migrated
653 // when rebuilding, all multicast msgs will be buffered.
654 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
655 {
656   // tear down old tree
657   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
658   CkAssert(curCookie->pe == CkMyPe());
659   // make sure I am the newest one
660   while (curCookie->newc) curCookie = curCookie->newc;
661   if (curCookie->isObsolete()) return;
662
663   //CmiPrintf("tree rebuild\n");
664   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
665
666   // build a chain
667   newCookie->oldc = curCookie;
668   curCookie->newc = newCookie;
669
670   sectId.get_val() = newCookie;
671
672   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
673
674   curCookie->setObsolete();
675
676   resetCookie(sectId);
677 }
678
679 void CkMulticastMgr::resetCookie(CkSectionInfo s)
680 {
681   mCastEntry *newCookie = (mCastEntry*)s.get_val();
682   mCastEntry *oldCookie = newCookie->oldc;
683
684   // get rid of old one
685   DEBUGF(("reset: oldc: %p\n", oldCookie));
686   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
687   int mype = CkMyPe();
688   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
689
690   // build a new one
691   initCookie(s);
692 }
693
694 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
695 {
696   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
697     // set an invalid cookie since we don't have it
698   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
699   for (int i=0; i< sid._nElems-1; i++) {
700      CProxyElement_ArrayBase ap(a, sid._elems[i]);
701      void *newMsg=CkCopyMsg((void **)&m);
702      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
703   }
704   if (sid._nElems > 0) {
705      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
706      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
707   }
708 }
709
710 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
711 {
712 #if CMK_MESSAGE_LOGGING
713         envelope *env = UsrToEnv(m);
714         env->flags = env->flags | CK_MULTICAST_MSG_MLOG;
715 #endif
716
717     for (int snum = 0; snum < nsid; snum++) {
718         void *msgCopy = m;
719         if (nsid - snum > 1)
720             msgCopy = CkCopyMsg(&m);
721         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
722     }
723 }
724
725
726
727 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
728 {
729   DEBUGF(("ArraySectionSend\n"));
730   multicastGrpMsg *msg = (multicastGrpMsg *)m;
731   msg->ep = ep;
732   CkSectionInfo &s = sid->_cookie;
733   mCastEntry *entry;
734
735   // If this section is rooted at this PE
736   if (s.get_pe() == CkMyPe()) {
737     entry = (mCastEntry *)s.get_val();   
738     if (NULL == entry)
739       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
740
741     // update entry pointer in case there is a newer one.
742     if (entry->newc) {
743       do { entry=entry->newc; } while (entry->newc);
744       s.get_val() = entry;
745     }
746
747 #if CMK_LBDB_ON
748     // fixme: running obj?
749     envelope *env = UsrToEnv(msg);
750     const LDOMHandle &om = CProxy_ArrayBase(s.get_aid()).ckLocMgr()->getOMHandle();
751     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
752 #endif
753
754     // The first time we need to rebuild the spanning tree, we do p2p sends to refresh lastKnown
755     if (entry->needRebuild == 1) {
756       msg->_cookie = s;
757       SimpleSend(ep, msg, s.get_aid(), *sid, opts);
758       entry->needRebuild = 2;
759       return;
760     }
761     // else the second time, we just rebuild cos now we'll have all the lastKnown PEs
762     else if (entry->needRebuild == 2) rebuild(s);
763   }
764   // else, if the root has migrated, we have a sub-optimal mcast
765   else {
766     // fixme - in this case, not recorded in LB
767     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
768   }
769
770   // update cookie
771   msg->_cookie = s;
772
773 #if SPLIT_MULTICAST
774   // split multicast msg into SPLIT_NUM copies
775   register envelope *env = UsrToEnv(m);
776   CkPackMessage(&env);
777   int totalsize = env->getTotalsize();
778   int packetSize = 0;
779   int totalcount = 0;
780   if(totalsize < split_threshold){
781     packetSize = totalsize;
782     totalcount = 1;
783   }else{
784     packetSize = split_size;
785     totalcount = totalsize/split_size;
786     if(totalsize%split_size) totalcount++; 
787     //packetSize = totalsize/SPLIT_NUM;
788     //if (totalsize%SPLIT_NUM) packetSize ++;
789     //totalcount = SPLIT_NUM;
790   }
791   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
792   int sizesofar = 0;
793   char *data = (char*) env;
794   if (totalcount == 1) {
795     // If the root of this section's tree is on this PE, then just propagate msg
796     if (s.get_pe() == CkMyPe()) {
797       CkUnpackMessage(&env);
798       msg = (multicastGrpMsg *)EnvToUsr(env);
799       recvMsg(msg);
800     }
801     // else send msg to root of section's spanning tree
802     else {
803       CProxy_CkMulticastMgr  mCastGrp(thisgroup);
804       msg = (multicastGrpMsg *)EnvToUsr(env);
805       mCastGrp[s.get_pe()].recvMsg(msg);
806     }
807     return;
808   }
809   for (int i=0; i<totalcount; i++) {
810     int mysize = packetSize;
811     if (mysize + sizesofar > totalsize) {
812       mysize = totalsize-sizesofar;
813     }
814     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
815     mCastGrp[s.get_pe()].recvPacket(s, sizesofar, mysize, data, i, totalcount, totalsize, 0);
816     sizesofar += mysize;
817     data += mysize;
818   }
819   CmiFree(env);
820 #else
821   if (s.get_pe() == CkMyPe()) {
822     recvMsg(msg);
823   }
824   else {
825     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
826     mCastGrp[s.get_pe()].recvMsg(msg);
827   }
828 #endif
829 }
830
831 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int offset, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
832 {
833   int i;
834   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
835
836
837   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
838     char *newdata = new char[n];
839     memcpy(newdata, data, n);
840     entry->packetBuf.enq(new mCastPacket(_cookie, offset, n, newdata, seqno, count, totalsize));
841 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
842     return;
843   }
844
845 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
846
847   // send to spanning tree children
848   // can not optimize using list send because the difference in cookie
849   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
850   for (i=0; i<entry->children.length(); i++) {
851     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], offset, n, data, seqno, count, totalsize, 0);
852   }
853
854   if (entry->asm_msg == NULL) {
855     CmiAssert(entry->asm_fill == 0);
856     entry->asm_msg = (char *)CmiAlloc(totalsize);
857   }
858   memcpy(entry->asm_msg+offset, data, n);
859   entry->asm_fill += n;
860   if (entry->asm_fill == totalsize) {
861     CkUnpackMessage((envelope **)&entry->asm_msg);
862     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
863     msg->_cookie = _cookie;
864 //    mCastGrp[CkMyPe()].recvMsg(msg);
865     sendToLocal(msg);
866     entry->asm_msg = NULL;
867     entry->asm_fill = 0;
868   }
869 //  if (fromBuffer) delete [] data;
870 }
871
872 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
873 {
874   int i;
875   CkSectionInfo &sectionInfo = msg->_cookie;
876   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
877   CmiAssert(entry->getAid() == sectionInfo.get_aid());
878
879   if (entry->notReady()) {
880     DEBUGF(("entry not ready, enq buffer %p\n", msg));
881     entry->msgBuf.enq(msg);
882     return;
883   }
884
885   // send to spanning tree children
886   // can not optimize using list send because the difference in cookie
887   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
888   for (i=0; i<entry->children.length(); i++) {
889     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
890     newmsg->_cookie = entry->children[i];
891     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
892   }
893
894   sendToLocal(msg);
895 }
896
897 void CkMulticastMgr::sendToLocal(multicastGrpMsg *msg)
898 {
899   int i;
900   CkSectionInfo &sectionInfo = msg->_cookie;
901   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
902   CmiAssert(entry->getAid() == sectionInfo.get_aid());
903   CkGroupID aid = sectionInfo.get_aid();
904
905   // send to local
906   int nLocal = entry->localElem.length();
907   DEBUGF(("[%d] send to local %d elems\n", CkMyPe(), nLocal));
908   for (i=0; i<nLocal-1; i++) {
909     CProxyElement_ArrayBase ap(aid, entry->localElem[i]);
910     if (_entryTable[msg->ep]->noKeep) {
911       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.get_aid(), entry->localElem[i], CK_MSG_KEEP);
912     }
913     else {
914       // send through scheduler queue
915       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
916       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
917     }
918     // use CK_MSG_DONTFREE so that the message can be reused
919     // the drawback of this scheme bypassing queue is that 
920     // if # of local element is huge, this leads to a long time occupying CPU
921     // also load balancer seems not be able to correctly instrument load
922 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
923     //CmiNetworkProgressAfter(3);
924   }
925   if (nLocal) {
926     CProxyElement_ArrayBase ap(aid, entry->localElem[nLocal-1]);
927     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
928 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
929   }
930   else {
931     CkAssert (entry->rootSid.get_pe() == CkMyPe());
932     delete msg;
933   }
934 }
935
936
937
938 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
939 {
940   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
941   if (CkMcastBaseMsg::checkMagic(m) == 0) {
942     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
943     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
944   }
945   // ignore invalid cookie sent by SimpleSend
946   if (m->gpe() != -1) {
947     id.get_type() = MulticastMsg;
948     id.get_pe() = m->gpe();
949     id.get_val() = m->cookie();
950   }
951   // note: retain old redNo
952 }
953
954 // Reduction
955
956 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
957 {
958   CkCallback *sectionCB;
959   int numSubSections = proxy.ckGetNumSubSections();
960   // If its a cross-array section,
961   if (numSubSections > 1)
962   {
963       /** @warning: Each instantiation is a mem leak! :o
964        * The class is trivially small, but there's one instantiation for each
965        * section delegated to CkMulticast. The objects need to live as long as
966        * their section exists and is used. The idea of 'destroying' an array
967        * section is still academic, and hence no effort has been made to charge
968        * some 'owner' entity with the task of deleting this object.
969        *
970        * Reimplementing delegated x-array reductions will make this consideration moot
971        */
972       // Configure the final cross-section reducer
973       ck::impl::XArraySectionReducer *red =
974           new ck::impl::XArraySectionReducer(numSubSections, cb);
975       // Configure the subsection callback to deposit with the final reducer
976       sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
977   }
978   // else, just direct the reduction to the actual client cb
979   else
980       sectionCB = cb;
981   // Wire the sections together by storing the subsection cb in each sectionID
982   for (int i=0; i<numSubSections; i++)
983   {
984       CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
985       mCastEntry *entry = (mCastEntry *)sInfo.get_val();
986       entry->red.storedCallback = sectionCB;
987   }
988 }
989
990 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
991 {
992   CkSectionInfo &id = proxy.ckGetSectionInfo();
993   mCastEntry *entry = (mCastEntry *)id.get_val();
994   entry->red.storedClient = fn;
995   entry->red.storedClientParam = param;
996 }
997
998 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
999 {
1000   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
1001   msg->reducer = type;
1002   msg->sid = id;
1003   msg->sourceFlag = 1;   // from array element
1004   msg->redNo = id.get_redNo();
1005   msg->gcount = 1;
1006   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
1007   msg->callback = cb;
1008   msg->userFlag=userFlag;
1009   return msg;
1010 }
1011
1012
1013
1014 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
1015 {
1016   CkCallback cb;
1017   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
1018 }
1019
1020
1021 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
1022 {
1023   if (id.get_val() == NULL || id.get_redNo() == -1) 
1024     CmiAbort("contribute: SectionID is not initialized\n");
1025
1026   int nFrags;
1027   if (-1 == fragSize) {         // no frag
1028     nFrags = 1;
1029     fragSize = dataSize;
1030   }
1031   else {
1032     CmiAssert (dataSize >= fragSize);
1033     nFrags = dataSize/fragSize;
1034     if (dataSize%fragSize) nFrags++;
1035   }
1036
1037   if (MAXFRAGS < nFrags) {
1038     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
1039     CmiAbort ("frag size too small\n");
1040   }
1041
1042   int mpe = id.get_pe();
1043   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1044
1045   // break the message into k-piece fragments
1046   int fSize = fragSize;
1047   for (int i=0; i<nFrags; i++) {
1048     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
1049       fSize = dataSize%fragSize;
1050     }
1051
1052     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
1053
1054     // initialize the new msg
1055     msg->reducer            = type;
1056     msg->sid                = id;
1057     msg->nFrags             = nFrags;
1058     msg->fragNo             = i;
1059     msg->sourceFlag         = 1;
1060     msg->redNo              = id.get_redNo();
1061     msg->gcount             = 1;
1062     msg->rebuilt            = (mpe == CkMyPe())?0:1;
1063     msg->callback           = cb;
1064     msg->userFlag           = userFlag;
1065
1066 #if CMK_MESSAGE_LOGGING
1067         envelope *env = UsrToEnv(msg);
1068         env->flags = env->flags | CK_REDUCTION_MSG_MLOG;
1069 #endif
1070
1071     mCastGrp[mpe].recvRedMsg(msg);
1072
1073     data = (void*)(((char*)data) + fSize);
1074   }
1075
1076   id.get_redNo()++;
1077   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
1078 }
1079
1080 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
1081                                               mCastEntry* entry,
1082                                               reductionInfo& redInfo) {
1083   int i;
1084   int dataSize = 0;
1085   int nFrags   = redInfo.msgs[0][0]->nFrags;
1086
1087   // to avoid memcpy and allocation cost for non-pipelined reductions
1088   if (1 == nFrags) {
1089     CkReductionMsg* msg = redInfo.msgs[0][0];
1090
1091     // free up the msg slot
1092     redInfo.msgs[0].length() = 0;
1093
1094     return msg;
1095   }
1096
1097   for (i=0; i<nFrags; i++) {
1098     dataSize += redInfo.msgs[i][0]->dataSize;
1099   }
1100
1101   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1102
1103   // initialize msg header
1104   msg->redNo      = redInfo.msgs[0][0]->redNo;
1105   msg->reducer    = redInfo.msgs[0][0]->reducer;
1106   msg->sid        = id;
1107   msg->nFrags     = nFrags;
1108
1109   // I guess following fields need not be initialized
1110   msg->sourceFlag = 2;
1111   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1112   msg->callback   = redInfo.msgs[0][0]->callback;
1113   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1114
1115   byte* data = (byte*)msg->getData ();
1116   for (i=0; i<nFrags; i++) {
1117     // copy data from fragments to msg
1118     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1119     data += redInfo.msgs[i][0]->dataSize;
1120
1121     // free fragments
1122     delete redInfo.msgs[i][0];
1123     redInfo.msgs[i].length() = 0;    
1124   }
1125
1126   return msg;
1127 }
1128
1129
1130
1131 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1132                                      mCastEntry* entry, reductionInfo& redInfo,
1133                                      int currentTreeUp) {
1134
1135     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1136     reductionMsgs& rmsgs = redInfo.msgs[index];
1137     int dataSize         = rmsgs[0]->dataSize;
1138     int i;
1139     int oldRedNo = redInfo.redNo;
1140     int nFrags   = rmsgs[0]->nFrags;
1141     int fragNo   = rmsgs[0]->fragNo;
1142     int userFlag = rmsgs[0]->userFlag;
1143
1144     // Figure out (from one of the msg fragments) which reducer function to use
1145     CkReduction::reducerType reducer = rmsgs[0]->reducer;
1146     CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1147     CkAssert(NULL != f);
1148
1149     // Check if migration occurred in any of the subtrees, and pick one valid callback
1150     CkCallback msg_cb;
1151     int rebuilt = 0;
1152     for (i=0; i<rmsgs.length(); i++) {
1153         if (rmsgs[i]->rebuilt) rebuilt = 1;
1154         if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1155     }
1156
1157     // Perform the actual reduction
1158     CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
1159     newmsg->redNo  = redInfo.redNo;
1160     newmsg->nFrags = nFrags;
1161     newmsg->fragNo = fragNo;
1162     newmsg->userFlag = userFlag;
1163     newmsg->reducer = reducer;
1164
1165     // Increment the number of fragments processed
1166     redInfo.npProcessed ++;
1167
1168     // Delete all the fragments which are no longer needed
1169     for (i=0; i<rmsgs.length(); i++)
1170         if (rmsgs[i]!=newmsg) delete rmsgs[i];
1171     rmsgs.length() = 0;
1172
1173     // If I am not the tree root
1174     if (entry->hasParent()) {
1175         // send up to parent
1176         newmsg->sid        = entry->parentGrp;
1177         newmsg->sourceFlag = 2;
1178         newmsg->redNo      = oldRedNo; ///< @todo: redundant, duplicate assignment?
1179         newmsg->gcount     = redInfo.gcount [index];
1180         newmsg->rebuilt    = rebuilt;
1181         newmsg->callback   = msg_cb;
1182         DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1183         mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1184     } else {
1185         newmsg->sid = id;
1186         // Buffer the reduced fragment
1187         rmsgs.push_back (newmsg);
1188         // If all the fragments have been reduced
1189         if (redInfo.npProcessed == nFrags) {
1190             // Combine the fragments
1191             newmsg = combineFrags (id, entry, redInfo);
1192             // Set the reference number based on the user flag at the contribute call
1193             CkSetRefNum(newmsg, userFlag);
1194             // Trigger the appropriate reduction client
1195             if ( !msg_cb.isInvalid() )
1196                 msg_cb.send(newmsg);
1197             else if (redInfo.storedCallback != NULL)
1198                 redInfo.storedCallback->send(newmsg);
1199             else if (redInfo.storedClient != NULL) {
1200                 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
1201                 delete newmsg;
1202             }
1203             else
1204                 CmiAbort("Did you forget to register a reduction client?");
1205
1206             DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1207             //
1208             if (currentTreeUp) {
1209                 if (entry->oldc) {
1210                     // free old tree on same processor;
1211                     mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1212                     entry->oldc = NULL;
1213                 }
1214                 if (entry->hasOldtree()) {
1215                     // free old tree on old processor
1216                     int oldpe = entry->oldtree.pe;
1217                     mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1218                     entry->oldtree.clear();
1219                 }
1220             }
1221             // Indicate if a tree rebuild is required
1222             if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1223         }
1224     }
1225 }
1226
1227
1228
1229 /**
1230  * Called from:
1231  *   - contribute(): calls PE specified in the cookie
1232  *   - reduceFragment(): calls parent PE
1233  *   - recvRedMsg(): calls root PE (if tree is obsolete)
1234  *   - releaseFutureRedMsgs: calls this PE
1235  *   - releaseBufferedRedMsgs: calls root PE
1236  */
1237 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1238 {
1239     int i;
1240     /// Grab the section info embedded in the redn msg
1241     CkSectionInfo id = msg->sid;
1242     /// ... and get at the ptr which shows me which cookie to use
1243     mCastEntry *entry = (mCastEntry *)id.get_val();
1244     CmiAssert(entry!=NULL);
1245
1246     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1247
1248     int updateReduceNo = 0;
1249
1250     //-------------------------------------------------------------------------
1251     /// If this cookie is obsolete
1252     if (entry->isObsolete()) {
1253         // Send up to root
1254         DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.get_pe()));
1255
1256         /// If I am the root, traverse the linked list of cookies to get the latest
1257         if (!entry->hasParent()) {
1258             mCastEntry *newentry = entry->newc;
1259             while (newentry && newentry->newc) newentry=newentry->newc;
1260             if (newentry) entry = newentry;
1261             CmiAssert(entry!=NULL);
1262         }
1263
1264         ///
1265         if (!entry->hasParent() && !entry->isObsolete()) {
1266             /// Indicate it is not on old spanning tree
1267             msg->sourceFlag = 0;
1268             /// Flag the redn as coming from an old tree and that the new entry cookie needs to know the new redn num.
1269             updateReduceNo  = 1;
1270         }
1271         /// If I am not the root or this latest cookie is also obsolete
1272         else {
1273             // Ensure that you're here with reason
1274             CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1275             // Edit the msg so that the recipient knows where to find its cookie
1276             msg->sid = entry->rootSid;
1277             msg->sourceFlag = 0;
1278             // Send the msg directly to the root of the redn tree
1279             mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1280             return;
1281         }
1282     }
1283
1284     /// Grab the locally stored redn info
1285     reductionInfo &redInfo = entry->red;
1286
1287     //-------------------------------------------------------------------------
1288     /// If you've received a msg from a previous redn, something has gone horribly wrong somewhere!
1289     if (msg->redNo < redInfo.redNo) {
1290         CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1291         CmiAbort("Could never happen! \n");
1292     }
1293
1294     //-------------------------------------------------------------------------
1295     /// If the current tree is not yet ready or if you've received a msg for a future redn, buffer the msg
1296     if (entry->notReady() || msg->redNo > redInfo.redNo) {
1297         DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1298         redInfo.futureMsgs.push_back(msg);
1299         return;
1300     }
1301
1302     //-------------------------------------------------------------------------
1303     const int index = msg->fragNo;
1304     // New contribution from an ArrayElement
1305     if (msg->sourceFlag == 1) {
1306         redInfo.lcount [index] ++;
1307     }
1308     // Redn from a child
1309     if (msg->sourceFlag == 2) {
1310         redInfo.ccount [index] ++;
1311     }
1312     // Total elems that have contributed the indexth fragment
1313     redInfo.gcount [index] += msg->gcount;
1314
1315     // Check if message is of proper size
1316     if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
1317     CmiAbort("Reduction data are not of same length!");
1318
1319     //-------------------------------------------------------------------------
1320     // Buffer the msg
1321     redInfo.msgs [index].push_back(msg);
1322
1323     //-------------------------------------------------------------------------
1324     /// Flag if this fragment can be reduced (if all local elements and children have contributed this fragment)
1325     int currentTreeUp = 0;
1326     if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
1327         currentTreeUp = 1;
1328
1329     /// Flag (only at the redn root) if all array elements contributed all their fragments
1330     int mixTreeUp = 0;
1331     if (!entry->hasParent()) {
1332         mixTreeUp = 1;
1333         for (int i=0; i<msg->nFrags; i++)
1334             if (entry->allElem.length() != redInfo.gcount [i])
1335                 mixTreeUp = 0;
1336     }
1337
1338     //-------------------------------------------------------------------------
1339     /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements
1340     if (currentTreeUp || mixTreeUp)
1341     {
1342         const int nFrags = msg->nFrags;
1343         /// Reduce this fragment
1344         reduceFragment (index, id, entry, redInfo, currentTreeUp);
1345
1346         // If migration happened, and my sub-tree reconstructed itself,
1347         // share the current reduction number with myself and all my children
1348         if (updateReduceNo)
1349             mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1350
1351         /// If all the fragments for the current reduction have been processed
1352         if (redInfo.npProcessed == nFrags) {
1353
1354             /// Increment the reduction number in all of this section's cookies
1355             entry->incReduceNo();
1356
1357             /// Reset bookkeeping counters
1358             for (i=0; i<nFrags; i++) {
1359                 redInfo.lcount [i] = 0;
1360                 redInfo.ccount [i] = 0;
1361                 redInfo.gcount [i] = 0;
1362             }
1363             redInfo.npProcessed = 0;
1364             /// Now that, the current redn is done, release any pending msgs from future redns
1365             releaseFutureReduceMsgs(entry);
1366         }
1367     }
1368 }
1369
1370
1371
1372 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1373 {
1374   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1375
1376   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1377     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1378     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1379   }
1380   entry->red.futureMsgs.length() = 0;
1381 }
1382
1383
1384
1385 // these messages have to be sent to root
1386 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1387 {
1388   int i;
1389   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1390
1391   for (int j=0; j<MAXFRAGS; j++) {
1392     for (i=0; i<entry->red.msgs[j].length(); i++) {
1393       CkReductionMsg *msg = entry->red.msgs[j][i];
1394       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1395       msg->sid = entry->rootSid;
1396       msg->sourceFlag = 0;
1397       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1398     }
1399     entry->red.msgs[j].length() = 0;
1400   }
1401
1402
1403   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1404     CkReductionMsg *msg = entry->red.futureMsgs[i];
1405     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1406     msg->sid = entry->rootSid;
1407     msg->sourceFlag = 0;
1408     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1409   }
1410   entry->red.futureMsgs.length() = 0;
1411 }
1412
1413
1414
1415 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1416 {
1417   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1418   if (entry->red.redNo < red)
1419     entry->red.redNo = red;
1420
1421   CProxy_CkMulticastMgr mp(thisgroup);
1422   for (int i=0; i<entry->children.length(); i++) {
1423     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1424   }
1425
1426   releaseFutureReduceMsgs(entry);
1427 }
1428
1429 #include "CkMulticast.def.h"
1430