fix a subtle bug in ckmulticast when using packing functions.
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19 #include "XArraySectionReducer.h"
20
21 #define DEBUGF(x)  // CkPrintf x;
22
23 // turn on or off fragmentation in multicast
24 #define SPLIT_MULTICAST  1
25
26 // maximum number of fragments into which a message can be broken
27 #define MAXFRAGS 100
28
29 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
30 typedef CkVec<CkArrayIndex>   arrayIndexList;
31 typedef CkVec<CkSectionInfo>     sectionIdList;
32 typedef CkVec<CkReductionMsg *>  reductionMsgs;
33 typedef CkQ<int>                 PieceSize;
34 typedef CkVec<LDObjid>          ObjKeyList;
35 typedef unsigned char            byte;
36
37 /** Information about the status of reductions proceeding along a given section
38  *
39  * An instance of this class is stored in every mCastEntry object making it possible
40  * to track redn operations on a per section basis all along the spanning tree.
41  */
42 class reductionInfo {
43     public:
44         /// Number of local array elements which have contributed a given fragment
45         int lcount [MAXFRAGS];
46         /// Number of child vertices (NOT array elements) that have contributed a given fragment
47         int ccount [MAXFRAGS];
48         /// The total number of array elements that have contributed so far to a given fragment
49         int gcount [MAXFRAGS];
50         /// The number of fragments processed so far
51         int npProcessed;
52         /// User callback
53         CkCallback *storedCallback;
54         /// User reduction client function
55         redClientFn storedClient;
56         /// User provided data for the redn client function
57         void *storedClientParam;
58         /// Reduction sequence number
59         int redNo;
60         /// Messages for this reduction
61         reductionMsgs  msgs [MAXFRAGS];
62         /// Messages of future reductions
63         reductionMsgs futureMsgs;
64
65     public:
66         reductionInfo(): storedCallback(NULL),
67                          storedClientParam(NULL),
68                          redNo(0),
69                          npProcessed(0) {
70             for (int i=0; i<MAXFRAGS; i++)
71                 lcount [i] = ccount [i] = gcount [i] = 0;
72         }
73 };
74
75 /// cookie status
76 #define COOKIE_NOTREADY 0
77 #define COOKIE_READY    1
78 #define COOKIE_OLD      2
79
80 class mCastPacket {
81 public:
82   CkSectionInfo cookie;
83   int offset;
84   int n;
85   char *data;
86   int seqno;
87   int count;
88   int totalsize;
89
90   mCastPacket(CkSectionInfo &_cookie, int _offset, int _n, char *_d, int _s, int _c, int _t):
91                 cookie(_cookie), offset(_offset), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
92 };
93
94 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
95
96 class SectionLocation {
97 public:
98   mCastEntry *entry;
99   int         pe;
100 public:
101   SectionLocation(): entry(NULL), pe(-1) {}
102   SectionLocation( mCastEntry *e, int p) { set(e, p); }
103   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
104   inline void clear() { entry = NULL; pe = -1; }
105 };
106
107
108
109
110 /// Cookie for an array section 
111 class mCastEntry 
112 {
113     public:
114         /// Array ID 
115         CkArrayID     aid;
116         /// Spanning tree parent
117         CkSectionInfo parentGrp;
118         /// List of direct children
119         sectionIdList children;
120         /// Number of direct children
121         int numChild;
122         /// List of all tree member array indices (Only useful on the tree root)
123         arrayIndexList allElem;
124         /// Only useful on root for LB
125         ObjKeyList     allObjKeys;
126         /// List of array elements on local PE
127         arrayIndexList localElem;
128         /// Should always be myPE
129         int pe;
130         /// Section ID of the root
131         CkSectionInfo rootSid;
132         multicastGrpMsgBuf msgBuf;
133         /// Buffer storing the pending packets
134         multicastGrpPacketBuf packetBuf;
135         /// For multicast packetization
136         char *asm_msg;
137         int   asm_fill;
138         /// Linked list of entries on the same processor
139         mCastEntry *oldc, *newc;
140         /// Old spanning tree
141         SectionLocation   oldtree;
142         // for reduction
143         reductionInfo red;
144         //
145         char needRebuild;
146     private:
147         char flag;
148     public:
149         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
150                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
151         mCastEntry(mCastEntry *);
152         /// Check if this tree is only a branch and has a parent
153         inline int hasParent() { return parentGrp.get_val()?1:0; }
154         /// Is this tree obsolete?
155         inline int isObsolete() { return (flag == COOKIE_OLD); }
156         /// Make the current tree obsolete
157         inline void setObsolete() { flag=COOKIE_OLD; }
158         /// Check if this (branch of the) tree is ready for use
159         inline int notReady() { return (flag == COOKIE_NOTREADY); }
160         /// Mark this (branch of the) tree as ready for use
161         inline void setReady() { flag=COOKIE_READY; }
162         /// Increment the reduction number across the whole linked list of cookies
163         inline void incReduceNo() {
164             red.redNo ++;
165             for (mCastEntry *next = newc; next; next=next->newc) 
166                 next->red.redNo++;
167         }
168         /// Get a handle on the array ID this tree is a member of
169         inline CkArrayID getAid() { return aid; }
170         inline int hasOldtree() { return oldtree.entry != NULL; }
171         inline void print() {
172             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
173         }
174 };
175
176
177
178
179 class cookieMsg: public CMessage_cookieMsg {
180 public:
181   CkSectionInfo cookie;
182 public:
183   cookieMsg() {};
184   cookieMsg(CkSectionInfo m): cookie(m) {};
185 };
186
187
188
189
190 /// multicast tree setup message
191 class multicastSetupMsg: public CMessage_multicastSetupMsg {
192 public:
193   int  nIdx;
194   CkArrayIndex *arrIdx;
195   int      *lastKnown;
196   CkSectionInfo parent;
197   CkSectionInfo rootSid;
198   int redNo;
199 };
200
201
202
203
204 /// message send in spanning tree
205 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
206 };
207
208
209 extern void CkPackMessage(envelope **pEnv);
210 extern void CkUnpackMessage(envelope **pEnv);
211
212
213
214 void _ckMulticastInit(void)
215 {
216 /*
217   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
218   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
219 */
220 }
221
222
223 mCastEntry::mCastEntry (mCastEntry *old): 
224   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
225 {
226   int i;
227   aid = old->aid;
228   parentGrp = old->parentGrp;
229   for (i=0; i<old->allElem.length(); i++)
230     allElem.push_back(old->allElem[i]);
231 #if CMK_LBDB_ON
232   CmiAssert(old->allElem.length() == old->allObjKeys.length());
233   for (i=0; i<old->allObjKeys.length(); i++)
234     allObjKeys.push_back(old->allObjKeys[i]);
235 #endif
236   pe = old->pe;
237   red.storedCallback = old->red.storedCallback;
238   red.storedClient = old->red.storedClient;
239   red.storedClientParam = old->red.storedClientParam;
240   red.redNo = old->red.redNo;
241   needRebuild = 0;
242   asm_msg = NULL;
243   asm_fill = 0;
244 }
245
246 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
247
248
249
250
251 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n)
252 {
253     // Create a multicast entry
254     mCastEntry *entry = new mCastEntry(aid);
255     // Push all the section member indices into the entry
256     for (int i=0; i<n; i++) {
257         entry->allElem.push_back(al[i]);
258 #if CMK_LBDB_ON
259         const LDObjid key = idx2LDObjid(al[i]);
260         entry->allObjKeys.push_back(key);
261 #endif
262     }
263     //  entry->aid = aid;
264     _id.get_aid() = aid;
265     _id.get_val() = entry;              // allocate table for this section
266     // 
267     initCookie(_id);
268 }
269
270
271
272
273 void CkMulticastMgr::setSection(CkSectionInfo &id)
274 {
275   initCookie(id);
276 }
277
278
279
280
281 /// @warning: This is deprecated
282 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
283 {
284   CkArrayID aid = proxy.ckGetArrayID();
285   CkSectionInfo &_id = proxy.ckGetSectionInfo();
286
287   mCastEntry *entry = new mCastEntry(aid);
288
289   const CkArrayIndex *al = proxy.ckGetArrayElements();
290   for (int i=0; i<proxy.ckGetNumElements(); i++) {
291     entry->allElem.push_back(al[i]);
292 #if CMK_LBDB_ON
293     const LDObjid key = idx2LDObjid(al[i]);
294     entry->allObjKeys.push_back(key);
295 #endif
296   }
297   _id.get_type() = MulticastMsg;
298   _id.get_aid() = aid;
299   _id.get_val() = entry;                // allocate table for this section
300   initCookie(_id);
301 }
302
303
304
305
306 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
307 {
308   CkSectionInfo &info = proxy.ckGetSectionInfo();
309
310   int oldpe = info.get_pe();
311   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
312
313   CkArrayID aid = proxy.ckGetArrayID();
314   CkSectionID *sid = proxy.ckGetSectionIDs();
315   mCastEntry *entry = new mCastEntry(aid);
316
317   mCastEntry *oldentry = (mCastEntry *)info.get_val();
318   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
319
320   const CkArrayIndex *al = sid->_elems;
321   CmiAssert(info.get_aid() == aid);
322   prepareCookie(entry, *sid, al, sid->_nElems, aid);
323
324   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
325
326     // store old tree info
327   entry->oldtree.set(oldentry, oldpe);
328
329     // obsolete old tree
330   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
331
332   // find reduction number
333   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
334 }
335
336
337
338
339 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
340 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndex *al, int count, CkArrayID aid)
341 {
342   for (int i=0; i<count; i++) {
343     entry->allElem.push_back(al[i]);
344 #if CMK_LBDB_ON
345     const LDObjid key = idx2LDObjid(al[i]);
346     entry->allObjKeys.push_back(key);
347 #endif
348   }
349   sid._cookie.get_type() = MulticastMsg;
350   sid._cookie.get_aid() = aid;
351   sid._cookie.get_val() = entry;        // allocate table for this section
352   sid._cookie.get_pe() = CkMyPe();
353 }
354
355
356
357
358 // this is used
359 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
360 {
361   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
362   int numSubSections = proxy->ckGetNumSubSections();
363   for (int i=0; i<numSubSections; i++)
364   {
365       CkArrayID aid = proxy->ckGetArrayIDn(i);
366       mCastEntry *entry = new mCastEntry(aid);
367       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
368       const CkArrayIndex *al = proxy->ckGetArrayElements(i);
369       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
370       initCookie(sid->_cookie);
371   }
372 }
373
374
375
376
377 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
378 {
379   mCastEntry *entry = (mCastEntry *)s.get_val();
380   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
381   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
382 }
383
384 // now that we get reduction number from the old cookie,
385 // we continue to build the spanning tree
386 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
387 {
388   mCastEntry *entry = (mCastEntry *)s.get_val();
389   entry->red.redNo = red;
390
391   initCookie(s);
392
393   // TODO delete old tree
394 }
395
396
397
398
399 void CkMulticastMgr::initCookie(CkSectionInfo s)
400 {
401     mCastEntry *entry = (mCastEntry *)s.get_val();
402     int n = entry->allElem.length();
403     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
404     // Create and initialize a setup message
405     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
406     msg->nIdx = n;
407     msg->parent = CkSectionInfo(entry->getAid());
408     msg->rootSid = s;
409     msg->redNo = entry->red.redNo;
410     // Fill the message with the section member indices and their last known locations
411     CkArray *array = CProxy_ArrayBase(s.get_aid()).ckLocalBranch();
412     for (int i=0; i<n; i++) {
413       msg->arrIdx[i] = entry->allElem[i];
414       int ape = array->lastKnown(entry->allElem[i]);
415       CmiAssert(ape >=0 && ape < CkNumPes());
416       msg->lastKnown[i] = ape;
417     }
418     // Trigger the spanning tree build
419     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
420     mCastGrp[CkMyPe()].setup(msg);
421 }
422
423
424
425
426 void CkMulticastMgr::teardown(CkSectionInfo cookie)
427 {
428     int i;
429     mCastEntry *sect = (mCastEntry *)cookie.get_val();
430     // Mark this section as obsolete
431     sect->setObsolete();
432     // Release the buffered messages 
433     releaseBufferedReduceMsgs(sect);
434     // Propagate the teardown to each of your children
435     CProxy_CkMulticastMgr mp(thisgroup);
436     for (i=0; i<sect->children.length(); i++)
437         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
438 }
439
440
441
442
443 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
444 {
445     int i;
446     mCastEntry *sect = (mCastEntry *)cookie.get_val();
447     // Reset the root section info
448     sect->rootSid = newroot;
449     // Mark this section as obsolete
450     sect->setObsolete();
451     // Release the buffered messages 
452     releaseBufferedReduceMsgs(sect);
453     // Propagate the teardown to each of your children
454     CProxy_CkMulticastMgr mp(thisgroup);
455     for (i=0; i<sect->children.length(); i++)
456         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
457 }
458
459
460
461
462 void CkMulticastMgr::freeup(CkSectionInfo cookie)
463 {
464   mCastEntry *sect = (mCastEntry *)cookie.get_val();
465   CProxy_CkMulticastMgr mp(thisgroup);
466   // Parse through all the section members on this PE and...
467   while (sect) 
468   {
469       // Free their children
470       for (int i=0; i<sect->children.length(); i++)
471           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
472       // Free the cookie itself
473       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
474       mCastEntry *oldc= sect->oldc;
475       delete sect;
476       sect = oldc;
477   }
478 }
479
480
481
482
483 void CkMulticastMgr::setup(multicastSetupMsg *msg)
484 {
485     int i,j;
486     mCastEntry *entry;
487     CkArrayID aid = msg->rootSid.get_aid();
488     if (msg->parent.get_pe() == CkMyPe()) 
489       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
490     else 
491       entry = new mCastEntry(aid);
492     entry->aid = aid;
493     entry->pe = CkMyPe();
494     entry->rootSid = msg->rootSid;
495     entry->parentGrp = msg->parent;
496
497     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
498     entry->red.redNo = msg->redNo;
499
500     // Create a numPE sized array of vectors to hold the array elements in each PE
501     int numpes = CkNumPes();
502     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
503     // Sort each array index in the setup message based on last known location
504     for (i=0; i<msg->nIdx; i++) 
505     {
506       int lastKnown = msg->lastKnown[i];
507       // If msg->arrIdx[i] local, add it to a special local element list
508       if (lastKnown == CkMyPe())
509           entry->localElem.insertAtEnd(msg->arrIdx[i]);
510       // else, add it to the list corresponding to its PE
511       else
512           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
513     }
514
515     CkVec<int> mySubTreePEs;
516     mySubTreePEs.reserve(numpes);
517     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
518     mySubTreePEs.push_back(CkMyPe());
519     // Identify the child PEs in the tree, ie the PEs with section members on them
520     for (i=0; i<numpes; i++) 
521     {
522       if (i==CkMyPe()) continue;
523       if (lists[i].size()) 
524           mySubTreePEs.push_back(i);
525     }
526     // The number of multicast children can be limited by the spanning tree factor 
527     int num = mySubTreePEs.size() - 1, numchild = 0;
528     if (factor <= 0) numchild = num;
529     else numchild = num<factor?num:factor;
530   
531     entry->numChild = numchild;
532
533     // If there are any children, go about building a spanning tree
534     if (numchild) 
535     {
536         // Build the next generation of the spanning tree rooted at my PE
537         int *peListPtr = mySubTreePEs.getVec();
538         topo::SpanningTreeVertex *nextGenInfo;
539         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
540         //CkAssert(nextGenInfo->childIndex.size() == numchild);
541         numchild = nextGenInfo->childIndex.size();
542         entry->numChild = numchild;
543
544         // Distribute the section members across the number of direct children (branches)
545         // Direct children are simply the first section member in each of the branch lists
546         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
547
548         // For each direct child, collate indices of all section members on the PEs in that branch
549         for (i=0; i < numchild; i++)
550         {
551             // Determine the indices of the first and last PEs in this branch of my sub-tree
552             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
553             if (i < numchild-1)
554                 childEndIndex = nextGenInfo->childIndex[i+1];
555             else
556                 childEndIndex = mySubTreePEs.size();
557             // For each PE in this branch, add the section members on that PE to a list
558             for (j = childStartIndex; j < childEndIndex; j++)
559             {
560                 int pe = mySubTreePEs[j];
561                 for (int k=0; k<lists[pe].size(); k++)
562                     slots[i].push_back(lists[pe][k]);
563             }
564         }
565
566         // Ask each of your direct children to setup their branches
567         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
568         for (i=0; i<numchild; i++) 
569         {
570             // Give each child info about the number, indices and location of its children
571             int n = slots[i].length();
572             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
573             m->parent = CkSectionInfo(aid, entry);
574             m->nIdx = slots[i].length();
575             m->rootSid = msg->rootSid;
576             m->redNo = msg->redNo;
577             for (j=0; j<slots[i].length(); j++) 
578             {
579                 m->arrIdx[j] = slots[i][j].idx;
580                 m->lastKnown[j] = slots[i][j].pe;
581             }
582             int childroot = slots[i][0].pe;
583             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
584             // Send the message to the child
585             mCastGrp[childroot].setup(m);
586         }
587         delete [] slots;
588         delete nextGenInfo;
589     }
590     // else, tell yourself that your children are ready
591     else 
592     {
593         childrenReady(entry);
594     }
595     delete [] lists;
596     delete msg;
597 }
598
599
600
601
602 void CkMulticastMgr::childrenReady(mCastEntry *entry)
603 {
604     // Mark this entry as ready
605     entry->setReady();
606     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
607     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
608     if (entry->hasParent()) 
609         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
610 #if SPLIT_MULTICAST
611     // clear packet buffer
612     while (!entry->packetBuf.isEmpty()) 
613     {
614         mCastPacket *packet = entry->packetBuf.deq();
615         packet->cookie.get_val() = entry;
616         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->offset, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
617         delete [] packet->data;
618         delete packet;
619     }
620 #endif
621     // clear msg buffer
622     while (!entry->msgBuf.isEmpty()) 
623     {
624         multicastGrpMsg *newmsg = entry->msgBuf.deq();
625         DEBUGF(("[%d] release buffer %p ep:%d\n", CkMyPe(), newmsg, newmsg->ep));
626         newmsg->_cookie.get_val() = entry;
627         mCastGrp[CkMyPe()].recvMsg(newmsg);
628     }
629     // release reduction msgs
630     releaseFutureReduceMsgs(entry);
631 }
632
633
634
635
636 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
637 {
638   mCastEntry *entry = (mCastEntry *)sid.get_val();
639   entry->children.push_back(child);
640   if (entry->children.length() == entry->numChild) {
641     childrenReady(entry);
642   }
643 }
644
645
646
647
648 // rebuild is called when root not migrated
649 // when rebuilding, all multicast msgs will be buffered.
650 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
651 {
652   // tear down old tree
653   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
654   CkAssert(curCookie->pe == CkMyPe());
655   // make sure I am the newest one
656   while (curCookie->newc) curCookie = curCookie->newc;
657   if (curCookie->isObsolete()) return;
658
659   //CmiPrintf("tree rebuild\n");
660   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
661
662   // build a chain
663   newCookie->oldc = curCookie;
664   curCookie->newc = newCookie;
665
666   sectId.get_val() = newCookie;
667
668   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
669
670   curCookie->setObsolete();
671
672   resetCookie(sectId);
673 }
674
675 void CkMulticastMgr::resetCookie(CkSectionInfo s)
676 {
677   mCastEntry *newCookie = (mCastEntry*)s.get_val();
678   mCastEntry *oldCookie = newCookie->oldc;
679
680   // get rid of old one
681   DEBUGF(("reset: oldc: %p\n", oldCookie));
682   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
683   int mype = CkMyPe();
684   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
685
686   // build a new one
687   initCookie(s);
688 }
689
690 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
691 {
692   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
693     // set an invalid cookie since we don't have it
694   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
695   for (int i=0; i< sid._nElems-1; i++) {
696      CProxyElement_ArrayBase ap(a, sid._elems[i]);
697      void *newMsg=CkCopyMsg((void **)&m);
698      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
699   }
700   if (sid._nElems > 0) {
701      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
702      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
703   }
704 }
705
706 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
707 {
708     for (int snum = 0; snum < nsid; snum++) {
709         void *msgCopy = m;
710         if (nsid - snum > 1)
711             msgCopy = CkCopyMsg(&m);
712         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
713     }
714 }
715
716
717
718 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
719 {
720   DEBUGF(("ArraySectionSend\n"));
721   multicastGrpMsg *msg = (multicastGrpMsg *)m;
722   msg->ep = ep;
723   CkSectionInfo &s = sid->_cookie;
724   mCastEntry *entry;
725
726   // If this section is rooted at this PE
727   if (s.get_pe() == CkMyPe()) {
728     entry = (mCastEntry *)s.get_val();   
729     if (NULL == entry)
730       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
731
732     // update entry pointer in case there is a newer one.
733     if (entry->newc) {
734       do { entry=entry->newc; } while (entry->newc);
735       s.get_val() = entry;
736     }
737
738 #if CMK_LBDB_ON
739     // fixme: running obj?
740     envelope *env = UsrToEnv(msg);
741     const LDOMHandle &om = CProxy_ArrayBase(s.get_aid()).ckLocMgr()->getOMHandle();
742     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
743 #endif
744
745     // The first time we need to rebuild the spanning tree, we do p2p sends to refresh lastKnown
746     if (entry->needRebuild == 1) {
747       msg->_cookie = s;
748       SimpleSend(ep, msg, s.get_aid(), *sid, opts);
749       entry->needRebuild = 2;
750       return;
751     }
752     // else the second time, we just rebuild cos now we'll have all the lastKnown PEs
753     else if (entry->needRebuild == 2) rebuild(s);
754   }
755   // else, if the root has migrated, we have a sub-optimal mcast
756   else {
757     // fixme - in this case, not recorded in LB
758     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
759   }
760
761   // update cookie
762   msg->_cookie = s;
763
764 #if SPLIT_MULTICAST
765   // split multicast msg into SPLIT_NUM copies
766   register envelope *env = UsrToEnv(m);
767   CkPackMessage(&env);
768   int totalsize = env->getTotalsize();
769   int packetSize = 0;
770   int totalcount = 0;
771   if(totalsize < split_threshold){
772     packetSize = totalsize;
773     totalcount = 1;
774   }else{
775     packetSize = split_size;
776     totalcount = totalsize/split_size;
777     if(totalsize%split_size) totalcount++; 
778     //packetSize = totalsize/SPLIT_NUM;
779     //if (totalsize%SPLIT_NUM) packetSize ++;
780     //totalcount = SPLIT_NUM;
781   }
782   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
783   int sizesofar = 0;
784   char *data = (char*) env;
785   if (totalcount == 1) {
786     // If the root of this section's tree is on this PE, then just propagate msg
787     if (s.get_pe() == CkMyPe()) {
788       CkUnpackMessage(&env);
789       msg = (multicastGrpMsg *)EnvToUsr(env);
790       recvMsg(msg);
791     }
792     // else send msg to root of section's spanning tree
793     else {
794       CProxy_CkMulticastMgr  mCastGrp(thisgroup);
795       msg = (multicastGrpMsg *)EnvToUsr(env);
796       mCastGrp[s.get_pe()].recvMsg(msg);
797     }
798     return;
799   }
800   for (int i=0; i<totalcount; i++) {
801     int mysize = packetSize;
802     if (mysize + sizesofar > totalsize) {
803       mysize = totalsize-sizesofar;
804     }
805     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
806     mCastGrp[s.get_pe()].recvPacket(s, sizesofar, mysize, data, i, totalcount, totalsize, 0);
807     sizesofar += mysize;
808     data += mysize;
809   }
810   CmiFree(env);
811 #else
812   if (s.get_pe() == CkMyPe()) {
813     recvMsg(msg);
814   }
815   else {
816     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
817     mCastGrp[s.get_pe()].recvMsg(msg);
818   }
819 #endif
820 }
821
822 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int offset, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
823 {
824   int i;
825   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
826
827
828   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
829     char *newdata = new char[n];
830     memcpy(newdata, data, n);
831     entry->packetBuf.enq(new mCastPacket(_cookie, offset, n, newdata, seqno, count, totalsize));
832 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
833     return;
834   }
835
836 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
837
838   // send to spanning tree children
839   // can not optimize using list send because the difference in cookie
840   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
841   for (i=0; i<entry->children.length(); i++) {
842     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], offset, n, data, seqno, count, totalsize, 0);
843   }
844
845   if (entry->asm_msg == NULL) {
846     CmiAssert(entry->asm_fill == 0);
847     entry->asm_msg = (char *)CmiAlloc(totalsize);
848   }
849   memcpy(entry->asm_msg+offset, data, n);
850   entry->asm_fill += n;
851   if (entry->asm_fill == totalsize) {
852     CkUnpackMessage((envelope **)&entry->asm_msg);
853     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
854     msg->_cookie = _cookie;
855 //    mCastGrp[CkMyPe()].recvMsg(msg);
856     sendToLocal(msg);
857     entry->asm_msg = NULL;
858     entry->asm_fill = 0;
859   }
860 //  if (fromBuffer) delete [] data;
861 }
862
863 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
864 {
865   int i;
866   CkSectionInfo &sectionInfo = msg->_cookie;
867   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
868   CmiAssert(entry->getAid() == sectionInfo.get_aid());
869
870   if (entry->notReady()) {
871     DEBUGF(("entry not ready, enq buffer %p\n", msg));
872     entry->msgBuf.enq(msg);
873     return;
874   }
875
876   // send to spanning tree children
877   // can not optimize using list send because the difference in cookie
878   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
879   for (i=0; i<entry->children.length(); i++) {
880     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
881     newmsg->_cookie = entry->children[i];
882     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
883   }
884
885   sendToLocal(msg);
886 }
887
888 void CkMulticastMgr::sendToLocal(multicastGrpMsg *msg)
889 {
890   int i;
891   CkSectionInfo &sectionInfo = msg->_cookie;
892   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
893   CmiAssert(entry->getAid() == sectionInfo.get_aid());
894   CkGroupID aid = sectionInfo.get_aid();
895
896   // send to local
897   int nLocal = entry->localElem.length();
898   DEBUGF(("[%d] send to local %d elems\n", CkMyPe(), nLocal));
899   for (i=0; i<nLocal-1; i++) {
900     CProxyElement_ArrayBase ap(aid, entry->localElem[i]);
901     if (_entryTable[msg->ep]->noKeep) {
902       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.get_aid(), entry->localElem[i], CK_MSG_KEEP);
903     }
904     else {
905       // send through scheduler queue
906       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
907       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
908     }
909     // use CK_MSG_DONTFREE so that the message can be reused
910     // the drawback of this scheme bypassing queue is that 
911     // if # of local element is huge, this leads to a long time occupying CPU
912     // also load balancer seems not be able to correctly instrument load
913 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
914     //CmiNetworkProgressAfter(3);
915   }
916   if (nLocal) {
917     CProxyElement_ArrayBase ap(aid, entry->localElem[nLocal-1]);
918     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
919 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
920   }
921   else {
922     CkAssert (entry->rootSid.get_pe() == CkMyPe());
923     delete msg;
924   }
925 }
926
927
928
929 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
930 {
931   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
932   if (CkMcastBaseMsg::checkMagic(m) == 0) {
933     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
934     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
935   }
936   // ignore invalid cookie sent by SimpleSend
937   if (m->gpe() != -1) {
938     id.get_type() = MulticastMsg;
939     id.get_pe() = m->gpe();
940     id.get_val() = m->cookie();
941   }
942   // note: retain old redNo
943 }
944
945 // Reduction
946
947 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
948 {
949   CkCallback *sectionCB;
950   int numSubSections = proxy.ckGetNumSubSections();
951   // If its a cross-array section,
952   if (numSubSections > 1)
953   {
954       /** @warning: Each instantiation is a mem leak! :o
955        * The class is trivially small, but there's one instantiation for each
956        * section delegated to CkMulticast. The objects need to live as long as
957        * their section exists and is used. The idea of 'destroying' an array
958        * section is still academic, and hence no effort has been made to charge
959        * some 'owner' entity with the task of deleting this object.
960        *
961        * Reimplementing delegated x-array reductions will make this consideration moot
962        */
963       // Configure the final cross-section reducer
964       ck::impl::XArraySectionReducer *red =
965           new ck::impl::XArraySectionReducer(numSubSections, cb);
966       // Configure the subsection callback to deposit with the final reducer
967       sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
968   }
969   // else, just direct the reduction to the actual client cb
970   else
971       sectionCB = cb;
972   // Wire the sections together by storing the subsection cb in each sectionID
973   for (int i=0; i<numSubSections; i++)
974   {
975       CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
976       mCastEntry *entry = (mCastEntry *)sInfo.get_val();
977       entry->red.storedCallback = sectionCB;
978   }
979 }
980
981 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
982 {
983   CkSectionInfo &id = proxy.ckGetSectionInfo();
984   mCastEntry *entry = (mCastEntry *)id.get_val();
985   entry->red.storedClient = fn;
986   entry->red.storedClientParam = param;
987 }
988
989 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
990 {
991   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
992   msg->reducer = type;
993   msg->sid = id;
994   msg->sourceFlag = 1;   // from array element
995   msg->redNo = id.get_redNo();
996   msg->gcount = 1;
997   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
998   msg->callback = cb;
999   msg->userFlag=userFlag;
1000   return msg;
1001 }
1002
1003
1004
1005 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
1006 {
1007   CkCallback cb;
1008   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
1009 }
1010
1011
1012 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
1013 {
1014   if (id.get_val() == NULL || id.get_redNo() == -1) 
1015     CmiAbort("contribute: SectionID is not initialized\n");
1016
1017   int nFrags;
1018   if (-1 == fragSize) {         // no frag
1019     nFrags = 1;
1020     fragSize = dataSize;
1021   }
1022   else {
1023     CmiAssert (dataSize >= fragSize);
1024     nFrags = dataSize/fragSize;
1025     if (dataSize%fragSize) nFrags++;
1026   }
1027
1028   if (MAXFRAGS < nFrags) {
1029     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
1030     CmiAbort ("frag size too small\n");
1031   }
1032
1033   int mpe = id.get_pe();
1034   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1035
1036   // break the message into k-piece fragments
1037   int fSize = fragSize;
1038   for (int i=0; i<nFrags; i++) {
1039     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
1040       fSize = dataSize%fragSize;
1041     }
1042
1043     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
1044
1045     // initialize the new msg
1046     msg->reducer            = type;
1047     msg->sid                = id;
1048     msg->nFrags             = nFrags;
1049     msg->fragNo             = i;
1050     msg->sourceFlag         = 1;
1051     msg->redNo              = id.get_redNo();
1052     msg->gcount             = 1;
1053     msg->rebuilt            = (mpe == CkMyPe())?0:1;
1054     msg->callback           = cb;
1055     msg->userFlag           = userFlag;
1056
1057     mCastGrp[mpe].recvRedMsg(msg);
1058
1059     data = (void*)(((char*)data) + fSize);
1060   }
1061
1062   id.get_redNo()++;
1063   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
1064 }
1065
1066 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
1067                                               mCastEntry* entry,
1068                                               reductionInfo& redInfo) {
1069   int i;
1070   int dataSize = 0;
1071   int nFrags   = redInfo.msgs[0][0]->nFrags;
1072
1073   // to avoid memcpy and allocation cost for non-pipelined reductions
1074   if (1 == nFrags) {
1075     CkReductionMsg* msg = redInfo.msgs[0][0];
1076
1077     // free up the msg slot
1078     redInfo.msgs[0].length() = 0;
1079
1080     return msg;
1081   }
1082
1083   for (i=0; i<nFrags; i++) {
1084     dataSize += redInfo.msgs[i][0]->dataSize;
1085   }
1086
1087   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1088
1089   // initialize msg header
1090   msg->redNo      = redInfo.msgs[0][0]->redNo;
1091   msg->reducer    = redInfo.msgs[0][0]->reducer;
1092   msg->sid        = id;
1093   msg->nFrags     = nFrags;
1094
1095   // I guess following fields need not be initialized
1096   msg->sourceFlag = 2;
1097   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1098   msg->callback   = redInfo.msgs[0][0]->callback;
1099   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1100
1101   byte* data = (byte*)msg->getData ();
1102   for (i=0; i<nFrags; i++) {
1103     // copy data from fragments to msg
1104     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1105     data += redInfo.msgs[i][0]->dataSize;
1106
1107     // free fragments
1108     delete redInfo.msgs[i][0];
1109     redInfo.msgs[i].length() = 0;    
1110   }
1111
1112   return msg;
1113 }
1114
1115
1116
1117 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1118                                      mCastEntry* entry, reductionInfo& redInfo,
1119                                      int currentTreeUp) {
1120
1121     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1122     reductionMsgs& rmsgs = redInfo.msgs[index];
1123     int dataSize         = rmsgs[0]->dataSize;
1124     int i;
1125     int oldRedNo = redInfo.redNo;
1126     int nFrags   = rmsgs[0]->nFrags;
1127     int fragNo   = rmsgs[0]->fragNo;
1128     int userFlag = rmsgs[0]->userFlag;
1129
1130     // Figure out (from one of the msg fragments) which reducer function to use
1131     CkReduction::reducerType reducer = rmsgs[0]->reducer;
1132     CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1133     CkAssert(NULL != f);
1134
1135     // Check if migration occurred in any of the subtrees, and pick one valid callback
1136     CkCallback msg_cb;
1137     int rebuilt = 0;
1138     for (i=0; i<rmsgs.length(); i++) {
1139         if (rmsgs[i]->rebuilt) rebuilt = 1;
1140         if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1141     }
1142
1143     // Perform the actual reduction
1144     CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
1145     newmsg->redNo  = redInfo.redNo;
1146     newmsg->nFrags = nFrags;
1147     newmsg->fragNo = fragNo;
1148     newmsg->userFlag = userFlag;
1149     newmsg->reducer = reducer;
1150
1151     // Increment the number of fragments processed
1152     redInfo.npProcessed ++;
1153
1154     // Delete all the fragments which are no longer needed
1155     for (i=0; i<rmsgs.length(); i++)
1156         if (rmsgs[i]!=newmsg) delete rmsgs[i];
1157     rmsgs.length() = 0;
1158
1159     // If I am not the tree root
1160     if (entry->hasParent()) {
1161         // send up to parent
1162         newmsg->sid        = entry->parentGrp;
1163         newmsg->sourceFlag = 2;
1164         newmsg->redNo      = oldRedNo; ///< @todo: redundant, duplicate assignment?
1165         newmsg->gcount     = redInfo.gcount [index];
1166         newmsg->rebuilt    = rebuilt;
1167         newmsg->callback   = msg_cb;
1168         DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1169         mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1170     } else {
1171         newmsg->sid = id;
1172         // Buffer the reduced fragment
1173         rmsgs.push_back (newmsg);
1174         // If all the fragments have been reduced
1175         if (redInfo.npProcessed == nFrags) {
1176             // Combine the fragments
1177             newmsg = combineFrags (id, entry, redInfo);
1178             // Set the reference number based on the user flag at the contribute call
1179             CkSetRefNum(newmsg, userFlag);
1180             // Trigger the appropriate reduction client
1181             if ( !msg_cb.isInvalid() )
1182                 msg_cb.send(newmsg);
1183             else if (redInfo.storedCallback != NULL)
1184                 redInfo.storedCallback->send(newmsg);
1185             else if (redInfo.storedClient != NULL) {
1186                 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
1187                 delete newmsg;
1188             }
1189             else
1190                 CmiAbort("Did you forget to register a reduction client?");
1191
1192             DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1193             //
1194             if (currentTreeUp) {
1195                 if (entry->oldc) {
1196                     // free old tree on same processor;
1197                     mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1198                     entry->oldc = NULL;
1199                 }
1200                 if (entry->hasOldtree()) {
1201                     // free old tree on old processor
1202                     int oldpe = entry->oldtree.pe;
1203                     mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1204                     entry->oldtree.clear();
1205                 }
1206             }
1207             // Indicate if a tree rebuild is required
1208             if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1209         }
1210     }
1211 }
1212
1213
1214
1215 /**
1216  * Called from:
1217  *   - contribute(): calls PE specified in the cookie
1218  *   - reduceFragment(): calls parent PE
1219  *   - recvRedMsg(): calls root PE (if tree is obsolete)
1220  *   - releaseFutureRedMsgs: calls this PE
1221  *   - releaseBufferedRedMsgs: calls root PE
1222  */
1223 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1224 {
1225     int i;
1226     /// Grab the section info embedded in the redn msg
1227     CkSectionInfo id = msg->sid;
1228     /// ... and get at the ptr which shows me which cookie to use
1229     mCastEntry *entry = (mCastEntry *)id.get_val();
1230     CmiAssert(entry!=NULL);
1231
1232     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1233
1234     int updateReduceNo = 0;
1235
1236     //-------------------------------------------------------------------------
1237     /// If this cookie is obsolete
1238     if (entry->isObsolete()) {
1239         // Send up to root
1240         DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.get_pe()));
1241
1242         /// If I am the root, traverse the linked list of cookies to get the latest
1243         if (!entry->hasParent()) {
1244             mCastEntry *newentry = entry->newc;
1245             while (newentry && newentry->newc) newentry=newentry->newc;
1246             if (newentry) entry = newentry;
1247             CmiAssert(entry!=NULL);
1248         }
1249
1250         ///
1251         if (!entry->hasParent() && !entry->isObsolete()) {
1252             /// Indicate it is not on old spanning tree
1253             msg->sourceFlag = 0;
1254             /// Flag the redn as coming from an old tree and that the new entry cookie needs to know the new redn num.
1255             updateReduceNo  = 1;
1256         }
1257         /// If I am not the root or this latest cookie is also obsolete
1258         else {
1259             // Ensure that you're here with reason
1260             CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1261             // Edit the msg so that the recipient knows where to find its cookie
1262             msg->sid = entry->rootSid;
1263             msg->sourceFlag = 0;
1264             // Send the msg directly to the root of the redn tree
1265             mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1266             return;
1267         }
1268     }
1269
1270     /// Grab the locally stored redn info
1271     reductionInfo &redInfo = entry->red;
1272
1273     //-------------------------------------------------------------------------
1274     /// If you've received a msg from a previous redn, something has gone horribly wrong somewhere!
1275     if (msg->redNo < redInfo.redNo) {
1276         CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1277         CmiAbort("Could never happen! \n");
1278     }
1279
1280     //-------------------------------------------------------------------------
1281     /// If the current tree is not yet ready or if you've received a msg for a future redn, buffer the msg
1282     if (entry->notReady() || msg->redNo > redInfo.redNo) {
1283         DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1284         redInfo.futureMsgs.push_back(msg);
1285         return;
1286     }
1287
1288     //-------------------------------------------------------------------------
1289     const int index = msg->fragNo;
1290     // New contribution from an ArrayElement
1291     if (msg->sourceFlag == 1) {
1292         redInfo.lcount [index] ++;
1293     }
1294     // Redn from a child
1295     if (msg->sourceFlag == 2) {
1296         redInfo.ccount [index] ++;
1297     }
1298     // Total elems that have contributed the indexth fragment
1299     redInfo.gcount [index] += msg->gcount;
1300
1301     // Check if message is of proper size
1302     if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
1303     CmiAbort("Reduction data are not of same length!");
1304
1305     //-------------------------------------------------------------------------
1306     // Buffer the msg
1307     redInfo.msgs [index].push_back(msg);
1308
1309     //-------------------------------------------------------------------------
1310     /// Flag if this fragment can be reduced (if all local elements and children have contributed this fragment)
1311     int currentTreeUp = 0;
1312     if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
1313         currentTreeUp = 1;
1314
1315     /// Flag (only at the redn root) if all array elements contributed all their fragments
1316     int mixTreeUp = 0;
1317     if (!entry->hasParent()) {
1318         mixTreeUp = 1;
1319         for (int i=0; i<msg->nFrags; i++)
1320             if (entry->allElem.length() != redInfo.gcount [i])
1321                 mixTreeUp = 0;
1322     }
1323
1324     //-------------------------------------------------------------------------
1325     /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements
1326     if (currentTreeUp || mixTreeUp)
1327     {
1328         const int nFrags = msg->nFrags;
1329         /// Reduce this fragment
1330         reduceFragment (index, id, entry, redInfo, currentTreeUp);
1331
1332         // If migration happened, and my sub-tree reconstructed itself,
1333         // share the current reduction number with myself and all my children
1334         if (updateReduceNo)
1335             mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1336
1337         /// If all the fragments for the current reduction have been processed
1338         if (redInfo.npProcessed == nFrags) {
1339
1340             /// Increment the reduction number in all of this section's cookies
1341             entry->incReduceNo();
1342
1343             /// Reset bookkeeping counters
1344             for (i=0; i<nFrags; i++) {
1345                 redInfo.lcount [i] = 0;
1346                 redInfo.ccount [i] = 0;
1347                 redInfo.gcount [i] = 0;
1348             }
1349             redInfo.npProcessed = 0;
1350             /// Now that, the current redn is done, release any pending msgs from future redns
1351             releaseFutureReduceMsgs(entry);
1352         }
1353     }
1354 }
1355
1356
1357
1358 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1359 {
1360   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1361
1362   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1363     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1364     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1365   }
1366   entry->red.futureMsgs.length() = 0;
1367 }
1368
1369
1370
1371 // these messages have to be sent to root
1372 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1373 {
1374   int i;
1375   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1376
1377   for (int j=0; j<MAXFRAGS; j++) {
1378     for (i=0; i<entry->red.msgs[j].length(); i++) {
1379       CkReductionMsg *msg = entry->red.msgs[j][i];
1380       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1381       msg->sid = entry->rootSid;
1382       msg->sourceFlag = 0;
1383       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1384     }
1385     entry->red.msgs[j].length() = 0;
1386   }
1387
1388
1389   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1390     CkReductionMsg *msg = entry->red.futureMsgs[i];
1391     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1392     msg->sid = entry->rootSid;
1393     msg->sourceFlag = 0;
1394     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1395   }
1396   entry->red.futureMsgs.length() = 0;
1397 }
1398
1399
1400
1401 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1402 {
1403   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1404   if (entry->red.redNo < red)
1405     entry->red.redNo = red;
1406
1407   CProxy_CkMulticastMgr mp(thisgroup);
1408   for (int i=0; i<entry->children.length(); i++) {
1409     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1410   }
1411
1412   releaseFutureReduceMsgs(entry);
1413 }
1414
1415 #include "CkMulticast.def.h"
1416