Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19
20 #define DEBUGF(x)  // CkPrintf x;
21
22 // turn on or off fragmentation in multicast
23 #define SPLIT_MULTICAST 0
24 // each multicast message is split into SPLIT_NUM fragments
25 #define SPLIT_NUM 2
26
27 // maximum number of fragments into which a message can be broken
28 #define MAXFRAGS 5
29
30 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
31 typedef CkVec<CkArrayIndexMax>   arrayIndexList;
32 typedef CkVec<CkSectionInfo>     sectionIdList;
33 typedef CkVec<CkReductionMsg *>  reductionMsgs;
34 typedef CkQ<int>                 PieceSize;
35 typedef CkVec<LDObjid>          ObjKeyList;
36 typedef unsigned char            byte;
37
38 class reductionInfo {
39 public:
40   int            lcount [MAXFRAGS]; /**< local elem collected */
41   int            ccount [MAXFRAGS]; /**< children node collected */
42   int            gcount [MAXFRAGS]; /**< total elem collected */
43   int            npProcessed;
44   CkCallback*    storedCallback;    /**< user callback */
45   redClientFn    storedClient;      /**< reduction client function */
46   void*          storedClientParam; /**< user provided data */
47   int            redNo;             /**< reduction sequence number */
48   reductionMsgs  msgs [MAXFRAGS];   /**< messages for this reduction */
49   reductionMsgs  futureMsgs;        /**< messages of future reductions */
50 public:
51   reductionInfo(): storedCallback(NULL), storedClientParam(NULL), redNo(0),
52                    npProcessed(0) {
53     for (int i=0; i<MAXFRAGS; i++) 
54       lcount [i] = ccount [i] = gcount [i] = 0;
55   }
56 };
57
58 /// cookie status
59 #define COOKIE_NOTREADY 0
60 #define COOKIE_READY    1
61 #define COOKIE_OLD      2
62
63 class mCastPacket {
64 public:
65   CkSectionInfo cookie;
66   int n;
67   char *data;
68   int seqno;
69   int count;
70   int totalsize;
71
72   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
73                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
74 };
75
76 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
77
78 class SectionLocation {
79 public:
80   mCastEntry *entry;
81   int         pe;
82 public:
83   SectionLocation(): entry(NULL), pe(-1) {}
84   SectionLocation( mCastEntry *e, int p) { set(e, p); }
85   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
86   inline void clear() { entry = NULL; pe = -1; }
87 };
88
89
90
91
92 /// Cookie for an array section 
93 class mCastEntry 
94 {
95     public:
96         /// Array ID 
97         CkArrayID     aid;
98         /// Spanning tree parent
99         CkSectionInfo parentGrp;
100         /// List of direct children
101         sectionIdList children;
102         /// Number of direct children
103         int numChild;
104         /// List of all tree member array indices (Only useful on the tree root)
105         arrayIndexList allElem;
106         /// Only useful on root for LB
107         ObjKeyList     allObjKeys;
108         /// List of array elements on local PE
109         arrayIndexList localElem;
110         /// Should always be myPE
111         int pe;
112         /// Section ID of the root
113         CkSectionInfo rootSid;
114         multicastGrpMsgBuf msgBuf;
115         /// Buffer storing the pending packets
116         multicastGrpPacketBuf packetBuf;
117         /// For multicast packetization
118         char *asm_msg;
119         int   asm_fill;
120         /// Linked list of entries on the same processor
121         mCastEntry *oldc, *newc;
122         /// Old spanning tree
123         SectionLocation   oldtree;
124         // for reduction
125         reductionInfo red;
126         //
127         char needRebuild;
128     private:
129         char flag;
130     public:
131         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
132                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
133         mCastEntry(mCastEntry *);
134         /// Check if this tree is only a branch and has a parent
135         inline int hasParent() { return parentGrp.get_val()?1:0; }
136         /// Is this tree obsolete?
137         inline int isObsolete() { return (flag == COOKIE_OLD); }
138         /// Make the current tree obsolete
139         inline void setObsolete() { flag=COOKIE_OLD; }
140         /// Check if this (branch of the) tree is ready for use
141         inline int notReady() { return (flag == COOKIE_NOTREADY); }
142         /// Mark this (branch of the) tree as ready for use
143         inline void setReady() { flag=COOKIE_READY; }
144         /// Increment the reduction number for all the section members on this PE
145         inline void incReduceNo() {
146             red.redNo ++;
147             for (mCastEntry *next = newc; next; next=next->newc) 
148                 next->red.redNo++;
149         }
150         /// Get a handle on the array ID this tree is a member of
151         inline CkArrayID getAid() { return aid; }
152         inline int hasOldtree() { return oldtree.entry != NULL; }
153         inline void print() {
154             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
155         }
156 };
157
158
159
160
161 class cookieMsg: public CMessage_cookieMsg {
162 public:
163   CkSectionInfo cookie;
164 public:
165   cookieMsg() {};
166   cookieMsg(CkSectionInfo m): cookie(m) {};
167 };
168
169
170
171
172 /// multicast tree setup message
173 class multicastSetupMsg: public CMessage_multicastSetupMsg {
174 public:
175   int  nIdx;
176   CkArrayIndexMax *arrIdx;
177   int      *lastKnown;
178   CkSectionInfo parent;
179   CkSectionInfo rootSid;
180   int redNo;
181 };
182
183
184
185
186 /// message send in spanning tree
187 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
188 };
189
190
191 extern void CkPackMessage(envelope **pEnv);
192 extern void CkUnpackMessage(envelope **pEnv);
193
194
195
196 void _ckMulticastInit(void)
197 {
198 /*
199   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
200   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
201 */
202 }
203
204
205 mCastEntry::mCastEntry (mCastEntry *old): 
206   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
207 {
208   int i;
209   aid = old->aid;
210   parentGrp = old->parentGrp;
211   for (i=0; i<old->allElem.length(); i++)
212     allElem.push_back(old->allElem[i]);
213 #if CMK_LBDB_ON
214   CmiAssert(old->allElem.length() == old->allObjKeys.length());
215   for (i=0; i<old->allObjKeys.length(); i++)
216     allObjKeys.push_back(old->allObjKeys[i]);
217 #endif
218   pe = old->pe;
219   red.storedCallback = old->red.storedCallback;
220   red.storedClient = old->red.storedClient;
221   red.storedClientParam = old->red.storedClientParam;
222   red.redNo = old->red.redNo;
223   needRebuild = 0;
224   asm_msg = NULL;
225   asm_fill = 0;
226 }
227
228 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
229
230
231
232
233 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndexMax *al, int n)
234 {
235     // Create a multicast entry
236     mCastEntry *entry = new mCastEntry(aid);
237     // Push all the section member indices into the entry
238     for (int i=0; i<n; i++) {
239         entry->allElem.push_back(al[i]);
240 #if CMK_LBDB_ON
241         const LDObjid key = idx2LDObjid(al[i]);
242         entry->allObjKeys.push_back(key);
243 #endif
244     }
245     //  entry->aid = aid;
246     _id.aid = aid;
247     _id.get_val() = entry;              // allocate table for this section
248     // 
249     initCookie(_id);
250 }
251
252
253
254
255 void CkMulticastMgr::setSection(CkSectionInfo &id)
256 {
257   initCookie(id);
258 }
259
260
261
262
263 /// @warning: This is deprecated
264 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
265 {
266   CkArrayID aid = proxy.ckGetArrayID();
267   CkSectionInfo &_id = proxy.ckGetSectionInfo();
268
269   mCastEntry *entry = new mCastEntry(aid);
270
271   const CkArrayIndexMax *al = proxy.ckGetArrayElements();
272   for (int i=0; i<proxy.ckGetNumElements(); i++) {
273     entry->allElem.push_back(al[i]);
274 #if CMK_LBDB_ON
275     const LDObjid key = idx2LDObjid(al[i]);
276     entry->allObjKeys.push_back(key);
277 #endif
278   }
279   _id.type = MulticastMsg;
280   _id.aid = aid;
281   _id.get_val() = entry;                // allocate table for this section
282   initCookie(_id);
283 }
284
285
286
287
288 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
289 {
290   CkSectionInfo &info = proxy.ckGetSectionInfo();
291
292   int oldpe = info.get_pe();
293   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
294
295   CkArrayID aid = proxy.ckGetArrayID();
296   CkSectionID *sid = proxy.ckGetSectionIDs();
297   mCastEntry *entry = new mCastEntry(aid);
298
299   mCastEntry *oldentry = (mCastEntry *)info.get_val();
300   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
301
302   const CkArrayIndexMax *al = sid->_elems;
303   CmiAssert(info.aid == aid);
304   prepareCookie(entry, *sid, al, sid->_nElems, aid);
305
306   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
307
308     // store old tree info
309   entry->oldtree.set(oldentry, oldpe);
310
311     // obsolete old tree
312   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
313
314   // find reduction number
315   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
316 }
317
318
319
320
321 // prepare a mCastEntry entry and set up in CkSectionID
322 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndexMax *al, int count, CkArrayID aid)
323 {
324   for (int i=0; i<count; i++) {
325     entry->allElem.push_back(al[i]);
326 #if CMK_LBDB_ON
327     const LDObjid key = idx2LDObjid(al[i]);
328     entry->allObjKeys.push_back(key);
329 #endif
330   }
331   sid._cookie.type = MulticastMsg;
332   sid._cookie.aid = aid;
333   sid._cookie.get_val() = entry;        // allocate table for this section
334   sid._cookie.get_pe() = CkMyPe();
335 }
336
337
338
339
340 // this is used
341 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
342 {
343   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
344   CkArrayID aid = proxy->ckGetArrayID();
345   CkSectionID *sid = proxy->ckGetSectionIDs();
346
347   mCastEntry *entry = new mCastEntry(aid);
348
349   const CkArrayIndexMax *al = proxy->ckGetArrayElements();
350   prepareCookie(entry, *sid, al, proxy->ckGetNumElements(), aid);
351   initCookie(sid->_cookie);
352 }
353
354
355
356
357 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
358 {
359   mCastEntry *entry = (mCastEntry *)s.get_val();
360   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
361   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
362 }
363
364 // now that we get reduction number from the old cookie,
365 // we continue to build the spanning tree
366 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
367 {
368   mCastEntry *entry = (mCastEntry *)s.get_val();
369   entry->red.redNo = red;
370
371   initCookie(s);
372
373   // TODO delete old tree
374 }
375
376
377
378
379 void CkMulticastMgr::initCookie(CkSectionInfo s)
380 {
381     mCastEntry *entry = (mCastEntry *)s.get_val();
382     int n = entry->allElem.length();
383     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
384     // Create and initialize a setup message
385     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
386     msg->nIdx = n;
387     msg->parent = CkSectionInfo(entry->getAid());
388     msg->rootSid = s;
389     msg->redNo = entry->red.redNo;
390     // Fill the message with the section member indices and their last known locations
391     CkArray *array = CProxy_ArrayBase(s.aid).ckLocalBranch();
392     for (int i=0; i<n; i++) {
393       msg->arrIdx[i] = entry->allElem[i];
394       int ape = array->lastKnown(entry->allElem[i]);
395       CmiAssert(ape >=0 && ape < CkNumPes());
396       msg->lastKnown[i] = ape;
397     }
398     // Trigger the spanning tree build
399     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
400     mCastGrp[CkMyPe()].setup(msg);
401 }
402
403
404
405
406 void CkMulticastMgr::teardown(CkSectionInfo cookie)
407 {
408     int i;
409     mCastEntry *sect = (mCastEntry *)cookie.get_val();
410     // Mark this section as obsolete
411     sect->setObsolete();
412     // Release the buffered messages 
413     releaseBufferedReduceMsgs(sect);
414     // Propagate the teardown to each of your children
415     CProxy_CkMulticastMgr mp(thisgroup);
416     for (i=0; i<sect->children.length(); i++)
417         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
418 }
419
420
421
422
423 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
424 {
425     int i;
426     mCastEntry *sect = (mCastEntry *)cookie.get_val();
427     // Reset the root section info
428     sect->rootSid = newroot;
429     // Mark this section as obsolete
430     sect->setObsolete();
431     // Release the buffered messages 
432     releaseBufferedReduceMsgs(sect);
433     // Propagate the teardown to each of your children
434     CProxy_CkMulticastMgr mp(thisgroup);
435     for (i=0; i<sect->children.length(); i++)
436         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
437 }
438
439
440
441
442 void CkMulticastMgr::freeup(CkSectionInfo cookie)
443 {
444   mCastEntry *sect = (mCastEntry *)cookie.get_val();
445   CProxy_CkMulticastMgr mp(thisgroup);
446   // Parse through all the section members on this PE and...
447   while (sect) 
448   {
449       // Free their children
450       for (int i=0; i<sect->children.length(); i++)
451           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
452       // Free the cookie itself
453       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
454       mCastEntry *oldc= sect->oldc;
455       delete sect;
456       sect = oldc;
457   }
458 }
459
460
461
462
463 void CkMulticastMgr::setup(multicastSetupMsg *msg)
464 {
465     int i,j;
466     mCastEntry *entry;
467     CkArrayID aid = msg->rootSid.aid;
468     if (msg->parent.get_pe() == CkMyPe()) 
469       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
470     else 
471       entry = new mCastEntry(aid);
472     entry->aid = aid;
473     entry->pe = CkMyPe();
474     entry->rootSid = msg->rootSid;
475     entry->parentGrp = msg->parent;
476     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
477     entry->red.redNo = msg->redNo;
478
479     // Create a numPE sized array of vectors to hold the array elements in each PE
480     int numpes = CkNumPes();
481     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
482     // Sort each array index in the setup message based on last known location
483     for (i=0; i<msg->nIdx; i++) 
484     {
485       int lastKnown = msg->lastKnown[i];
486       // If msg->arrIdx[i] local, add it to a special local element list
487       if (lastKnown == CkMyPe())
488           entry->localElem.insertAtEnd(msg->arrIdx[i]);
489       // else, add it to the list corresponding to its PE
490       else
491           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
492     }
493
494     CkVec<int> mySubTreePEs;
495     mySubTreePEs.reserve(numpes);
496     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
497     mySubTreePEs.push_back(CkMyPe());
498     // Identify the child PEs in the tree, ie the PEs with section members on them
499     for (i=0; i<numpes; i++) 
500     {
501       if (i==CkMyPe()) continue;
502       if (lists[i].size()) 
503           mySubTreePEs.push_back(i);
504     }
505     // The number of multicast children can be limited by the spanning tree factor 
506     int num = mySubTreePEs.size() - 1, numchild = 0;
507     if (factor <= 0) numchild = num;
508     else numchild = num<factor?num:factor;
509   
510     entry->numChild = numchild;
511
512     // If there are any children, go about building a spanning tree
513     if (numchild) 
514     {
515         // Build the next generation of the spanning tree rooted at my PE
516         int *peListPtr = mySubTreePEs.getVec();
517         topo::SpanningTreeVertex *nextGenInfo;
518         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
519         CkAssert(nextGenInfo->childIndex.size() == numchild);
520
521         // Distribute the section members across the number of direct children (branches)
522         // Direct children are simply the first section member in each of the branch lists
523         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
524
525         // For each direct child, collate indices of all section members on the PEs in that branch
526         for (int i=0; i < numchild; i++)
527         {
528             // Determine the indices of the first and last PEs in this branch of my sub-tree
529             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
530             if (i < numchild-1)
531                 childEndIndex = nextGenInfo->childIndex[i+1];
532             else
533                 childEndIndex = mySubTreePEs.size();
534             // For each PE in this branch, add the section members on that PE to a list
535             for (int j = childStartIndex; j < childEndIndex; j++)
536             {
537                 int pe = mySubTreePEs[j];
538                 for (int k=0; k<lists[pe].size(); k++)
539                     slots[i].push_back(lists[pe][k]);
540             }
541         }
542
543         // Ask each of your direct children to setup their branches
544         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
545         for (i=0; i<numchild; i++) 
546         {
547             // Give each child info about the number, indices and location of its children
548             int n = slots[i].length();
549             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
550             m->parent = CkSectionInfo(aid, entry);
551             m->nIdx = slots[i].length();
552             m->rootSid = msg->rootSid;
553             m->redNo = msg->redNo;
554             for (j=0; j<slots[i].length(); j++) 
555             {
556                 m->arrIdx[j] = slots[i][j].idx;
557                 m->lastKnown[j] = slots[i][j].pe;
558             }
559             int childroot = slots[i][0].pe;
560             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
561             // Send the message to the child
562             mCastGrp[childroot].setup(m);
563         }
564         delete [] slots;
565         delete nextGenInfo;
566     }
567     // else, tell yourself that your children are ready
568     else 
569     {
570         childrenReady(entry);
571     }
572     delete [] lists;
573 }
574
575
576
577
578 void CkMulticastMgr::childrenReady(mCastEntry *entry)
579 {
580     // Mark this entry as ready
581     entry->setReady();
582     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
583     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
584     if (entry->hasParent()) 
585         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
586 #if SPLIT_MULTICAST
587     // clear packet buffer
588     while (!entry->packetBuf.isEmpty()) 
589     {
590         mCastPacket *packet = entry->packetBuf.deq();
591         packet->cookie.get_val() = entry;
592         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
593         delete [] packet->data;
594         delete packet;
595     }
596 #else
597     // clear msg buffer
598     while (!entry->msgBuf.isEmpty()) 
599     {
600         multicastGrpMsg *newmsg = entry->msgBuf.deq();
601         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
602         newmsg->_cookie.get_val() = entry;
603         mCastGrp[CkMyPe()].recvMsg(newmsg);
604     }
605 #endif
606     // release reduction msgs
607     releaseFutureReduceMsgs(entry);
608 }
609
610
611
612
613 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
614 {
615   mCastEntry *entry = (mCastEntry *)sid.get_val();
616   entry->children.push_back(child);
617   if (entry->children.length() == entry->numChild) {
618     childrenReady(entry);
619   }
620 }
621
622
623
624
625 // rebuild is called when root not migrated
626 // when rebuilding, all multicast msgs will be buffered.
627 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
628 {
629   // tear down old tree
630   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
631   CkAssert(curCookie->pe == CkMyPe());
632   // make sure I am the newest one
633   while (curCookie->newc) curCookie = curCookie->newc;
634   if (curCookie->isObsolete()) return;
635
636   //CmiPrintf("tree rebuild\n");
637   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
638
639   // build a chain
640   newCookie->oldc = curCookie;
641   curCookie->newc = newCookie;
642
643   sectId.get_val() = newCookie;
644
645   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
646
647   curCookie->setObsolete();
648
649   resetCookie(sectId);
650 }
651
652 // mark old cookie spanning tree as old and 
653 // build a new one
654 void CkMulticastMgr::resetCookie(CkSectionInfo s)
655 {
656   mCastEntry *newCookie = (mCastEntry*)s.get_val();
657   mCastEntry *oldCookie = newCookie->oldc;
658
659   // get rid of old one
660   DEBUGF(("reset: oldc: %p\n", oldCookie));
661   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
662   int mype = CkMyPe();
663   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
664
665   // build a new one
666   initCookie(s);
667 }
668
669 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
670 {
671   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
672     // set an invalid cookie since we don't have it
673   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
674   for (int i=0; i< sid._nElems-1; i++) {
675      CProxyElement_ArrayBase ap(a, sid._elems[i]);
676      void *newMsg=CkCopyMsg((void **)&m);
677      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
678   }
679   if (sid._nElems > 0) {
680      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
681      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
682   }
683 }
684
685 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
686 {
687   DEBUGF(("ArraySectionSend\n"));
688
689   multicastGrpMsg *msg = (multicastGrpMsg *)m;
690 //  msg->aid = a;
691   msg->ep = ep;
692
693   CkSectionInfo &s = sid->_cookie;
694   CmiAssert(nsid == 1);
695
696   mCastEntry *entry;
697   if (s.get_pe() == CkMyPe()) {
698     entry = (mCastEntry *)s.get_val();   
699     if (entry == NULL) {
700       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
701     }
702
703     // update entry pointer in case there is a newer one.
704     if (entry->newc) {
705       do { entry=entry->newc; } while (entry->newc);
706       s.get_val() = entry;
707     }
708
709 #if CMK_LBDB_ON
710     // fixme: running obj?
711     envelope *env = UsrToEnv(msg);
712     const LDOMHandle &om = CProxy_ArrayBase(s.aid).ckLocMgr()->getOMHandle();
713     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
714 #endif
715
716     // first time need to rebuild, we do simple send to refresh lastKnown
717     if (entry->needRebuild == 1) {
718       msg->_cookie = s;
719       SimpleSend(ep, msg, s.aid, *sid, opts);
720       entry->needRebuild = 2;
721       return;
722     }
723     else if (entry->needRebuild == 2) rebuild(s);
724   }
725   else {
726     // fixme - in this case, not recorded in LB
727     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
728   }
729
730   // don't need packing here
731 /*
732   register envelope *env = UsrToEnv(m);
733   CkPackMessage(&env);
734   m = EnvToUsr(env);
735 */
736
737   // update cookie
738   msg->_cookie = s;
739
740 #if SPLIT_MULTICAST
741   // split multicast msg into SPLIT_NUM copies
742   register envelope *env = UsrToEnv(m);
743   CkPackMessage(&env);
744   int totalsize = env->getTotalsize();
745   int packetSize = totalsize/SPLIT_NUM;
746   if (totalsize%SPLIT_NUM) packetSize ++;
747   int totalcount = SPLIT_NUM;
748   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
749   int sizesofar = 0;
750   char *data = (char*) env;
751   for (int i=0; i<totalcount; i++) {
752     int mysize = packetSize;
753     if (mysize + sizesofar > totalsize) {
754       mysize = totalsize-sizesofar;
755     }
756     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
757     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
758     sizesofar += mysize;
759     data += mysize;
760   }
761   CmiFree(env);
762 #else
763   if (s.get_pe() == CkMyPe()) {
764     recvMsg(msg);
765   }
766   else {
767     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
768     mCastGrp[s.get_pe()].recvMsg(msg);
769   }
770 #endif
771 }
772
773 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
774 {
775   int i;
776   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
777
778
779   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
780     char *newdata = new char[n];
781     memcpy(newdata, data, n);
782     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
783 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
784     return;
785   }
786
787 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
788
789   // send to spanning tree children
790   // can not optimize using list send because the difference in cookie
791   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
792   for (i=0; i<entry->children.length(); i++) {
793     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
794   }
795
796   if (seqno == 0) {
797     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
798       entry->print();
799       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
800     }
801     entry->asm_msg = (char *)CmiAlloc(totalsize);
802   }
803   memcpy(entry->asm_msg+entry->asm_fill, data, n);
804   entry->asm_fill += n;
805   if (seqno + 1 == count) {
806     CmiAssert(entry->asm_fill == totalsize);
807     CkUnpackMessage((envelope **)&entry->asm_msg);
808     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
809     msg->_cookie = _cookie;
810 //    mCastGrp[CkMyPe()].recvMsg(msg);
811     recvMsg(msg);
812     entry->asm_msg = NULL;
813     entry->asm_fill = 0;
814   }
815 //  if (fromBuffer) delete [] data;
816 }
817
818 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
819 {
820   int i;
821   CkSectionInfo &sectionInfo = msg->_cookie;
822   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
823   CmiAssert(entry->getAid() == sectionInfo.aid);
824
825 #if ! SPLIT_MULTICAST
826   if (entry->notReady()) {
827     DEBUGF(("entry not ready, enq buffer %p\n", msg));
828     entry->msgBuf.enq(msg);
829     return;
830   }
831
832   // send to spanning tree children
833   // can not optimize using list send because the difference in cookie
834   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
835   for (i=0; i<entry->children.length(); i++) {
836     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
837     newmsg->_cookie = entry->children[i];
838     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
839   }
840 #endif
841
842   // send to local
843   int nLocal = entry->localElem.length();
844   DEBUGF(("send to local %d\n", nLocal));
845   for (i=0; i<nLocal-1; i++) {
846     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[i]);
847     if (_entryTable[msg->ep]->noKeep) {
848       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.aid, entry->localElem[i], CK_MSG_KEEP);
849     }
850     else {
851       // send through scheduler queue
852       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
853       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
854     }
855     // use CK_MSG_DONTFREE so that the message can be reused
856     // the drawback of this scheme bypassing queue is that 
857     // if # of local element is huge, this leads to a long time occupying CPU
858     // also load balancer seems not be able to correctly instrument load
859 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
860     //CmiNetworkProgressAfter(3);
861   }
862   if (nLocal) {
863     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[nLocal-1]);
864     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
865 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
866   }
867   else {
868     CkAssert (entry->rootSid.get_pe() == CkMyPe());
869     delete msg;
870   }
871 }
872
873 // user function
874 // to retrieve section info from a multicast msg
875 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
876 {
877   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
878   if (CkMcastBaseMsg::checkMagic(m) == 0) {
879     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
880     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
881   }
882   // ignore invalid cookie sent by SimpleSend
883   if (m->gpe() != -1) {
884     id.type = MulticastMsg;
885     id.get_pe() = m->gpe();
886     id.get_val() = m->cookie();
887   }
888   // note: retain old redNo
889 }
890
891 // Reduction
892
893 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
894 {
895   CkSectionInfo &id = proxy.ckGetSectionInfo();
896   mCastEntry *entry = (mCastEntry *)id.get_val();
897   entry->red.storedCallback = cb;
898 }
899
900 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
901 {
902   CkSectionInfo &id = proxy.ckGetSectionInfo();
903   mCastEntry *entry = (mCastEntry *)id.get_val();
904   entry->red.storedClient = fn;
905   entry->red.storedClientParam = param;
906 }
907
908 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
909 {
910   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
911   msg->reducer = type;
912   msg->sid = id;
913   msg->sourceFlag = 1;   // from array element
914   msg->redNo = id.get_redNo();
915   msg->gcount = 1;
916   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
917   msg->callback = cb;
918   msg->userFlag=userFlag;
919   return msg;
920 }
921
922
923
924 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
925 {
926   CkCallback cb;
927   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
928 }
929
930
931 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
932 {
933   if (id.get_val() == NULL || id.get_redNo() == -1) 
934     CmiAbort("contribute: SectionID is not initialized\n");
935
936   int nFrags;
937   if (-1 == fragSize) {         // no frag
938     nFrags = 1;
939     fragSize = dataSize;
940   }
941   else {
942     CmiAssert (dataSize >= fragSize);
943     nFrags = dataSize/fragSize;
944     if (dataSize%fragSize) nFrags++;
945   }
946
947   if (MAXFRAGS < nFrags) {
948     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
949     CmiAbort ("frag size too small\n");
950   }
951
952   int mpe = id.get_pe();
953   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
954
955   // break the message into k-piece fragments
956   int fSize = fragSize;
957   for (int i=0; i<nFrags; i++) {
958     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
959       fSize = dataSize%fragSize;
960     }
961
962     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
963
964     // initialize the new msg
965     msg->reducer            = type;
966     msg->sid                = id;
967     msg->nFrags             = nFrags;
968     msg->fragNo             = i;
969     msg->sourceFlag         = 1;
970     msg->redNo              = id.get_redNo();
971     msg->gcount             = 1;
972     msg->rebuilt            = (mpe == CkMyPe())?0:1;
973     msg->callback           = cb;
974     msg->userFlag           = userFlag;
975
976     mCastGrp[mpe].recvRedMsg(msg);
977
978     data = (void*)(((char*)data) + fSize);
979   }
980
981   id.get_redNo()++;
982   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
983 }
984
985 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
986                                               mCastEntry* entry,
987                                               reductionInfo& redInfo) {
988   int i;
989   int dataSize = 0;
990   int nFrags   = redInfo.msgs[0][0]->nFrags;
991
992   // to avoid memcpy and allocation cost for non-pipelined reductions
993   if (1 == nFrags) {
994     CkReductionMsg* msg = redInfo.msgs[0][0];
995
996     // free up the msg slot
997     redInfo.msgs[0].length() = 0;
998
999     return msg;
1000   }
1001
1002   for (i=0; i<nFrags; i++) {
1003     dataSize += redInfo.msgs[i][0]->dataSize;
1004   }
1005
1006   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1007
1008   // initialize msg header
1009   msg->redNo      = redInfo.msgs[0][0]->redNo;
1010   msg->reducer    = redInfo.msgs[0][0]->reducer;
1011   msg->sid        = id;
1012   msg->nFrags     = nFrags;
1013
1014   // I guess following fields need not be initialized
1015   msg->sourceFlag = 2;
1016   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1017   msg->callback   = redInfo.msgs[0][0]->callback;
1018   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1019
1020   byte* data = (byte*)msg->getData ();
1021   for (i=0; i<nFrags; i++) {
1022     // copy data from fragments to msg
1023     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1024     data += redInfo.msgs[i][0]->dataSize;
1025
1026     // free fragments
1027     delete redInfo.msgs[i][0];
1028     redInfo.msgs[i].length() = 0;    
1029   }
1030
1031   return msg;
1032 }
1033
1034 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1035                                      mCastEntry* entry, reductionInfo& redInfo,
1036                                      int& updateReduceNo, int currentTreeUp){
1037
1038   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1039   reductionMsgs& rmsgs = redInfo.msgs[index];
1040   int dataSize         = rmsgs[0]->dataSize;
1041   CkReduction::reducerType reducer = rmsgs[0]->reducer;
1042   int i;
1043   int oldRedNo = redInfo.redNo;
1044   int nFrags   = rmsgs[0]->nFrags;
1045   int fragNo   = rmsgs[0]->fragNo;
1046   int userFlag   = rmsgs[0]->userFlag;
1047                                                                                 
1048   // reduce msgs
1049   CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1050   CkAssert(NULL != f);
1051
1052   // check valid callback in msg and check if migration happened
1053   CkCallback msg_cb;
1054   int rebuilt = 0;
1055   for (i=0; i<rmsgs.length(); i++) {
1056     if (rmsgs[i]->rebuilt) rebuilt = 1;
1057     if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1058   }
1059
1060   CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec()); 
1061   newmsg->redNo  = redInfo.redNo;
1062   newmsg->nFrags = nFrags;
1063   newmsg->fragNo = fragNo;
1064   newmsg->userFlag = userFlag;
1065
1066   // increment num-frags processed
1067   redInfo.npProcessed ++;
1068
1069   // check if migration and free messages
1070   for (i=0; i<rmsgs.length(); i++) {
1071     if (rmsgs[i]!=newmsg) delete rmsgs[i];
1072   }
1073   rmsgs.length() = 0;
1074
1075   if (redInfo.npProcessed == nFrags) {
1076     entry->incReduceNo();
1077     DEBUGF(("Advanced entry:%p redNo: %d\n", entry, entry->red.redNo));
1078   }
1079   if (updateReduceNo) mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1080                                                                                 
1081   if (entry->hasParent()) {
1082     // send up to parent
1083     newmsg->sid        = entry->parentGrp;
1084     newmsg->reducer    = reducer;
1085     newmsg->sourceFlag = 2;
1086     newmsg->redNo      = oldRedNo;
1087     newmsg->gcount     = redInfo.gcount [index];
1088     newmsg->rebuilt    = rebuilt;
1089     newmsg->callback   = msg_cb;
1090     DEBUGF(("send to parent %p: %d\n", entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1091     mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1092   } else { // root
1093     newmsg->sid = id;
1094     // buffer message
1095     rmsgs.push_back (newmsg);
1096
1097     //if (entry->allElem.length() == redInfo.gcount) {
1098     if (redInfo.npProcessed == nFrags) {
1099
1100       newmsg = combineFrags (id, entry, redInfo);
1101
1102       if (!msg_cb.isInvalid()) {
1103         msg_cb.send(newmsg);
1104       }
1105       else if (redInfo.storedCallback != NULL) {
1106         redInfo.storedCallback->send(newmsg);
1107       }
1108       else if (redInfo.storedClient != NULL) {
1109         redInfo.storedClient(id, redInfo.storedClientParam, dataSize,
1110            newmsg->data);
1111         delete newmsg;
1112       }
1113       else
1114         CmiAbort("Did you forget to register a reduction client?");
1115                                                                                 
1116       DEBUGF(("Reduction client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1117       if (currentTreeUp) {
1118         if (entry->oldc) {
1119             // free old tree on same processor;
1120           mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1121           entry->oldc = NULL;
1122         }
1123         if (entry->hasOldtree()) {
1124             // free old tree on old processor
1125           int oldpe = entry->oldtree.pe;
1126           mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1127           entry->oldtree.clear();
1128         }
1129       }
1130       if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1131     }
1132   }
1133 }
1134
1135 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1136 {
1137   int i;
1138   CkSectionInfo id = msg->sid;
1139   mCastEntry *entry = (mCastEntry *)id.get_val();
1140   CmiAssert(entry!=NULL);
1141 //CmiPrintf("[%d] recvRedMsg: entry: %p\n", CkMyPe(), entry);
1142
1143   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1144
1145   int updateReduceNo = 0;
1146
1147   // update entry if obsolete
1148   if (entry->isObsolete()) {
1149       // send up to root
1150     DEBUGF(("[%d] entry obsolete-send to root %d\n", CkMyPe(), entry->rootSid.pe));
1151     if (!entry->hasParent()) { //rootSid.pe == CkMyPe()
1152       // I am root, set to the new cookie if there is
1153       mCastEntry *newentry = entry->newc;
1154       while (newentry && newentry->newc) newentry=newentry->newc;
1155       if (newentry) entry = newentry;
1156       CmiAssert(entry!=NULL);
1157     }
1158     if (!entry->hasParent() && !entry->isObsolete()) {
1159        // root find the latest cookie that is not obsolete
1160       msg->sourceFlag = 0;       // indicate it is not on old spanning tree
1161       updateReduceNo = 1;        // reduce from old tree, new entry need update.
1162     }
1163     else {
1164       CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1165       // entry is obsolete, send to root directly
1166       msg->sid = entry->rootSid;
1167
1168       msg->sourceFlag = 0;
1169       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1170       return;
1171     }
1172   }
1173
1174   reductionInfo &redInfo = entry->red;
1175
1176   DEBUGF(("[%d] msg %p red:%d, entry:%p redno:%d\n", CkMyPe(), msg, msg->redNo, entry, entry->red.redNo));
1177   // old message come, ignore
1178   if (msg->redNo < redInfo.redNo) {
1179     CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1180     CmiAbort("Could never happen! \n");
1181   }
1182   if (entry->notReady() || msg->redNo > redInfo.redNo) {
1183     DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1184     redInfo.futureMsgs.push_back(msg);
1185     return;
1186   }
1187
1188   DEBUGF(("[%d] recvRedMsg rebuilt:%d red:%d\n", CkMyPe(), msg->rebuilt, redInfo.redNo));
1189
1190   const int index = msg->fragNo;
1191
1192   // buffer this msg
1193   if (msg->sourceFlag == 1) {
1194     // new reduction message from ArrayElement
1195     redInfo.lcount [index] ++;
1196   }
1197
1198   if (msg->sourceFlag == 2) {
1199     redInfo.ccount [index] ++;
1200   }
1201
1202   redInfo.gcount [index] += msg->gcount;
1203
1204   // buffer the msg
1205   // first check if message is of proper size
1206   if ((0 != redInfo.msgs[index].length()) && 
1207       (msg->dataSize != (redInfo.msgs [index][0]->dataSize))) {
1208     CmiAbort("Reduction data are not of same length!");
1209   }
1210
1211   redInfo.msgs [index].push_back(msg);
1212
1213   const int numFragsRcvd = redInfo.msgs [index].length();
1214
1215   DEBUGF(("[%d] index:%d lcount:%d-%d, ccount:%d-%d, gcount:%d-%d root:%d\n", CkMyPe(),index, entry->red.lcount[index],entry->localElem.length(), entry->red.ccount[index], entry->children.length(), entry->red.gcount[index], entry->allElem.length(), !entry->hasParent()));
1216
1217   int currentTreeUp = 0;
1218   if (redInfo.lcount [index] == entry->localElem.length() &&
1219       redInfo.ccount [index] == entry->children.length())
1220       currentTreeUp = 1;
1221
1222   int mixTreeUp = 0;
1223   const int numElems = entry->allElem.length();
1224   
1225   if (!entry->hasParent()) {
1226     mixTreeUp = 1;
1227     for (int i=0; i<msg->nFrags; i++) {
1228       if (entry->allElem.length() != redInfo.gcount [i]) {
1229         mixTreeUp = 0;
1230       }
1231     }
1232   }
1233
1234   if (currentTreeUp || mixTreeUp)
1235   {
1236     const int nFrags = msg->nFrags;  
1237     
1238     // msg from children contain only one fragment
1239     reduceFragment (index, id, entry, redInfo, updateReduceNo, 
1240                     currentTreeUp);
1241
1242     if (redInfo.npProcessed == nFrags) {
1243       // reset counters
1244       for (i=0; i<nFrags; i++) {
1245         redInfo.lcount [i] = 0;
1246         redInfo.ccount [i] = 0;
1247         redInfo.gcount [i] = 0;
1248       }
1249       redInfo.npProcessed = 0;
1250
1251       // release future msgs
1252       releaseFutureReduceMsgs(entry);
1253     }
1254   }
1255 }
1256
1257 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1258 {
1259   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1260
1261   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1262     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1263     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1264   }
1265   entry->red.futureMsgs.length() = 0;
1266 }
1267
1268 // these messages have to be sent to root
1269 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1270 {
1271   int i;
1272   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1273
1274   for (int j=0; j<MAXFRAGS; j++) {
1275     for (i=0; i<entry->red.msgs[j].length(); i++) {
1276       CkReductionMsg *msg = entry->red.msgs[j][i];
1277       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1278       msg->sid = entry->rootSid;
1279       msg->sourceFlag = 0;
1280       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1281     }
1282     entry->red.msgs[j].length() = 0;
1283   }
1284
1285
1286   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1287     CkReductionMsg *msg = entry->red.futureMsgs[i];
1288     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1289     msg->sid = entry->rootSid;
1290     msg->sourceFlag = 0;
1291     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1292   }
1293   entry->red.futureMsgs.length() = 0;
1294 }
1295
1296 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1297 {
1298   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1299   if (entry->red.redNo < red)
1300     entry->red.redNo = red;
1301
1302   CProxy_CkMulticastMgr mp(thisgroup);
1303   for (int i=0; i<entry->children.length(); i++) {
1304     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1305   }
1306
1307   releaseFutureReduceMsgs(entry);
1308 }
1309
1310 #if 0
1311 ////////////////////////////////////////////////////////////////////////////////
1312 /////
1313 ///////////////// Builtin Reducer Functions //////////////
1314 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1315 {CkAbort("ERROR! Called the invalid reducer!\n");return NULL;}
1316
1317 /* A simple reducer, like sum_int, looks like this:
1318 static CkReductionMsg *sum_int(int nMsg,CkReductionMsg **msg)
1319 {
1320   int i,ret=0;
1321   for (i=0;i<nMsg;i++)
1322     ret+=*(int *)(msg[i]->data);
1323   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1324 }
1325 */
1326
1327 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1328 static CkReductionMsg *name(int nMsg, CkReductionMsg **msg)\
1329 {\
1330   int m,i;\
1331   int nElem=msg[0]->getSize()/sizeof(dataType);\
1332   dataType *ret=(dataType *)(msg[0]->getData());\
1333   for (m=1;m<nMsg;m++)\
1334   {\
1335     dataType *value=(dataType *)(msg[m]->getData());\
1336     for (i=0;i<nElem;i++)\
1337     {\
1338       loop\
1339     }\
1340   }\
1341   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret);\
1342 }
1343
1344 //Use this macro for reductions that have the same type for all inputs
1345 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1346   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1347   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1348   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1349
1350
1351 //Compute the sum the numbers passed by each element.
1352 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1353
1354 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1355
1356 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1357
1358 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1359
1360 CkReduction::reducerFn CkMulticastMgr::reducerTable[CkMulticastMgr::MAXREDUCERS]={
1361     ::invalid_reducer,
1362   //Compute the sum the numbers passed by each element.
1363     ::sum_int,::sum_float,::sum_double,
1364     ::product_int,::product_float,::product_double,
1365     ::max_int,::max_float,::max_double,
1366     ::min_int,::min_float,::min_double
1367 };
1368 #endif
1369
1370 #include "CkMulticast.def.h"