some code docs for ckmulticast
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19
20 #define DEBUGF(x)  // CkPrintf x;
21
22 // turn on or off fragmentation in multicast
23 #define SPLIT_MULTICAST 0
24 // each multicast message is split into SPLIT_NUM fragments
25 #define SPLIT_NUM 2
26
27 // maximum number of fragments into which a message can be broken
28 #define MAXFRAGS 5
29
30 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
31 typedef CkVec<CkArrayIndexMax>   arrayIndexList;
32 typedef CkVec<CkSectionInfo>     sectionIdList;
33 typedef CkVec<CkReductionMsg *>  reductionMsgs;
34 typedef CkQ<int>                 PieceSize;
35 typedef CkVec<LDObjid>          ObjKeyList;
36 typedef unsigned char            byte;
37
38 class reductionInfo {
39 public:
40   int            lcount [MAXFRAGS]; /**< local elem collected */
41   int            ccount [MAXFRAGS]; /**< children node collected */
42   int            gcount [MAXFRAGS]; /**< total elem collected */
43   int            npProcessed;
44   CkCallback*    storedCallback;    /**< user callback */
45   redClientFn    storedClient;      /**< reduction client function */
46   void*          storedClientParam; /**< user provided data */
47   int            redNo;             /**< reduction sequence number */
48   reductionMsgs  msgs [MAXFRAGS];   /**< messages for this reduction */
49   reductionMsgs  futureMsgs;        /**< messages of future reductions */
50 public:
51   reductionInfo(): storedCallback(NULL), storedClientParam(NULL), redNo(0),
52                    npProcessed(0) {
53     for (int i=0; i<MAXFRAGS; i++) 
54       lcount [i] = ccount [i] = gcount [i] = 0;
55   }
56 };
57
58 /// cookie status
59 #define COOKIE_NOTREADY 0
60 #define COOKIE_READY    1
61 #define COOKIE_OLD      2
62
63 class mCastPacket {
64 public:
65   CkSectionInfo cookie;
66   int n;
67   char *data;
68   int seqno;
69   int count;
70   int totalsize;
71
72   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
73                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
74 };
75
76 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
77
78 class SectionLocation {
79 public:
80   mCastEntry *entry;
81   int         pe;
82 public:
83   SectionLocation(): entry(NULL), pe(-1) {}
84   SectionLocation( mCastEntry *e, int p) { set(e, p); }
85   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
86   inline void clear() { entry = NULL; pe = -1; }
87 };
88
89
90
91
92 /// Cookie for an array section 
93 class mCastEntry 
94 {
95     public:
96         /// Array ID 
97         CkArrayID     aid;
98         /// Spanning tree parent
99         CkSectionInfo parentGrp;
100         /// List of direct children
101         sectionIdList children;
102         /// Number of direct children
103         int numChild;
104         /// List of all tree member array indices (Only useful on the tree root)
105         arrayIndexList allElem;
106         /// Only useful on root for LB
107         ObjKeyList     allObjKeys;
108         /// List of array elements on local PE
109         arrayIndexList localElem;
110         /// Should always be myPE
111         int pe;
112         /// Section ID of the root
113         CkSectionInfo rootSid;
114         multicastGrpMsgBuf msgBuf;
115         /// Buffer storing the pending packets
116         multicastGrpPacketBuf packetBuf;
117         /// For multicast packetization
118         char *asm_msg;
119         int   asm_fill;
120         /// Linked list of entries on the same processor
121         mCastEntry *oldc, *newc;
122         /// Old spanning tree
123         SectionLocation   oldtree;
124         // for reduction
125         reductionInfo red;
126         //
127         char needRebuild;
128     private:
129         char flag;
130     public:
131         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
132                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
133         mCastEntry(mCastEntry *);
134         /// Check if this tree is only a branch and has a parent
135         inline int hasParent() { return parentGrp.get_val()?1:0; }
136         /// Is this tree obsolete?
137         inline int isObsolete() { return (flag == COOKIE_OLD); }
138         /// Make the current tree obsolete
139         inline void setObsolete() { flag=COOKIE_OLD; }
140         /// Check if this (branch of the) tree is ready for use
141         inline int notReady() { return (flag == COOKIE_NOTREADY); }
142         /// Mark this (branch of the) tree as ready for use
143         inline void setReady() { flag=COOKIE_READY; }
144         /// Increment the reduction number for all the section members on this PE
145         inline void incReduceNo() {
146             red.redNo ++;
147             for (mCastEntry *next = newc; next; next=next->newc) 
148                 next->red.redNo++;
149         }
150         /// Get a handle on the array ID this tree is a member of
151         inline CkArrayID getAid() { return aid; }
152         inline int hasOldtree() { return oldtree.entry != NULL; }
153         inline void print() {
154             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
155         }
156 };
157
158
159
160
161 class cookieMsg: public CMessage_cookieMsg {
162 public:
163   CkSectionInfo cookie;
164 public:
165   cookieMsg() {};
166   cookieMsg(CkSectionInfo m): cookie(m) {};
167 };
168
169
170
171
172 /// multicast tree setup message
173 class multicastSetupMsg: public CMessage_multicastSetupMsg {
174 public:
175   int  nIdx;
176   CkArrayIndexMax *arrIdx;
177   int      *lastKnown;
178   CkSectionInfo parent;
179   CkSectionInfo rootSid;
180   int redNo;
181 };
182
183
184
185
186 /// message send in spanning tree
187 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
188 };
189
190
191 extern void CkPackMessage(envelope **pEnv);
192 extern void CkUnpackMessage(envelope **pEnv);
193
194
195
196 void _ckMulticastInit(void)
197 {
198 /*
199   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
200   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
201 */
202 }
203
204
205 mCastEntry::mCastEntry (mCastEntry *old): 
206   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
207 {
208   int i;
209   aid = old->aid;
210   parentGrp = old->parentGrp;
211   for (i=0; i<old->allElem.length(); i++)
212     allElem.push_back(old->allElem[i]);
213 #if CMK_LBDB_ON
214   CmiAssert(old->allElem.length() == old->allObjKeys.length());
215   for (i=0; i<old->allObjKeys.length(); i++)
216     allObjKeys.push_back(old->allObjKeys[i]);
217 #endif
218   pe = old->pe;
219   red.storedCallback = old->red.storedCallback;
220   red.storedClient = old->red.storedClient;
221   red.storedClientParam = old->red.storedClientParam;
222   red.redNo = old->red.redNo;
223   needRebuild = 0;
224   asm_msg = NULL;
225   asm_fill = 0;
226 }
227
228 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
229
230
231
232
233 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndexMax *al, int n)
234 {
235     // Create a multicast entry
236     mCastEntry *entry = new mCastEntry(aid);
237     // Push all the section member indices into the entry
238     for (int i=0; i<n; i++) {
239         entry->allElem.push_back(al[i]);
240 #if CMK_LBDB_ON
241         const LDObjid key = idx2LDObjid(al[i]);
242         entry->allObjKeys.push_back(key);
243 #endif
244     }
245     //  entry->aid = aid;
246     _id.aid = aid;
247     _id.get_val() = entry;              // allocate table for this section
248     // 
249     initCookie(_id);
250 }
251
252
253
254
255 void CkMulticastMgr::setSection(CkSectionInfo &id)
256 {
257   initCookie(id);
258 }
259
260
261
262
263 /// @warning: This is deprecated
264 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
265 {
266   CkArrayID aid = proxy.ckGetArrayID();
267   CkSectionInfo &_id = proxy.ckGetSectionInfo();
268
269   mCastEntry *entry = new mCastEntry(aid);
270
271   const CkArrayIndexMax *al = proxy.ckGetArrayElements();
272   for (int i=0; i<proxy.ckGetNumElements(); i++) {
273     entry->allElem.push_back(al[i]);
274 #if CMK_LBDB_ON
275     const LDObjid key = idx2LDObjid(al[i]);
276     entry->allObjKeys.push_back(key);
277 #endif
278   }
279   _id.type = MulticastMsg;
280   _id.aid = aid;
281   _id.get_val() = entry;                // allocate table for this section
282   initCookie(_id);
283 }
284
285
286
287
288 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
289 {
290   CkSectionInfo &info = proxy.ckGetSectionInfo();
291
292   int oldpe = info.get_pe();
293   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
294
295   CkArrayID aid = proxy.ckGetArrayID();
296   CkSectionID *sid = proxy.ckGetSectionIDs();
297   mCastEntry *entry = new mCastEntry(aid);
298
299   mCastEntry *oldentry = (mCastEntry *)info.get_val();
300   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
301
302   const CkArrayIndexMax *al = sid->_elems;
303   CmiAssert(info.aid == aid);
304   prepareCookie(entry, *sid, al, sid->_nElems, aid);
305
306   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
307
308     // store old tree info
309   entry->oldtree.set(oldentry, oldpe);
310
311     // obsolete old tree
312   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
313
314   // find reduction number
315   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
316 }
317
318
319
320
321 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
322 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndexMax *al, int count, CkArrayID aid)
323 {
324   for (int i=0; i<count; i++) {
325     entry->allElem.push_back(al[i]);
326 #if CMK_LBDB_ON
327     const LDObjid key = idx2LDObjid(al[i]);
328     entry->allObjKeys.push_back(key);
329 #endif
330   }
331   sid._cookie.type = MulticastMsg;
332   sid._cookie.aid = aid;
333   sid._cookie.get_val() = entry;        // allocate table for this section
334   sid._cookie.get_pe() = CkMyPe();
335 }
336
337
338
339
340 // this is used
341 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
342 {
343   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
344   CkArrayID aid = proxy->ckGetArrayID();
345   CkSectionID *sid = proxy->ckGetSectionIDs();
346
347   mCastEntry *entry = new mCastEntry(aid);
348
349   const CkArrayIndexMax *al = proxy->ckGetArrayElements();
350   prepareCookie(entry, *sid, al, proxy->ckGetNumElements(), aid);
351   initCookie(sid->_cookie);
352 }
353
354
355
356
357 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
358 {
359   mCastEntry *entry = (mCastEntry *)s.get_val();
360   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
361   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
362 }
363
364 // now that we get reduction number from the old cookie,
365 // we continue to build the spanning tree
366 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
367 {
368   mCastEntry *entry = (mCastEntry *)s.get_val();
369   entry->red.redNo = red;
370
371   initCookie(s);
372
373   // TODO delete old tree
374 }
375
376
377
378
379 void CkMulticastMgr::initCookie(CkSectionInfo s)
380 {
381     mCastEntry *entry = (mCastEntry *)s.get_val();
382     int n = entry->allElem.length();
383     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
384     // Create and initialize a setup message
385     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
386     msg->nIdx = n;
387     msg->parent = CkSectionInfo(entry->getAid());
388     msg->rootSid = s;
389     msg->redNo = entry->red.redNo;
390     // Fill the message with the section member indices and their last known locations
391     CkArray *array = CProxy_ArrayBase(s.aid).ckLocalBranch();
392     for (int i=0; i<n; i++) {
393       msg->arrIdx[i] = entry->allElem[i];
394       int ape = array->lastKnown(entry->allElem[i]);
395       CmiAssert(ape >=0 && ape < CkNumPes());
396       msg->lastKnown[i] = ape;
397     }
398     // Trigger the spanning tree build
399     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
400     mCastGrp[CkMyPe()].setup(msg);
401 }
402
403
404
405
406 void CkMulticastMgr::teardown(CkSectionInfo cookie)
407 {
408     int i;
409     mCastEntry *sect = (mCastEntry *)cookie.get_val();
410     // Mark this section as obsolete
411     sect->setObsolete();
412     // Release the buffered messages 
413     releaseBufferedReduceMsgs(sect);
414     // Propagate the teardown to each of your children
415     CProxy_CkMulticastMgr mp(thisgroup);
416     for (i=0; i<sect->children.length(); i++)
417         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
418 }
419
420
421
422
423 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
424 {
425     int i;
426     mCastEntry *sect = (mCastEntry *)cookie.get_val();
427     // Reset the root section info
428     sect->rootSid = newroot;
429     // Mark this section as obsolete
430     sect->setObsolete();
431     // Release the buffered messages 
432     releaseBufferedReduceMsgs(sect);
433     // Propagate the teardown to each of your children
434     CProxy_CkMulticastMgr mp(thisgroup);
435     for (i=0; i<sect->children.length(); i++)
436         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
437 }
438
439
440
441
442 void CkMulticastMgr::freeup(CkSectionInfo cookie)
443 {
444   mCastEntry *sect = (mCastEntry *)cookie.get_val();
445   CProxy_CkMulticastMgr mp(thisgroup);
446   // Parse through all the section members on this PE and...
447   while (sect) 
448   {
449       // Free their children
450       for (int i=0; i<sect->children.length(); i++)
451           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
452       // Free the cookie itself
453       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
454       mCastEntry *oldc= sect->oldc;
455       delete sect;
456       sect = oldc;
457   }
458 }
459
460
461
462
463 void CkMulticastMgr::setup(multicastSetupMsg *msg)
464 {
465     int i,j;
466     mCastEntry *entry;
467     CkArrayID aid = msg->rootSid.aid;
468     if (msg->parent.get_pe() == CkMyPe()) 
469       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
470     else 
471       entry = new mCastEntry(aid);
472     entry->aid = aid;
473     entry->pe = CkMyPe();
474     entry->rootSid = msg->rootSid;
475     entry->parentGrp = msg->parent;
476     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
477     entry->red.redNo = msg->redNo;
478
479     // Create a numPE sized array of vectors to hold the array elements in each PE
480     int numpes = CkNumPes();
481     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
482     // Sort each array index in the setup message based on last known location
483     for (i=0; i<msg->nIdx; i++) 
484     {
485       int lastKnown = msg->lastKnown[i];
486       // If msg->arrIdx[i] local, add it to a special local element list
487       if (lastKnown == CkMyPe())
488           entry->localElem.insertAtEnd(msg->arrIdx[i]);
489       // else, add it to the list corresponding to its PE
490       else
491           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
492     }
493
494     CkVec<int> mySubTreePEs;
495     mySubTreePEs.reserve(numpes);
496     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
497     mySubTreePEs.push_back(CkMyPe());
498     // Identify the child PEs in the tree, ie the PEs with section members on them
499     for (i=0; i<numpes; i++) 
500     {
501       if (i==CkMyPe()) continue;
502       if (lists[i].size()) 
503           mySubTreePEs.push_back(i);
504     }
505     // The number of multicast children can be limited by the spanning tree factor 
506     int num = mySubTreePEs.size() - 1, numchild = 0;
507     if (factor <= 0) numchild = num;
508     else numchild = num<factor?num:factor;
509   
510     entry->numChild = numchild;
511
512     // If there are any children, go about building a spanning tree
513     if (numchild) 
514     {
515         // Build the next generation of the spanning tree rooted at my PE
516         int *peListPtr = mySubTreePEs.getVec();
517         topo::SpanningTreeVertex *nextGenInfo;
518         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
519         CkAssert(nextGenInfo->childIndex.size() == numchild);
520
521         // Distribute the section members across the number of direct children (branches)
522         // Direct children are simply the first section member in each of the branch lists
523         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
524
525         // For each direct child, collate indices of all section members on the PEs in that branch
526         for (i=0; i < numchild; i++)
527         {
528             // Determine the indices of the first and last PEs in this branch of my sub-tree
529             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
530             if (i < numchild-1)
531                 childEndIndex = nextGenInfo->childIndex[i+1];
532             else
533                 childEndIndex = mySubTreePEs.size();
534             // For each PE in this branch, add the section members on that PE to a list
535             for (j = childStartIndex; j < childEndIndex; j++)
536             {
537                 int pe = mySubTreePEs[j];
538                 for (int k=0; k<lists[pe].size(); k++)
539                     slots[i].push_back(lists[pe][k]);
540             }
541         }
542
543         // Ask each of your direct children to setup their branches
544         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
545         for (i=0; i<numchild; i++) 
546         {
547             // Give each child info about the number, indices and location of its children
548             int n = slots[i].length();
549             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
550             m->parent = CkSectionInfo(aid, entry);
551             m->nIdx = slots[i].length();
552             m->rootSid = msg->rootSid;
553             m->redNo = msg->redNo;
554             for (j=0; j<slots[i].length(); j++) 
555             {
556                 m->arrIdx[j] = slots[i][j].idx;
557                 m->lastKnown[j] = slots[i][j].pe;
558             }
559             int childroot = slots[i][0].pe;
560             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
561             // Send the message to the child
562             mCastGrp[childroot].setup(m);
563         }
564         delete [] slots;
565         delete nextGenInfo;
566     }
567     // else, tell yourself that your children are ready
568     else 
569     {
570         childrenReady(entry);
571     }
572     delete [] lists;
573     delete msg;
574 }
575
576
577
578
579 void CkMulticastMgr::childrenReady(mCastEntry *entry)
580 {
581     // Mark this entry as ready
582     entry->setReady();
583     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
584     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
585     if (entry->hasParent()) 
586         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
587 #if SPLIT_MULTICAST
588     // clear packet buffer
589     while (!entry->packetBuf.isEmpty()) 
590     {
591         mCastPacket *packet = entry->packetBuf.deq();
592         packet->cookie.get_val() = entry;
593         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
594         delete [] packet->data;
595         delete packet;
596     }
597 #else
598     // clear msg buffer
599     while (!entry->msgBuf.isEmpty()) 
600     {
601         multicastGrpMsg *newmsg = entry->msgBuf.deq();
602         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
603         newmsg->_cookie.get_val() = entry;
604         mCastGrp[CkMyPe()].recvMsg(newmsg);
605     }
606 #endif
607     // release reduction msgs
608     releaseFutureReduceMsgs(entry);
609 }
610
611
612
613
614 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
615 {
616   mCastEntry *entry = (mCastEntry *)sid.get_val();
617   entry->children.push_back(child);
618   if (entry->children.length() == entry->numChild) {
619     childrenReady(entry);
620   }
621 }
622
623
624
625
626 // rebuild is called when root not migrated
627 // when rebuilding, all multicast msgs will be buffered.
628 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
629 {
630   // tear down old tree
631   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
632   CkAssert(curCookie->pe == CkMyPe());
633   // make sure I am the newest one
634   while (curCookie->newc) curCookie = curCookie->newc;
635   if (curCookie->isObsolete()) return;
636
637   //CmiPrintf("tree rebuild\n");
638   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
639
640   // build a chain
641   newCookie->oldc = curCookie;
642   curCookie->newc = newCookie;
643
644   sectId.get_val() = newCookie;
645
646   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
647
648   curCookie->setObsolete();
649
650   resetCookie(sectId);
651 }
652
653 void CkMulticastMgr::resetCookie(CkSectionInfo s)
654 {
655   mCastEntry *newCookie = (mCastEntry*)s.get_val();
656   mCastEntry *oldCookie = newCookie->oldc;
657
658   // get rid of old one
659   DEBUGF(("reset: oldc: %p\n", oldCookie));
660   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
661   int mype = CkMyPe();
662   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
663
664   // build a new one
665   initCookie(s);
666 }
667
668 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
669 {
670   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
671     // set an invalid cookie since we don't have it
672   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
673   for (int i=0; i< sid._nElems-1; i++) {
674      CProxyElement_ArrayBase ap(a, sid._elems[i]);
675      void *newMsg=CkCopyMsg((void **)&m);
676      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
677   }
678   if (sid._nElems > 0) {
679      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
680      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
681   }
682 }
683
684 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
685 {
686   DEBUGF(("ArraySectionSend\n"));
687
688   multicastGrpMsg *msg = (multicastGrpMsg *)m;
689 //  msg->aid = a;
690   msg->ep = ep;
691
692   CkSectionInfo &s = sid->_cookie;
693   CmiAssert(nsid == 1);
694
695   mCastEntry *entry;
696   if (s.get_pe() == CkMyPe()) {
697     entry = (mCastEntry *)s.get_val();   
698     if (entry == NULL) {
699       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
700     }
701
702     // update entry pointer in case there is a newer one.
703     if (entry->newc) {
704       do { entry=entry->newc; } while (entry->newc);
705       s.get_val() = entry;
706     }
707
708 #if CMK_LBDB_ON
709     // fixme: running obj?
710     envelope *env = UsrToEnv(msg);
711     const LDOMHandle &om = CProxy_ArrayBase(s.aid).ckLocMgr()->getOMHandle();
712     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
713 #endif
714
715     // first time need to rebuild, we do simple send to refresh lastKnown
716     if (entry->needRebuild == 1) {
717       msg->_cookie = s;
718       SimpleSend(ep, msg, s.aid, *sid, opts);
719       entry->needRebuild = 2;
720       return;
721     }
722     else if (entry->needRebuild == 2) rebuild(s);
723   }
724   else {
725     // fixme - in this case, not recorded in LB
726     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
727   }
728
729   // don't need packing here
730 /*
731   register envelope *env = UsrToEnv(m);
732   CkPackMessage(&env);
733   m = EnvToUsr(env);
734 */
735
736   // update cookie
737   msg->_cookie = s;
738
739 #if SPLIT_MULTICAST
740   // split multicast msg into SPLIT_NUM copies
741   register envelope *env = UsrToEnv(m);
742   CkPackMessage(&env);
743   int totalsize = env->getTotalsize();
744   int packetSize = totalsize/SPLIT_NUM;
745   if (totalsize%SPLIT_NUM) packetSize ++;
746   int totalcount = SPLIT_NUM;
747   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
748   int sizesofar = 0;
749   char *data = (char*) env;
750   for (int i=0; i<totalcount; i++) {
751     int mysize = packetSize;
752     if (mysize + sizesofar > totalsize) {
753       mysize = totalsize-sizesofar;
754     }
755     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
756     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
757     sizesofar += mysize;
758     data += mysize;
759   }
760   CmiFree(env);
761 #else
762   if (s.get_pe() == CkMyPe()) {
763     recvMsg(msg);
764   }
765   else {
766     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
767     mCastGrp[s.get_pe()].recvMsg(msg);
768   }
769 #endif
770 }
771
772 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
773 {
774   int i;
775   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
776
777
778   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
779     char *newdata = new char[n];
780     memcpy(newdata, data, n);
781     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
782 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
783     return;
784   }
785
786 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
787
788   // send to spanning tree children
789   // can not optimize using list send because the difference in cookie
790   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
791   for (i=0; i<entry->children.length(); i++) {
792     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
793   }
794
795   if (seqno == 0) {
796     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
797       entry->print();
798       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
799     }
800     entry->asm_msg = (char *)CmiAlloc(totalsize);
801   }
802   memcpy(entry->asm_msg+entry->asm_fill, data, n);
803   entry->asm_fill += n;
804   if (seqno + 1 == count) {
805     CmiAssert(entry->asm_fill == totalsize);
806     CkUnpackMessage((envelope **)&entry->asm_msg);
807     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
808     msg->_cookie = _cookie;
809 //    mCastGrp[CkMyPe()].recvMsg(msg);
810     recvMsg(msg);
811     entry->asm_msg = NULL;
812     entry->asm_fill = 0;
813   }
814 //  if (fromBuffer) delete [] data;
815 }
816
817 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
818 {
819   int i;
820   CkSectionInfo &sectionInfo = msg->_cookie;
821   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
822   CmiAssert(entry->getAid() == sectionInfo.aid);
823
824 #if ! SPLIT_MULTICAST
825   if (entry->notReady()) {
826     DEBUGF(("entry not ready, enq buffer %p\n", msg));
827     entry->msgBuf.enq(msg);
828     return;
829   }
830
831   // send to spanning tree children
832   // can not optimize using list send because the difference in cookie
833   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
834   for (i=0; i<entry->children.length(); i++) {
835     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
836     newmsg->_cookie = entry->children[i];
837     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
838   }
839 #endif
840
841   // send to local
842   int nLocal = entry->localElem.length();
843   DEBUGF(("send to local %d\n", nLocal));
844   for (i=0; i<nLocal-1; i++) {
845     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[i]);
846     if (_entryTable[msg->ep]->noKeep) {
847       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.aid, entry->localElem[i], CK_MSG_KEEP);
848     }
849     else {
850       // send through scheduler queue
851       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
852       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
853     }
854     // use CK_MSG_DONTFREE so that the message can be reused
855     // the drawback of this scheme bypassing queue is that 
856     // if # of local element is huge, this leads to a long time occupying CPU
857     // also load balancer seems not be able to correctly instrument load
858 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
859     //CmiNetworkProgressAfter(3);
860   }
861   if (nLocal) {
862     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[nLocal-1]);
863     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
864 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
865   }
866   else {
867     CkAssert (entry->rootSid.get_pe() == CkMyPe());
868     delete msg;
869   }
870 }
871
872 // user function
873 // to retrieve section info from a multicast msg
874 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
875 {
876   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
877   if (CkMcastBaseMsg::checkMagic(m) == 0) {
878     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
879     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
880   }
881   // ignore invalid cookie sent by SimpleSend
882   if (m->gpe() != -1) {
883     id.type = MulticastMsg;
884     id.get_pe() = m->gpe();
885     id.get_val() = m->cookie();
886   }
887   // note: retain old redNo
888 }
889
890 // Reduction
891
892 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
893 {
894   CkSectionInfo &id = proxy.ckGetSectionInfo();
895   mCastEntry *entry = (mCastEntry *)id.get_val();
896   entry->red.storedCallback = cb;
897 }
898
899 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
900 {
901   CkSectionInfo &id = proxy.ckGetSectionInfo();
902   mCastEntry *entry = (mCastEntry *)id.get_val();
903   entry->red.storedClient = fn;
904   entry->red.storedClientParam = param;
905 }
906
907 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
908 {
909   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
910   msg->reducer = type;
911   msg->sid = id;
912   msg->sourceFlag = 1;   // from array element
913   msg->redNo = id.get_redNo();
914   msg->gcount = 1;
915   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
916   msg->callback = cb;
917   msg->userFlag=userFlag;
918   return msg;
919 }
920
921
922
923 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
924 {
925   CkCallback cb;
926   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
927 }
928
929
930 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
931 {
932   if (id.get_val() == NULL || id.get_redNo() == -1) 
933     CmiAbort("contribute: SectionID is not initialized\n");
934
935   int nFrags;
936   if (-1 == fragSize) {         // no frag
937     nFrags = 1;
938     fragSize = dataSize;
939   }
940   else {
941     CmiAssert (dataSize >= fragSize);
942     nFrags = dataSize/fragSize;
943     if (dataSize%fragSize) nFrags++;
944   }
945
946   if (MAXFRAGS < nFrags) {
947     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
948     CmiAbort ("frag size too small\n");
949   }
950
951   int mpe = id.get_pe();
952   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
953
954   // break the message into k-piece fragments
955   int fSize = fragSize;
956   for (int i=0; i<nFrags; i++) {
957     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
958       fSize = dataSize%fragSize;
959     }
960
961     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
962
963     // initialize the new msg
964     msg->reducer            = type;
965     msg->sid                = id;
966     msg->nFrags             = nFrags;
967     msg->fragNo             = i;
968     msg->sourceFlag         = 1;
969     msg->redNo              = id.get_redNo();
970     msg->gcount             = 1;
971     msg->rebuilt            = (mpe == CkMyPe())?0:1;
972     msg->callback           = cb;
973     msg->userFlag           = userFlag;
974
975     mCastGrp[mpe].recvRedMsg(msg);
976
977     data = (void*)(((char*)data) + fSize);
978   }
979
980   id.get_redNo()++;
981   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
982 }
983
984 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
985                                               mCastEntry* entry,
986                                               reductionInfo& redInfo) {
987   int i;
988   int dataSize = 0;
989   int nFrags   = redInfo.msgs[0][0]->nFrags;
990
991   // to avoid memcpy and allocation cost for non-pipelined reductions
992   if (1 == nFrags) {
993     CkReductionMsg* msg = redInfo.msgs[0][0];
994
995     // free up the msg slot
996     redInfo.msgs[0].length() = 0;
997
998     return msg;
999   }
1000
1001   for (i=0; i<nFrags; i++) {
1002     dataSize += redInfo.msgs[i][0]->dataSize;
1003   }
1004
1005   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1006
1007   // initialize msg header
1008   msg->redNo      = redInfo.msgs[0][0]->redNo;
1009   msg->reducer    = redInfo.msgs[0][0]->reducer;
1010   msg->sid        = id;
1011   msg->nFrags     = nFrags;
1012
1013   // I guess following fields need not be initialized
1014   msg->sourceFlag = 2;
1015   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1016   msg->callback   = redInfo.msgs[0][0]->callback;
1017   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1018
1019   byte* data = (byte*)msg->getData ();
1020   for (i=0; i<nFrags; i++) {
1021     // copy data from fragments to msg
1022     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1023     data += redInfo.msgs[i][0]->dataSize;
1024
1025     // free fragments
1026     delete redInfo.msgs[i][0];
1027     redInfo.msgs[i].length() = 0;    
1028   }
1029
1030   return msg;
1031 }
1032
1033 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1034                                      mCastEntry* entry, reductionInfo& redInfo,
1035                                      int& updateReduceNo, int currentTreeUp){
1036
1037   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1038   reductionMsgs& rmsgs = redInfo.msgs[index];
1039   int dataSize         = rmsgs[0]->dataSize;
1040   CkReduction::reducerType reducer = rmsgs[0]->reducer;
1041   int i;
1042   int oldRedNo = redInfo.redNo;
1043   int nFrags   = rmsgs[0]->nFrags;
1044   int fragNo   = rmsgs[0]->fragNo;
1045   int userFlag   = rmsgs[0]->userFlag;
1046                                                                                 
1047   // reduce msgs
1048   CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1049   CkAssert(NULL != f);
1050
1051   // check valid callback in msg and check if migration happened
1052   CkCallback msg_cb;
1053   int rebuilt = 0;
1054   for (i=0; i<rmsgs.length(); i++) {
1055     if (rmsgs[i]->rebuilt) rebuilt = 1;
1056     if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1057   }
1058
1059   CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec()); 
1060   newmsg->redNo  = redInfo.redNo;
1061   newmsg->nFrags = nFrags;
1062   newmsg->fragNo = fragNo;
1063   newmsg->userFlag = userFlag;
1064
1065   // increment num-frags processed
1066   redInfo.npProcessed ++;
1067
1068   // check if migration and free messages
1069   for (i=0; i<rmsgs.length(); i++) {
1070     if (rmsgs[i]!=newmsg) delete rmsgs[i];
1071   }
1072   rmsgs.length() = 0;
1073
1074   if (redInfo.npProcessed == nFrags) {
1075     entry->incReduceNo();
1076     DEBUGF(("Advanced entry:%p redNo: %d\n", entry, entry->red.redNo));
1077   }
1078   if (updateReduceNo) mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1079                                                                                 
1080   if (entry->hasParent()) {
1081     // send up to parent
1082     newmsg->sid        = entry->parentGrp;
1083     newmsg->reducer    = reducer;
1084     newmsg->sourceFlag = 2;
1085     newmsg->redNo      = oldRedNo;
1086     newmsg->gcount     = redInfo.gcount [index];
1087     newmsg->rebuilt    = rebuilt;
1088     newmsg->callback   = msg_cb;
1089     DEBUGF(("send to parent %p: %d\n", entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1090     mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1091   } else { // root
1092     newmsg->sid = id;
1093     // buffer message
1094     rmsgs.push_back (newmsg);
1095
1096     //if (entry->allElem.length() == redInfo.gcount) {
1097     if (redInfo.npProcessed == nFrags) {
1098
1099       newmsg = combineFrags (id, entry, redInfo);
1100       CkSetRefNum(newmsg, userFlag);
1101
1102       if (!msg_cb.isInvalid()) {
1103         msg_cb.send(newmsg);
1104       }
1105       else if (redInfo.storedCallback != NULL) {
1106         redInfo.storedCallback->send(newmsg);
1107       }
1108       else if (redInfo.storedClient != NULL) {
1109         redInfo.storedClient(id, redInfo.storedClientParam, dataSize,
1110            newmsg->data);
1111         delete newmsg;
1112       }
1113       else
1114         CmiAbort("Did you forget to register a reduction client?");
1115                                                                                 
1116       DEBUGF(("Reduction client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1117       if (currentTreeUp) {
1118         if (entry->oldc) {
1119             // free old tree on same processor;
1120           mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1121           entry->oldc = NULL;
1122         }
1123         if (entry->hasOldtree()) {
1124             // free old tree on old processor
1125           int oldpe = entry->oldtree.pe;
1126           mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1127           entry->oldtree.clear();
1128         }
1129       }
1130       if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1131     }
1132   }
1133 }
1134
1135 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1136 {
1137   int i;
1138   CkSectionInfo id = msg->sid;
1139   mCastEntry *entry = (mCastEntry *)id.get_val();
1140   CmiAssert(entry!=NULL);
1141 //CmiPrintf("[%d] recvRedMsg: entry: %p\n", CkMyPe(), entry);
1142
1143   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1144
1145   int updateReduceNo = 0;
1146
1147   // update entry if obsolete
1148   if (entry->isObsolete()) {
1149       // send up to root
1150     DEBUGF(("[%d] entry obsolete-send to root %d\n", CkMyPe(), entry->rootSid.pe));
1151     if (!entry->hasParent()) { //rootSid.pe == CkMyPe()
1152       // I am root, set to the new cookie if there is
1153       mCastEntry *newentry = entry->newc;
1154       while (newentry && newentry->newc) newentry=newentry->newc;
1155       if (newentry) entry = newentry;
1156       CmiAssert(entry!=NULL);
1157     }
1158     if (!entry->hasParent() && !entry->isObsolete()) {
1159        // root find the latest cookie that is not obsolete
1160       msg->sourceFlag = 0;       // indicate it is not on old spanning tree
1161       updateReduceNo = 1;        // reduce from old tree, new entry need update.
1162     }
1163     else {
1164       CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1165       // entry is obsolete, send to root directly
1166       msg->sid = entry->rootSid;
1167
1168       msg->sourceFlag = 0;
1169       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1170       return;
1171     }
1172   }
1173
1174   reductionInfo &redInfo = entry->red;
1175
1176   DEBUGF(("[%d] msg %p red:%d, entry:%p redno:%d\n", CkMyPe(), msg, msg->redNo, entry, entry->red.redNo));
1177   // old message come, ignore
1178   if (msg->redNo < redInfo.redNo) {
1179     CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1180     CmiAbort("Could never happen! \n");
1181   }
1182   if (entry->notReady() || msg->redNo > redInfo.redNo) {
1183     DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1184     redInfo.futureMsgs.push_back(msg);
1185     return;
1186   }
1187
1188   DEBUGF(("[%d] recvRedMsg rebuilt:%d red:%d\n", CkMyPe(), msg->rebuilt, redInfo.redNo));
1189
1190   const int index = msg->fragNo;
1191
1192   // buffer this msg
1193   if (msg->sourceFlag == 1) {
1194     // new reduction message from ArrayElement
1195     redInfo.lcount [index] ++;
1196   }
1197
1198   if (msg->sourceFlag == 2) {
1199     redInfo.ccount [index] ++;
1200   }
1201
1202   redInfo.gcount [index] += msg->gcount;
1203
1204   // buffer the msg
1205   // first check if message is of proper size
1206   if ((0 != redInfo.msgs[index].length()) && 
1207       (msg->dataSize != (redInfo.msgs [index][0]->dataSize))) {
1208     CmiAbort("Reduction data are not of same length!");
1209   }
1210
1211   redInfo.msgs [index].push_back(msg);
1212
1213   const int numFragsRcvd = redInfo.msgs [index].length();
1214
1215   DEBUGF(("[%d] index:%d lcount:%d-%d, ccount:%d-%d, gcount:%d-%d root:%d\n", CkMyPe(),index, entry->red.lcount[index],entry->localElem.length(), entry->red.ccount[index], entry->children.length(), entry->red.gcount[index], entry->allElem.length(), !entry->hasParent()));
1216
1217   int currentTreeUp = 0;
1218   if (redInfo.lcount [index] == entry->localElem.length() &&
1219       redInfo.ccount [index] == entry->children.length())
1220       currentTreeUp = 1;
1221
1222   int mixTreeUp = 0;
1223   const int numElems = entry->allElem.length();
1224   
1225   if (!entry->hasParent()) {
1226     mixTreeUp = 1;
1227     for (int i=0; i<msg->nFrags; i++) {
1228       if (entry->allElem.length() != redInfo.gcount [i]) {
1229         mixTreeUp = 0;
1230       }
1231     }
1232   }
1233
1234   if (currentTreeUp || mixTreeUp)
1235   {
1236     const int nFrags = msg->nFrags;  
1237     
1238     // msg from children contain only one fragment
1239     reduceFragment (index, id, entry, redInfo, updateReduceNo, 
1240                     currentTreeUp);
1241
1242     if (redInfo.npProcessed == nFrags) {
1243       // reset counters
1244       for (i=0; i<nFrags; i++) {
1245         redInfo.lcount [i] = 0;
1246         redInfo.ccount [i] = 0;
1247         redInfo.gcount [i] = 0;
1248       }
1249       redInfo.npProcessed = 0;
1250
1251       // release future msgs
1252       releaseFutureReduceMsgs(entry);
1253     }
1254   }
1255 }
1256
1257 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1258 {
1259   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1260
1261   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1262     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1263     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1264   }
1265   entry->red.futureMsgs.length() = 0;
1266 }
1267
1268 // these messages have to be sent to root
1269 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1270 {
1271   int i;
1272   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1273
1274   for (int j=0; j<MAXFRAGS; j++) {
1275     for (i=0; i<entry->red.msgs[j].length(); i++) {
1276       CkReductionMsg *msg = entry->red.msgs[j][i];
1277       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1278       msg->sid = entry->rootSid;
1279       msg->sourceFlag = 0;
1280       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1281     }
1282     entry->red.msgs[j].length() = 0;
1283   }
1284
1285
1286   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1287     CkReductionMsg *msg = entry->red.futureMsgs[i];
1288     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1289     msg->sid = entry->rootSid;
1290     msg->sourceFlag = 0;
1291     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1292   }
1293   entry->red.futureMsgs.length() = 0;
1294 }
1295
1296 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1297 {
1298   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1299   if (entry->red.redNo < red)
1300     entry->red.redNo = red;
1301
1302   CProxy_CkMulticastMgr mp(thisgroup);
1303   for (int i=0; i<entry->children.length(); i++) {
1304     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1305   }
1306
1307   releaseFutureReduceMsgs(entry);
1308 }
1309
1310 #if 0
1311 ////////////////////////////////////////////////////////////////////////////////
1312 /////
1313 ///////////////// Builtin Reducer Functions //////////////
1314 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1315 {CkAbort("ERROR! Called the invalid reducer!\n");return NULL;}
1316
1317 /* A simple reducer, like sum_int, looks like this:
1318 static CkReductionMsg *sum_int(int nMsg,CkReductionMsg **msg)
1319 {
1320   int i,ret=0;
1321   for (i=0;i<nMsg;i++)
1322     ret+=*(int *)(msg[i]->data);
1323   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1324 }
1325 */
1326
1327 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1328 static CkReductionMsg *name(int nMsg, CkReductionMsg **msg)\
1329 {\
1330   int m,i;\
1331   int nElem=msg[0]->getSize()/sizeof(dataType);\
1332   dataType *ret=(dataType *)(msg[0]->getData());\
1333   for (m=1;m<nMsg;m++)\
1334   {\
1335     dataType *value=(dataType *)(msg[m]->getData());\
1336     for (i=0;i<nElem;i++)\
1337     {\
1338       loop\
1339     }\
1340   }\
1341   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret);\
1342 }
1343
1344 //Use this macro for reductions that have the same type for all inputs
1345 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1346   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1347   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1348   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1349
1350
1351 //Compute the sum the numbers passed by each element.
1352 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1353
1354 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1355
1356 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1357
1358 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1359
1360 CkReduction::reducerFn CkMulticastMgr::reducerTable[CkMulticastMgr::MAXREDUCERS]={
1361     ::invalid_reducer,
1362   //Compute the sum the numbers passed by each element.
1363     ::sum_int,::sum_float,::sum_double,
1364     ::product_int,::product_float,::product_double,
1365     ::max_int,::max_float,::max_double,
1366     ::min_int,::min_float,::min_double
1367 };
1368 #endif
1369
1370 #include "CkMulticast.def.h"