Examples: 3D Jacobi using SDAG and Parameter marshalling, from Bigsim
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentatio  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  *     SMP node aware     (SMP_AWARE)
12  * */
13
14 #include "charm++.h"
15 #include "envelope.h"
16 #include "register.h"
17
18 #include "ckmulticast.h"
19
20 #define DEBUGF(x)  // CkPrintf x;
21
22 // turn on or off fragmentation in multicast
23 #define SPLIT_MULTICAST 0
24 // each multicast message is split into SPLIT_NUM fragments
25 #define SPLIT_NUM 2
26
27 // maximum number of fragments into which a message can be broken
28 #define MAXFRAGS 5
29
30 #define SMP_AWARE                     1
31
32 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
33 typedef CkVec<CkArrayIndexMax>   arrayIndexList;
34 typedef CkVec<CkSectionInfo>     sectionIdList;
35 typedef CkVec<CkReductionMsg *>  reductionMsgs;
36 typedef CkQ<int>                 PieceSize;
37 typedef CkVec<LDObjid>          ObjKeyList;
38 typedef unsigned char            byte;
39
40 class reductionInfo {
41 public:
42   int            lcount [MAXFRAGS]; /**< local elem collected */
43   int            ccount [MAXFRAGS]; /**< children node collected */
44   int            gcount [MAXFRAGS]; /**< total elem collected */
45   int            npProcessed;
46   CkCallback*    storedCallback;    /**< user callback */
47   redClientFn    storedClient;      /**< reduction client function */
48   void*          storedClientParam; /**< user provided data */
49   int            redNo;             /**< reduction sequence number */
50   reductionMsgs  msgs [MAXFRAGS];   /**< messages for this reduction */
51   reductionMsgs  futureMsgs;        /**< messages of future reductions */
52 public:
53   reductionInfo(): storedCallback(NULL), storedClientParam(NULL), redNo(0),
54                    npProcessed(0) {
55     for (int i=0; i<MAXFRAGS; i++) 
56       lcount [i] = ccount [i] = gcount [i] = 0;
57   }
58 };
59
60 /// cookie status
61 #define COOKIE_NOTREADY 0
62 #define COOKIE_READY    1
63 #define COOKIE_OLD      2
64
65 class mCastPacket {
66 public:
67   CkSectionInfo cookie;
68   int n;
69   char *data;
70   int seqno;
71   int count;
72   int totalsize;
73
74   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
75                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
76 };
77
78 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
79
80 class SectionLocation {
81 public:
82   mCastEntry *entry;
83   int         pe;
84 public:
85   SectionLocation(): entry(NULL), pe(-1) {}
86   SectionLocation( mCastEntry *e, int p) { set(e, p); }
87   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
88   inline void clear() { entry = NULL; pe = -1; }
89 };
90
91
92
93
94 /// Cookie for an array section 
95 class mCastEntry 
96 {
97     public:
98         /// Array ID 
99         CkArrayID     aid;
100         /// Spanning tree parent
101         CkSectionInfo parentGrp;
102         /// List of direct children
103         sectionIdList children;
104         /// Number of direct children
105         int numChild;
106         /// List of all tree member array indices (Only useful on the tree root)
107         arrayIndexList allElem;
108         /// Only useful on root for LB
109         ObjKeyList     allObjKeys;
110         /// List of array elements on local PE
111         arrayIndexList localElem;
112         /// Should always be myPE
113         int pe;
114         /// Section ID of the root
115         CkSectionInfo rootSid;
116         multicastGrpMsgBuf msgBuf;
117         /// Buffer storing the pending packets
118         multicastGrpPacketBuf packetBuf;
119         /// For multicast packetization
120         char *asm_msg;
121         int   asm_fill;
122         /// Linked list of entries on the same processor
123         mCastEntry *oldc, *newc;
124         /// Old spanning tree
125         SectionLocation   oldtree;
126         // for reduction
127         reductionInfo red;
128         //
129         char needRebuild;
130     private:
131         char flag;
132     public:
133         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
134                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
135         mCastEntry(mCastEntry *);
136         /// Check if this tree is only a branch and has a parent
137         inline int hasParent() { return parentGrp.get_val()?1:0; }
138         /// Is this tree obsolete?
139         inline int isObsolete() { return (flag == COOKIE_OLD); }
140         /// Make the current tree obsolete
141         inline void setObsolete() { flag=COOKIE_OLD; }
142         /// Check if this (branch of the) tree is ready for use
143         inline int notReady() { return (flag == COOKIE_NOTREADY); }
144         /// Mark this (branch of the) tree as ready for use
145         inline void setReady() { flag=COOKIE_READY; }
146         /// Increment the reduction number for all the section members on this PE
147         inline void incReduceNo() {
148             red.redNo ++;
149             for (mCastEntry *next = newc; next; next=next->newc) 
150                 next->red.redNo++;
151         }
152         /// Get a handle on the array ID this tree is a member of
153         inline CkArrayID getAid() { return aid; }
154         inline int hasOldtree() { return oldtree.entry != NULL; }
155         inline void print() {
156             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
157         }
158 };
159
160
161
162
163 class cookieMsg: public CMessage_cookieMsg {
164 public:
165   CkSectionInfo cookie;
166 public:
167   cookieMsg() {};
168   cookieMsg(CkSectionInfo m): cookie(m) {};
169 };
170
171
172
173
174 /// multicast tree setup message
175 class multicastSetupMsg: public CMessage_multicastSetupMsg {
176 public:
177   int  nIdx;
178   CkArrayIndexMax *arrIdx;
179   int      *lastKnown;
180   CkSectionInfo parent;
181   CkSectionInfo rootSid;
182   int redNo;
183 };
184
185
186
187
188 /// message send in spanning tree
189 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
190 };
191
192
193 extern void CkPackMessage(envelope **pEnv);
194 extern void CkUnpackMessage(envelope **pEnv);
195
196
197
198 void _ckMulticastInit(void)
199 {
200 /*
201   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
202   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
203 */
204 }
205
206
207 mCastEntry::mCastEntry (mCastEntry *old): 
208   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
209 {
210   int i;
211   aid = old->aid;
212   parentGrp = old->parentGrp;
213   for (i=0; i<old->allElem.length(); i++)
214     allElem.push_back(old->allElem[i]);
215 #if CMK_LBDB_ON
216   CmiAssert(old->allElem.length() == old->allObjKeys.length());
217   for (i=0; i<old->allObjKeys.length(); i++)
218     allObjKeys.push_back(old->allObjKeys[i]);
219 #endif
220   pe = old->pe;
221   red.storedCallback = old->red.storedCallback;
222   red.storedClient = old->red.storedClient;
223   red.storedClientParam = old->red.storedClientParam;
224   red.redNo = old->red.redNo;
225   needRebuild = 0;
226   asm_msg = NULL;
227   asm_fill = 0;
228 }
229
230 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
231
232
233
234
235 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndexMax *al, int n)
236 {
237     // Create a multicast entry
238     mCastEntry *entry = new mCastEntry(aid);
239     // Push all the section member indices into the entry
240     for (int i=0; i<n; i++) {
241         entry->allElem.push_back(al[i]);
242 #if CMK_LBDB_ON
243         const LDObjid key = idx2LDObjid(al[i]);
244         entry->allObjKeys.push_back(key);
245 #endif
246     }
247     //  entry->aid = aid;
248     _id.aid = aid;
249     _id.get_val() = entry;              // allocate table for this section
250     // 
251     initCookie(_id);
252 }
253
254
255
256
257 void CkMulticastMgr::setSection(CkSectionInfo &id)
258 {
259   initCookie(id);
260 }
261
262
263
264
265 /// @warning: This is deprecated
266 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
267 {
268   CkArrayID aid = proxy.ckGetArrayID();
269   CkSectionInfo &_id = proxy.ckGetSectionInfo();
270
271   mCastEntry *entry = new mCastEntry(aid);
272
273   const CkArrayIndexMax *al = proxy.ckGetArrayElements();
274   for (int i=0; i<proxy.ckGetNumElements(); i++) {
275     entry->allElem.push_back(al[i]);
276 #if CMK_LBDB_ON
277     const LDObjid key = idx2LDObjid(al[i]);
278     entry->allObjKeys.push_back(key);
279 #endif
280   }
281   _id.type = MulticastMsg;
282   _id.aid = aid;
283   _id.get_val() = entry;                // allocate table for this section
284   initCookie(_id);
285 }
286
287
288
289
290 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
291 {
292   CkSectionInfo &info = proxy.ckGetSectionInfo();
293
294   int oldpe = info.get_pe();
295   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
296
297   CkArrayID aid = proxy.ckGetArrayID();
298   CkSectionID *sid = proxy.ckGetSectionIDs();
299   mCastEntry *entry = new mCastEntry(aid);
300
301   mCastEntry *oldentry = (mCastEntry *)info.get_val();
302   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
303
304   const CkArrayIndexMax *al = sid->_elems;
305   CmiAssert(info.aid == aid);
306   prepareCookie(entry, *sid, al, sid->_nElems, aid);
307
308   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
309
310     // store old tree info
311   entry->oldtree.set(oldentry, oldpe);
312
313     // obsolete old tree
314   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
315
316   // find reduction number
317   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
318 }
319
320
321
322
323 // prepare a mCastEntry entry and set up in CkSectionID
324 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndexMax *al, int count, CkArrayID aid)
325 {
326   for (int i=0; i<count; i++) {
327     entry->allElem.push_back(al[i]);
328 #if CMK_LBDB_ON
329     const LDObjid key = idx2LDObjid(al[i]);
330     entry->allObjKeys.push_back(key);
331 #endif
332   }
333   sid._cookie.type = MulticastMsg;
334   sid._cookie.aid = aid;
335   sid._cookie.get_val() = entry;        // allocate table for this section
336   sid._cookie.get_pe() = CkMyPe();
337 }
338
339
340
341
342 // this is used
343 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
344 {
345   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
346   CkArrayID aid = proxy->ckGetArrayID();
347   CkSectionID *sid = proxy->ckGetSectionIDs();
348
349   mCastEntry *entry = new mCastEntry(aid);
350
351   const CkArrayIndexMax *al = proxy->ckGetArrayElements();
352   prepareCookie(entry, *sid, al, proxy->ckGetNumElements(), aid);
353   initCookie(sid->_cookie);
354 }
355
356
357
358
359 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
360 {
361   mCastEntry *entry = (mCastEntry *)s.get_val();
362   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
363   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
364 }
365
366 // now that we get reduction number from the old cookie,
367 // we continue to build the spanning tree
368 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
369 {
370   mCastEntry *entry = (mCastEntry *)s.get_val();
371   entry->red.redNo = red;
372
373   initCookie(s);
374
375   // TODO delete old tree
376 }
377
378
379
380
381 void CkMulticastMgr::initCookie(CkSectionInfo s)
382 {
383     mCastEntry *entry = (mCastEntry *)s.get_val();
384     int n = entry->allElem.length();
385     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
386     // Create and initialize a setup message
387     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
388     msg->nIdx = n;
389     msg->parent = CkSectionInfo(entry->getAid());
390     msg->rootSid = s;
391     msg->redNo = entry->red.redNo;
392     // Fill the message with the section member indices and their last known locations
393     CkArray *array = CProxy_ArrayBase(s.aid).ckLocalBranch();
394     for (int i=0; i<n; i++) {
395       msg->arrIdx[i] = entry->allElem[i];
396       int ape = array->lastKnown(entry->allElem[i]);
397       CmiAssert(ape >=0 && ape < CkNumPes());
398       msg->lastKnown[i] = ape;
399     }
400     // Trigger the spanning tree build
401     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
402     mCastGrp[CkMyPe()].setup(msg);
403 }
404
405
406
407
408 void CkMulticastMgr::teardown(CkSectionInfo cookie)
409 {
410     int i;
411     mCastEntry *sect = (mCastEntry *)cookie.get_val();
412     // Mark this section as obsolete
413     sect->setObsolete();
414     // Release the buffered messages 
415     releaseBufferedReduceMsgs(sect);
416     // Propagate the teardown to each of your children
417     CProxy_CkMulticastMgr mp(thisgroup);
418     for (i=0; i<sect->children.length(); i++)
419         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
420 }
421
422
423
424
425 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
426 {
427     int i;
428     mCastEntry *sect = (mCastEntry *)cookie.get_val();
429     // Reset the root section info
430     sect->rootSid = newroot;
431     // Mark this section as obsolete
432     sect->setObsolete();
433     // Release the buffered messages 
434     releaseBufferedReduceMsgs(sect);
435     // Propagate the teardown to each of your children
436     CProxy_CkMulticastMgr mp(thisgroup);
437     for (i=0; i<sect->children.length(); i++)
438         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
439 }
440
441
442
443
444 void CkMulticastMgr::freeup(CkSectionInfo cookie)
445 {
446   mCastEntry *sect = (mCastEntry *)cookie.get_val();
447   CProxy_CkMulticastMgr mp(thisgroup);
448   // Parse through all the section members on this PE and...
449   while (sect) 
450   {
451       // Free their children
452       for (int i=0; i<sect->children.length(); i++)
453           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
454       // Free the cookie itself
455       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
456       mCastEntry *oldc= sect->oldc;
457       delete sect;
458       sect = oldc;
459   }
460 }
461
462
463
464
465 void CkMulticastMgr::setup(multicastSetupMsg *msg)
466 {
467     int i,j;
468     mCastEntry *entry;
469     CkArrayID aid = msg->rootSid.aid;
470     if (msg->parent.get_pe() == CkMyPe()) 
471       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
472     else 
473       entry = new mCastEntry(aid);
474     entry->aid = aid;
475     entry->pe = CkMyPe();
476     entry->rootSid = msg->rootSid;
477     entry->parentGrp = msg->parent;
478     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
479     entry->red.redNo = msg->redNo;
480
481     // Create a numPE sized array of vectors to hold the array elements in each PE
482     int numpes = CkNumPes();
483     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
484     // Sort each array index in the setup message based on last known location
485     for (i=0; i<msg->nIdx; i++) 
486     {
487       int lastKnown = msg->lastKnown[i];
488       // If msg->arrIdx[i] local, add it to a special local element list
489       if (lastKnown == CkMyPe())
490           entry->localElem.insertAtEnd(msg->arrIdx[i]);
491       // else, add it to the list corresponding to its PE
492       else
493           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
494     }
495
496     // Determine the number of children in the multicast tree
497     // (divide into MAXMCASTCHILDREN slots)
498     int numchild = 0;
499     int num = 0;
500     // Count the number of PEs with section members on them
501     for (i=0; i<numpes; i++) 
502     {
503       if (i==CkMyPe()) continue;
504       if (lists[i].length()) num++;
505     }
506     // The number of multicast children can be limited by the spanning tree factor 
507     if (factor <= 0) numchild = num;
508     else numchild = num<factor?num:factor;
509   
510     entry->numChild = numchild;
511
512     // If there are any children, go about building a spanning tree
513     if (numchild) 
514     {
515         // Distribute the section members across the number of direct children (branches)
516         // Direct children are simply the first section member in each of the branch lists
517         //CkPrintf("[%d] numchild: %d\n", CkMyPe(), numchild);
518         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
519 #if !SMP_AWARE
520         num = 0;
521         for (i=0; i<numpes; i++) 
522         {
523             if (i==CkMyPe()) continue;
524             if (lists[i].length() == 0) continue;
525             for (j=0; j<lists[i].length(); j++)
526                 slots[num].push_back(lists[i][j]);
527             num = (num+1) % numchild;
528         }
529 #else
530         num = 0;
531         // SMP node aware, put SMP processors on the direct children
532         // first pass, find SMP processors
533         for (i=0; i<numpes; i++) 
534         {
535             if (i==CkMyPe()) continue;
536             if (lists[i].length() == 0) continue;
537             if (CmiPeOnSamePhysicalNode(i, CkMyPe())==1) {
538               // CkPrintf("[%d] child: %d\n", CkMyPe(), i);
539               for (j=0; j<lists[i].length(); j++)
540                 slots[num].push_back(lists[i][j]);
541               lists[i].length() = 0;
542               num = (num+1) % numchild;
543             }
544         }
545         // second pass, fill the rest
546         for (i=0; i<numpes; i++) 
547         {
548             if (i==CkMyPe()) continue;
549             if (lists[i].length() == 0) continue;
550             // CkPrintf("[%d] child: %d\n", CkMyPe(), i);
551             for (j=0; j<lists[i].length(); j++)
552                 slots[num].push_back(lists[i][j]);
553             num = (num+1) % numchild;
554         }
555 #endif
556
557         // Ask each of your direct children to setup their branches
558         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
559         for (i=0; i<numchild; i++) 
560         {
561             // Give each child info about the number, indices and location of its children
562             int n = slots[i].length();
563             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
564             m->parent = CkSectionInfo(aid, entry);
565             m->nIdx = slots[i].length();
566             m->rootSid = msg->rootSid;
567             m->redNo = msg->redNo;
568             for (j=0; j<slots[i].length(); j++) 
569             {
570                 m->arrIdx[j] = slots[i][j].idx;
571                 m->lastKnown[j] = slots[i][j].pe;
572             }
573             int childroot = slots[i][0].pe;
574             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
575             // Send the message to the child
576             mCastGrp[childroot].setup(m);
577         }
578         delete [] slots;
579     }
580     // else, tell yourself that your children are ready
581     else 
582     {
583         childrenReady(entry);
584     }
585     delete [] lists;
586 }
587
588
589
590
591 void CkMulticastMgr::childrenReady(mCastEntry *entry)
592 {
593     // Mark this entry as ready
594     entry->setReady();
595     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
596     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
597     if (entry->hasParent()) 
598         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
599 #if SPLIT_MULTICAST
600     // clear packet buffer
601     while (!entry->packetBuf.isEmpty()) 
602     {
603         mCastPacket *packet = entry->packetBuf.deq();
604         packet->cookie.get_val() = entry;
605         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
606         delete [] packet->data;
607         delete packet;
608     }
609 #else
610     // clear msg buffer
611     while (!entry->msgBuf.isEmpty()) 
612     {
613         multicastGrpMsg *newmsg = entry->msgBuf.deq();
614         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
615         newmsg->_cookie.get_val() = entry;
616         mCastGrp[CkMyPe()].recvMsg(newmsg);
617     }
618 #endif
619     // release reduction msgs
620     releaseFutureReduceMsgs(entry);
621 }
622
623
624
625
626 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
627 {
628   mCastEntry *entry = (mCastEntry *)sid.get_val();
629   entry->children.push_back(child);
630   if (entry->children.length() == entry->numChild) {
631     childrenReady(entry);
632   }
633 }
634
635
636
637
638 // rebuild is called when root not migrated
639 // when rebuilding, all multicast msgs will be buffered.
640 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
641 {
642   // tear down old tree
643   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
644   CkAssert(curCookie->pe == CkMyPe());
645   // make sure I am the newest one
646   while (curCookie->newc) curCookie = curCookie->newc;
647   if (curCookie->isObsolete()) return;
648
649   //CmiPrintf("tree rebuild\n");
650   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
651
652   // build a chain
653   newCookie->oldc = curCookie;
654   curCookie->newc = newCookie;
655
656   sectId.get_val() = newCookie;
657
658   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
659
660   curCookie->setObsolete();
661
662   resetCookie(sectId);
663 }
664
665 // mark old cookie spanning tree as old and 
666 // build a new one
667 void CkMulticastMgr::resetCookie(CkSectionInfo s)
668 {
669   mCastEntry *newCookie = (mCastEntry*)s.get_val();
670   mCastEntry *oldCookie = newCookie->oldc;
671
672   // get rid of old one
673   DEBUGF(("reset: oldc: %p\n", oldCookie));
674   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
675   int mype = CkMyPe();
676   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
677
678   // build a new one
679   initCookie(s);
680 }
681
682 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
683 {
684   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
685     // set an invalid cookie since we don't have it
686   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
687   for (int i=0; i< sid._nElems-1; i++) {
688      CProxyElement_ArrayBase ap(a, sid._elems[i]);
689      void *newMsg=CkCopyMsg((void **)&m);
690      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
691   }
692   if (sid._nElems > 0) {
693      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
694      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
695   }
696 }
697
698 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
699 {
700   DEBUGF(("ArraySectionSend\n"));
701
702   multicastGrpMsg *msg = (multicastGrpMsg *)m;
703 //  msg->aid = a;
704   msg->ep = ep;
705
706   CkSectionInfo &s = sid->_cookie;
707   CmiAssert(nsid == 1);
708
709   mCastEntry *entry;
710   if (s.get_pe() == CkMyPe()) {
711     entry = (mCastEntry *)s.get_val();   
712     if (entry == NULL) {
713       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
714     }
715
716     // update entry pointer in case there is a newer one.
717     if (entry->newc) {
718       do { entry=entry->newc; } while (entry->newc);
719       s.get_val() = entry;
720     }
721
722 #if CMK_LBDB_ON
723     // fixme: running obj?
724     envelope *env = UsrToEnv(msg);
725     const LDOMHandle &om = CProxy_ArrayBase(s.aid).ckLocMgr()->getOMHandle();
726     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
727 #endif
728
729     // first time need to rebuild, we do simple send to refresh lastKnown
730     if (entry->needRebuild == 1) {
731       msg->_cookie = s;
732       SimpleSend(ep, msg, s.aid, *sid, opts);
733       entry->needRebuild = 2;
734       return;
735     }
736     else if (entry->needRebuild == 2) rebuild(s);
737   }
738   else {
739     // fixme - in this case, not recorded in LB
740     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
741   }
742
743   // don't need packing here
744 /*
745   register envelope *env = UsrToEnv(m);
746   CkPackMessage(&env);
747   m = EnvToUsr(env);
748 */
749
750   // update cookie
751   msg->_cookie = s;
752
753 #if SPLIT_MULTICAST
754   // split multicast msg into SPLIT_NUM copies
755   register envelope *env = UsrToEnv(m);
756   CkPackMessage(&env);
757   int totalsize = env->getTotalsize();
758   int packetSize = totalsize/SPLIT_NUM;
759   if (totalsize%SPLIT_NUM) packetSize ++;
760   int totalcount = SPLIT_NUM;
761   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
762   int sizesofar = 0;
763   char *data = (char*) env;
764   for (int i=0; i<totalcount; i++) {
765     int mysize = packetSize;
766     if (mysize + sizesofar > totalsize) {
767       mysize = totalsize-sizesofar;
768     }
769     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
770     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
771     sizesofar += mysize;
772     data += mysize;
773   }
774   CmiFree(env);
775 #else
776   if (s.get_pe() == CkMyPe()) {
777     recvMsg(msg);
778   }
779   else {
780     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
781     mCastGrp[s.get_pe()].recvMsg(msg);
782   }
783 #endif
784 }
785
786 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
787 {
788   int i;
789   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
790
791
792   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
793     char *newdata = new char[n];
794     memcpy(newdata, data, n);
795     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
796 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
797     return;
798   }
799
800 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
801
802   // send to spanning tree children
803   // can not optimize using list send because the difference in cookie
804   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
805   for (i=0; i<entry->children.length(); i++) {
806     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
807   }
808
809   if (seqno == 0) {
810     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
811       entry->print();
812       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
813     }
814     entry->asm_msg = (char *)CmiAlloc(totalsize);
815   }
816   memcpy(entry->asm_msg+entry->asm_fill, data, n);
817   entry->asm_fill += n;
818   if (seqno + 1 == count) {
819     CmiAssert(entry->asm_fill == totalsize);
820     CkUnpackMessage((envelope **)&entry->asm_msg);
821     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
822     msg->_cookie = _cookie;
823 //    mCastGrp[CkMyPe()].recvMsg(msg);
824     recvMsg(msg);
825     entry->asm_msg = NULL;
826     entry->asm_fill = 0;
827   }
828 //  if (fromBuffer) delete [] data;
829 }
830
831 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
832 {
833   int i;
834   CkSectionInfo &sectionInfo = msg->_cookie;
835   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
836   CmiAssert(entry->getAid() == sectionInfo.aid);
837
838 #if ! SPLIT_MULTICAST
839   if (entry->notReady()) {
840     DEBUGF(("entry not ready, enq buffer %p\n", msg));
841     entry->msgBuf.enq(msg);
842     return;
843   }
844
845   // send to spanning tree children
846   // can not optimize using list send because the difference in cookie
847   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
848   for (i=0; i<entry->children.length(); i++) {
849     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
850     newmsg->_cookie = entry->children[i];
851     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
852   }
853 #endif
854
855   // send to local
856   int nLocal = entry->localElem.length();
857   DEBUGF(("send to local %d\n", nLocal));
858   for (i=0; i<nLocal-1; i++) {
859     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[i]);
860     if (_entryTable[msg->ep]->noKeep) {
861       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.aid, entry->localElem[i], CK_MSG_KEEP);
862     }
863     else {
864       // send through scheduler queue
865       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
866       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
867     }
868     // use CK_MSG_DONTFREE so that the message can be reused
869     // the drawback of this scheme bypassing queue is that 
870     // if # of local element is huge, this leads to a long time occupying CPU
871     // also load balancer seems not be able to correctly instrument load
872 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
873     //CmiNetworkProgressAfter(3);
874   }
875   if (nLocal) {
876     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[nLocal-1]);
877     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
878 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
879   }
880   else {
881     CkAssert (entry->rootSid.get_pe() == CkMyPe());
882     delete msg;
883   }
884 }
885
886 // user function
887 // to retrieve section info from a multicast msg
888 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
889 {
890   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
891   if (CkMcastBaseMsg::checkMagic(m) == 0) {
892     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
893     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
894   }
895   // ignore invalid cookie sent by SimpleSend
896   if (m->gpe() != -1) {
897     id.type = MulticastMsg;
898     id.get_pe() = m->gpe();
899     id.get_val() = m->cookie();
900   }
901   // note: retain old redNo
902 }
903
904 // Reduction
905
906 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
907 {
908   CkSectionInfo &id = proxy.ckGetSectionInfo();
909   mCastEntry *entry = (mCastEntry *)id.get_val();
910   entry->red.storedCallback = cb;
911 }
912
913 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
914 {
915   CkSectionInfo &id = proxy.ckGetSectionInfo();
916   mCastEntry *entry = (mCastEntry *)id.get_val();
917   entry->red.storedClient = fn;
918   entry->red.storedClientParam = param;
919 }
920
921 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
922 {
923   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
924   msg->reducer = type;
925   msg->sid = id;
926   msg->sourceFlag = 1;   // from array element
927   msg->redNo = id.get_redNo();
928   msg->gcount = 1;
929   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
930   msg->callback = cb;
931   msg->userFlag=userFlag;
932   return msg;
933 }
934
935
936
937 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
938 {
939   CkCallback cb;
940   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
941 }
942
943
944 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
945 {
946   if (id.get_val() == NULL || id.get_redNo() == -1) 
947     CmiAbort("contribute: SectionID is not initialized\n");
948
949   int nFrags;
950   if (-1 == fragSize) {         // no frag
951     nFrags = 1;
952     fragSize = dataSize;
953   }
954   else {
955     CmiAssert (dataSize >= fragSize);
956     nFrags = dataSize/fragSize;
957     if (dataSize%fragSize) nFrags++;
958   }
959
960   if (MAXFRAGS < nFrags) {
961     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
962     CmiAbort ("frag size too small\n");
963   }
964
965   int mpe = id.get_pe();
966   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
967
968   // break the message into k-piece fragments
969   int fSize = fragSize;
970   for (int i=0; i<nFrags; i++) {
971     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
972       fSize = dataSize%fragSize;
973     }
974
975     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
976
977     // initialize the new msg
978     msg->reducer            = type;
979     msg->sid                = id;
980     msg->nFrags             = nFrags;
981     msg->fragNo             = i;
982     msg->sourceFlag         = 1;
983     msg->redNo              = id.get_redNo();
984     msg->gcount             = 1;
985     msg->rebuilt            = (mpe == CkMyPe())?0:1;
986     msg->callback           = cb;
987     msg->userFlag           = userFlag;
988
989     mCastGrp[mpe].recvRedMsg(msg);
990
991     data = (void*)(((char*)data) + fSize);
992   }
993
994   id.get_redNo()++;
995   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
996 }
997
998 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
999                                               mCastEntry* entry,
1000                                               reductionInfo& redInfo) {
1001   int i;
1002   int dataSize = 0;
1003   int nFrags   = redInfo.msgs[0][0]->nFrags;
1004
1005   // to avoid memcpy and allocation cost for non-pipelined reductions
1006   if (1 == nFrags) {
1007     CkReductionMsg* msg = redInfo.msgs[0][0];
1008
1009     // free up the msg slot
1010     redInfo.msgs[0].length() = 0;
1011
1012     return msg;
1013   }
1014
1015   for (i=0; i<nFrags; i++) {
1016     dataSize += redInfo.msgs[i][0]->dataSize;
1017   }
1018
1019   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1020
1021   // initialize msg header
1022   msg->redNo      = redInfo.msgs[0][0]->redNo;
1023   msg->reducer    = redInfo.msgs[0][0]->reducer;
1024   msg->sid        = id;
1025   msg->nFrags     = nFrags;
1026
1027   // I guess following fields need not be initialized
1028   msg->sourceFlag = 2;
1029   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1030   msg->callback   = redInfo.msgs[0][0]->callback;
1031   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1032
1033   byte* data = (byte*)msg->getData ();
1034   for (i=0; i<nFrags; i++) {
1035     // copy data from fragments to msg
1036     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1037     data += redInfo.msgs[i][0]->dataSize;
1038
1039     // free fragments
1040     delete redInfo.msgs[i][0];
1041     redInfo.msgs[i].length() = 0;    
1042   }
1043
1044   return msg;
1045 }
1046
1047 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1048                                      mCastEntry* entry, reductionInfo& redInfo,
1049                                      int& updateReduceNo, int currentTreeUp){
1050
1051   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1052   reductionMsgs& rmsgs = redInfo.msgs[index];
1053   int dataSize         = rmsgs[0]->dataSize;
1054   CkReduction::reducerType reducer = rmsgs[0]->reducer;
1055   int i;
1056   int oldRedNo = redInfo.redNo;
1057   int nFrags   = rmsgs[0]->nFrags;
1058   int fragNo   = rmsgs[0]->fragNo;
1059   int userFlag   = rmsgs[0]->userFlag;
1060                                                                                 
1061   // reduce msgs
1062   CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1063   CkAssert(NULL != f);
1064
1065   // check valid callback in msg and check if migration happened
1066   CkCallback msg_cb;
1067   int rebuilt = 0;
1068   for (i=0; i<rmsgs.length(); i++) {
1069     if (rmsgs[i]->rebuilt) rebuilt = 1;
1070     if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1071   }
1072
1073   CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec()); 
1074   newmsg->redNo  = redInfo.redNo;
1075   newmsg->nFrags = nFrags;
1076   newmsg->fragNo = fragNo;
1077   newmsg->userFlag = userFlag;
1078
1079   // increment num-frags processed
1080   redInfo.npProcessed ++;
1081
1082   // check if migration and free messages
1083   for (i=0; i<rmsgs.length(); i++) {
1084     if (rmsgs[i]!=newmsg) delete rmsgs[i];
1085   }
1086   rmsgs.length() = 0;
1087
1088   if (redInfo.npProcessed == nFrags) {
1089     entry->incReduceNo();
1090     DEBUGF(("Advanced entry:%p redNo: %d\n", entry, entry->red.redNo));
1091   }
1092   if (updateReduceNo) mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1093                                                                                 
1094   if (entry->hasParent()) {
1095     // send up to parent
1096     newmsg->sid        = entry->parentGrp;
1097     newmsg->reducer    = reducer;
1098     newmsg->sourceFlag = 2;
1099     newmsg->redNo      = oldRedNo;
1100     newmsg->gcount     = redInfo.gcount [index];
1101     newmsg->rebuilt    = rebuilt;
1102     newmsg->callback   = msg_cb;
1103     DEBUGF(("send to parent %p: %d\n", entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1104     mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1105   } else { // root
1106     newmsg->sid = id;
1107     // buffer message
1108     rmsgs.push_back (newmsg);
1109
1110     //if (entry->allElem.length() == redInfo.gcount) {
1111     if (redInfo.npProcessed == nFrags) {
1112
1113       newmsg = combineFrags (id, entry, redInfo);
1114
1115       if (!msg_cb.isInvalid()) {
1116         msg_cb.send(newmsg);
1117       }
1118       else if (redInfo.storedCallback != NULL) {
1119         redInfo.storedCallback->send(newmsg);
1120       }
1121       else if (redInfo.storedClient != NULL) {
1122         redInfo.storedClient(id, redInfo.storedClientParam, dataSize,
1123            newmsg->data);
1124         delete newmsg;
1125       }
1126       else
1127         CmiAbort("Did you forget to register a reduction client?");
1128                                                                                 
1129       DEBUGF(("Reduction client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1130       if (currentTreeUp) {
1131         if (entry->oldc) {
1132             // free old tree on same processor;
1133           mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1134           entry->oldc = NULL;
1135         }
1136         if (entry->hasOldtree()) {
1137             // free old tree on old processor
1138           int oldpe = entry->oldtree.pe;
1139           mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1140           entry->oldtree.clear();
1141         }
1142       }
1143       if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1144     }
1145   }
1146 }
1147
1148 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1149 {
1150   int i;
1151   CkSectionInfo id = msg->sid;
1152   mCastEntry *entry = (mCastEntry *)id.get_val();
1153   CmiAssert(entry!=NULL);
1154 //CmiPrintf("[%d] recvRedMsg: entry: %p\n", CkMyPe(), entry);
1155
1156   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1157
1158   int updateReduceNo = 0;
1159
1160   // update entry if obsolete
1161   if (entry->isObsolete()) {
1162       // send up to root
1163     DEBUGF(("[%d] entry obsolete-send to root %d\n", CkMyPe(), entry->rootSid.pe));
1164     if (!entry->hasParent()) { //rootSid.pe == CkMyPe()
1165       // I am root, set to the new cookie if there is
1166       mCastEntry *newentry = entry->newc;
1167       while (newentry && newentry->newc) newentry=newentry->newc;
1168       if (newentry) entry = newentry;
1169       CmiAssert(entry!=NULL);
1170     }
1171     if (!entry->hasParent() && !entry->isObsolete()) {
1172        // root find the latest cookie that is not obsolete
1173       msg->sourceFlag = 0;       // indicate it is not on old spanning tree
1174       updateReduceNo = 1;        // reduce from old tree, new entry need update.
1175     }
1176     else {
1177       CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1178       // entry is obsolete, send to root directly
1179       msg->sid = entry->rootSid;
1180
1181       msg->sourceFlag = 0;
1182       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1183       return;
1184     }
1185   }
1186
1187   reductionInfo &redInfo = entry->red;
1188
1189   DEBUGF(("[%d] msg %p red:%d, entry:%p redno:%d\n", CkMyPe(), msg, msg->redNo, entry, entry->red.redNo));
1190   // old message come, ignore
1191   if (msg->redNo < redInfo.redNo) {
1192     CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1193     CmiAbort("Could never happen! \n");
1194   }
1195   if (entry->notReady() || msg->redNo > redInfo.redNo) {
1196     DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1197     redInfo.futureMsgs.push_back(msg);
1198     return;
1199   }
1200
1201   DEBUGF(("[%d] recvRedMsg rebuilt:%d red:%d\n", CkMyPe(), msg->rebuilt, redInfo.redNo));
1202
1203   const int index = msg->fragNo;
1204
1205   // buffer this msg
1206   if (msg->sourceFlag == 1) {
1207     // new reduction message from ArrayElement
1208     redInfo.lcount [index] ++;
1209   }
1210
1211   if (msg->sourceFlag == 2) {
1212     redInfo.ccount [index] ++;
1213   }
1214
1215   redInfo.gcount [index] += msg->gcount;
1216
1217   // buffer the msg
1218   // first check if message is of proper size
1219   if ((0 != redInfo.msgs[index].length()) && 
1220       (msg->dataSize != (redInfo.msgs [index][0]->dataSize))) {
1221     CmiAbort("Reduction data are not of same length!");
1222   }
1223
1224   redInfo.msgs [index].push_back(msg);
1225
1226   const int numFragsRcvd = redInfo.msgs [index].length();
1227
1228   DEBUGF(("[%d] index:%d lcount:%d-%d, ccount:%d-%d, gcount:%d-%d root:%d\n", CkMyPe(),index, entry->red.lcount[index],entry->localElem.length(), entry->red.ccount[index], entry->children.length(), entry->red.gcount[index], entry->allElem.length(), !entry->hasParent()));
1229
1230   int currentTreeUp = 0;
1231   if (redInfo.lcount [index] == entry->localElem.length() &&
1232       redInfo.ccount [index] == entry->children.length())
1233       currentTreeUp = 1;
1234
1235   int mixTreeUp = 0;
1236   const int numElems = entry->allElem.length();
1237   
1238   if (!entry->hasParent()) {
1239     mixTreeUp = 1;
1240     for (int i=0; i<msg->nFrags; i++) {
1241       if (entry->allElem.length() != redInfo.gcount [i]) {
1242         mixTreeUp = 0;
1243       }
1244     }
1245   }
1246
1247   if (currentTreeUp || mixTreeUp)
1248   {
1249     const int nFrags = msg->nFrags;  
1250     
1251     // msg from children contain only one fragment
1252     reduceFragment (index, id, entry, redInfo, updateReduceNo, 
1253                     currentTreeUp);
1254
1255     if (redInfo.npProcessed == nFrags) {
1256       // reset counters
1257       for (i=0; i<nFrags; i++) {
1258         redInfo.lcount [i] = 0;
1259         redInfo.ccount [i] = 0;
1260         redInfo.gcount [i] = 0;
1261       }
1262       redInfo.npProcessed = 0;
1263
1264       // release future msgs
1265       releaseFutureReduceMsgs(entry);
1266     }
1267   }
1268 }
1269
1270 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1271 {
1272   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1273
1274   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1275     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1276     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1277   }
1278   entry->red.futureMsgs.length() = 0;
1279 }
1280
1281 // these messages have to be sent to root
1282 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1283 {
1284   int i;
1285   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1286
1287   for (int j=0; j<MAXFRAGS; j++) {
1288     for (i=0; i<entry->red.msgs[j].length(); i++) {
1289       CkReductionMsg *msg = entry->red.msgs[j][i];
1290       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1291       msg->sid = entry->rootSid;
1292       msg->sourceFlag = 0;
1293       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1294     }
1295     entry->red.msgs[j].length() = 0;
1296   }
1297
1298
1299   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1300     CkReductionMsg *msg = entry->red.futureMsgs[i];
1301     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1302     msg->sid = entry->rootSid;
1303     msg->sourceFlag = 0;
1304     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1305   }
1306   entry->red.futureMsgs.length() = 0;
1307 }
1308
1309 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1310 {
1311   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1312   if (entry->red.redNo < red)
1313     entry->red.redNo = red;
1314
1315   CProxy_CkMulticastMgr mp(thisgroup);
1316   for (int i=0; i<entry->children.length(); i++) {
1317     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1318   }
1319
1320   releaseFutureReduceMsgs(entry);
1321 }
1322
1323 #if 0
1324 ////////////////////////////////////////////////////////////////////////////////
1325 /////
1326 ///////////////// Builtin Reducer Functions //////////////
1327 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1328 {CkAbort("ERROR! Called the invalid reducer!\n");return NULL;}
1329
1330 /* A simple reducer, like sum_int, looks like this:
1331 static CkReductionMsg *sum_int(int nMsg,CkReductionMsg **msg)
1332 {
1333   int i,ret=0;
1334   for (i=0;i<nMsg;i++)
1335     ret+=*(int *)(msg[i]->data);
1336   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1337 }
1338 */
1339
1340 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1341 static CkReductionMsg *name(int nMsg, CkReductionMsg **msg)\
1342 {\
1343   int m,i;\
1344   int nElem=msg[0]->getSize()/sizeof(dataType);\
1345   dataType *ret=(dataType *)(msg[0]->getData());\
1346   for (m=1;m<nMsg;m++)\
1347   {\
1348     dataType *value=(dataType *)(msg[m]->getData());\
1349     for (i=0;i<nElem;i++)\
1350     {\
1351       loop\
1352     }\
1353   }\
1354   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret);\
1355 }
1356
1357 //Use this macro for reductions that have the same type for all inputs
1358 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1359   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1360   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1361   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1362
1363
1364 //Compute the sum the numbers passed by each element.
1365 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1366
1367 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1368
1369 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1370
1371 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1372
1373 CkReduction::reducerFn CkMulticastMgr::reducerTable[CkMulticastMgr::MAXREDUCERS]={
1374     ::invalid_reducer,
1375   //Compute the sum the numbers passed by each element.
1376     ::sum_int,::sum_float,::sum_double,
1377     ::product_int,::product_float,::product_double,
1378     ::max_int,::max_float,::max_double,
1379     ::min_int,::min_float,::min_double
1380 };
1381 #endif
1382
1383 #include "CkMulticast.def.h"