Merge branch 'pami-bgq-commthr-ready' of charmgit:charm into charm
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19 #include "XArraySectionReducer.h"
20
21 #define DEBUGF(x)  // CkPrintf x;
22
23 // turn on or off fragmentation in multicast
24 #if CMK_MESSAGE_LOGGING
25 #define SPLIT_MULTICAST  0
26 #else
27 #define SPLIT_MULTICAST  1
28 #endif
29
30 // maximum number of fragments into which a message can be broken
31 #define MAXFRAGS 100
32
33 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
34 typedef CkVec<CkArrayIndex>   arrayIndexList;
35 typedef CkVec<CkSectionInfo>     sectionIdList;
36 typedef CkVec<CkReductionMsg *>  reductionMsgs;
37 typedef CkQ<int>                 PieceSize;
38 typedef CkVec<LDObjid>          ObjKeyList;
39 typedef unsigned char            byte;
40
41 /** Information about the status of reductions proceeding along a given section
42  *
43  * An instance of this class is stored in every mCastEntry object making it possible
44  * to track redn operations on a per section basis all along the spanning tree.
45  */
46 class reductionInfo {
47     public:
48         /// Number of local array elements which have contributed a given fragment
49         int lcount [MAXFRAGS];
50         /// Number of child vertices (NOT array elements) that have contributed a given fragment
51         int ccount [MAXFRAGS];
52         /// The total number of array elements that have contributed so far to a given fragment
53         int gcount [MAXFRAGS];
54         /// The number of fragments processed so far
55         int npProcessed;
56         /// User callback
57         CkCallback *storedCallback;
58         /// User reduction client function
59         redClientFn storedClient;
60         /// User provided data for the redn client function
61         void *storedClientParam;
62         /// Reduction sequence number
63         int redNo;
64         /// Messages for this reduction
65         reductionMsgs  msgs [MAXFRAGS];
66         /// Messages of future reductions
67         reductionMsgs futureMsgs;
68
69     public:
70         reductionInfo(): storedCallback(NULL),
71                          storedClientParam(NULL),
72                          redNo(0),
73                          npProcessed(0) {
74             for (int i=0; i<MAXFRAGS; i++)
75                 lcount [i] = ccount [i] = gcount [i] = 0;
76         }
77 };
78
79 /// cookie status
80 #define COOKIE_NOTREADY 0
81 #define COOKIE_READY    1
82 #define COOKIE_OLD      2
83
84 class mCastPacket {
85 public:
86   CkSectionInfo cookie;
87   int offset;
88   int n;
89   char *data;
90   int seqno;
91   int count;
92   int totalsize;
93
94   mCastPacket(CkSectionInfo &_cookie, int _offset, int _n, char *_d, int _s, int _c, int _t):
95                 cookie(_cookie), offset(_offset), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
96 };
97
98 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
99
100 class SectionLocation {
101 public:
102   mCastEntry *entry;
103   int         pe;
104 public:
105   SectionLocation(): entry(NULL), pe(-1) {}
106   SectionLocation( mCastEntry *e, int p) { set(e, p); }
107   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
108   inline void clear() { entry = NULL; pe = -1; }
109 };
110
111
112
113
114 /// Cookie for an array section 
115 class mCastEntry 
116 {
117     public:
118         /// Array ID 
119         CkArrayID     aid;
120         /// Spanning tree parent
121         CkSectionInfo parentGrp;
122         /// List of direct children
123         sectionIdList children;
124         /// Number of direct children
125         int numChild;
126         /// List of all tree member array indices (Only useful on the tree root)
127         arrayIndexList allElem;
128         /// Only useful on root for LB
129         ObjKeyList     allObjKeys;
130         /// List of array elements on local PE
131         arrayIndexList localElem;
132         /// Should always be myPE
133         int pe;
134         /// Section ID of the root
135         CkSectionInfo rootSid;
136         multicastGrpMsgBuf msgBuf;
137         /// Buffer storing the pending packets
138         multicastGrpPacketBuf packetBuf;
139         /// For multicast packetization
140         char *asm_msg;
141         int   asm_fill;
142         /// Linked list of entries on the same processor
143         mCastEntry *oldc, *newc;
144         /// Old spanning tree
145         SectionLocation   oldtree;
146         // for reduction
147         reductionInfo red;
148         //
149         char needRebuild;
150     private:
151         char flag;
152     public:
153         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
154                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
155         mCastEntry(mCastEntry *);
156         /// Check if this tree is only a branch and has a parent
157         inline int hasParent() { return parentGrp.get_val()?1:0; }
158         /// Is this tree obsolete?
159         inline int isObsolete() { return (flag == COOKIE_OLD); }
160         /// Make the current tree obsolete
161         inline void setObsolete() { flag=COOKIE_OLD; }
162         /// Check if this (branch of the) tree is ready for use
163         inline int notReady() { return (flag == COOKIE_NOTREADY); }
164         /// Mark this (branch of the) tree as ready for use
165         inline void setReady() { flag=COOKIE_READY; }
166         /// Increment the reduction number across the whole linked list of cookies
167         inline void incReduceNo() {
168             red.redNo ++;
169             for (mCastEntry *next = newc; next; next=next->newc) 
170                 next->red.redNo++;
171         }
172         /// Get a handle on the array ID this tree is a member of
173         inline CkArrayID getAid() { return aid; }
174         inline int hasOldtree() { return oldtree.entry != NULL; }
175         inline void print() {
176             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
177         }
178 };
179
180
181
182
183 class cookieMsg: public CMessage_cookieMsg {
184 public:
185   CkSectionInfo cookie;
186 public:
187   cookieMsg() {};
188   cookieMsg(CkSectionInfo m): cookie(m) {};
189 };
190
191
192
193
194 /// multicast tree setup message
195 class multicastSetupMsg: public CMessage_multicastSetupMsg {
196 public:
197   int  nIdx;
198   CkArrayIndex *arrIdx;
199   int      *lastKnown;
200   CkSectionInfo parent;
201   CkSectionInfo rootSid;
202   int redNo;
203 };
204
205
206
207
208 /// message send in spanning tree
209 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
210 };
211
212
213 extern void CkPackMessage(envelope **pEnv);
214 extern void CkUnpackMessage(envelope **pEnv);
215
216
217
218 void _ckMulticastInit(void)
219 {
220 /*
221   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
222   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
223 */
224 }
225
226
227 mCastEntry::mCastEntry (mCastEntry *old): 
228   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
229 {
230   int i;
231   aid = old->aid;
232   parentGrp = old->parentGrp;
233   for (i=0; i<old->allElem.length(); i++)
234     allElem.push_back(old->allElem[i]);
235 #if CMK_LBDB_ON
236   CmiAssert(old->allElem.length() == old->allObjKeys.length());
237   for (i=0; i<old->allObjKeys.length(); i++)
238     allObjKeys.push_back(old->allObjKeys[i]);
239 #endif
240   pe = old->pe;
241   red.storedCallback = old->red.storedCallback;
242   red.storedClient = old->red.storedClient;
243   red.storedClientParam = old->red.storedClientParam;
244   red.redNo = old->red.redNo;
245   needRebuild = 0;
246   asm_msg = NULL;
247   asm_fill = 0;
248 }
249
250 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
251
252
253
254
255 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndex *al, int n)
256 {
257     // Create a multicast entry
258     mCastEntry *entry = new mCastEntry(aid);
259     // Push all the section member indices into the entry
260     for (int i=0; i<n; i++) {
261         entry->allElem.push_back(al[i]);
262 #if CMK_LBDB_ON
263         const LDObjid key = idx2LDObjid(al[i]);
264         entry->allObjKeys.push_back(key);
265 #endif
266     }
267     //  entry->aid = aid;
268     _id.get_aid() = aid;
269     _id.get_val() = entry;              // allocate table for this section
270     // 
271     initCookie(_id);
272 }
273
274
275
276
277 void CkMulticastMgr::setSection(CkSectionInfo &id)
278 {
279   initCookie(id);
280 }
281
282
283
284
285 /// @warning: This is deprecated
286 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
287 {
288   CkArrayID aid = proxy.ckGetArrayID();
289   CkSectionInfo &_id = proxy.ckGetSectionInfo();
290
291   mCastEntry *entry = new mCastEntry(aid);
292
293   const CkArrayIndex *al = proxy.ckGetArrayElements();
294   for (int i=0; i<proxy.ckGetNumElements(); i++) {
295     entry->allElem.push_back(al[i]);
296 #if CMK_LBDB_ON
297     const LDObjid key = idx2LDObjid(al[i]);
298     entry->allObjKeys.push_back(key);
299 #endif
300   }
301   _id.get_type() = MulticastMsg;
302   _id.get_aid() = aid;
303   _id.get_val() = entry;                // allocate table for this section
304   initCookie(_id);
305 }
306
307
308
309
310 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
311 {
312   CkSectionInfo &info = proxy.ckGetSectionInfo();
313
314   int oldpe = info.get_pe();
315   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
316
317   CkArrayID aid = proxy.ckGetArrayID();
318   CkSectionID *sid = proxy.ckGetSectionIDs();
319   mCastEntry *entry = new mCastEntry(aid);
320
321   mCastEntry *oldentry = (mCastEntry *)info.get_val();
322   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
323
324   const CkArrayIndex *al = sid->_elems;
325   CmiAssert(info.get_aid() == aid);
326   prepareCookie(entry, *sid, al, sid->_nElems, aid);
327
328   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
329
330     // store old tree info
331   entry->oldtree.set(oldentry, oldpe);
332
333     // obsolete old tree
334   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
335
336   // find reduction number
337   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
338 }
339
340
341
342
343 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
344 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndex *al, int count, CkArrayID aid)
345 {
346   for (int i=0; i<count; i++) {
347     entry->allElem.push_back(al[i]);
348 #if CMK_LBDB_ON
349     const LDObjid key = idx2LDObjid(al[i]);
350     entry->allObjKeys.push_back(key);
351 #endif
352   }
353   sid._cookie.get_type() = MulticastMsg;
354   sid._cookie.get_aid() = aid;
355   sid._cookie.get_val() = entry;        // allocate table for this section
356   sid._cookie.get_pe() = CkMyPe();
357 }
358
359
360
361
362 // this is used
363 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
364 {
365   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
366   int numSubSections = proxy->ckGetNumSubSections();
367   for (int i=0; i<numSubSections; i++)
368   {
369       CkArrayID aid = proxy->ckGetArrayIDn(i);
370       mCastEntry *entry = new mCastEntry(aid);
371       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
372       const CkArrayIndex *al = proxy->ckGetArrayElements(i);
373       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
374       initCookie(sid->_cookie);
375   }
376 }
377
378
379
380
381 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
382 {
383   mCastEntry *entry = (mCastEntry *)s.get_val();
384   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
385   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
386 }
387
388 // now that we get reduction number from the old cookie,
389 // we continue to build the spanning tree
390 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
391 {
392   mCastEntry *entry = (mCastEntry *)s.get_val();
393   entry->red.redNo = red;
394
395   initCookie(s);
396
397   // TODO delete old tree
398 }
399
400
401
402
403 void CkMulticastMgr::initCookie(CkSectionInfo s)
404 {
405     mCastEntry *entry = (mCastEntry *)s.get_val();
406     int n = entry->allElem.length();
407     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
408     // Create and initialize a setup message
409     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
410     msg->nIdx = n;
411     msg->parent = CkSectionInfo(entry->getAid());
412     msg->rootSid = s;
413     msg->redNo = entry->red.redNo;
414     // Fill the message with the section member indices and their last known locations
415     CkArray *array = CProxy_ArrayBase(s.get_aid()).ckLocalBranch();
416     for (int i=0; i<n; i++) {
417       msg->arrIdx[i] = entry->allElem[i];
418       int ape = array->lastKnown(entry->allElem[i]);
419       CmiAssert(ape >=0 && ape < CkNumPes());
420       msg->lastKnown[i] = ape;
421     }
422     // Trigger the spanning tree build
423     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
424     mCastGrp[CkMyPe()].setup(msg);
425 }
426
427
428
429
430 void CkMulticastMgr::teardown(CkSectionInfo cookie)
431 {
432     int i;
433     mCastEntry *sect = (mCastEntry *)cookie.get_val();
434     // Mark this section as obsolete
435     sect->setObsolete();
436     // Release the buffered messages 
437     releaseBufferedReduceMsgs(sect);
438     // Propagate the teardown to each of your children
439     CProxy_CkMulticastMgr mp(thisgroup);
440     for (i=0; i<sect->children.length(); i++)
441         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
442 }
443
444
445
446
447 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
448 {
449     int i;
450     mCastEntry *sect = (mCastEntry *)cookie.get_val();
451     // Reset the root section info
452     sect->rootSid = newroot;
453     // Mark this section as obsolete
454     sect->setObsolete();
455     // Release the buffered messages 
456     releaseBufferedReduceMsgs(sect);
457     // Propagate the teardown to each of your children
458     CProxy_CkMulticastMgr mp(thisgroup);
459     for (i=0; i<sect->children.length(); i++)
460         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
461 }
462
463
464
465
466 void CkMulticastMgr::freeup(CkSectionInfo cookie)
467 {
468   mCastEntry *sect = (mCastEntry *)cookie.get_val();
469   CProxy_CkMulticastMgr mp(thisgroup);
470   // Parse through all the section members on this PE and...
471   while (sect) 
472   {
473       // Free their children
474       for (int i=0; i<sect->children.length(); i++)
475           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
476       // Free the cookie itself
477       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
478       mCastEntry *oldc= sect->oldc;
479       delete sect;
480       sect = oldc;
481   }
482 }
483
484
485
486
487 void CkMulticastMgr::setup(multicastSetupMsg *msg)
488 {
489     int i,j;
490     mCastEntry *entry;
491     CkArrayID aid = msg->rootSid.get_aid();
492     if (msg->parent.get_pe() == CkMyPe()) 
493       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
494     else 
495       entry = new mCastEntry(aid);
496     entry->aid = aid;
497     entry->pe = CkMyPe();
498     entry->rootSid = msg->rootSid;
499     entry->parentGrp = msg->parent;
500
501     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
502     entry->red.redNo = msg->redNo;
503
504     // Create a numPE sized array of vectors to hold the array elements in each PE
505     int numpes = CkNumPes();
506     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
507     // Sort each array index in the setup message based on last known location
508     for (i=0; i<msg->nIdx; i++) 
509     {
510       int lastKnown = msg->lastKnown[i];
511       // If msg->arrIdx[i] local, add it to a special local element list
512       if (lastKnown == CkMyPe())
513           entry->localElem.insertAtEnd(msg->arrIdx[i]);
514       // else, add it to the list corresponding to its PE
515       else
516           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
517     }
518
519     CkVec<int> mySubTreePEs;
520     mySubTreePEs.reserve(numpes);
521     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
522     mySubTreePEs.push_back(CkMyPe());
523     // Identify the child PEs in the tree, ie the PEs with section members on them
524     for (i=0; i<numpes; i++) 
525     {
526       if (i==CkMyPe()) continue;
527       if (lists[i].size()) 
528           mySubTreePEs.push_back(i);
529     }
530     // The number of multicast children can be limited by the spanning tree factor 
531     int num = mySubTreePEs.size() - 1, numchild = 0;
532     if (factor <= 0) numchild = num;
533     else numchild = num<factor?num:factor;
534   
535     entry->numChild = numchild;
536
537     // If there are any children, go about building a spanning tree
538     if (numchild) 
539     {
540         // Build the next generation of the spanning tree rooted at my PE
541         int *peListPtr = mySubTreePEs.getVec();
542         topo::SpanningTreeVertex *nextGenInfo;
543         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
544         //CkAssert(nextGenInfo->childIndex.size() == numchild);
545         numchild = nextGenInfo->childIndex.size();
546         entry->numChild = numchild;
547
548         // Distribute the section members across the number of direct children (branches)
549         // Direct children are simply the first section member in each of the branch lists
550         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
551
552         // For each direct child, collate indices of all section members on the PEs in that branch
553         for (i=0; i < numchild; i++)
554         {
555             // Determine the indices of the first and last PEs in this branch of my sub-tree
556             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
557             if (i < numchild-1)
558                 childEndIndex = nextGenInfo->childIndex[i+1];
559             else
560                 childEndIndex = mySubTreePEs.size();
561             // For each PE in this branch, add the section members on that PE to a list
562             for (j = childStartIndex; j < childEndIndex; j++)
563             {
564                 int pe = mySubTreePEs[j];
565                 for (int k=0; k<lists[pe].size(); k++)
566                     slots[i].push_back(lists[pe][k]);
567             }
568         }
569
570         // Ask each of your direct children to setup their branches
571         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
572         for (i=0; i<numchild; i++) 
573         {
574             // Give each child info about the number, indices and location of its children
575             int n = slots[i].length();
576             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
577             m->parent = CkSectionInfo(aid, entry);
578             m->nIdx = slots[i].length();
579             m->rootSid = msg->rootSid;
580             m->redNo = msg->redNo;
581             for (j=0; j<slots[i].length(); j++) 
582             {
583                 m->arrIdx[j] = slots[i][j].idx;
584                 m->lastKnown[j] = slots[i][j].pe;
585             }
586             int childroot = slots[i][0].pe;
587             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
588             // Send the message to the child
589             mCastGrp[childroot].setup(m);
590         }
591         delete [] slots;
592         delete nextGenInfo;
593     }
594     // else, tell yourself that your children are ready
595     else 
596     {
597         childrenReady(entry);
598     }
599     delete [] lists;
600     delete msg;
601 }
602
603
604
605
606 void CkMulticastMgr::childrenReady(mCastEntry *entry)
607 {
608     // Mark this entry as ready
609     entry->setReady();
610     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
611     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
612     if (entry->hasParent()) 
613         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
614 #if SPLIT_MULTICAST
615     // clear packet buffer
616     while (!entry->packetBuf.isEmpty()) 
617     {
618         mCastPacket *packet = entry->packetBuf.deq();
619         packet->cookie.get_val() = entry;
620         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->offset, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
621         delete [] packet->data;
622         delete packet;
623     }
624 #endif
625     // clear msg buffer
626     while (!entry->msgBuf.isEmpty()) 
627     {
628         multicastGrpMsg *newmsg = entry->msgBuf.deq();
629         DEBUGF(("[%d] release buffer %p ep:%d\n", CkMyPe(), newmsg, newmsg->ep));
630         newmsg->_cookie.get_val() = entry;
631         mCastGrp[CkMyPe()].recvMsg(newmsg);
632     }
633     // release reduction msgs
634     releaseFutureReduceMsgs(entry);
635 }
636
637
638
639
640 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
641 {
642   mCastEntry *entry = (mCastEntry *)sid.get_val();
643   entry->children.push_back(child);
644   if (entry->children.length() == entry->numChild) {
645     childrenReady(entry);
646   }
647 }
648
649
650
651
652 // rebuild is called when root not migrated
653 // when rebuilding, all multicast msgs will be buffered.
654 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
655 {
656   // tear down old tree
657   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
658   CkAssert(curCookie->pe == CkMyPe());
659   // make sure I am the newest one
660   while (curCookie->newc) curCookie = curCookie->newc;
661   if (curCookie->isObsolete()) return;
662
663   //CmiPrintf("tree rebuild\n");
664   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
665
666   // build a chain
667   newCookie->oldc = curCookie;
668   curCookie->newc = newCookie;
669
670   sectId.get_val() = newCookie;
671
672   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
673
674   curCookie->setObsolete();
675
676   resetCookie(sectId);
677 }
678
679 void CkMulticastMgr::resetCookie(CkSectionInfo s)
680 {
681   mCastEntry *newCookie = (mCastEntry*)s.get_val();
682   mCastEntry *oldCookie = newCookie->oldc;
683
684   // get rid of old one
685   DEBUGF(("reset: oldc: %p\n", oldCookie));
686   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
687   int mype = CkMyPe();
688   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
689
690   // build a new one
691   initCookie(s);
692 }
693
694 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
695 {
696   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
697     // set an invalid cookie since we don't have it
698   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
699   for (int i=0; i< sid._nElems-1; i++) {
700      CProxyElement_ArrayBase ap(a, sid._elems[i]);
701      void *newMsg=CkCopyMsg((void **)&m);
702      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
703   }
704   if (sid._nElems > 0) {
705      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
706      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
707   }
708 }
709
710 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
711 {
712 #if CMK_MESSAGE_LOGGING
713         envelope *env = UsrToEnv(m);
714         env->flags = env->flags | CK_MULTICAST_MSG_MLOG;
715 #endif
716
717     for (int snum = 0; snum < nsid; snum++) {
718         void *msgCopy = m;
719         if (nsid - snum > 1)
720             msgCopy = CkCopyMsg(&m);
721         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
722     }
723 }
724
725
726
727 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
728 {
729   DEBUGF(("ArraySectionSend\n"));
730   multicastGrpMsg *msg = (multicastGrpMsg *)m;
731   msg->ep = ep;
732   CkSectionInfo &s = sid->_cookie;
733   mCastEntry *entry;
734
735   // If this section is rooted at this PE
736   if (s.get_pe() == CkMyPe()) {
737     entry = (mCastEntry *)s.get_val();   
738     if (NULL == entry)
739       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
740
741     // update entry pointer in case there is a newer one.
742     if (entry->newc) {
743       do { entry=entry->newc; } while (entry->newc);
744       s.get_val() = entry;
745     }
746
747 #if CMK_LBDB_ON
748     // fixme: running obj?
749     envelope *env = UsrToEnv(msg);
750     const LDOMHandle &om = CProxy_ArrayBase(s.get_aid()).ckLocMgr()->getOMHandle();
751     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
752 #endif
753
754     // The first time we need to rebuild the spanning tree, we do p2p sends to refresh lastKnown
755     if (entry->needRebuild == 1) {
756       msg->_cookie = s;
757       SimpleSend(ep, msg, s.get_aid(), *sid, opts);
758       entry->needRebuild = 2;
759       return;
760     }
761     // else the second time, we just rebuild cos now we'll have all the lastKnown PEs
762     else if (entry->needRebuild == 2) rebuild(s);
763   }
764   // else, if the root has migrated, we have a sub-optimal mcast
765   else {
766     // fixme - in this case, not recorded in LB
767     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
768   }
769
770   // update cookie
771   msg->_cookie = s;
772
773 #if SPLIT_MULTICAST
774   // split multicast msg into SPLIT_NUM copies
775   register envelope *env = UsrToEnv(m);
776   CkPackMessage(&env);
777   int totalsize = env->getTotalsize();
778   int packetSize = 0;
779   int totalcount = 0;
780   if(totalsize < split_threshold){
781     packetSize = totalsize;
782     totalcount = 1;
783   }else{
784     packetSize = split_size;
785     totalcount = totalsize/split_size;
786     if(totalsize%split_size) totalcount++; 
787     //packetSize = totalsize/SPLIT_NUM;
788     //if (totalsize%SPLIT_NUM) packetSize ++;
789     //totalcount = SPLIT_NUM;
790   }
791   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
792   int sizesofar = 0;
793   char *data = (char*) env;
794   if (totalcount == 1) {
795     // If the root of this section's tree is on this PE, then just propagate msg
796     if (s.get_pe() == CkMyPe()) {
797       CkUnpackMessage(&env);
798       msg = (multicastGrpMsg *)EnvToUsr(env);
799       recvMsg(msg);
800     }
801     // else send msg to root of section's spanning tree
802     else {
803       CProxy_CkMulticastMgr  mCastGrp(thisgroup);
804       msg = (multicastGrpMsg *)EnvToUsr(env);
805       mCastGrp[s.get_pe()].recvMsg(msg);
806     }
807     return;
808   }
809   for (int i=0; i<totalcount; i++) {
810     int mysize = packetSize;
811     if (mysize + sizesofar > totalsize) {
812       mysize = totalsize-sizesofar;
813     }
814     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
815     mCastGrp[s.get_pe()].recvPacket(s, sizesofar, mysize, data, i, totalcount, totalsize, 0);
816     sizesofar += mysize;
817     data += mysize;
818   }
819   CmiFree(env);
820 #else
821   if (s.get_pe() == CkMyPe()) {
822     recvMsg(msg);
823   }
824   else {
825     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
826     mCastGrp[s.get_pe()].recvMsg(msg);
827   }
828 #endif
829 }
830
831 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int offset, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
832 {
833   int i;
834   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
835
836
837   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
838     char *newdata = new char[n];
839     memcpy(newdata, data, n);
840     entry->packetBuf.enq(new mCastPacket(_cookie, offset, n, newdata, seqno, count, totalsize));
841 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
842     return;
843   }
844
845 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
846
847   // send to spanning tree children
848   // can not optimize using list send because the difference in cookie
849   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
850   for (i=0; i<entry->children.length(); i++) {
851     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], offset, n, data, seqno, count, totalsize, 0);
852   }
853
854   if (entry->asm_msg == NULL) {
855     CmiAssert(entry->asm_fill == 0);
856     entry->asm_msg = (char *)CmiAlloc(totalsize);
857   }
858   memcpy(entry->asm_msg+offset, data, n);
859   entry->asm_fill += n;
860   if (entry->asm_fill == totalsize) {
861     CkUnpackMessage((envelope **)&entry->asm_msg);
862     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
863     msg->_cookie = _cookie;
864 //    mCastGrp[CkMyPe()].recvMsg(msg);
865     sendToLocal(msg);
866     entry->asm_msg = NULL;
867     entry->asm_fill = 0;
868   }
869 //  if (fromBuffer) delete [] data;
870 }
871
872 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
873 {
874   int i;
875   CkSectionInfo &sectionInfo = msg->_cookie;
876   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
877   CmiAssert(entry->getAid() == sectionInfo.get_aid());
878
879   if (entry->notReady()) {
880     DEBUGF(("entry not ready, enq buffer %p\n", msg));
881     entry->msgBuf.enq(msg);
882     return;
883   }
884
885   // send to spanning tree children
886   // can not optimize using list send because the difference in cookie
887   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
888   for (i=0; i<entry->children.length(); i++) {
889     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
890     newmsg->_cookie = entry->children[i];
891     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
892   }
893
894   sendToLocal(msg);
895 }
896
897 void CkMulticastMgr::sendToLocal(multicastGrpMsg *msg)
898 {
899   int i;
900   CkSectionInfo &sectionInfo = msg->_cookie;
901   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
902   CmiAssert(entry->getAid() == sectionInfo.get_aid());
903   CkGroupID aid = sectionInfo.get_aid();
904
905   // send to local
906   int nLocal = entry->localElem.length();
907   DEBUGF(("[%d] send to local %d elems\n", CkMyPe(), nLocal));
908   for (i=0; i<nLocal-1; i++) {
909     CProxyElement_ArrayBase ap(aid, entry->localElem[i]);
910     if (_entryTable[msg->ep]->noKeep) {
911       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.get_aid(), entry->localElem[i], CK_MSG_KEEP);
912     }
913     else {
914       // send through scheduler queue
915       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
916       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
917     }
918     // use CK_MSG_DONTFREE so that the message can be reused
919     // the drawback of this scheme bypassing queue is that 
920     // if # of local element is huge, this leads to a long time occupying CPU
921     // also load balancer seems not be able to correctly instrument load
922 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
923     //CmiNetworkProgressAfter(3);
924   }
925   if (nLocal) {
926     CProxyElement_ArrayBase ap(aid, entry->localElem[nLocal-1]);
927     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
928 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
929   }
930   else {
931     CkAssert (entry->rootSid.get_pe() == CkMyPe());
932     delete msg;
933   }
934 }
935
936
937
938 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
939 {
940   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
941   if (CkMcastBaseMsg::checkMagic(m) == 0) {
942     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
943     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
944   }
945   // ignore invalid cookie sent by SimpleSend
946   if (m->gpe() != -1) {
947     id.get_type() = MulticastMsg;
948     id.get_pe() = m->gpe();
949     id.get_val() = m->cookie();
950   }
951   // note: retain old redNo
952 }
953
954 // Reduction
955
956 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
957 {
958   CkCallback *sectionCB;
959   int numSubSections = proxy.ckGetNumSubSections();
960   // If its a cross-array section,
961   if (numSubSections > 1)
962   {
963       /** @warning: Each instantiation is a mem leak! :o
964        * The class is trivially small, but there's one instantiation for each
965        * section delegated to CkMulticast. The objects need to live as long as
966        * their section exists and is used. The idea of 'destroying' an array
967        * section is still academic, and hence no effort has been made to charge
968        * some 'owner' entity with the task of deleting this object.
969        *
970        * Reimplementing delegated x-array reductions will make this consideration moot
971        */
972       // Configure the final cross-section reducer
973       ck::impl::XArraySectionReducer *red =
974           new ck::impl::XArraySectionReducer(numSubSections, cb);
975       // Configure the subsection callback to deposit with the final reducer
976       sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
977   }
978   // else, just direct the reduction to the actual client cb
979   else
980       sectionCB = cb;
981   // Wire the sections together by storing the subsection cb in each sectionID
982   for (int i=0; i<numSubSections; i++)
983   {
984       CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
985       mCastEntry *entry = (mCastEntry *)sInfo.get_val();
986       entry->red.storedCallback = sectionCB;
987   }
988 }
989
990 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
991 {
992   CkSectionInfo &id = proxy.ckGetSectionInfo();
993   mCastEntry *entry = (mCastEntry *)id.get_val();
994   entry->red.storedClient = fn;
995   entry->red.storedClientParam = param;
996 }
997
998 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
999 {
1000   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
1001   msg->reducer = type;
1002   msg->sid = id;
1003   msg->sourceFlag = 1;   // from array element
1004   msg->redNo = id.get_redNo();
1005   msg->gcount = 1;
1006   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
1007   msg->callback = cb;
1008   msg->userFlag=userFlag;
1009   return msg;
1010 }
1011
1012
1013
1014 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
1015 {
1016   CkCallback cb;
1017   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
1018 }
1019
1020
1021 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
1022 {
1023   if (id.get_val() == NULL || id.get_redNo() == -1) 
1024     CmiAbort("contribute: SectionID is not initialized\n");
1025
1026   int nFrags;
1027   if (-1 == fragSize) {         // no frag
1028     nFrags = 1;
1029     fragSize = dataSize;
1030   }
1031   else {
1032     CmiAssert (dataSize >= fragSize);
1033     nFrags = dataSize/fragSize;
1034     if (dataSize%fragSize) nFrags++;
1035   }
1036
1037   if (MAXFRAGS < nFrags) {
1038     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
1039     CmiAbort ("frag size too small\n");
1040   }
1041
1042   int mpe = id.get_pe();
1043   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1044
1045   // break the message into k-piece fragments
1046   int fSize = fragSize;
1047   for (int i=0; i<nFrags; i++) {
1048     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
1049       fSize = dataSize%fragSize;
1050     }
1051
1052     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
1053
1054     // initialize the new msg
1055     msg->reducer            = type;
1056     msg->sid                = id;
1057     msg->nFrags             = nFrags;
1058     msg->fragNo             = i;
1059     msg->sourceFlag         = 1;
1060     msg->redNo              = id.get_redNo();
1061     msg->gcount             = 1;
1062     msg->rebuilt            = (mpe == CkMyPe())?0:1;
1063     msg->callback           = cb;
1064     msg->userFlag           = userFlag;
1065
1066     mCastGrp[mpe].recvRedMsg(msg);
1067
1068     data = (void*)(((char*)data) + fSize);
1069   }
1070
1071   id.get_redNo()++;
1072   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
1073 }
1074
1075 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
1076                                               mCastEntry* entry,
1077                                               reductionInfo& redInfo) {
1078   int i;
1079   int dataSize = 0;
1080   int nFrags   = redInfo.msgs[0][0]->nFrags;
1081
1082   // to avoid memcpy and allocation cost for non-pipelined reductions
1083   if (1 == nFrags) {
1084     CkReductionMsg* msg = redInfo.msgs[0][0];
1085
1086     // free up the msg slot
1087     redInfo.msgs[0].length() = 0;
1088
1089     return msg;
1090   }
1091
1092   for (i=0; i<nFrags; i++) {
1093     dataSize += redInfo.msgs[i][0]->dataSize;
1094   }
1095
1096   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1097
1098   // initialize msg header
1099   msg->redNo      = redInfo.msgs[0][0]->redNo;
1100   msg->reducer    = redInfo.msgs[0][0]->reducer;
1101   msg->sid        = id;
1102   msg->nFrags     = nFrags;
1103
1104   // I guess following fields need not be initialized
1105   msg->sourceFlag = 2;
1106   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1107   msg->callback   = redInfo.msgs[0][0]->callback;
1108   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1109
1110   byte* data = (byte*)msg->getData ();
1111   for (i=0; i<nFrags; i++) {
1112     // copy data from fragments to msg
1113     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1114     data += redInfo.msgs[i][0]->dataSize;
1115
1116     // free fragments
1117     delete redInfo.msgs[i][0];
1118     redInfo.msgs[i].length() = 0;    
1119   }
1120
1121   return msg;
1122 }
1123
1124
1125
1126 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1127                                      mCastEntry* entry, reductionInfo& redInfo,
1128                                      int currentTreeUp) {
1129
1130     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1131     reductionMsgs& rmsgs = redInfo.msgs[index];
1132     int dataSize         = rmsgs[0]->dataSize;
1133     int i;
1134     int oldRedNo = redInfo.redNo;
1135     int nFrags   = rmsgs[0]->nFrags;
1136     int fragNo   = rmsgs[0]->fragNo;
1137     int userFlag = rmsgs[0]->userFlag;
1138
1139     // Figure out (from one of the msg fragments) which reducer function to use
1140     CkReduction::reducerType reducer = rmsgs[0]->reducer;
1141     CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1142     CkAssert(NULL != f);
1143
1144     // Check if migration occurred in any of the subtrees, and pick one valid callback
1145     CkCallback msg_cb;
1146     int rebuilt = 0;
1147     for (i=0; i<rmsgs.length(); i++) {
1148         if (rmsgs[i]->rebuilt) rebuilt = 1;
1149         if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1150     }
1151
1152     // Perform the actual reduction
1153     CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
1154     newmsg->redNo  = redInfo.redNo;
1155     newmsg->nFrags = nFrags;
1156     newmsg->fragNo = fragNo;
1157     newmsg->userFlag = userFlag;
1158     newmsg->reducer = reducer;
1159
1160     // Increment the number of fragments processed
1161     redInfo.npProcessed ++;
1162
1163     // Delete all the fragments which are no longer needed
1164     for (i=0; i<rmsgs.length(); i++)
1165         if (rmsgs[i]!=newmsg) delete rmsgs[i];
1166     rmsgs.length() = 0;
1167
1168     // If I am not the tree root
1169     if (entry->hasParent()) {
1170         // send up to parent
1171         newmsg->sid        = entry->parentGrp;
1172         newmsg->sourceFlag = 2;
1173         newmsg->redNo      = oldRedNo; ///< @todo: redundant, duplicate assignment?
1174         newmsg->gcount     = redInfo.gcount [index];
1175         newmsg->rebuilt    = rebuilt;
1176         newmsg->callback   = msg_cb;
1177         DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1178         mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1179     } else {
1180         newmsg->sid = id;
1181         // Buffer the reduced fragment
1182         rmsgs.push_back (newmsg);
1183         // If all the fragments have been reduced
1184         if (redInfo.npProcessed == nFrags) {
1185             // Combine the fragments
1186             newmsg = combineFrags (id, entry, redInfo);
1187             // Set the reference number based on the user flag at the contribute call
1188             CkSetRefNum(newmsg, userFlag);
1189             // Trigger the appropriate reduction client
1190             if ( !msg_cb.isInvalid() )
1191                 msg_cb.send(newmsg);
1192             else if (redInfo.storedCallback != NULL)
1193                 redInfo.storedCallback->send(newmsg);
1194             else if (redInfo.storedClient != NULL) {
1195                 redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
1196                 delete newmsg;
1197             }
1198             else
1199                 CmiAbort("Did you forget to register a reduction client?");
1200
1201             DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1202             //
1203             if (currentTreeUp) {
1204                 if (entry->oldc) {
1205                     // free old tree on same processor;
1206                     mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1207                     entry->oldc = NULL;
1208                 }
1209                 if (entry->hasOldtree()) {
1210                     // free old tree on old processor
1211                     int oldpe = entry->oldtree.pe;
1212                     mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1213                     entry->oldtree.clear();
1214                 }
1215             }
1216             // Indicate if a tree rebuild is required
1217             if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1218         }
1219     }
1220 }
1221
1222
1223
1224 /**
1225  * Called from:
1226  *   - contribute(): calls PE specified in the cookie
1227  *   - reduceFragment(): calls parent PE
1228  *   - recvRedMsg(): calls root PE (if tree is obsolete)
1229  *   - releaseFutureRedMsgs: calls this PE
1230  *   - releaseBufferedRedMsgs: calls root PE
1231  */
1232 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1233 {
1234     int i;
1235     /// Grab the section info embedded in the redn msg
1236     CkSectionInfo id = msg->sid;
1237     /// ... and get at the ptr which shows me which cookie to use
1238     mCastEntry *entry = (mCastEntry *)id.get_val();
1239     CmiAssert(entry!=NULL);
1240
1241     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1242
1243     int updateReduceNo = 0;
1244
1245     //-------------------------------------------------------------------------
1246     /// If this cookie is obsolete
1247     if (entry->isObsolete()) {
1248         // Send up to root
1249         DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.get_pe()));
1250
1251         /// If I am the root, traverse the linked list of cookies to get the latest
1252         if (!entry->hasParent()) {
1253             mCastEntry *newentry = entry->newc;
1254             while (newentry && newentry->newc) newentry=newentry->newc;
1255             if (newentry) entry = newentry;
1256             CmiAssert(entry!=NULL);
1257         }
1258
1259         ///
1260         if (!entry->hasParent() && !entry->isObsolete()) {
1261             /// Indicate it is not on old spanning tree
1262             msg->sourceFlag = 0;
1263             /// Flag the redn as coming from an old tree and that the new entry cookie needs to know the new redn num.
1264             updateReduceNo  = 1;
1265         }
1266         /// If I am not the root or this latest cookie is also obsolete
1267         else {
1268             // Ensure that you're here with reason
1269             CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1270             // Edit the msg so that the recipient knows where to find its cookie
1271             msg->sid = entry->rootSid;
1272             msg->sourceFlag = 0;
1273             // Send the msg directly to the root of the redn tree
1274             mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1275             return;
1276         }
1277     }
1278
1279     /// Grab the locally stored redn info
1280     reductionInfo &redInfo = entry->red;
1281
1282     //-------------------------------------------------------------------------
1283     /// If you've received a msg from a previous redn, something has gone horribly wrong somewhere!
1284     if (msg->redNo < redInfo.redNo) {
1285         CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1286         CmiAbort("Could never happen! \n");
1287     }
1288
1289     //-------------------------------------------------------------------------
1290     /// If the current tree is not yet ready or if you've received a msg for a future redn, buffer the msg
1291     if (entry->notReady() || msg->redNo > redInfo.redNo) {
1292         DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1293         redInfo.futureMsgs.push_back(msg);
1294         return;
1295     }
1296
1297     //-------------------------------------------------------------------------
1298     const int index = msg->fragNo;
1299     // New contribution from an ArrayElement
1300     if (msg->sourceFlag == 1) {
1301         redInfo.lcount [index] ++;
1302     }
1303     // Redn from a child
1304     if (msg->sourceFlag == 2) {
1305         redInfo.ccount [index] ++;
1306     }
1307     // Total elems that have contributed the indexth fragment
1308     redInfo.gcount [index] += msg->gcount;
1309
1310     // Check if message is of proper size
1311     if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
1312     CmiAbort("Reduction data are not of same length!");
1313
1314     //-------------------------------------------------------------------------
1315     // Buffer the msg
1316     redInfo.msgs [index].push_back(msg);
1317
1318     //-------------------------------------------------------------------------
1319     /// Flag if this fragment can be reduced (if all local elements and children have contributed this fragment)
1320     int currentTreeUp = 0;
1321     if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
1322         currentTreeUp = 1;
1323
1324     /// Flag (only at the redn root) if all array elements contributed all their fragments
1325     int mixTreeUp = 0;
1326     if (!entry->hasParent()) {
1327         mixTreeUp = 1;
1328         for (int i=0; i<msg->nFrags; i++)
1329             if (entry->allElem.length() != redInfo.gcount [i])
1330                 mixTreeUp = 0;
1331     }
1332
1333     //-------------------------------------------------------------------------
1334     /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements
1335     if (currentTreeUp || mixTreeUp)
1336     {
1337         const int nFrags = msg->nFrags;
1338         /// Reduce this fragment
1339         reduceFragment (index, id, entry, redInfo, currentTreeUp);
1340
1341         // If migration happened, and my sub-tree reconstructed itself,
1342         // share the current reduction number with myself and all my children
1343         if (updateReduceNo)
1344             mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1345
1346         /// If all the fragments for the current reduction have been processed
1347         if (redInfo.npProcessed == nFrags) {
1348
1349             /// Increment the reduction number in all of this section's cookies
1350             entry->incReduceNo();
1351
1352             /// Reset bookkeeping counters
1353             for (i=0; i<nFrags; i++) {
1354                 redInfo.lcount [i] = 0;
1355                 redInfo.ccount [i] = 0;
1356                 redInfo.gcount [i] = 0;
1357             }
1358             redInfo.npProcessed = 0;
1359             /// Now that, the current redn is done, release any pending msgs from future redns
1360             releaseFutureReduceMsgs(entry);
1361         }
1362     }
1363 }
1364
1365
1366
1367 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1368 {
1369   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1370
1371   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1372     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1373     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1374   }
1375   entry->red.futureMsgs.length() = 0;
1376 }
1377
1378
1379
1380 // these messages have to be sent to root
1381 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1382 {
1383   int i;
1384   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1385
1386   for (int j=0; j<MAXFRAGS; j++) {
1387     for (i=0; i<entry->red.msgs[j].length(); i++) {
1388       CkReductionMsg *msg = entry->red.msgs[j][i];
1389       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1390       msg->sid = entry->rootSid;
1391       msg->sourceFlag = 0;
1392       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1393     }
1394     entry->red.msgs[j].length() = 0;
1395   }
1396
1397
1398   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1399     CkReductionMsg *msg = entry->red.futureMsgs[i];
1400     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1401     msg->sid = entry->rootSid;
1402     msg->sourceFlag = 0;
1403     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1404   }
1405   entry->red.futureMsgs.length() = 0;
1406 }
1407
1408
1409
1410 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1411 {
1412   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1413   if (entry->red.redNo < red)
1414     entry->red.redNo = red;
1415
1416   CProxy_CkMulticastMgr mp(thisgroup);
1417   for (int i=0; i<entry->children.length(); i++) {
1418     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1419   }
1420
1421   releaseFutureReduceMsgs(entry);
1422 }
1423
1424 #include "CkMulticast.def.h"
1425