ckmulticast: more docs for sendToSection
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19 #include "XArraySectionReducer.h"
20
21 #define DEBUGF(x)  // CkPrintf x;
22
23 // turn on or off fragmentation in multicast
24 #define SPLIT_MULTICAST  1
25
26 // maximum number of fragments into which a message can be broken
27 #define MAXFRAGS 100
28
29 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
30 typedef CkVec<CkArrayIndex>   arrayIndexList;
31 typedef CkVec<CkSectionInfo>     sectionIdList;
32 typedef CkVec<CkReductionMsg *>  reductionMsgs;
33 typedef CkQ<int>                 PieceSize;
34 typedef CkVec<LDObjid>          ObjKeyList;
35 typedef unsigned char            byte;
36
37 /** Information about the status of reductions proceeding along a given section
38  *
39  * An instance of this class is stored in every mCastEntry object making it possible
40  * to track redn operations on a per section basis all along the spanning tree.
41  */
42 class reductionInfo {
43     public:
44         /// Number of local array elements which have contributed a given fragment
45         int lcount [MAXFRAGS];
46         /// Number of child vertices (NOT array elements) that have contributed a given fragment
47         int ccount [MAXFRAGS];
48         /// The total number of array elements that have contributed so far to a given fragment
49         int gcount [MAXFRAGS];
50         /// The number of fragments processed so far
51         int npProcessed;
52         /// User callback
53         CkCallback *storedCallback;
54         /// User reduction client function
55         redClientFn storedClient;
56         /// User provided data for the redn client function
57         void *storedClientParam;
58         /// Reduction sequence number
59         int redNo;
60         /// Messages for this reduction
61         reductionMsgs  msgs [MAXFRAGS];
62         /// Messages of future reductions
63         reductionMsgs futureMsgs;
64
65     public:
66         reductionInfo(): storedCallback(NULL),
67                          storedClientParam(NULL),
68                          redNo(0),
69                          npProcessed(0) {
70             for (int i=0; i<MAXFRAGS; i++)
71                 lcount [i] = ccount [i] = gcount [i] = 0;
72         }
73 };
74
75 /// cookie status
76 #define COOKIE_NOTREADY 0
77 #define COOKIE_READY    1
78 #define COOKIE_OLD      2
79
80 class mCastPacket {
81 public:
82   CkSectionInfo cookie;
83   int n;
84   char *data;
85   int seqno;
86   int count;
87   int totalsize;
88
89   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
90                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
91 };
92
93 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
94
95 class SectionLocation {
96 public:
97   mCastEntry *entry;
98   int         pe;
99 public:
100   SectionLocation(): entry(NULL), pe(-1) {}
101   SectionLocation( mCastEntry *e, int p) { set(e, p); }
102   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
103   inline void clear() { entry = NULL; pe = -1; }
104 };
105
106
107
108
109 /// Cookie for an array section 
110 class mCastEntry 
111 {
112     public:
113         /// Array ID 
114         CkArrayID     aid;
115         /// Spanning tree parent
116         CkSectionInfo parentGrp;
117         /// List of direct children
118         sectionIdList children;
119         /// Number of direct children
120         int numChild;
121         /// List of all tree member array indices (Only useful on the tree root)
122         arrayIndexList allElem;
123         /// Only useful on root for LB
124         ObjKeyList     allObjKeys;
125         /// List of array elements on local PE
126         arrayIndexList localElem;
127         /// Should always be myPE
128         int pe;
129         /// Section ID of the root
130         CkSectionInfo rootSid;
131         multicastGrpMsgBuf msgBuf;
132         /// Buffer storing the pending packets
133         multicastGrpPacketBuf packetBuf;
134         /// For multicast packetization
135         char *asm_msg;
136         int   asm_fill;
137         /// Linked list of entries on the same processor
138         mCastEntry *oldc, *newc;
139         /// Old spanning tree
140         SectionLocation   oldtree;
141         // for reduction
142         reductionInfo red;
143         //
144         char needRebuild;
145     private:
146         char flag;
147     public:
148         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
149                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
150         mCastEntry(mCastEntry *);
151         /// Check if this tree is only a branch and has a parent
152         inline int hasParent() { return parentGrp.get_val()?1:0; }
153         /// Is this tree obsolete?
154         inline int isObsolete() { return (flag == COOKIE_OLD); }
155         /// Make the current tree obsolete
156         inline void setObsolete() { flag=COOKIE_OLD; }
157         /// Check if this (branch of the) tree is ready for use
158         inline int notReady() { return (flag == COOKIE_NOTREADY); }
159         /// Mark this (branch of the) tree as ready for use
160         inline void setReady() { flag=COOKIE_READY; }
161         /// Increment the reduction number across the whole linked list of cookies
162         inline void incReduceNo() {
163             red.redNo ++;
164             for (mCastEntry *next = newc; next; next=next->newc) 
165                 next->red.redNo++;
166         }
167         /// Get a handle on the array ID this tree is a member of
168         inline CkArrayID getAid() { return aid; }
169         inline int hasOldtree() { return oldtree.entry != NULL; }
170         inline void print() {
171             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
172         }
173 };
174
175
176
177
178 class cookieMsg: public CMessage_cookieMsg {
179 public:
180   CkSectionInfo cookie;
181 public:
182   cookieMsg() {};
183   cookieMsg(CkSectionInfo m): cookie(m) {};
184 };
185
186
187
188
189 /// multicast tree setup message
190 class multicastSetupMsg: public CMessage_multicastSetupMsg {
191 public:
192   int  nIdx;
193   CkArrayIndex *arrIdx;
194   int      *lastKnown;
195   CkSectionInfo parent;
196   CkSectionInfo rootSid;
197   int redNo;
198 };
199
200
201
202
203 /// message send in spanning tree
204 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
205 };
206
207
208 extern void CkPackMessage(envelope **pEnv);
209 extern void CkUnpackMessage(envelope **pEnv);
210
211
212
213 void _ckMulticastInit(void)
214 {
215 /*
216   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
217   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
218 */
219 }
220
221
222 mCastEntry::mCastEntry (mCastEntry *old): 
223   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
224 {
225   int i;
226   aid = old->aid;
227   parentGrp = old->parentGrp;
228   for (i=0; i<old->allElem.length(); i++)
229     allElem.push_back(old->allElem[i]);
230 #if CMK_LBDB_ON
231   CmiAssert(old->allElem.length() == old->allObjKeys.length());
232   for (i=0; i<old->allObjKeys.length(); i++)
233     allObjKeys.push_back(old->allObjKeys[i]);
234 #endif
235   pe = old->pe;
236   red.storedCallback = old->red.storedCallback;
237   red.storedClient = old->red.storedClient;
238   red.storedClientParam = old->red.storedClientParam;
239   red.redNo = old->red.redNo;
240   needRebuild = 0;
241   asm_msg = NULL;
242   asm_fill = 0;
243 }
244
245 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
246
247
248
249
250 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n)
251 {
252     // Create a multicast entry
253     mCastEntry *entry = new mCastEntry(aid);
254     // Push all the section member indices into the entry
255     for (int i=0; i<n; i++) {
256         entry->allElem.push_back(al[i]);
257 #if CMK_LBDB_ON
258         const LDObjid key = idx2LDObjid(al[i]);
259         entry->allObjKeys.push_back(key);
260 #endif
261     }
262     //  entry->aid = aid;
263     _id.get_aid() = aid;
264     _id.get_val() = entry;              // allocate table for this section
265     // 
266     initCookie(_id);
267 }
268
269
270
271
272 void CkMulticastMgr::setSection(CkSectionInfo &id)
273 {
274   initCookie(id);
275 }
276
277
278
279
280 /// @warning: This is deprecated
281 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
282 {
283   CkArrayID aid = proxy.ckGetArrayID();
284   CkSectionInfo &_id = proxy.ckGetSectionInfo();
285
286   mCastEntry *entry = new mCastEntry(aid);
287
288   const CkArrayIndex *al = proxy.ckGetArrayElements();
289   for (int i=0; i<proxy.ckGetNumElements(); i++) {
290     entry->allElem.push_back(al[i]);
291 #if CMK_LBDB_ON
292     const LDObjid key = idx2LDObjid(al[i]);
293     entry->allObjKeys.push_back(key);
294 #endif
295   }
296   _id.get_type() = MulticastMsg;
297   _id.get_aid() = aid;
298   _id.get_val() = entry;                // allocate table for this section
299   initCookie(_id);
300 }
301
302
303
304
305 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
306 {
307   CkSectionInfo &info = proxy.ckGetSectionInfo();
308
309   int oldpe = info.get_pe();
310   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
311
312   CkArrayID aid = proxy.ckGetArrayID();
313   CkSectionID *sid = proxy.ckGetSectionIDs();
314   mCastEntry *entry = new mCastEntry(aid);
315
316   mCastEntry *oldentry = (mCastEntry *)info.get_val();
317   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
318
319   const CkArrayIndex *al = sid->_elems;
320   CmiAssert(info.get_aid() == aid);
321   prepareCookie(entry, *sid, al, sid->_nElems, aid);
322
323   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
324
325     // store old tree info
326   entry->oldtree.set(oldentry, oldpe);
327
328     // obsolete old tree
329   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
330
331   // find reduction number
332   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
333 }
334
335
336
337
338 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
339 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndex *al, int count, CkArrayID aid)
340 {
341   for (int i=0; i<count; i++) {
342     entry->allElem.push_back(al[i]);
343 #if CMK_LBDB_ON
344     const LDObjid key = idx2LDObjid(al[i]);
345     entry->allObjKeys.push_back(key);
346 #endif
347   }
348   sid._cookie.get_type() = MulticastMsg;
349   sid._cookie.get_aid() = aid;
350   sid._cookie.get_val() = entry;        // allocate table for this section
351   sid._cookie.get_pe() = CkMyPe();
352 }
353
354
355
356
357 // this is used
358 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
359 {
360   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
361   int numSubSections = proxy->ckGetNumSubSections();
362   for (int i=0; i<numSubSections; i++)
363   {
364       CkArrayID aid = proxy->ckGetArrayIDn(i);
365       mCastEntry *entry = new mCastEntry(aid);
366       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
367       const CkArrayIndex *al = proxy->ckGetArrayElements(i);
368       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
369       initCookie(sid->_cookie);
370   }
371 }
372
373
374
375
376 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
377 {
378   mCastEntry *entry = (mCastEntry *)s.get_val();
379   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
380   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
381 }
382
383 // now that we get reduction number from the old cookie,
384 // we continue to build the spanning tree
385 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
386 {
387   mCastEntry *entry = (mCastEntry *)s.get_val();
388   entry->red.redNo = red;
389
390   initCookie(s);
391
392   // TODO delete old tree
393 }
394
395
396
397
398 void CkMulticastMgr::initCookie(CkSectionInfo s)
399 {
400     mCastEntry *entry = (mCastEntry *)s.get_val();
401     int n = entry->allElem.length();
402     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
403     // Create and initialize a setup message
404     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
405     msg->nIdx = n;
406     msg->parent = CkSectionInfo(entry->getAid());
407     msg->rootSid = s;
408     msg->redNo = entry->red.redNo;
409     // Fill the message with the section member indices and their last known locations
410     CkArray *array = CProxy_ArrayBase(s.get_aid()).ckLocalBranch();
411     for (int i=0; i<n; i++) {
412       msg->arrIdx[i] = entry->allElem[i];
413       int ape = array->lastKnown(entry->allElem[i]);
414       CmiAssert(ape >=0 && ape < CkNumPes());
415       msg->lastKnown[i] = ape;
416     }
417     // Trigger the spanning tree build
418     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
419     mCastGrp[CkMyPe()].setup(msg);
420 }
421
422
423
424
425 void CkMulticastMgr::teardown(CkSectionInfo cookie)
426 {
427     int i;
428     mCastEntry *sect = (mCastEntry *)cookie.get_val();
429     // Mark this section as obsolete
430     sect->setObsolete();
431     // Release the buffered messages 
432     releaseBufferedReduceMsgs(sect);
433     // Propagate the teardown to each of your children
434     CProxy_CkMulticastMgr mp(thisgroup);
435     for (i=0; i<sect->children.length(); i++)
436         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
437 }
438
439
440
441
442 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
443 {
444     int i;
445     mCastEntry *sect = (mCastEntry *)cookie.get_val();
446     // Reset the root section info
447     sect->rootSid = newroot;
448     // Mark this section as obsolete
449     sect->setObsolete();
450     // Release the buffered messages 
451     releaseBufferedReduceMsgs(sect);
452     // Propagate the teardown to each of your children
453     CProxy_CkMulticastMgr mp(thisgroup);
454     for (i=0; i<sect->children.length(); i++)
455         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
456 }
457
458
459
460
461 void CkMulticastMgr::freeup(CkSectionInfo cookie)
462 {
463   mCastEntry *sect = (mCastEntry *)cookie.get_val();
464   CProxy_CkMulticastMgr mp(thisgroup);
465   // Parse through all the section members on this PE and...
466   while (sect) 
467   {
468       // Free their children
469       for (int i=0; i<sect->children.length(); i++)
470           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
471       // Free the cookie itself
472       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
473       mCastEntry *oldc= sect->oldc;
474       delete sect;
475       sect = oldc;
476   }
477 }
478
479
480
481
482 void CkMulticastMgr::setup(multicastSetupMsg *msg)
483 {
484     int i,j;
485     mCastEntry *entry;
486     CkArrayID aid = msg->rootSid.get_aid();
487     if (msg->parent.get_pe() == CkMyPe()) 
488       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
489     else 
490       entry = new mCastEntry(aid);
491     entry->aid = aid;
492     entry->pe = CkMyPe();
493     entry->rootSid = msg->rootSid;
494     entry->parentGrp = msg->parent;
495
496     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
497     entry->red.redNo = msg->redNo;
498
499     // Create a numPE sized array of vectors to hold the array elements in each PE
500     int numpes = CkNumPes();
501     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
502     // Sort each array index in the setup message based on last known location
503     for (i=0; i<msg->nIdx; i++) 
504     {
505       int lastKnown = msg->lastKnown[i];
506       // If msg->arrIdx[i] local, add it to a special local element list
507       if (lastKnown == CkMyPe())
508           entry->localElem.insertAtEnd(msg->arrIdx[i]);
509       // else, add it to the list corresponding to its PE
510       else
511           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
512     }
513
514     CkVec<int> mySubTreePEs;
515     mySubTreePEs.reserve(numpes);
516     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
517     mySubTreePEs.push_back(CkMyPe());
518     // Identify the child PEs in the tree, ie the PEs with section members on them
519     for (i=0; i<numpes; i++) 
520     {
521       if (i==CkMyPe()) continue;
522       if (lists[i].size()) 
523           mySubTreePEs.push_back(i);
524     }
525     // The number of multicast children can be limited by the spanning tree factor 
526     int num = mySubTreePEs.size() - 1, numchild = 0;
527     if (factor <= 0) numchild = num;
528     else numchild = num<factor?num:factor;
529   
530     entry->numChild = numchild;
531
532     // If there are any children, go about building a spanning tree
533     if (numchild) 
534     {
535         // Build the next generation of the spanning tree rooted at my PE
536         int *peListPtr = mySubTreePEs.getVec();
537         topo::SpanningTreeVertex *nextGenInfo;
538         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
539         //CkAssert(nextGenInfo->childIndex.size() == numchild);
540         numchild = nextGenInfo->childIndex.size();
541         entry->numChild = numchild;
542
543         // Distribute the section members across the number of direct children (branches)
544         // Direct children are simply the first section member in each of the branch lists
545         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
546
547         // For each direct child, collate indices of all section members on the PEs in that branch
548         for (i=0; i < numchild; i++)
549         {
550             // Determine the indices of the first and last PEs in this branch of my sub-tree
551             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
552             if (i < numchild-1)
553                 childEndIndex = nextGenInfo->childIndex[i+1];
554             else
555                 childEndIndex = mySubTreePEs.size();
556             // For each PE in this branch, add the section members on that PE to a list
557             for (j = childStartIndex; j < childEndIndex; j++)
558             {
559                 int pe = mySubTreePEs[j];
560                 for (int k=0; k<lists[pe].size(); k++)
561                     slots[i].push_back(lists[pe][k]);
562             }
563         }
564
565         // Ask each of your direct children to setup their branches
566         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
567         for (i=0; i<numchild; i++) 
568         {
569             // Give each child info about the number, indices and location of its children
570             int n = slots[i].length();
571             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
572             m->parent = CkSectionInfo(aid, entry);
573             m->nIdx = slots[i].length();
574             m->rootSid = msg->rootSid;
575             m->redNo = msg->redNo;
576             for (j=0; j<slots[i].length(); j++) 
577             {
578                 m->arrIdx[j] = slots[i][j].idx;
579                 m->lastKnown[j] = slots[i][j].pe;
580             }
581             int childroot = slots[i][0].pe;
582             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
583             // Send the message to the child
584             mCastGrp[childroot].setup(m);
585         }
586         delete [] slots;
587         delete nextGenInfo;
588     }
589     // else, tell yourself that your children are ready
590     else 
591     {
592         childrenReady(entry);
593     }
594     delete [] lists;
595     delete msg;
596 }
597
598
599
600
601 void CkMulticastMgr::childrenReady(mCastEntry *entry)
602 {
603     // Mark this entry as ready
604     entry->setReady();
605     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
606     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
607     if (entry->hasParent()) 
608         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
609 #if SPLIT_MULTICAST
610     // clear packet buffer
611     while (!entry->packetBuf.isEmpty()) 
612     {
613         mCastPacket *packet = entry->packetBuf.deq();
614         packet->cookie.get_val() = entry;
615         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
616         delete [] packet->data;
617         delete packet;
618     }
619 #endif
620     // clear msg buffer
621     while (!entry->msgBuf.isEmpty()) 
622     {
623         multicastGrpMsg *newmsg = entry->msgBuf.deq();
624         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
625         newmsg->_cookie.get_val() = entry;
626         mCastGrp[CkMyPe()].recvMsg(newmsg);
627     }
628     // release reduction msgs
629     releaseFutureReduceMsgs(entry);
630 }
631
632
633
634
635 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
636 {
637   mCastEntry *entry = (mCastEntry *)sid.get_val();
638   entry->children.push_back(child);
639   if (entry->children.length() == entry->numChild) {
640     childrenReady(entry);
641   }
642 }
643
644
645
646
647 // rebuild is called when root not migrated
648 // when rebuilding, all multicast msgs will be buffered.
649 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
650 {
651   // tear down old tree
652   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
653   CkAssert(curCookie->pe == CkMyPe());
654   // make sure I am the newest one
655   while (curCookie->newc) curCookie = curCookie->newc;
656   if (curCookie->isObsolete()) return;
657
658   //CmiPrintf("tree rebuild\n");
659   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
660
661   // build a chain
662   newCookie->oldc = curCookie;
663   curCookie->newc = newCookie;
664
665   sectId.get_val() = newCookie;
666
667   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
668
669   curCookie->setObsolete();
670
671   resetCookie(sectId);
672 }
673
674 void CkMulticastMgr::resetCookie(CkSectionInfo s)
675 {
676   mCastEntry *newCookie = (mCastEntry*)s.get_val();
677   mCastEntry *oldCookie = newCookie->oldc;
678
679   // get rid of old one
680   DEBUGF(("reset: oldc: %p\n", oldCookie));
681   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
682   int mype = CkMyPe();
683   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
684
685   // build a new one
686   initCookie(s);
687 }
688
689 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
690 {
691   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
692     // set an invalid cookie since we don't have it
693   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
694   for (int i=0; i< sid._nElems-1; i++) {
695      CProxyElement_ArrayBase ap(a, sid._elems[i]);
696      void *newMsg=CkCopyMsg((void **)&m);
697      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
698   }
699   if (sid._nElems > 0) {
700      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
701      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
702   }
703 }
704
705 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
706 {
707     for (int snum = 0; snum < nsid; snum++) {
708         void *msgCopy = m;
709         if (nsid - snum > 1)
710             msgCopy = CkCopyMsg(&m);
711         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
712     }
713 }
714
715
716
717 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
718 {
719   DEBUGF(("ArraySectionSend\n"));
720   multicastGrpMsg *msg = (multicastGrpMsg *)m;
721   msg->ep = ep;
722   CkSectionInfo &s = sid->_cookie;
723   mCastEntry *entry;
724
725   // If this section is rooted at this PE
726   if (s.get_pe() == CkMyPe()) {
727     entry = (mCastEntry *)s.get_val();   
728     if (NULL == entry)
729       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
730
731     // update entry pointer in case there is a newer one.
732     if (entry->newc) {
733       do { entry=entry->newc; } while (entry->newc);
734       s.get_val() = entry;
735     }
736
737 #if CMK_LBDB_ON
738     // fixme: running obj?
739     envelope *env = UsrToEnv(msg);
740     const LDOMHandle &om = CProxy_ArrayBase(s.get_aid()).ckLocMgr()->getOMHandle();
741     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
742 #endif
743
744     // The first time we need to rebuild the spanning tree, we do p2p sends to refresh lastKnown
745     if (entry->needRebuild == 1) {
746       msg->_cookie = s;
747       SimpleSend(ep, msg, s.get_aid(), *sid, opts);
748       entry->needRebuild = 2;
749       return;
750     }
751     // else the second time, we just rebuild cos now we'll have all the lastKnown PEs
752     else if (entry->needRebuild == 2) rebuild(s);
753   }
754   // else, if the root has migrated, we have a sub-optimal mcast
755   else {
756     // fixme - in this case, not recorded in LB
757     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
758   }
759
760   // update cookie
761   msg->_cookie = s;
762
763 #if SPLIT_MULTICAST
764   // split multicast msg into SPLIT_NUM copies
765   register envelope *env = UsrToEnv(m);
766   CkPackMessage(&env);
767   int totalsize = env->getTotalsize();
768   int packetSize = 0;
769   int totalcount = 0;
770   if(totalsize < split_threshold){
771     packetSize = totalsize;
772     totalcount = 1;
773   }else{
774     packetSize = split_size;
775     totalcount = totalsize/split_size;
776     if(totalsize%split_size) totalcount++; 
777     //packetSize = totalsize/SPLIT_NUM;
778     //if (totalsize%SPLIT_NUM) packetSize ++;
779     //totalcount = SPLIT_NUM;
780   }
781   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
782   int sizesofar = 0;
783   char *data = (char*) env;
784   if (totalcount == 1) {
785     // If the root of this section's tree is on this PE, then just propagate msg
786     if (s.get_pe() == CkMyPe()) {
787       CkUnpackMessage(&env);
788       msg = (multicastGrpMsg *)EnvToUsr(env);
789       recvMsg(msg);
790     }
791     // else send msg to root of section's spanning tree
792     else {
793       CProxy_CkMulticastMgr  mCastGrp(thisgroup);
794       msg = (multicastGrpMsg *)EnvToUsr(env);
795       mCastGrp[s.get_pe()].recvMsg(msg);
796     }
797     return;
798   }
799   for (int i=0; i<totalcount; i++) {
800     int mysize = packetSize;
801     if (mysize + sizesofar > totalsize) {
802       mysize = totalsize-sizesofar;
803     }
804     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
805     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
806     sizesofar += mysize;
807     data += mysize;
808   }
809   CmiFree(env);
810 #else
811   if (s.get_pe() == CkMyPe()) {
812     recvMsg(msg);
813   }
814   else {
815     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
816     mCastGrp[s.get_pe()].recvMsg(msg);
817   }
818 #endif
819 }
820
821 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
822 {
823   int i;
824   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
825
826
827   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
828     char *newdata = new char[n];
829     memcpy(newdata, data, n);
830     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
831 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
832     return;
833   }
834
835 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
836
837   // send to spanning tree children
838   // can not optimize using list send because the difference in cookie
839   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
840   for (i=0; i<entry->children.length(); i++) {
841     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
842   }
843
844   if (seqno == 0) {
845     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
846       entry->print();
847       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
848     }
849     entry->asm_msg = (char *)CmiAlloc(totalsize);
850   }
851   memcpy(entry->asm_msg+entry->asm_fill, data, n);
852   entry->asm_fill += n;
853   if (seqno + 1 == count) {
854     CmiAssert(entry->asm_fill == totalsize);
855     CkUnpackMessage((envelope **)&entry->asm_msg);
856     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
857     msg->_cookie = _cookie;
858 //    mCastGrp[CkMyPe()].recvMsg(msg);
859     sendToLocal(msg);
860     entry->asm_msg = NULL;
861     entry->asm_fill = 0;
862   }
863 //  if (fromBuffer) delete [] data;
864 }
865
866 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
867 {
868   int i;
869   CkSectionInfo &sectionInfo = msg->_cookie;
870   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
871   CmiAssert(entry->getAid() == sectionInfo.get_aid());
872
873   if (entry->notReady()) {
874     DEBUGF(("entry not ready, enq buffer %p\n", msg));
875     entry->msgBuf.enq(msg);
876     return;
877   }
878
879   // send to spanning tree children
880   // can not optimize using list send because the difference in cookie
881   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
882   for (i=0; i<entry->children.length(); i++) {
883     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
884     newmsg->_cookie = entry->children[i];
885     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
886   }
887
888   sendToLocal(msg);
889 }
890
891 void CkMulticastMgr::sendToLocal(multicastGrpMsg *msg)
892 {
893   int i;
894   CkSectionInfo &sectionInfo = msg->_cookie;
895   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
896   CmiAssert(entry->getAid() == sectionInfo.get_aid());
897
898   // send to local
899   int nLocal = entry->localElem.length();
900   DEBUGF(("send to local %d\n", nLocal));
901   for (i=0; i<nLocal-1; i++) {
902     CProxyElement_ArrayBase ap(sectionInfo.get_aid(), entry->localElem[i]);
903     if (_entryTable[msg->ep]->noKeep) {
904       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.get_aid(), entry->localElem[i], CK_MSG_KEEP);
905     }
906     else {
907       // send through scheduler queue
908       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
909       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
910     }
911     // use CK_MSG_DONTFREE so that the message can be reused
912     // the drawback of this scheme bypassing queue is that 
913     // if # of local element is huge, this leads to a long time occupying CPU
914     // also load balancer seems not be able to correctly instrument load
915 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
916     //CmiNetworkProgressAfter(3);
917   }
918   if (nLocal) {
919     CProxyElement_ArrayBase ap(sectionInfo.get_aid(), entry->localElem[nLocal-1]);
920     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
921 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
922   }
923   else {
924     CkAssert (entry->rootSid.get_pe() == CkMyPe());
925     delete msg;
926   }
927 }
928
929
930
931 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
932 {
933   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
934   if (CkMcastBaseMsg::checkMagic(m) == 0) {
935     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
936     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
937   }
938   // ignore invalid cookie sent by SimpleSend
939   if (m->gpe() != -1) {
940     id.get_type() = MulticastMsg;
941     id.get_pe() = m->gpe();
942     id.get_val() = m->cookie();
943   }
944   // note: retain old redNo
945 }
946
947 // Reduction
948
949 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
950 {
951   CkCallback *sectionCB;
952   int numSubSections = proxy.ckGetNumSubSections();
953   // If its a cross-array section,
954   if (numSubSections > 1)
955   {
956       /** @warning: Each instantiation is a mem leak! :o
957        * The class is trivially small, but there's one instantiation for each
958        * section delegated to CkMulticast. The objects need to live as long as
959        * their section exists and is used. The idea of 'destroying' an array
960        * section is still academic, and hence no effort has been made to charge
961        * some 'owner' entity with the task of deleting this object.
962        *
963        * Reimplementing delegated x-array reductions will make this consideration moot
964        */
965       // Configure the final cross-section reducer
966       ck::impl::XArraySectionReducer *red =
967           new ck::impl::XArraySectionReducer(numSubSections, cb);
968       // Configure the subsection callback to deposit with the final reducer
969       sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
970   }
971   // else, just direct the reduction to the actual client cb
972   else
973       sectionCB = cb;
974   // Wire the sections together by storing the subsection cb in each sectionID
975   for (int i=0; i<numSubSections; i++)
976   {
977       CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
978       mCastEntry *entry = (mCastEntry *)sInfo.get_val();
979       entry->red.storedCallback = sectionCB;
980   }
981 }
982
983 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
984 {
985   CkSectionInfo &id = proxy.ckGetSectionInfo();
986   mCastEntry *entry = (mCastEntry *)id.get_val();
987   entry->red.storedClient = fn;
988   entry->red.storedClientParam = param;
989 }
990
991 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
992 {
993   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
994   msg->reducer = type;
995   msg->sid = id;
996   msg->sourceFlag = 1;   // from array element
997   msg->redNo = id.get_redNo();
998   msg->gcount = 1;
999   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
1000   msg->callback = cb;
1001   msg->userFlag=userFlag;
1002   return msg;
1003 }
1004
1005
1006
1007 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
1008 {
1009   CkCallback cb;
1010   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
1011 }
1012
1013
1014 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
1015 {
1016   if (id.get_val() == NULL || id.get_redNo() == -1) 
1017     CmiAbort("contribute: SectionID is not initialized\n");
1018
1019   int nFrags;
1020   if (-1 == fragSize) {         // no frag
1021     nFrags = 1;
1022     fragSize = dataSize;
1023   }
1024   else {
1025     CmiAssert (dataSize >= fragSize);
1026     nFrags = dataSize/fragSize;
1027     if (dataSize%fragSize) nFrags++;
1028   }
1029
1030   if (MAXFRAGS < nFrags) {
1031     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
1032     CmiAbort ("frag size too small\n");
1033   }
1034
1035   int mpe = id.get_pe();
1036   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1037
1038   // break the message into k-piece fragments
1039   int fSize = fragSize;
1040   for (int i=0; i<nFrags; i++) {
1041     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
1042       fSize = dataSize%fragSize;
1043     }
1044
1045     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
1046
1047     // initialize the new msg
1048     msg->reducer            = type;
1049     msg->sid                = id;
1050     msg->nFrags             = nFrags;
1051     msg->fragNo             = i;
1052     msg->sourceFlag         = 1;
1053     msg->redNo              = id.get_redNo();
1054     msg->gcount             = 1;
1055     msg->rebuilt            = (mpe == CkMyPe())?0:1;
1056     msg->callback           = cb;
1057     msg->userFlag           = userFlag;
1058
1059     mCastGrp[mpe].recvRedMsg(msg);
1060
1061     data = (void*)(((char*)data) + fSize);
1062   }
1063
1064   id.get_redNo()++;
1065   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
1066 }
1067
1068 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
1069                                               mCastEntry* entry,
1070                                               reductionInfo& redInfo) {
1071   int i;
1072   int dataSize = 0;
1073   int nFrags   = redInfo.msgs[0][0]->nFrags;
1074
1075   // to avoid memcpy and allocation cost for non-pipelined reductions
1076   if (1 == nFrags) {
1077     CkReductionMsg* msg = redInfo.msgs[0][0];
1078
1079     // free up the msg slot
1080     redInfo.msgs[0].length() = 0;
1081
1082     return msg;
1083   }
1084
1085   for (i=0; i<nFrags; i++) {
1086     dataSize += redInfo.msgs[i][0]->dataSize;
1087   }
1088
1089   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1090
1091   // initialize msg header
1092   msg->redNo      = redInfo.msgs[0][0]->redNo;
1093   msg->reducer    = redInfo.msgs[0][0]->reducer;
1094   msg->sid        = id;
1095   msg->nFrags     = nFrags;
1096
1097   // I guess following fields need not be initialized
1098   msg->sourceFlag = 2;
1099   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1100   msg->callback   = redInfo.msgs[0][0]->callback;
1101   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1102
1103   byte* data = (byte*)msg->getData ();
1104   for (i=0; i<nFrags; i++) {
1105     // copy data from fragments to msg
1106     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1107     data += redInfo.msgs[i][0]->dataSize;
1108
1109     // free fragments
1110     delete redInfo.msgs[i][0];
1111     redInfo.msgs[i].length() = 0;    
1112   }
1113
1114   return msg;
1115 }
1116
1117
1118
1119 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1120                                      mCastEntry* entry, reductionInfo& redInfo,
1121                                      int currentTreeUp) {
1122
1123     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1124     reductionMsgs& rmsgs = redInfo.msgs[index];
1125     int dataSize         = rmsgs[0]->dataSize;
1126     int i;
1127     int oldRedNo = redInfo.redNo;
1128     int nFrags   = rmsgs[0]->nFrags;
1129     int fragNo   = rmsgs[0]->fragNo;
1130     int userFlag = rmsgs[0]->userFlag;
1131
1132     // Figure out (from one of the msg fragments) which reducer function to use
1133     CkReduction::reducerType reducer = rmsgs[0]->reducer;
1134     CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1135     CkAssert(NULL != f);
1136
1137     // Check if migration occurred in any of the subtrees, and pick one valid callback
1138     CkCallback msg_cb;
1139     int rebuilt = 0;
1140     for (i=0; i<rmsgs.length(); i++) {
1141         if (rmsgs[i]->rebuilt) rebuilt = 1;
1142         if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1143     }
1144
1145     // Perform the actual reduction
1146     CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
1147     newmsg->redNo  = redInfo.redNo;
1148     newmsg->nFrags = nFrags;
1149     newmsg->fragNo = fragNo;
1150     newmsg->userFlag = userFlag;
1151     newmsg->reducer = reducer;
1152
1153     // Increment the number of fragments processed
1154     redInfo.npProcessed ++;
1155
1156     // Delete all the fragments which are no longer needed
1157     for (i=0; i<rmsgs.length(); i++)
1158         if (rmsgs[i]!=newmsg) delete rmsgs[i];
1159     rmsgs.length() = 0;
1160
1161     // If I am not the tree root
1162     if (entry->hasParent()) {
1163         // send up to parent
1164         newmsg->sid        = entry->parentGrp;
1165         newmsg->sourceFlag = 2;
1166         newmsg->redNo      = oldRedNo; ///< @todo: redundant, duplicate assignment?
1167         newmsg->gcount     = redInfo.gcount [index];
1168         newmsg->rebuilt    = rebuilt;
1169         newmsg->callback   = msg_cb;
1170         DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1171         mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1172     } else {
1173         newmsg->sid = id;
1174         // Buffer the reduced fragment
1175         rmsgs.push_back (newmsg);
1176         // If all the fragments have been reduced
1177         if (redInfo.npProcessed == nFrags) {
1178             // Combine the fragments
1179             newmsg = combineFrags (id, entry, redInfo);
1180             // Set the reference number based on the user flag at the contribute call
1181             CkSetRefNum(newmsg, userFlag);
1182             // Trigger the appropriate reduction client
1183             if ( !msg_cb.isInvalid() )
1184                 msg_cb.send(newmsg);
1185             else if (redInfo.storedCallback != NULL)
1186                 redInfo.storedCallback->send(newmsg);
1187             else if (redInfo.storedClient != NULL) {
1188                 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
1189                 delete newmsg;
1190             }
1191             else
1192                 CmiAbort("Did you forget to register a reduction client?");
1193
1194             DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1195             //
1196             if (currentTreeUp) {
1197                 if (entry->oldc) {
1198                     // free old tree on same processor;
1199                     mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1200                     entry->oldc = NULL;
1201                 }
1202                 if (entry->hasOldtree()) {
1203                     // free old tree on old processor
1204                     int oldpe = entry->oldtree.pe;
1205                     mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1206                     entry->oldtree.clear();
1207                 }
1208             }
1209             // Indicate if a tree rebuild is required
1210             if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1211         }
1212     }
1213 }
1214
1215
1216
1217 /**
1218  * Called from:
1219  *   - contribute(): calls PE specified in the cookie
1220  *   - reduceFragment(): calls parent PE
1221  *   - recvRedMsg(): calls root PE (if tree is obsolete)
1222  *   - releaseFutureRedMsgs: calls this PE
1223  *   - releaseBufferedRedMsgs: calls root PE
1224  */
1225 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1226 {
1227     int i;
1228     /// Grab the section info embedded in the redn msg
1229     CkSectionInfo id = msg->sid;
1230     /// ... and get at the ptr which shows me which cookie to use
1231     mCastEntry *entry = (mCastEntry *)id.get_val();
1232     CmiAssert(entry!=NULL);
1233
1234     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1235
1236     int updateReduceNo = 0;
1237
1238     //-------------------------------------------------------------------------
1239     /// If this cookie is obsolete
1240     if (entry->isObsolete()) {
1241         // Send up to root
1242         DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.pe));
1243
1244         /// If I am the root, traverse the linked list of cookies to get the latest
1245         if (!entry->hasParent()) {
1246             mCastEntry *newentry = entry->newc;
1247             while (newentry && newentry->newc) newentry=newentry->newc;
1248             if (newentry) entry = newentry;
1249             CmiAssert(entry!=NULL);
1250         }
1251
1252         ///
1253         if (!entry->hasParent() && !entry->isObsolete()) {
1254             /// Indicate it is not on old spanning tree
1255             msg->sourceFlag = 0;
1256             /// Flag the redn as coming from an old tree and that the new entry cookie needs to know the new redn num.
1257             updateReduceNo  = 1;
1258         }
1259         /// If I am not the root or this latest cookie is also obsolete
1260         else {
1261             // Ensure that you're here with reason
1262             CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1263             // Edit the msg so that the recipient knows where to find its cookie
1264             msg->sid = entry->rootSid;
1265             msg->sourceFlag = 0;
1266             // Send the msg directly to the root of the redn tree
1267             mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1268             return;
1269         }
1270     }
1271
1272     /// Grab the locally stored redn info
1273     reductionInfo &redInfo = entry->red;
1274
1275     //-------------------------------------------------------------------------
1276     /// If you've received a msg from a previous redn, something has gone horribly wrong somewhere!
1277     if (msg->redNo < redInfo.redNo) {
1278         CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1279         CmiAbort("Could never happen! \n");
1280     }
1281
1282     //-------------------------------------------------------------------------
1283     /// If the current tree is not yet ready or if you've received a msg for a future redn, buffer the msg
1284     if (entry->notReady() || msg->redNo > redInfo.redNo) {
1285         DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1286         redInfo.futureMsgs.push_back(msg);
1287         return;
1288     }
1289
1290     //-------------------------------------------------------------------------
1291     const int index = msg->fragNo;
1292     // New contribution from an ArrayElement
1293     if (msg->sourceFlag == 1) {
1294         redInfo.lcount [index] ++;
1295     }
1296     // Redn from a child
1297     if (msg->sourceFlag == 2) {
1298         redInfo.ccount [index] ++;
1299     }
1300     // Total elems that have contributed the indexth fragment
1301     redInfo.gcount [index] += msg->gcount;
1302
1303     // Check if message is of proper size
1304     if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
1305     CmiAbort("Reduction data are not of same length!");
1306
1307     //-------------------------------------------------------------------------
1308     // Buffer the msg
1309     redInfo.msgs [index].push_back(msg);
1310
1311     //-------------------------------------------------------------------------
1312     /// Flag if this fragment can be reduced (if all local elements and children have contributed this fragment)
1313     int currentTreeUp = 0;
1314     if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
1315         currentTreeUp = 1;
1316
1317     /// Flag (only at the redn root) if all array elements contributed all their fragments
1318     int mixTreeUp = 0;
1319     if (!entry->hasParent()) {
1320         mixTreeUp = 1;
1321         for (int i=0; i<msg->nFrags; i++)
1322             if (entry->allElem.length() != redInfo.gcount [i])
1323                 mixTreeUp = 0;
1324     }
1325
1326     //-------------------------------------------------------------------------
1327     /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements
1328     if (currentTreeUp || mixTreeUp)
1329     {
1330         const int nFrags = msg->nFrags;
1331         /// Reduce this fragment
1332         reduceFragment (index, id, entry, redInfo, currentTreeUp);
1333
1334         // If migration happened, and my sub-tree reconstructed itself,
1335         // share the current reduction number with myself and all my children
1336         if (updateReduceNo)
1337             mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1338
1339         /// If all the fragments for the current reduction have been processed
1340         if (redInfo.npProcessed == nFrags) {
1341
1342             /// Increment the reduction number in all of this section's cookies
1343             entry->incReduceNo();
1344
1345             /// Reset bookkeeping counters
1346             for (i=0; i<nFrags; i++) {
1347                 redInfo.lcount [i] = 0;
1348                 redInfo.ccount [i] = 0;
1349                 redInfo.gcount [i] = 0;
1350             }
1351             redInfo.npProcessed = 0;
1352             /// Now that, the current redn is done, release any pending msgs from future redns
1353             releaseFutureReduceMsgs(entry);
1354         }
1355     }
1356 }
1357
1358
1359
1360 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1361 {
1362   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1363
1364   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1365     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1366     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1367   }
1368   entry->red.futureMsgs.length() = 0;
1369 }
1370
1371
1372
1373 // these messages have to be sent to root
1374 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1375 {
1376   int i;
1377   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1378
1379   for (int j=0; j<MAXFRAGS; j++) {
1380     for (i=0; i<entry->red.msgs[j].length(); i++) {
1381       CkReductionMsg *msg = entry->red.msgs[j][i];
1382       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1383       msg->sid = entry->rootSid;
1384       msg->sourceFlag = 0;
1385       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1386     }
1387     entry->red.msgs[j].length() = 0;
1388   }
1389
1390
1391   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1392     CkReductionMsg *msg = entry->red.futureMsgs[i];
1393     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1394     msg->sid = entry->rootSid;
1395     msg->sourceFlag = 0;
1396     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1397   }
1398   entry->red.futureMsgs.length() = 0;
1399 }
1400
1401
1402
1403 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1404 {
1405   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1406   if (entry->red.redNo < red)
1407     entry->red.redNo = red;
1408
1409   CProxy_CkMulticastMgr mp(thisgroup);
1410   for (int i=0; i<entry->children.length(); i++) {
1411     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1412   }
1413
1414   releaseFutureReduceMsgs(entry);
1415 }
1416
1417 #include "CkMulticast.def.h"
1418