ckmulticast: Simplify reduceFragment() signature
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19 #include "XArraySectionReducer.h"
20
21 #define DEBUGF(x)  // CkPrintf x;
22
23 // turn on or off fragmentation in multicast
24 #define SPLIT_MULTICAST  0 
25 // each multicast message is split into SPLIT_NUM fragments
26 #define SPLIT_NUM 20
27 #define SPLIT_SIZE (250000)
28
29 #define SPLIT_THRESHOLD (1000000)
30 // maximum number of fragments into which a message can be broken
31 #define MAXFRAGS 20
32
33 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
34 typedef CkVec<CkArrayIndexMax>   arrayIndexList;
35 typedef CkVec<CkSectionInfo>     sectionIdList;
36 typedef CkVec<CkReductionMsg *>  reductionMsgs;
37 typedef CkQ<int>                 PieceSize;
38 typedef CkVec<LDObjid>          ObjKeyList;
39 typedef unsigned char            byte;
40
41 /** Information about the status of reductions proceeding along a given section
42  *
43  * An instance of this class is stored in every mCastEntry object making it possible
44  * to track redn operations on a per section basis all along the spanning tree.
45  */
46 class reductionInfo {
47     public:
48         /// Number of local array elements which have contributed a given fragment
49         int lcount [MAXFRAGS];
50         /// Number of child vertices (NOT array elements) that have contributed a given fragment
51         int ccount [MAXFRAGS];
52         /// The total number of array elements that have contributed so far to a given fragment
53         int gcount [MAXFRAGS];
54         /// The number of fragments processed so far
55         int npProcessed;
56         /// User callback
57         CkCallback *storedCallback;
58         /// User reduction client function
59         redClientFn storedClient;
60         /// User provided data for the redn client function
61         void *storedClientParam;
62         /// Reduction sequence number
63         int redNo;
64         /// Messages for this reduction
65         reductionMsgs  msgs [MAXFRAGS];
66         /// Messages of future reductions
67         reductionMsgs futureMsgs;
68
69     public:
70         reductionInfo(): storedCallback(NULL),
71                          storedClientParam(NULL),
72                          redNo(0),
73                          npProcessed(0) {
74             for (int i=0; i<MAXFRAGS; i++)
75                 lcount [i] = ccount [i] = gcount [i] = 0;
76         }
77 };
78
79 /// cookie status
80 #define COOKIE_NOTREADY 0
81 #define COOKIE_READY    1
82 #define COOKIE_OLD      2
83
84 class mCastPacket {
85 public:
86   CkSectionInfo cookie;
87   int n;
88   char *data;
89   int seqno;
90   int count;
91   int totalsize;
92
93   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
94                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
95 };
96
97 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
98
99 class SectionLocation {
100 public:
101   mCastEntry *entry;
102   int         pe;
103 public:
104   SectionLocation(): entry(NULL), pe(-1) {}
105   SectionLocation( mCastEntry *e, int p) { set(e, p); }
106   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
107   inline void clear() { entry = NULL; pe = -1; }
108 };
109
110
111
112
113 /// Cookie for an array section 
114 class mCastEntry 
115 {
116     public:
117         /// Array ID 
118         CkArrayID     aid;
119         /// Spanning tree parent
120         CkSectionInfo parentGrp;
121         /// List of direct children
122         sectionIdList children;
123         /// Number of direct children
124         int numChild;
125         /// List of all tree member array indices (Only useful on the tree root)
126         arrayIndexList allElem;
127         /// Only useful on root for LB
128         ObjKeyList     allObjKeys;
129         /// List of array elements on local PE
130         arrayIndexList localElem;
131         /// Should always be myPE
132         int pe;
133         /// Section ID of the root
134         CkSectionInfo rootSid;
135         multicastGrpMsgBuf msgBuf;
136         /// Buffer storing the pending packets
137         multicastGrpPacketBuf packetBuf;
138         /// For multicast packetization
139         char *asm_msg;
140         int   asm_fill;
141         /// Linked list of entries on the same processor
142         mCastEntry *oldc, *newc;
143         /// Old spanning tree
144         SectionLocation   oldtree;
145         // for reduction
146         reductionInfo red;
147         //
148         char needRebuild;
149     private:
150         char flag;
151     public:
152         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
153                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
154         mCastEntry(mCastEntry *);
155         /// Check if this tree is only a branch and has a parent
156         inline int hasParent() { return parentGrp.get_val()?1:0; }
157         /// Is this tree obsolete?
158         inline int isObsolete() { return (flag == COOKIE_OLD); }
159         /// Make the current tree obsolete
160         inline void setObsolete() { flag=COOKIE_OLD; }
161         /// Check if this (branch of the) tree is ready for use
162         inline int notReady() { return (flag == COOKIE_NOTREADY); }
163         /// Mark this (branch of the) tree as ready for use
164         inline void setReady() { flag=COOKIE_READY; }
165         /// Increment the reduction number across the whole linked list of cookies
166         inline void incReduceNo() {
167             red.redNo ++;
168             for (mCastEntry *next = newc; next; next=next->newc) 
169                 next->red.redNo++;
170         }
171         /// Get a handle on the array ID this tree is a member of
172         inline CkArrayID getAid() { return aid; }
173         inline int hasOldtree() { return oldtree.entry != NULL; }
174         inline void print() {
175             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
176         }
177 };
178
179
180
181
182 class cookieMsg: public CMessage_cookieMsg {
183 public:
184   CkSectionInfo cookie;
185 public:
186   cookieMsg() {};
187   cookieMsg(CkSectionInfo m): cookie(m) {};
188 };
189
190
191
192
193 /// multicast tree setup message
194 class multicastSetupMsg: public CMessage_multicastSetupMsg {
195 public:
196   int  nIdx;
197   CkArrayIndexMax *arrIdx;
198   int      *lastKnown;
199   CkSectionInfo parent;
200   CkSectionInfo rootSid;
201   int redNo;
202 };
203
204
205
206
207 /// message send in spanning tree
208 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
209 };
210
211
212 extern void CkPackMessage(envelope **pEnv);
213 extern void CkUnpackMessage(envelope **pEnv);
214
215
216
217 void _ckMulticastInit(void)
218 {
219 /*
220   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
221   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
222 */
223 }
224
225
226 mCastEntry::mCastEntry (mCastEntry *old): 
227   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
228 {
229   int i;
230   aid = old->aid;
231   parentGrp = old->parentGrp;
232   for (i=0; i<old->allElem.length(); i++)
233     allElem.push_back(old->allElem[i]);
234 #if CMK_LBDB_ON
235   CmiAssert(old->allElem.length() == old->allObjKeys.length());
236   for (i=0; i<old->allObjKeys.length(); i++)
237     allObjKeys.push_back(old->allObjKeys[i]);
238 #endif
239   pe = old->pe;
240   red.storedCallback = old->red.storedCallback;
241   red.storedClient = old->red.storedClient;
242   red.storedClientParam = old->red.storedClientParam;
243   red.redNo = old->red.redNo;
244   needRebuild = 0;
245   asm_msg = NULL;
246   asm_fill = 0;
247 }
248
249 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
250
251
252
253
254 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndexMax *al, int n)
255 {
256     // Create a multicast entry
257     mCastEntry *entry = new mCastEntry(aid);
258     // Push all the section member indices into the entry
259     for (int i=0; i<n; i++) {
260         entry->allElem.push_back(al[i]);
261 #if CMK_LBDB_ON
262         const LDObjid key = idx2LDObjid(al[i]);
263         entry->allObjKeys.push_back(key);
264 #endif
265     }
266     //  entry->aid = aid;
267     _id.aid = aid;
268     _id.get_val() = entry;              // allocate table for this section
269     // 
270     initCookie(_id);
271 }
272
273
274
275
276 void CkMulticastMgr::setSection(CkSectionInfo &id)
277 {
278   initCookie(id);
279 }
280
281
282
283
284 /// @warning: This is deprecated
285 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
286 {
287   CkArrayID aid = proxy.ckGetArrayID();
288   CkSectionInfo &_id = proxy.ckGetSectionInfo();
289
290   mCastEntry *entry = new mCastEntry(aid);
291
292   const CkArrayIndexMax *al = proxy.ckGetArrayElements();
293   for (int i=0; i<proxy.ckGetNumElements(); i++) {
294     entry->allElem.push_back(al[i]);
295 #if CMK_LBDB_ON
296     const LDObjid key = idx2LDObjid(al[i]);
297     entry->allObjKeys.push_back(key);
298 #endif
299   }
300   _id.type = MulticastMsg;
301   _id.aid = aid;
302   _id.get_val() = entry;                // allocate table for this section
303   initCookie(_id);
304 }
305
306
307
308
309 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
310 {
311   CkSectionInfo &info = proxy.ckGetSectionInfo();
312
313   int oldpe = info.get_pe();
314   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
315
316   CkArrayID aid = proxy.ckGetArrayID();
317   CkSectionID *sid = proxy.ckGetSectionIDs();
318   mCastEntry *entry = new mCastEntry(aid);
319
320   mCastEntry *oldentry = (mCastEntry *)info.get_val();
321   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
322
323   const CkArrayIndexMax *al = sid->_elems;
324   CmiAssert(info.aid == aid);
325   prepareCookie(entry, *sid, al, sid->_nElems, aid);
326
327   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
328
329     // store old tree info
330   entry->oldtree.set(oldentry, oldpe);
331
332     // obsolete old tree
333   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
334
335   // find reduction number
336   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
337 }
338
339
340
341
342 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
343 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndexMax *al, int count, CkArrayID aid)
344 {
345   for (int i=0; i<count; i++) {
346     entry->allElem.push_back(al[i]);
347 #if CMK_LBDB_ON
348     const LDObjid key = idx2LDObjid(al[i]);
349     entry->allObjKeys.push_back(key);
350 #endif
351   }
352   sid._cookie.type = MulticastMsg;
353   sid._cookie.aid = aid;
354   sid._cookie.get_val() = entry;        // allocate table for this section
355   sid._cookie.get_pe() = CkMyPe();
356 }
357
358
359
360
361 // this is used
362 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
363 {
364   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
365   int numSubSections = proxy->ckGetNumSubSections();
366   for (int i=0; i<numSubSections; i++)
367   {
368       CkArrayID aid = proxy->ckGetArrayIDn(i);
369       mCastEntry *entry = new mCastEntry(aid);
370       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
371       const CkArrayIndexMax *al = proxy->ckGetArrayElements(i);
372       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
373       initCookie(sid->_cookie);
374   }
375 }
376
377
378
379
380 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
381 {
382   mCastEntry *entry = (mCastEntry *)s.get_val();
383   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
384   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
385 }
386
387 // now that we get reduction number from the old cookie,
388 // we continue to build the spanning tree
389 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
390 {
391   mCastEntry *entry = (mCastEntry *)s.get_val();
392   entry->red.redNo = red;
393
394   initCookie(s);
395
396   // TODO delete old tree
397 }
398
399
400
401
402 void CkMulticastMgr::initCookie(CkSectionInfo s)
403 {
404     mCastEntry *entry = (mCastEntry *)s.get_val();
405     int n = entry->allElem.length();
406     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
407     // Create and initialize a setup message
408     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
409     msg->nIdx = n;
410     msg->parent = CkSectionInfo(entry->getAid());
411     msg->rootSid = s;
412     msg->redNo = entry->red.redNo;
413     // Fill the message with the section member indices and their last known locations
414     CkArray *array = CProxy_ArrayBase(s.aid).ckLocalBranch();
415     for (int i=0; i<n; i++) {
416       msg->arrIdx[i] = entry->allElem[i];
417       int ape = array->lastKnown(entry->allElem[i]);
418       CmiAssert(ape >=0 && ape < CkNumPes());
419       msg->lastKnown[i] = ape;
420     }
421     // Trigger the spanning tree build
422     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
423     mCastGrp[CkMyPe()].setup(msg);
424 }
425
426
427
428
429 void CkMulticastMgr::teardown(CkSectionInfo cookie)
430 {
431     int i;
432     mCastEntry *sect = (mCastEntry *)cookie.get_val();
433     // Mark this section as obsolete
434     sect->setObsolete();
435     // Release the buffered messages 
436     releaseBufferedReduceMsgs(sect);
437     // Propagate the teardown to each of your children
438     CProxy_CkMulticastMgr mp(thisgroup);
439     for (i=0; i<sect->children.length(); i++)
440         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
441 }
442
443
444
445
446 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
447 {
448     int i;
449     mCastEntry *sect = (mCastEntry *)cookie.get_val();
450     // Reset the root section info
451     sect->rootSid = newroot;
452     // Mark this section as obsolete
453     sect->setObsolete();
454     // Release the buffered messages 
455     releaseBufferedReduceMsgs(sect);
456     // Propagate the teardown to each of your children
457     CProxy_CkMulticastMgr mp(thisgroup);
458     for (i=0; i<sect->children.length(); i++)
459         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
460 }
461
462
463
464
465 void CkMulticastMgr::freeup(CkSectionInfo cookie)
466 {
467   mCastEntry *sect = (mCastEntry *)cookie.get_val();
468   CProxy_CkMulticastMgr mp(thisgroup);
469   // Parse through all the section members on this PE and...
470   while (sect) 
471   {
472       // Free their children
473       for (int i=0; i<sect->children.length(); i++)
474           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
475       // Free the cookie itself
476       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
477       mCastEntry *oldc= sect->oldc;
478       delete sect;
479       sect = oldc;
480   }
481 }
482
483
484
485
486 void CkMulticastMgr::setup(multicastSetupMsg *msg)
487 {
488     int i,j;
489     mCastEntry *entry;
490     CkArrayID aid = msg->rootSid.aid;
491     if (msg->parent.get_pe() == CkMyPe()) 
492       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
493     else 
494       entry = new mCastEntry(aid);
495     entry->aid = aid;
496     entry->pe = CkMyPe();
497     entry->rootSid = msg->rootSid;
498     entry->parentGrp = msg->parent;
499
500     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
501     entry->red.redNo = msg->redNo;
502
503     // Create a numPE sized array of vectors to hold the array elements in each PE
504     int numpes = CkNumPes();
505     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
506     // Sort each array index in the setup message based on last known location
507     for (i=0; i<msg->nIdx; i++) 
508     {
509       int lastKnown = msg->lastKnown[i];
510       // If msg->arrIdx[i] local, add it to a special local element list
511       if (lastKnown == CkMyPe())
512           entry->localElem.insertAtEnd(msg->arrIdx[i]);
513       // else, add it to the list corresponding to its PE
514       else
515           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
516     }
517
518     CkVec<int> mySubTreePEs;
519     mySubTreePEs.reserve(numpes);
520     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
521     mySubTreePEs.push_back(CkMyPe());
522     // Identify the child PEs in the tree, ie the PEs with section members on them
523     for (i=0; i<numpes; i++) 
524     {
525       if (i==CkMyPe()) continue;
526       if (lists[i].size()) 
527           mySubTreePEs.push_back(i);
528     }
529     // The number of multicast children can be limited by the spanning tree factor 
530     int num = mySubTreePEs.size() - 1, numchild = 0;
531     if (factor <= 0) numchild = num;
532     else numchild = num<factor?num:factor;
533   
534     entry->numChild = numchild;
535
536     // If there are any children, go about building a spanning tree
537     if (numchild) 
538     {
539         // Build the next generation of the spanning tree rooted at my PE
540         int *peListPtr = mySubTreePEs.getVec();
541         topo::SpanningTreeVertex *nextGenInfo;
542         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
543         //CkAssert(nextGenInfo->childIndex.size() == numchild);
544         numchild = nextGenInfo->childIndex.size();
545         entry->numChild = numchild;
546
547         // Distribute the section members across the number of direct children (branches)
548         // Direct children are simply the first section member in each of the branch lists
549         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
550
551         // For each direct child, collate indices of all section members on the PEs in that branch
552         for (i=0; i < numchild; i++)
553         {
554             // Determine the indices of the first and last PEs in this branch of my sub-tree
555             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
556             if (i < numchild-1)
557                 childEndIndex = nextGenInfo->childIndex[i+1];
558             else
559                 childEndIndex = mySubTreePEs.size();
560             // For each PE in this branch, add the section members on that PE to a list
561             for (j = childStartIndex; j < childEndIndex; j++)
562             {
563                 int pe = mySubTreePEs[j];
564                 for (int k=0; k<lists[pe].size(); k++)
565                     slots[i].push_back(lists[pe][k]);
566             }
567         }
568
569         // Ask each of your direct children to setup their branches
570         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
571         for (i=0; i<numchild; i++) 
572         {
573             // Give each child info about the number, indices and location of its children
574             int n = slots[i].length();
575             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
576             m->parent = CkSectionInfo(aid, entry);
577             m->nIdx = slots[i].length();
578             m->rootSid = msg->rootSid;
579             m->redNo = msg->redNo;
580             for (j=0; j<slots[i].length(); j++) 
581             {
582                 m->arrIdx[j] = slots[i][j].idx;
583                 m->lastKnown[j] = slots[i][j].pe;
584             }
585             int childroot = slots[i][0].pe;
586             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
587             // Send the message to the child
588             mCastGrp[childroot].setup(m);
589         }
590         delete [] slots;
591         delete nextGenInfo;
592     }
593     // else, tell yourself that your children are ready
594     else 
595     {
596         childrenReady(entry);
597     }
598     delete [] lists;
599     delete msg;
600 }
601
602
603
604
605 void CkMulticastMgr::childrenReady(mCastEntry *entry)
606 {
607     // Mark this entry as ready
608     entry->setReady();
609     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
610     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
611     if (entry->hasParent()) 
612         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
613 #if SPLIT_MULTICAST
614     // clear packet buffer
615     while (!entry->packetBuf.isEmpty()) 
616     {
617         mCastPacket *packet = entry->packetBuf.deq();
618         packet->cookie.get_val() = entry;
619         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
620         delete [] packet->data;
621         delete packet;
622     }
623 #else
624     // clear msg buffer
625     while (!entry->msgBuf.isEmpty()) 
626     {
627         multicastGrpMsg *newmsg = entry->msgBuf.deq();
628         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
629         newmsg->_cookie.get_val() = entry;
630         mCastGrp[CkMyPe()].recvMsg(newmsg);
631     }
632 #endif
633     // release reduction msgs
634     releaseFutureReduceMsgs(entry);
635 }
636
637
638
639
640 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
641 {
642   mCastEntry *entry = (mCastEntry *)sid.get_val();
643   entry->children.push_back(child);
644   if (entry->children.length() == entry->numChild) {
645     childrenReady(entry);
646   }
647 }
648
649
650
651
652 // rebuild is called when root not migrated
653 // when rebuilding, all multicast msgs will be buffered.
654 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
655 {
656   // tear down old tree
657   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
658   CkAssert(curCookie->pe == CkMyPe());
659   // make sure I am the newest one
660   while (curCookie->newc) curCookie = curCookie->newc;
661   if (curCookie->isObsolete()) return;
662
663   //CmiPrintf("tree rebuild\n");
664   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
665
666   // build a chain
667   newCookie->oldc = curCookie;
668   curCookie->newc = newCookie;
669
670   sectId.get_val() = newCookie;
671
672   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
673
674   curCookie->setObsolete();
675
676   resetCookie(sectId);
677 }
678
679 void CkMulticastMgr::resetCookie(CkSectionInfo s)
680 {
681   mCastEntry *newCookie = (mCastEntry*)s.get_val();
682   mCastEntry *oldCookie = newCookie->oldc;
683
684   // get rid of old one
685   DEBUGF(("reset: oldc: %p\n", oldCookie));
686   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
687   int mype = CkMyPe();
688   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
689
690   // build a new one
691   initCookie(s);
692 }
693
694 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
695 {
696   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
697     // set an invalid cookie since we don't have it
698   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
699   for (int i=0; i< sid._nElems-1; i++) {
700      CProxyElement_ArrayBase ap(a, sid._elems[i]);
701      void *newMsg=CkCopyMsg((void **)&m);
702      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
703   }
704   if (sid._nElems > 0) {
705      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
706      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
707   }
708 }
709
710 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
711 {
712     for (int snum = 0; snum < nsid; snum++) {
713         void *msgCopy = m;
714         if (nsid - snum > 1)
715             msgCopy = CkCopyMsg(&m);
716         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
717     }
718 }
719
720
721
722 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
723 {
724             DEBUGF(("ArraySectionSend\n"));
725
726   multicastGrpMsg *msg = (multicastGrpMsg *)m;
727 //  msg->aid = a;
728   msg->ep = ep;
729
730   CkSectionInfo &s = sid->_cookie;
731
732   mCastEntry *entry;
733   if (s.get_pe() == CkMyPe()) {
734     entry = (mCastEntry *)s.get_val();   
735     if (entry == NULL) {
736       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
737     }
738
739     // update entry pointer in case there is a newer one.
740     if (entry->newc) {
741       do { entry=entry->newc; } while (entry->newc);
742       s.get_val() = entry;
743     }
744
745 #if CMK_LBDB_ON
746     // fixme: running obj?
747     envelope *env = UsrToEnv(msg);
748     const LDOMHandle &om = CProxy_ArrayBase(s.aid).ckLocMgr()->getOMHandle();
749     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
750 #endif
751
752     // first time need to rebuild, we do simple send to refresh lastKnown
753     if (entry->needRebuild == 1) {
754       msg->_cookie = s;
755       SimpleSend(ep, msg, s.aid, *sid, opts);
756       entry->needRebuild = 2;
757       return;
758     }
759     else if (entry->needRebuild == 2) rebuild(s);
760   }
761   else {
762     // fixme - in this case, not recorded in LB
763     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
764   }
765
766   // don't need packing here
767 /*
768   register envelope *env = UsrToEnv(m);
769   CkPackMessage(&env);
770   m = EnvToUsr(env);
771 */
772
773   // update cookie
774   msg->_cookie = s;
775
776 #if SPLIT_MULTICAST
777   // split multicast msg into SPLIT_NUM copies
778   register envelope *env = UsrToEnv(m);
779   CkPackMessage(&env);
780   int totalsize = env->getTotalsize();
781   int packetSize = 0;
782   int totalcount = 0;
783   if(totalsize < SPLIT_THRESHOLD){
784     packetSize = totalsize;
785     totalcount = 1;
786   }else{
787     packetSize = SPLIT_SIZE;
788     totalcount = totalsize/SPLIT_SIZE;
789     if(totalcount%SPLIT_SIZE) totalcount++; 
790     //packetSize = totalsize/SPLIT_NUM;
791     //if (totalsize%SPLIT_NUM) packetSize ++;
792     //totalcount = SPLIT_NUM;
793   }
794   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
795   int sizesofar = 0;
796   char *data = (char*) env;
797   for (int i=0; i<totalcount; i++) {
798     int mysize = packetSize;
799     if (mysize + sizesofar > totalsize) {
800       mysize = totalsize-sizesofar;
801     }
802     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
803     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
804     sizesofar += mysize;
805     data += mysize;
806   }
807   CmiFree(env);
808 #else
809   if (s.get_pe() == CkMyPe()) {
810     recvMsg(msg);
811   }
812   else {
813     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
814     mCastGrp[s.get_pe()].recvMsg(msg);
815   }
816 #endif
817 }
818
819 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
820 {
821   int i;
822   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
823
824
825   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
826     char *newdata = new char[n];
827     memcpy(newdata, data, n);
828     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
829 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
830     return;
831   }
832
833 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
834
835   // send to spanning tree children
836   // can not optimize using list send because the difference in cookie
837   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
838   for (i=0; i<entry->children.length(); i++) {
839     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
840   }
841
842   if (seqno == 0) {
843     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
844       entry->print();
845       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
846     }
847     entry->asm_msg = (char *)CmiAlloc(totalsize);
848   }
849   memcpy(entry->asm_msg+entry->asm_fill, data, n);
850   entry->asm_fill += n;
851   if (seqno + 1 == count) {
852     CmiAssert(entry->asm_fill == totalsize);
853     CkUnpackMessage((envelope **)&entry->asm_msg);
854     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
855     msg->_cookie = _cookie;
856 //    mCastGrp[CkMyPe()].recvMsg(msg);
857     recvMsg(msg);
858     entry->asm_msg = NULL;
859     entry->asm_fill = 0;
860   }
861 //  if (fromBuffer) delete [] data;
862 }
863
864 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
865 {
866   int i;
867   CkSectionInfo &sectionInfo = msg->_cookie;
868   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
869   CmiAssert(entry->getAid() == sectionInfo.aid);
870
871 #if ! SPLIT_MULTICAST
872   if (entry->notReady()) {
873     DEBUGF(("entry not ready, enq buffer %p\n", msg));
874     entry->msgBuf.enq(msg);
875     return;
876   }
877
878   // send to spanning tree children
879   // can not optimize using list send because the difference in cookie
880   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
881   for (i=0; i<entry->children.length(); i++) {
882     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
883     newmsg->_cookie = entry->children[i];
884     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
885   }
886 #endif
887
888   // send to local
889   int nLocal = entry->localElem.length();
890   DEBUGF(("send to local %d\n", nLocal));
891   for (i=0; i<nLocal-1; i++) {
892     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[i]);
893     if (_entryTable[msg->ep]->noKeep) {
894       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.aid, entry->localElem[i], CK_MSG_KEEP);
895     }
896     else {
897       // send through scheduler queue
898       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
899       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
900     }
901     // use CK_MSG_DONTFREE so that the message can be reused
902     // the drawback of this scheme bypassing queue is that 
903     // if # of local element is huge, this leads to a long time occupying CPU
904     // also load balancer seems not be able to correctly instrument load
905 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
906     //CmiNetworkProgressAfter(3);
907   }
908   if (nLocal) {
909     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[nLocal-1]);
910     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
911 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
912   }
913   else {
914     CkAssert (entry->rootSid.get_pe() == CkMyPe());
915     delete msg;
916   }
917 }
918
919
920
921 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
922 {
923   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
924   if (CkMcastBaseMsg::checkMagic(m) == 0) {
925     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
926     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
927   }
928   // ignore invalid cookie sent by SimpleSend
929   if (m->gpe() != -1) {
930     id.type = MulticastMsg;
931     id.get_pe() = m->gpe();
932     id.get_val() = m->cookie();
933   }
934   // note: retain old redNo
935 }
936
937 // Reduction
938
939 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
940 {
941   CkCallback *sectionCB;
942   int numSubSections = proxy.ckGetNumSubSections();
943   // If its a cross-array section,
944   if (numSubSections > 1)
945   {
946       /** @warning: Each instantiation is a mem leak! :o
947        * The class is trivially small, but there's one instantiation for each
948        * section delegated to CkMulticast. The objects need to live as long as
949        * their section exists and is used. The idea of 'destroying' an array
950        * section is still academic, and hence no effort has been made to charge
951        * some 'owner' entity with the task of deleting this object.
952        *
953        * Reimplementing delegated x-array reductions will make this consideration moot
954        */
955       // Configure the final cross-section reducer
956       ck::impl::XArraySectionReducer *red =
957           new ck::impl::XArraySectionReducer(numSubSections, cb);
958       // Configure the subsection callback to deposit with the final reducer
959       sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
960   }
961   // else, just direct the reduction to the actual client cb
962   else
963       sectionCB = cb;
964   // Wire the sections together by storing the subsection cb in each sectionID
965   for (int i=0; i<numSubSections; i++)
966   {
967       CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
968       mCastEntry *entry = (mCastEntry *)sInfo.get_val();
969       entry->red.storedCallback = sectionCB;
970   }
971 }
972
973 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
974 {
975   CkSectionInfo &id = proxy.ckGetSectionInfo();
976   mCastEntry *entry = (mCastEntry *)id.get_val();
977   entry->red.storedClient = fn;
978   entry->red.storedClientParam = param;
979 }
980
981 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
982 {
983   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
984   msg->reducer = type;
985   msg->sid = id;
986   msg->sourceFlag = 1;   // from array element
987   msg->redNo = id.get_redNo();
988   msg->gcount = 1;
989   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
990   msg->callback = cb;
991   msg->userFlag=userFlag;
992   return msg;
993 }
994
995
996
997 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
998 {
999   CkCallback cb;
1000   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
1001 }
1002
1003
1004 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
1005 {
1006   if (id.get_val() == NULL || id.get_redNo() == -1) 
1007     CmiAbort("contribute: SectionID is not initialized\n");
1008
1009   int nFrags;
1010   if (-1 == fragSize) {         // no frag
1011     nFrags = 1;
1012     fragSize = dataSize;
1013   }
1014   else {
1015     CmiAssert (dataSize >= fragSize);
1016     nFrags = dataSize/fragSize;
1017     if (dataSize%fragSize) nFrags++;
1018   }
1019
1020   if (MAXFRAGS < nFrags) {
1021     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
1022     CmiAbort ("frag size too small\n");
1023   }
1024
1025   int mpe = id.get_pe();
1026   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1027
1028   // break the message into k-piece fragments
1029   int fSize = fragSize;
1030   for (int i=0; i<nFrags; i++) {
1031     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
1032       fSize = dataSize%fragSize;
1033     }
1034
1035     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
1036
1037     // initialize the new msg
1038     msg->reducer            = type;
1039     msg->sid                = id;
1040     msg->nFrags             = nFrags;
1041     msg->fragNo             = i;
1042     msg->sourceFlag         = 1;
1043     msg->redNo              = id.get_redNo();
1044     msg->gcount             = 1;
1045     msg->rebuilt            = (mpe == CkMyPe())?0:1;
1046     msg->callback           = cb;
1047     msg->userFlag           = userFlag;
1048
1049     mCastGrp[mpe].recvRedMsg(msg);
1050
1051     data = (void*)(((char*)data) + fSize);
1052   }
1053
1054   id.get_redNo()++;
1055   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
1056 }
1057
1058 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
1059                                               mCastEntry* entry,
1060                                               reductionInfo& redInfo) {
1061   int i;
1062   int dataSize = 0;
1063   int nFrags   = redInfo.msgs[0][0]->nFrags;
1064
1065   // to avoid memcpy and allocation cost for non-pipelined reductions
1066   if (1 == nFrags) {
1067     CkReductionMsg* msg = redInfo.msgs[0][0];
1068
1069     // free up the msg slot
1070     redInfo.msgs[0].length() = 0;
1071
1072     return msg;
1073   }
1074
1075   for (i=0; i<nFrags; i++) {
1076     dataSize += redInfo.msgs[i][0]->dataSize;
1077   }
1078
1079   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1080
1081   // initialize msg header
1082   msg->redNo      = redInfo.msgs[0][0]->redNo;
1083   msg->reducer    = redInfo.msgs[0][0]->reducer;
1084   msg->sid        = id;
1085   msg->nFrags     = nFrags;
1086
1087   // I guess following fields need not be initialized
1088   msg->sourceFlag = 2;
1089   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1090   msg->callback   = redInfo.msgs[0][0]->callback;
1091   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1092
1093   byte* data = (byte*)msg->getData ();
1094   for (i=0; i<nFrags; i++) {
1095     // copy data from fragments to msg
1096     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1097     data += redInfo.msgs[i][0]->dataSize;
1098
1099     // free fragments
1100     delete redInfo.msgs[i][0];
1101     redInfo.msgs[i].length() = 0;    
1102   }
1103
1104   return msg;
1105 }
1106
1107
1108
1109 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1110                                      mCastEntry* entry, reductionInfo& redInfo,
1111                                      int currentTreeUp) {
1112
1113     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1114     reductionMsgs& rmsgs = redInfo.msgs[index];
1115     int dataSize         = rmsgs[0]->dataSize;
1116     int i;
1117     int oldRedNo = redInfo.redNo;
1118     int nFrags   = rmsgs[0]->nFrags;
1119     int fragNo   = rmsgs[0]->fragNo;
1120     int userFlag = rmsgs[0]->userFlag;
1121
1122     // Figure out (from one of the msg fragments) which reducer function to use
1123     CkReduction::reducerType reducer = rmsgs[0]->reducer;
1124     CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1125     CkAssert(NULL != f);
1126
1127     // Check if migration occurred in any of the subtrees, and pick one valid callback
1128     CkCallback msg_cb;
1129     int rebuilt = 0;
1130     for (i=0; i<rmsgs.length(); i++) {
1131         if (rmsgs[i]->rebuilt) rebuilt = 1;
1132         if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1133     }
1134
1135     // Perform the actual reduction
1136     CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
1137     newmsg->redNo  = redInfo.redNo;
1138     newmsg->nFrags = nFrags;
1139     newmsg->fragNo = fragNo;
1140     newmsg->userFlag = userFlag;
1141     newmsg->reducer = reducer;
1142
1143     // Increment the number of fragments processed
1144     redInfo.npProcessed ++;
1145
1146     // Delete all the fragments which are no longer needed
1147     for (i=0; i<rmsgs.length(); i++)
1148         if (rmsgs[i]!=newmsg) delete rmsgs[i];
1149     rmsgs.length() = 0;
1150
1151     // If I am not the tree root
1152     if (entry->hasParent()) {
1153         // send up to parent
1154         newmsg->sid        = entry->parentGrp;
1155         newmsg->sourceFlag = 2;
1156         newmsg->redNo      = oldRedNo; ///< @todo: redundant, duplicate assignment?
1157         newmsg->gcount     = redInfo.gcount [index];
1158         newmsg->rebuilt    = rebuilt;
1159         newmsg->callback   = msg_cb;
1160         DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1161         mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1162     } else {
1163         newmsg->sid = id;
1164         // Buffer the reduced fragment
1165         rmsgs.push_back (newmsg);
1166         // If all the fragments have been reduced
1167         if (redInfo.npProcessed == nFrags) {
1168             // Combine the fragments
1169             newmsg = combineFrags (id, entry, redInfo);
1170             // Set the reference number based on the user flag at the contribute call
1171             CkSetRefNum(newmsg, userFlag);
1172             // Trigger the appropriate reduction client
1173             if ( !msg_cb.isInvalid() )
1174                 msg_cb.send(newmsg);
1175             else if (redInfo.storedCallback != NULL)
1176                 redInfo.storedCallback->send(newmsg);
1177             else if (redInfo.storedClient != NULL) {
1178                 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
1179                 delete newmsg;
1180             }
1181             else
1182                 CmiAbort("Did you forget to register a reduction client?");
1183
1184             DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1185             //
1186             if (currentTreeUp) {
1187                 if (entry->oldc) {
1188                     // free old tree on same processor;
1189                     mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1190                     entry->oldc = NULL;
1191                 }
1192                 if (entry->hasOldtree()) {
1193                     // free old tree on old processor
1194                     int oldpe = entry->oldtree.pe;
1195                     mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1196                     entry->oldtree.clear();
1197                 }
1198             }
1199             // Indicate if a tree rebuild is required
1200             if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1201         }
1202     }
1203 }
1204
1205
1206
1207 /**
1208  * Called from:
1209  *   - contribute(): calls PE specified in the cookie
1210  *   - reduceFragment(): calls parent PE
1211  *   - recvRedMsg(): calls root PE (if tree is obsolete)
1212  *   - releaseFutureRedMsgs: calls this PE
1213  *   - releaseBufferedRedMsgs: calls root PE
1214  */
1215 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1216 {
1217     int i;
1218     /// Grab the section info embedded in the redn msg
1219     CkSectionInfo id = msg->sid;
1220     /// ... and get at the ptr which shows me which cookie to use
1221     mCastEntry *entry = (mCastEntry *)id.get_val();
1222     CmiAssert(entry!=NULL);
1223
1224     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1225
1226     int updateReduceNo = 0;
1227
1228     //-------------------------------------------------------------------------
1229     /// If this cookie is obsolete
1230     if (entry->isObsolete()) {
1231         // Send up to root
1232         DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.pe));
1233
1234         /// If I am the root, traverse the linked list of cookies to get the latest
1235         if (!entry->hasParent()) {
1236             mCastEntry *newentry = entry->newc;
1237             while (newentry && newentry->newc) newentry=newentry->newc;
1238             if (newentry) entry = newentry;
1239             CmiAssert(entry!=NULL);
1240         }
1241
1242         ///
1243         if (!entry->hasParent() && !entry->isObsolete()) {
1244             /// Indicate it is not on old spanning tree
1245             msg->sourceFlag = 0;
1246             /// Flag the redn as coming from an old tree and that the new entry cookie needs to know the new redn num.
1247             updateReduceNo  = 1;
1248         }
1249         /// If I am not the root or this latest cookie is also obsolete
1250         else {
1251             // Ensure that you're here with reason
1252             CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1253             // Edit the msg so that the recipient knows where to find its cookie
1254             msg->sid = entry->rootSid;
1255             msg->sourceFlag = 0;
1256             // Send the msg directly to the root of the redn tree
1257             mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1258             return;
1259         }
1260     }
1261
1262     /// Grab the locally stored redn info
1263     reductionInfo &redInfo = entry->red;
1264
1265     //-------------------------------------------------------------------------
1266     /// If you've received a msg from a previous redn, something has gone horribly wrong somewhere!
1267     if (msg->redNo < redInfo.redNo) {
1268         CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1269         CmiAbort("Could never happen! \n");
1270     }
1271
1272     //-------------------------------------------------------------------------
1273     /// If the current tree is not yet ready or if you've received a msg for a future redn, buffer the msg
1274     if (entry->notReady() || msg->redNo > redInfo.redNo) {
1275         DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1276         redInfo.futureMsgs.push_back(msg);
1277         return;
1278     }
1279
1280     //-------------------------------------------------------------------------
1281     const int index = msg->fragNo;
1282     // New contribution from an ArrayElement
1283     if (msg->sourceFlag == 1) {
1284         redInfo.lcount [index] ++;
1285     }
1286     // Redn from a child
1287     if (msg->sourceFlag == 2) {
1288         redInfo.ccount [index] ++;
1289     }
1290     // Total elems that have contributed the indexth fragment
1291     redInfo.gcount [index] += msg->gcount;
1292
1293     // Check if message is of proper size
1294     if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
1295     CmiAbort("Reduction data are not of same length!");
1296
1297     //-------------------------------------------------------------------------
1298     // Buffer the msg
1299     redInfo.msgs [index].push_back(msg);
1300
1301     //-------------------------------------------------------------------------
1302     /// Flag if this fragment can be reduced (if all local elements and children have contributed this fragment)
1303     int currentTreeUp = 0;
1304     if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
1305         currentTreeUp = 1;
1306
1307     /// Flag (only at the redn root) if all array elements contributed all their fragments
1308     int mixTreeUp = 0;
1309     if (!entry->hasParent()) {
1310         mixTreeUp = 1;
1311         for (int i=0; i<msg->nFrags; i++)
1312             if (entry->allElem.length() != redInfo.gcount [i])
1313                 mixTreeUp = 0;
1314     }
1315
1316     //-------------------------------------------------------------------------
1317     /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements
1318     if (currentTreeUp || mixTreeUp)
1319     {
1320         const int nFrags = msg->nFrags;
1321         /// Reduce this fragment
1322         reduceFragment (index, id, entry, redInfo, currentTreeUp);
1323
1324         // If migration happened, and my sub-tree reconstructed itself,
1325         // share the current reduction number with myself and all my children
1326         if (updateReduceNo)
1327             mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1328
1329         /// If all the fragments for the current reduction have been processed
1330         if (redInfo.npProcessed == nFrags) {
1331
1332             /// Increment the reduction number in all of this section's cookies
1333             entry->incReduceNo();
1334
1335             /// Reset bookkeeping counters
1336             for (i=0; i<nFrags; i++) {
1337                 redInfo.lcount [i] = 0;
1338                 redInfo.ccount [i] = 0;
1339                 redInfo.gcount [i] = 0;
1340             }
1341             redInfo.npProcessed = 0;
1342             /// Now that, the current redn is done, release any pending msgs from future redns
1343             releaseFutureReduceMsgs(entry);
1344         }
1345     }
1346 }
1347
1348
1349
1350 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1351 {
1352   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1353
1354   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1355     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1356     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1357   }
1358   entry->red.futureMsgs.length() = 0;
1359 }
1360
1361
1362
1363 // these messages have to be sent to root
1364 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1365 {
1366   int i;
1367   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1368
1369   for (int j=0; j<MAXFRAGS; j++) {
1370     for (i=0; i<entry->red.msgs[j].length(); i++) {
1371       CkReductionMsg *msg = entry->red.msgs[j][i];
1372       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1373       msg->sid = entry->rootSid;
1374       msg->sourceFlag = 0;
1375       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1376     }
1377     entry->red.msgs[j].length() = 0;
1378   }
1379
1380
1381   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1382     CkReductionMsg *msg = entry->red.futureMsgs[i];
1383     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1384     msg->sid = entry->rootSid;
1385     msg->sourceFlag = 0;
1386     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1387   }
1388   entry->red.futureMsgs.length() = 0;
1389 }
1390
1391
1392
1393 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1394 {
1395   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1396   if (entry->red.redNo < red)
1397     entry->red.redNo = red;
1398
1399   CProxy_CkMulticastMgr mp(thisgroup);
1400   for (int i=0; i<entry->children.length(); i++) {
1401     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1402   }
1403
1404   releaseFutureReduceMsgs(entry);
1405 }
1406
1407 #include "CkMulticast.def.h"
1408