CkMulticast: Support multicasts to cross-array sections
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19
20 #define DEBUGF(x)  // CkPrintf x;
21
22 // turn on or off fragmentation in multicast
23 #define SPLIT_MULTICAST 0
24 // each multicast message is split into SPLIT_NUM fragments
25 #define SPLIT_NUM 2
26
27 // maximum number of fragments into which a message can be broken
28 #define MAXFRAGS 5
29
30 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
31 typedef CkVec<CkArrayIndexMax>   arrayIndexList;
32 typedef CkVec<CkSectionInfo>     sectionIdList;
33 typedef CkVec<CkReductionMsg *>  reductionMsgs;
34 typedef CkQ<int>                 PieceSize;
35 typedef CkVec<LDObjid>          ObjKeyList;
36 typedef unsigned char            byte;
37
38 class reductionInfo {
39 public:
40   int            lcount [MAXFRAGS]; /**< local elem collected */
41   int            ccount [MAXFRAGS]; /**< children node collected */
42   int            gcount [MAXFRAGS]; /**< total elem collected */
43   int            npProcessed;
44   CkCallback*    storedCallback;    /**< user callback */
45   redClientFn    storedClient;      /**< reduction client function */
46   void*          storedClientParam; /**< user provided data */
47   int            redNo;             /**< reduction sequence number */
48   reductionMsgs  msgs [MAXFRAGS];   /**< messages for this reduction */
49   reductionMsgs  futureMsgs;        /**< messages of future reductions */
50 public:
51   reductionInfo(): storedCallback(NULL), storedClientParam(NULL), redNo(0),
52                    npProcessed(0) {
53     for (int i=0; i<MAXFRAGS; i++) 
54       lcount [i] = ccount [i] = gcount [i] = 0;
55   }
56 };
57
58 /// cookie status
59 #define COOKIE_NOTREADY 0
60 #define COOKIE_READY    1
61 #define COOKIE_OLD      2
62
63 class mCastPacket {
64 public:
65   CkSectionInfo cookie;
66   int n;
67   char *data;
68   int seqno;
69   int count;
70   int totalsize;
71
72   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
73                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
74 };
75
76 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
77
78 class SectionLocation {
79 public:
80   mCastEntry *entry;
81   int         pe;
82 public:
83   SectionLocation(): entry(NULL), pe(-1) {}
84   SectionLocation( mCastEntry *e, int p) { set(e, p); }
85   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
86   inline void clear() { entry = NULL; pe = -1; }
87 };
88
89
90
91
92 /// Cookie for an array section 
93 class mCastEntry 
94 {
95     public:
96         /// Array ID 
97         CkArrayID     aid;
98         /// Spanning tree parent
99         CkSectionInfo parentGrp;
100         /// List of direct children
101         sectionIdList children;
102         /// Number of direct children
103         int numChild;
104         /// List of all tree member array indices (Only useful on the tree root)
105         arrayIndexList allElem;
106         /// Only useful on root for LB
107         ObjKeyList     allObjKeys;
108         /// List of array elements on local PE
109         arrayIndexList localElem;
110         /// Should always be myPE
111         int pe;
112         /// Section ID of the root
113         CkSectionInfo rootSid;
114         multicastGrpMsgBuf msgBuf;
115         /// Buffer storing the pending packets
116         multicastGrpPacketBuf packetBuf;
117         /// For multicast packetization
118         char *asm_msg;
119         int   asm_fill;
120         /// Linked list of entries on the same processor
121         mCastEntry *oldc, *newc;
122         /// Old spanning tree
123         SectionLocation   oldtree;
124         // for reduction
125         reductionInfo red;
126         //
127         char needRebuild;
128     private:
129         char flag;
130     public:
131         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
132                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
133         mCastEntry(mCastEntry *);
134         /// Check if this tree is only a branch and has a parent
135         inline int hasParent() { return parentGrp.get_val()?1:0; }
136         /// Is this tree obsolete?
137         inline int isObsolete() { return (flag == COOKIE_OLD); }
138         /// Make the current tree obsolete
139         inline void setObsolete() { flag=COOKIE_OLD; }
140         /// Check if this (branch of the) tree is ready for use
141         inline int notReady() { return (flag == COOKIE_NOTREADY); }
142         /// Mark this (branch of the) tree as ready for use
143         inline void setReady() { flag=COOKIE_READY; }
144         /// Increment the reduction number for all the section members on this PE
145         inline void incReduceNo() {
146             red.redNo ++;
147             for (mCastEntry *next = newc; next; next=next->newc) 
148                 next->red.redNo++;
149         }
150         /// Get a handle on the array ID this tree is a member of
151         inline CkArrayID getAid() { return aid; }
152         inline int hasOldtree() { return oldtree.entry != NULL; }
153         inline void print() {
154             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
155         }
156 };
157
158
159
160
161 class cookieMsg: public CMessage_cookieMsg {
162 public:
163   CkSectionInfo cookie;
164 public:
165   cookieMsg() {};
166   cookieMsg(CkSectionInfo m): cookie(m) {};
167 };
168
169
170
171
172 /// multicast tree setup message
173 class multicastSetupMsg: public CMessage_multicastSetupMsg {
174 public:
175   int  nIdx;
176   CkArrayIndexMax *arrIdx;
177   int      *lastKnown;
178   CkSectionInfo parent;
179   CkSectionInfo rootSid;
180   int redNo;
181 };
182
183
184
185
186 /// message send in spanning tree
187 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
188 };
189
190
191 extern void CkPackMessage(envelope **pEnv);
192 extern void CkUnpackMessage(envelope **pEnv);
193
194
195
196 void _ckMulticastInit(void)
197 {
198 /*
199   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
200   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
201 */
202 }
203
204
205 mCastEntry::mCastEntry (mCastEntry *old): 
206   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
207 {
208   int i;
209   aid = old->aid;
210   parentGrp = old->parentGrp;
211   for (i=0; i<old->allElem.length(); i++)
212     allElem.push_back(old->allElem[i]);
213 #if CMK_LBDB_ON
214   CmiAssert(old->allElem.length() == old->allObjKeys.length());
215   for (i=0; i<old->allObjKeys.length(); i++)
216     allObjKeys.push_back(old->allObjKeys[i]);
217 #endif
218   pe = old->pe;
219   red.storedCallback = old->red.storedCallback;
220   red.storedClient = old->red.storedClient;
221   red.storedClientParam = old->red.storedClientParam;
222   red.redNo = old->red.redNo;
223   needRebuild = 0;
224   asm_msg = NULL;
225   asm_fill = 0;
226 }
227
228 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
229
230
231
232
233 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndexMax *al, int n)
234 {
235     // Create a multicast entry
236     mCastEntry *entry = new mCastEntry(aid);
237     // Push all the section member indices into the entry
238     for (int i=0; i<n; i++) {
239         entry->allElem.push_back(al[i]);
240 #if CMK_LBDB_ON
241         const LDObjid key = idx2LDObjid(al[i]);
242         entry->allObjKeys.push_back(key);
243 #endif
244     }
245     //  entry->aid = aid;
246     _id.aid = aid;
247     _id.get_val() = entry;              // allocate table for this section
248     // 
249     initCookie(_id);
250 }
251
252
253
254
255 void CkMulticastMgr::setSection(CkSectionInfo &id)
256 {
257   initCookie(id);
258 }
259
260
261
262
263 /// @warning: This is deprecated
264 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
265 {
266   CkArrayID aid = proxy.ckGetArrayID();
267   CkSectionInfo &_id = proxy.ckGetSectionInfo();
268
269   mCastEntry *entry = new mCastEntry(aid);
270
271   const CkArrayIndexMax *al = proxy.ckGetArrayElements();
272   for (int i=0; i<proxy.ckGetNumElements(); i++) {
273     entry->allElem.push_back(al[i]);
274 #if CMK_LBDB_ON
275     const LDObjid key = idx2LDObjid(al[i]);
276     entry->allObjKeys.push_back(key);
277 #endif
278   }
279   _id.type = MulticastMsg;
280   _id.aid = aid;
281   _id.get_val() = entry;                // allocate table for this section
282   initCookie(_id);
283 }
284
285
286
287
288 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
289 {
290   CkSectionInfo &info = proxy.ckGetSectionInfo();
291
292   int oldpe = info.get_pe();
293   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
294
295   CkArrayID aid = proxy.ckGetArrayID();
296   CkSectionID *sid = proxy.ckGetSectionIDs();
297   mCastEntry *entry = new mCastEntry(aid);
298
299   mCastEntry *oldentry = (mCastEntry *)info.get_val();
300   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
301
302   const CkArrayIndexMax *al = sid->_elems;
303   CmiAssert(info.aid == aid);
304   prepareCookie(entry, *sid, al, sid->_nElems, aid);
305
306   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
307
308     // store old tree info
309   entry->oldtree.set(oldentry, oldpe);
310
311     // obsolete old tree
312   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
313
314   // find reduction number
315   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
316 }
317
318
319
320
321 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
322 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndexMax *al, int count, CkArrayID aid)
323 {
324   for (int i=0; i<count; i++) {
325     entry->allElem.push_back(al[i]);
326 #if CMK_LBDB_ON
327     const LDObjid key = idx2LDObjid(al[i]);
328     entry->allObjKeys.push_back(key);
329 #endif
330   }
331   sid._cookie.type = MulticastMsg;
332   sid._cookie.aid = aid;
333   sid._cookie.get_val() = entry;        // allocate table for this section
334   sid._cookie.get_pe() = CkMyPe();
335 }
336
337
338
339
340 // this is used
341 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
342 {
343   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
344   int numSubSections = proxy->ckGetNumSubSections();
345   for (int i=0; i<numSubSections; i++)
346   {
347       CkArrayID aid = proxy->ckGetArrayIDn(i);
348       mCastEntry *entry = new mCastEntry(aid);
349       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
350       const CkArrayIndexMax *al = proxy->ckGetArrayElements(i);
351       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
352       initCookie(sid->_cookie);
353   }
354 }
355
356
357
358
359 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
360 {
361   mCastEntry *entry = (mCastEntry *)s.get_val();
362   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
363   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
364 }
365
366 // now that we get reduction number from the old cookie,
367 // we continue to build the spanning tree
368 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
369 {
370   mCastEntry *entry = (mCastEntry *)s.get_val();
371   entry->red.redNo = red;
372
373   initCookie(s);
374
375   // TODO delete old tree
376 }
377
378
379
380
381 void CkMulticastMgr::initCookie(CkSectionInfo s)
382 {
383     mCastEntry *entry = (mCastEntry *)s.get_val();
384     int n = entry->allElem.length();
385     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
386     // Create and initialize a setup message
387     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
388     msg->nIdx = n;
389     msg->parent = CkSectionInfo(entry->getAid());
390     msg->rootSid = s;
391     msg->redNo = entry->red.redNo;
392     // Fill the message with the section member indices and their last known locations
393     CkArray *array = CProxy_ArrayBase(s.aid).ckLocalBranch();
394     for (int i=0; i<n; i++) {
395       msg->arrIdx[i] = entry->allElem[i];
396       int ape = array->lastKnown(entry->allElem[i]);
397       CmiAssert(ape >=0 && ape < CkNumPes());
398       msg->lastKnown[i] = ape;
399     }
400     // Trigger the spanning tree build
401     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
402     mCastGrp[CkMyPe()].setup(msg);
403 }
404
405
406
407
408 void CkMulticastMgr::teardown(CkSectionInfo cookie)
409 {
410     int i;
411     mCastEntry *sect = (mCastEntry *)cookie.get_val();
412     // Mark this section as obsolete
413     sect->setObsolete();
414     // Release the buffered messages 
415     releaseBufferedReduceMsgs(sect);
416     // Propagate the teardown to each of your children
417     CProxy_CkMulticastMgr mp(thisgroup);
418     for (i=0; i<sect->children.length(); i++)
419         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
420 }
421
422
423
424
425 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
426 {
427     int i;
428     mCastEntry *sect = (mCastEntry *)cookie.get_val();
429     // Reset the root section info
430     sect->rootSid = newroot;
431     // Mark this section as obsolete
432     sect->setObsolete();
433     // Release the buffered messages 
434     releaseBufferedReduceMsgs(sect);
435     // Propagate the teardown to each of your children
436     CProxy_CkMulticastMgr mp(thisgroup);
437     for (i=0; i<sect->children.length(); i++)
438         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
439 }
440
441
442
443
444 void CkMulticastMgr::freeup(CkSectionInfo cookie)
445 {
446   mCastEntry *sect = (mCastEntry *)cookie.get_val();
447   CProxy_CkMulticastMgr mp(thisgroup);
448   // Parse through all the section members on this PE and...
449   while (sect) 
450   {
451       // Free their children
452       for (int i=0; i<sect->children.length(); i++)
453           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
454       // Free the cookie itself
455       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
456       mCastEntry *oldc= sect->oldc;
457       delete sect;
458       sect = oldc;
459   }
460 }
461
462
463
464
465 void CkMulticastMgr::setup(multicastSetupMsg *msg)
466 {
467     int i,j;
468     mCastEntry *entry;
469     CkArrayID aid = msg->rootSid.aid;
470     if (msg->parent.get_pe() == CkMyPe()) 
471       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
472     else 
473       entry = new mCastEntry(aid);
474     entry->aid = aid;
475     entry->pe = CkMyPe();
476     entry->rootSid = msg->rootSid;
477     entry->parentGrp = msg->parent;
478     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
479     entry->red.redNo = msg->redNo;
480
481     // Create a numPE sized array of vectors to hold the array elements in each PE
482     int numpes = CkNumPes();
483     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
484     // Sort each array index in the setup message based on last known location
485     for (i=0; i<msg->nIdx; i++) 
486     {
487       int lastKnown = msg->lastKnown[i];
488       // If msg->arrIdx[i] local, add it to a special local element list
489       if (lastKnown == CkMyPe())
490           entry->localElem.insertAtEnd(msg->arrIdx[i]);
491       // else, add it to the list corresponding to its PE
492       else
493           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
494     }
495
496     CkVec<int> mySubTreePEs;
497     mySubTreePEs.reserve(numpes);
498     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
499     mySubTreePEs.push_back(CkMyPe());
500     // Identify the child PEs in the tree, ie the PEs with section members on them
501     for (i=0; i<numpes; i++) 
502     {
503       if (i==CkMyPe()) continue;
504       if (lists[i].size()) 
505           mySubTreePEs.push_back(i);
506     }
507     // The number of multicast children can be limited by the spanning tree factor 
508     int num = mySubTreePEs.size() - 1, numchild = 0;
509     if (factor <= 0) numchild = num;
510     else numchild = num<factor?num:factor;
511   
512     entry->numChild = numchild;
513
514     // If there are any children, go about building a spanning tree
515     if (numchild) 
516     {
517         // Build the next generation of the spanning tree rooted at my PE
518         int *peListPtr = mySubTreePEs.getVec();
519         topo::SpanningTreeVertex *nextGenInfo;
520         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
521         CkAssert(nextGenInfo->childIndex.size() == numchild);
522
523         // Distribute the section members across the number of direct children (branches)
524         // Direct children are simply the first section member in each of the branch lists
525         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
526
527         // For each direct child, collate indices of all section members on the PEs in that branch
528         for (i=0; i < numchild; i++)
529         {
530             // Determine the indices of the first and last PEs in this branch of my sub-tree
531             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
532             if (i < numchild-1)
533                 childEndIndex = nextGenInfo->childIndex[i+1];
534             else
535                 childEndIndex = mySubTreePEs.size();
536             // For each PE in this branch, add the section members on that PE to a list
537             for (j = childStartIndex; j < childEndIndex; j++)
538             {
539                 int pe = mySubTreePEs[j];
540                 for (int k=0; k<lists[pe].size(); k++)
541                     slots[i].push_back(lists[pe][k]);
542             }
543         }
544
545         // Ask each of your direct children to setup their branches
546         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
547         for (i=0; i<numchild; i++) 
548         {
549             // Give each child info about the number, indices and location of its children
550             int n = slots[i].length();
551             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
552             m->parent = CkSectionInfo(aid, entry);
553             m->nIdx = slots[i].length();
554             m->rootSid = msg->rootSid;
555             m->redNo = msg->redNo;
556             for (j=0; j<slots[i].length(); j++) 
557             {
558                 m->arrIdx[j] = slots[i][j].idx;
559                 m->lastKnown[j] = slots[i][j].pe;
560             }
561             int childroot = slots[i][0].pe;
562             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
563             // Send the message to the child
564             mCastGrp[childroot].setup(m);
565         }
566         delete [] slots;
567         delete nextGenInfo;
568     }
569     // else, tell yourself that your children are ready
570     else 
571     {
572         childrenReady(entry);
573     }
574     delete [] lists;
575     delete msg;
576 }
577
578
579
580
581 void CkMulticastMgr::childrenReady(mCastEntry *entry)
582 {
583     // Mark this entry as ready
584     entry->setReady();
585     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
586     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
587     if (entry->hasParent()) 
588         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
589 #if SPLIT_MULTICAST
590     // clear packet buffer
591     while (!entry->packetBuf.isEmpty()) 
592     {
593         mCastPacket *packet = entry->packetBuf.deq();
594         packet->cookie.get_val() = entry;
595         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
596         delete [] packet->data;
597         delete packet;
598     }
599 #else
600     // clear msg buffer
601     while (!entry->msgBuf.isEmpty()) 
602     {
603         multicastGrpMsg *newmsg = entry->msgBuf.deq();
604         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
605         newmsg->_cookie.get_val() = entry;
606         mCastGrp[CkMyPe()].recvMsg(newmsg);
607     }
608 #endif
609     // release reduction msgs
610     releaseFutureReduceMsgs(entry);
611 }
612
613
614
615
616 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
617 {
618   mCastEntry *entry = (mCastEntry *)sid.get_val();
619   entry->children.push_back(child);
620   if (entry->children.length() == entry->numChild) {
621     childrenReady(entry);
622   }
623 }
624
625
626
627
628 // rebuild is called when root not migrated
629 // when rebuilding, all multicast msgs will be buffered.
630 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
631 {
632   // tear down old tree
633   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
634   CkAssert(curCookie->pe == CkMyPe());
635   // make sure I am the newest one
636   while (curCookie->newc) curCookie = curCookie->newc;
637   if (curCookie->isObsolete()) return;
638
639   //CmiPrintf("tree rebuild\n");
640   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
641
642   // build a chain
643   newCookie->oldc = curCookie;
644   curCookie->newc = newCookie;
645
646   sectId.get_val() = newCookie;
647
648   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
649
650   curCookie->setObsolete();
651
652   resetCookie(sectId);
653 }
654
655 void CkMulticastMgr::resetCookie(CkSectionInfo s)
656 {
657   mCastEntry *newCookie = (mCastEntry*)s.get_val();
658   mCastEntry *oldCookie = newCookie->oldc;
659
660   // get rid of old one
661   DEBUGF(("reset: oldc: %p\n", oldCookie));
662   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
663   int mype = CkMyPe();
664   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
665
666   // build a new one
667   initCookie(s);
668 }
669
670 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
671 {
672   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
673     // set an invalid cookie since we don't have it
674   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
675   for (int i=0; i< sid._nElems-1; i++) {
676      CProxyElement_ArrayBase ap(a, sid._elems[i]);
677      void *newMsg=CkCopyMsg((void **)&m);
678      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
679   }
680   if (sid._nElems > 0) {
681      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
682      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
683   }
684 }
685
686 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
687 {
688     for (int snum = 0; snum < nsid; snum++) {
689         void *msgCopy = m;
690         if (nsid - snum > 1)
691             msgCopy = CkCopyMsg(&m);
692         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
693     }
694 }
695
696
697
698 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
699 {
700             DEBUGF(("ArraySectionSend\n"));
701
702   multicastGrpMsg *msg = (multicastGrpMsg *)m;
703 //  msg->aid = a;
704   msg->ep = ep;
705
706   CkSectionInfo &s = sid->_cookie;
707
708   mCastEntry *entry;
709   if (s.get_pe() == CkMyPe()) {
710     entry = (mCastEntry *)s.get_val();   
711     if (entry == NULL) {
712       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
713     }
714
715     // update entry pointer in case there is a newer one.
716     if (entry->newc) {
717       do { entry=entry->newc; } while (entry->newc);
718       s.get_val() = entry;
719     }
720
721 #if CMK_LBDB_ON
722     // fixme: running obj?
723     envelope *env = UsrToEnv(msg);
724     const LDOMHandle &om = CProxy_ArrayBase(s.aid).ckLocMgr()->getOMHandle();
725     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
726 #endif
727
728     // first time need to rebuild, we do simple send to refresh lastKnown
729     if (entry->needRebuild == 1) {
730       msg->_cookie = s;
731       SimpleSend(ep, msg, s.aid, *sid, opts);
732       entry->needRebuild = 2;
733       return;
734     }
735     else if (entry->needRebuild == 2) rebuild(s);
736   }
737   else {
738     // fixme - in this case, not recorded in LB
739     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
740   }
741
742   // don't need packing here
743 /*
744   register envelope *env = UsrToEnv(m);
745   CkPackMessage(&env);
746   m = EnvToUsr(env);
747 */
748
749   // update cookie
750   msg->_cookie = s;
751
752 #if SPLIT_MULTICAST
753   // split multicast msg into SPLIT_NUM copies
754   register envelope *env = UsrToEnv(m);
755   CkPackMessage(&env);
756   int totalsize = env->getTotalsize();
757   int packetSize = totalsize/SPLIT_NUM;
758   if (totalsize%SPLIT_NUM) packetSize ++;
759   int totalcount = SPLIT_NUM;
760   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
761   int sizesofar = 0;
762   char *data = (char*) env;
763   for (int i=0; i<totalcount; i++) {
764     int mysize = packetSize;
765     if (mysize + sizesofar > totalsize) {
766       mysize = totalsize-sizesofar;
767     }
768     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
769     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
770     sizesofar += mysize;
771     data += mysize;
772   }
773   CmiFree(env);
774 #else
775   if (s.get_pe() == CkMyPe()) {
776     recvMsg(msg);
777   }
778   else {
779     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
780     mCastGrp[s.get_pe()].recvMsg(msg);
781   }
782 #endif
783 }
784
785 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
786 {
787   int i;
788   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
789
790
791   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
792     char *newdata = new char[n];
793     memcpy(newdata, data, n);
794     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
795 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
796     return;
797   }
798
799 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
800
801   // send to spanning tree children
802   // can not optimize using list send because the difference in cookie
803   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
804   for (i=0; i<entry->children.length(); i++) {
805     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
806   }
807
808   if (seqno == 0) {
809     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
810       entry->print();
811       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
812     }
813     entry->asm_msg = (char *)CmiAlloc(totalsize);
814   }
815   memcpy(entry->asm_msg+entry->asm_fill, data, n);
816   entry->asm_fill += n;
817   if (seqno + 1 == count) {
818     CmiAssert(entry->asm_fill == totalsize);
819     CkUnpackMessage((envelope **)&entry->asm_msg);
820     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
821     msg->_cookie = _cookie;
822 //    mCastGrp[CkMyPe()].recvMsg(msg);
823     recvMsg(msg);
824     entry->asm_msg = NULL;
825     entry->asm_fill = 0;
826   }
827 //  if (fromBuffer) delete [] data;
828 }
829
830 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
831 {
832   int i;
833   CkSectionInfo &sectionInfo = msg->_cookie;
834   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
835   CmiAssert(entry->getAid() == sectionInfo.aid);
836
837 #if ! SPLIT_MULTICAST
838   if (entry->notReady()) {
839     DEBUGF(("entry not ready, enq buffer %p\n", msg));
840     entry->msgBuf.enq(msg);
841     return;
842   }
843
844   // send to spanning tree children
845   // can not optimize using list send because the difference in cookie
846   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
847   for (i=0; i<entry->children.length(); i++) {
848     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
849     newmsg->_cookie = entry->children[i];
850     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
851   }
852 #endif
853
854   // send to local
855   int nLocal = entry->localElem.length();
856   DEBUGF(("send to local %d\n", nLocal));
857   for (i=0; i<nLocal-1; i++) {
858     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[i]);
859     if (_entryTable[msg->ep]->noKeep) {
860       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.aid, entry->localElem[i], CK_MSG_KEEP);
861     }
862     else {
863       // send through scheduler queue
864       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
865       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
866     }
867     // use CK_MSG_DONTFREE so that the message can be reused
868     // the drawback of this scheme bypassing queue is that 
869     // if # of local element is huge, this leads to a long time occupying CPU
870     // also load balancer seems not be able to correctly instrument load
871 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
872     //CmiNetworkProgressAfter(3);
873   }
874   if (nLocal) {
875     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[nLocal-1]);
876     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
877 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
878   }
879   else {
880     CkAssert (entry->rootSid.get_pe() == CkMyPe());
881     delete msg;
882   }
883 }
884
885 // user function
886 // to retrieve section info from a multicast msg
887 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
888 {
889   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
890   if (CkMcastBaseMsg::checkMagic(m) == 0) {
891     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
892     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
893   }
894   // ignore invalid cookie sent by SimpleSend
895   if (m->gpe() != -1) {
896     id.type = MulticastMsg;
897     id.get_pe() = m->gpe();
898     id.get_val() = m->cookie();
899   }
900   // note: retain old redNo
901 }
902
903 // Reduction
904
905 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
906 {
907   CkSectionInfo &id = proxy.ckGetSectionInfo();
908   mCastEntry *entry = (mCastEntry *)id.get_val();
909   entry->red.storedCallback = cb;
910 }
911
912 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
913 {
914   CkSectionInfo &id = proxy.ckGetSectionInfo();
915   mCastEntry *entry = (mCastEntry *)id.get_val();
916   entry->red.storedClient = fn;
917   entry->red.storedClientParam = param;
918 }
919
920 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
921 {
922   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
923   msg->reducer = type;
924   msg->sid = id;
925   msg->sourceFlag = 1;   // from array element
926   msg->redNo = id.get_redNo();
927   msg->gcount = 1;
928   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
929   msg->callback = cb;
930   msg->userFlag=userFlag;
931   return msg;
932 }
933
934
935
936 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
937 {
938   CkCallback cb;
939   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
940 }
941
942
943 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
944 {
945   if (id.get_val() == NULL || id.get_redNo() == -1) 
946     CmiAbort("contribute: SectionID is not initialized\n");
947
948   int nFrags;
949   if (-1 == fragSize) {         // no frag
950     nFrags = 1;
951     fragSize = dataSize;
952   }
953   else {
954     CmiAssert (dataSize >= fragSize);
955     nFrags = dataSize/fragSize;
956     if (dataSize%fragSize) nFrags++;
957   }
958
959   if (MAXFRAGS < nFrags) {
960     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
961     CmiAbort ("frag size too small\n");
962   }
963
964   int mpe = id.get_pe();
965   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
966
967   // break the message into k-piece fragments
968   int fSize = fragSize;
969   for (int i=0; i<nFrags; i++) {
970     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
971       fSize = dataSize%fragSize;
972     }
973
974     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
975
976     // initialize the new msg
977     msg->reducer            = type;
978     msg->sid                = id;
979     msg->nFrags             = nFrags;
980     msg->fragNo             = i;
981     msg->sourceFlag         = 1;
982     msg->redNo              = id.get_redNo();
983     msg->gcount             = 1;
984     msg->rebuilt            = (mpe == CkMyPe())?0:1;
985     msg->callback           = cb;
986     msg->userFlag           = userFlag;
987
988     mCastGrp[mpe].recvRedMsg(msg);
989
990     data = (void*)(((char*)data) + fSize);
991   }
992
993   id.get_redNo()++;
994   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
995 }
996
997 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
998                                               mCastEntry* entry,
999                                               reductionInfo& redInfo) {
1000   int i;
1001   int dataSize = 0;
1002   int nFrags   = redInfo.msgs[0][0]->nFrags;
1003
1004   // to avoid memcpy and allocation cost for non-pipelined reductions
1005   if (1 == nFrags) {
1006     CkReductionMsg* msg = redInfo.msgs[0][0];
1007
1008     // free up the msg slot
1009     redInfo.msgs[0].length() = 0;
1010
1011     return msg;
1012   }
1013
1014   for (i=0; i<nFrags; i++) {
1015     dataSize += redInfo.msgs[i][0]->dataSize;
1016   }
1017
1018   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1019
1020   // initialize msg header
1021   msg->redNo      = redInfo.msgs[0][0]->redNo;
1022   msg->reducer    = redInfo.msgs[0][0]->reducer;
1023   msg->sid        = id;
1024   msg->nFrags     = nFrags;
1025
1026   // I guess following fields need not be initialized
1027   msg->sourceFlag = 2;
1028   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1029   msg->callback   = redInfo.msgs[0][0]->callback;
1030   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1031
1032   byte* data = (byte*)msg->getData ();
1033   for (i=0; i<nFrags; i++) {
1034     // copy data from fragments to msg
1035     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1036     data += redInfo.msgs[i][0]->dataSize;
1037
1038     // free fragments
1039     delete redInfo.msgs[i][0];
1040     redInfo.msgs[i].length() = 0;    
1041   }
1042
1043   return msg;
1044 }
1045
1046 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1047                                      mCastEntry* entry, reductionInfo& redInfo,
1048                                      int& updateReduceNo, int currentTreeUp){
1049
1050   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1051   reductionMsgs& rmsgs = redInfo.msgs[index];
1052   int dataSize         = rmsgs[0]->dataSize;
1053   CkReduction::reducerType reducer = rmsgs[0]->reducer;
1054   int i;
1055   int oldRedNo = redInfo.redNo;
1056   int nFrags   = rmsgs[0]->nFrags;
1057   int fragNo   = rmsgs[0]->fragNo;
1058   int userFlag   = rmsgs[0]->userFlag;
1059                                                                                 
1060   // reduce msgs
1061   CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1062   CkAssert(NULL != f);
1063
1064   // check valid callback in msg and check if migration happened
1065   CkCallback msg_cb;
1066   int rebuilt = 0;
1067   for (i=0; i<rmsgs.length(); i++) {
1068     if (rmsgs[i]->rebuilt) rebuilt = 1;
1069     if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1070   }
1071
1072   CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec()); 
1073   newmsg->redNo  = redInfo.redNo;
1074   newmsg->nFrags = nFrags;
1075   newmsg->fragNo = fragNo;
1076   newmsg->userFlag = userFlag;
1077
1078   // increment num-frags processed
1079   redInfo.npProcessed ++;
1080
1081   // check if migration and free messages
1082   for (i=0; i<rmsgs.length(); i++) {
1083     if (rmsgs[i]!=newmsg) delete rmsgs[i];
1084   }
1085   rmsgs.length() = 0;
1086
1087   if (redInfo.npProcessed == nFrags) {
1088     entry->incReduceNo();
1089     DEBUGF(("Advanced entry:%p redNo: %d\n", entry, entry->red.redNo));
1090   }
1091   if (updateReduceNo) mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1092                                                                                 
1093   if (entry->hasParent()) {
1094     // send up to parent
1095     newmsg->sid        = entry->parentGrp;
1096     newmsg->reducer    = reducer;
1097     newmsg->sourceFlag = 2;
1098     newmsg->redNo      = oldRedNo;
1099     newmsg->gcount     = redInfo.gcount [index];
1100     newmsg->rebuilt    = rebuilt;
1101     newmsg->callback   = msg_cb;
1102     DEBUGF(("send to parent %p: %d\n", entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1103     mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1104   } else { // root
1105     newmsg->sid = id;
1106     // buffer message
1107     rmsgs.push_back (newmsg);
1108
1109     //if (entry->allElem.length() == redInfo.gcount) {
1110     if (redInfo.npProcessed == nFrags) {
1111
1112       newmsg = combineFrags (id, entry, redInfo);
1113       CkSetRefNum(newmsg, userFlag);
1114
1115       if (!msg_cb.isInvalid()) {
1116         msg_cb.send(newmsg);
1117       }
1118       else if (redInfo.storedCallback != NULL) {
1119         redInfo.storedCallback->send(newmsg);
1120       }
1121       else if (redInfo.storedClient != NULL) {
1122         redInfo.storedClient(id, redInfo.storedClientParam, dataSize,
1123            newmsg->data);
1124         delete newmsg;
1125       }
1126       else
1127         CmiAbort("Did you forget to register a reduction client?");
1128                                                                                 
1129       DEBUGF(("Reduction client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1130       if (currentTreeUp) {
1131         if (entry->oldc) {
1132             // free old tree on same processor;
1133           mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1134           entry->oldc = NULL;
1135         }
1136         if (entry->hasOldtree()) {
1137             // free old tree on old processor
1138           int oldpe = entry->oldtree.pe;
1139           mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1140           entry->oldtree.clear();
1141         }
1142       }
1143       if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1144     }
1145   }
1146 }
1147
1148 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1149 {
1150   int i;
1151   CkSectionInfo id = msg->sid;
1152   mCastEntry *entry = (mCastEntry *)id.get_val();
1153   CmiAssert(entry!=NULL);
1154 //CmiPrintf("[%d] recvRedMsg: entry: %p\n", CkMyPe(), entry);
1155
1156   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1157
1158   int updateReduceNo = 0;
1159
1160   // update entry if obsolete
1161   if (entry->isObsolete()) {
1162       // send up to root
1163     DEBUGF(("[%d] entry obsolete-send to root %d\n", CkMyPe(), entry->rootSid.pe));
1164     if (!entry->hasParent()) { //rootSid.pe == CkMyPe()
1165       // I am root, set to the new cookie if there is
1166       mCastEntry *newentry = entry->newc;
1167       while (newentry && newentry->newc) newentry=newentry->newc;
1168       if (newentry) entry = newentry;
1169       CmiAssert(entry!=NULL);
1170     }
1171     if (!entry->hasParent() && !entry->isObsolete()) {
1172        // root find the latest cookie that is not obsolete
1173       msg->sourceFlag = 0;       // indicate it is not on old spanning tree
1174       updateReduceNo = 1;        // reduce from old tree, new entry need update.
1175     }
1176     else {
1177       CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1178       // entry is obsolete, send to root directly
1179       msg->sid = entry->rootSid;
1180
1181       msg->sourceFlag = 0;
1182       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1183       return;
1184     }
1185   }
1186
1187   reductionInfo &redInfo = entry->red;
1188
1189   DEBUGF(("[%d] msg %p red:%d, entry:%p redno:%d\n", CkMyPe(), msg, msg->redNo, entry, entry->red.redNo));
1190   // old message come, ignore
1191   if (msg->redNo < redInfo.redNo) {
1192     CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1193     CmiAbort("Could never happen! \n");
1194   }
1195   if (entry->notReady() || msg->redNo > redInfo.redNo) {
1196     DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1197     redInfo.futureMsgs.push_back(msg);
1198     return;
1199   }
1200
1201   DEBUGF(("[%d] recvRedMsg rebuilt:%d red:%d\n", CkMyPe(), msg->rebuilt, redInfo.redNo));
1202
1203   const int index = msg->fragNo;
1204
1205   // buffer this msg
1206   if (msg->sourceFlag == 1) {
1207     // new reduction message from ArrayElement
1208     redInfo.lcount [index] ++;
1209   }
1210
1211   if (msg->sourceFlag == 2) {
1212     redInfo.ccount [index] ++;
1213   }
1214
1215   redInfo.gcount [index] += msg->gcount;
1216
1217   // buffer the msg
1218   // first check if message is of proper size
1219   if ((0 != redInfo.msgs[index].length()) && 
1220       (msg->dataSize != (redInfo.msgs [index][0]->dataSize))) {
1221     CmiAbort("Reduction data are not of same length!");
1222   }
1223
1224   redInfo.msgs [index].push_back(msg);
1225
1226   const int numFragsRcvd = redInfo.msgs [index].length();
1227
1228   DEBUGF(("[%d] index:%d lcount:%d-%d, ccount:%d-%d, gcount:%d-%d root:%d\n", CkMyPe(),index, entry->red.lcount[index],entry->localElem.length(), entry->red.ccount[index], entry->children.length(), entry->red.gcount[index], entry->allElem.length(), !entry->hasParent()));
1229
1230   int currentTreeUp = 0;
1231   if (redInfo.lcount [index] == entry->localElem.length() &&
1232       redInfo.ccount [index] == entry->children.length())
1233       currentTreeUp = 1;
1234
1235   int mixTreeUp = 0;
1236   const int numElems = entry->allElem.length();
1237   
1238   if (!entry->hasParent()) {
1239     mixTreeUp = 1;
1240     for (int i=0; i<msg->nFrags; i++) {
1241       if (entry->allElem.length() != redInfo.gcount [i]) {
1242         mixTreeUp = 0;
1243       }
1244     }
1245   }
1246
1247   if (currentTreeUp || mixTreeUp)
1248   {
1249     const int nFrags = msg->nFrags;  
1250     
1251     // msg from children contain only one fragment
1252     reduceFragment (index, id, entry, redInfo, updateReduceNo, 
1253                     currentTreeUp);
1254
1255     if (redInfo.npProcessed == nFrags) {
1256       // reset counters
1257       for (i=0; i<nFrags; i++) {
1258         redInfo.lcount [i] = 0;
1259         redInfo.ccount [i] = 0;
1260         redInfo.gcount [i] = 0;
1261       }
1262       redInfo.npProcessed = 0;
1263
1264       // release future msgs
1265       releaseFutureReduceMsgs(entry);
1266     }
1267   }
1268 }
1269
1270 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1271 {
1272   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1273
1274   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1275     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1276     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1277   }
1278   entry->red.futureMsgs.length() = 0;
1279 }
1280
1281 // these messages have to be sent to root
1282 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1283 {
1284   int i;
1285   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1286
1287   for (int j=0; j<MAXFRAGS; j++) {
1288     for (i=0; i<entry->red.msgs[j].length(); i++) {
1289       CkReductionMsg *msg = entry->red.msgs[j][i];
1290       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1291       msg->sid = entry->rootSid;
1292       msg->sourceFlag = 0;
1293       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1294     }
1295     entry->red.msgs[j].length() = 0;
1296   }
1297
1298
1299   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1300     CkReductionMsg *msg = entry->red.futureMsgs[i];
1301     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1302     msg->sid = entry->rootSid;
1303     msg->sourceFlag = 0;
1304     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1305   }
1306   entry->red.futureMsgs.length() = 0;
1307 }
1308
1309 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1310 {
1311   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1312   if (entry->red.redNo < red)
1313     entry->red.redNo = red;
1314
1315   CProxy_CkMulticastMgr mp(thisgroup);
1316   for (int i=0; i<entry->children.length(); i++) {
1317     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1318   }
1319
1320   releaseFutureReduceMsgs(entry);
1321 }
1322
1323 #if 0
1324 ////////////////////////////////////////////////////////////////////////////////
1325 /////
1326 ///////////////// Builtin Reducer Functions //////////////
1327 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1328 {CkAbort("ERROR! Called the invalid reducer!\n");return NULL;}
1329
1330 /* A simple reducer, like sum_int, looks like this:
1331 static CkReductionMsg *sum_int(int nMsg,CkReductionMsg **msg)
1332 {
1333   int i,ret=0;
1334   for (i=0;i<nMsg;i++)
1335     ret+=*(int *)(msg[i]->data);
1336   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1337 }
1338 */
1339
1340 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1341 static CkReductionMsg *name(int nMsg, CkReductionMsg **msg)\
1342 {\
1343   int m,i;\
1344   int nElem=msg[0]->getSize()/sizeof(dataType);\
1345   dataType *ret=(dataType *)(msg[0]->getData());\
1346   for (m=1;m<nMsg;m++)\
1347   {\
1348     dataType *value=(dataType *)(msg[m]->getData());\
1349     for (i=0;i<nElem;i++)\
1350     {\
1351       loop\
1352     }\
1353   }\
1354   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret);\
1355 }
1356
1357 //Use this macro for reductions that have the same type for all inputs
1358 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1359   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1360   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1361   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1362
1363
1364 //Compute the sum the numbers passed by each element.
1365 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1366
1367 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1368
1369 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1370
1371 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1372
1373 CkReduction::reducerFn CkMulticastMgr::reducerTable[CkMulticastMgr::MAXREDUCERS]={
1374     ::invalid_reducer,
1375   //Compute the sum the numbers passed by each element.
1376     ::sum_int,::sum_float,::sum_double,
1377     ::product_int,::product_float,::product_double,
1378     ::max_int,::max_float,::max_double,
1379     ::min_int,::min_float,::min_double
1380 };
1381 #endif
1382
1383 #include "CkMulticast.def.h"