879c33f24dfee433ec272aebb54d77439b3a8631
[charm.git] / src / libs / ck-libs / multicast / ckmulticast.C
1 /*
2  *  Charm++ support for array section multicast and reduction
3  *
4  *  written by Gengbin Zheng,   gzheng@uiuc.edu
5  *  on 12/2001
6  *
7  *  features:
8  *     using a spanning tree (factor defined in ckmulticast.h)
9  *     support pipelining via fragmentation  (SPLIT_MULTICAST)
10  *     support *any-time* migration, spanning tree will be rebuilt automatically
11  * */
12
13 #include "charm++.h"
14 #include "envelope.h"
15 #include "register.h"
16
17 #include "ckmulticast.h"
18 #include "spanningTreeStrategy.h"
19 #include "XArraySectionReducer.h"
20
21 #define DEBUGF(x)  // CkPrintf x;
22
23 // turn on or off fragmentation in multicast
24 #define SPLIT_MULTICAST  0 
25 // each multicast message is split into SPLIT_NUM fragments
26 #define SPLIT_NUM 20
27 #define SPLIT_SIZE (250000)
28
29 #define SPLIT_THRESHOLD (1000000)
30 // maximum number of fragments into which a message can be broken
31 #define MAXFRAGS 20
32
33 typedef CkQ<multicastGrpMsg *>   multicastGrpMsgBuf;
34 typedef CkVec<CkArrayIndexMax>   arrayIndexList;
35 typedef CkVec<CkSectionInfo>     sectionIdList;
36 typedef CkVec<CkReductionMsg *>  reductionMsgs;
37 typedef CkQ<int>                 PieceSize;
38 typedef CkVec<LDObjid>          ObjKeyList;
39 typedef unsigned char            byte;
40
41 class reductionInfo {
42 public:
43   int            lcount [MAXFRAGS]; /**< local elem collected */
44   int            ccount [MAXFRAGS]; /**< children node collected */
45   int            gcount [MAXFRAGS]; /**< total elem collected */
46   int            npProcessed;
47   CkCallback*    storedCallback;    /**< user callback */
48   redClientFn    storedClient;      /**< reduction client function */
49   void*          storedClientParam; /**< user provided data */
50   int            redNo;             /**< reduction sequence number */
51   reductionMsgs  msgs [MAXFRAGS];   /**< messages for this reduction */
52   reductionMsgs  futureMsgs;        /**< messages of future reductions */
53 public:
54   reductionInfo(): storedCallback(NULL), storedClientParam(NULL), redNo(0),
55                    npProcessed(0) {
56     for (int i=0; i<MAXFRAGS; i++) 
57       lcount [i] = ccount [i] = gcount [i] = 0;
58   }
59 };
60
61 /// cookie status
62 #define COOKIE_NOTREADY 0
63 #define COOKIE_READY    1
64 #define COOKIE_OLD      2
65
66 class mCastPacket {
67 public:
68   CkSectionInfo cookie;
69   int n;
70   char *data;
71   int seqno;
72   int count;
73   int totalsize;
74
75   mCastPacket(CkSectionInfo &_cookie, int _n, char *_d, int _s, int _c, int _t):
76                 cookie(_cookie), n(_n), data(_d), seqno(_s), count(_c), totalsize(_t) {}
77 };
78
79 typedef CkQ<mCastPacket *> multicastGrpPacketBuf;
80
81 class SectionLocation {
82 public:
83   mCastEntry *entry;
84   int         pe;
85 public:
86   SectionLocation(): entry(NULL), pe(-1) {}
87   SectionLocation( mCastEntry *e, int p) { set(e, p); }
88   inline void set(mCastEntry *e, int p) { entry = e; pe = p; }
89   inline void clear() { entry = NULL; pe = -1; }
90 };
91
92
93
94
95 /// Cookie for an array section 
96 class mCastEntry 
97 {
98     public:
99         /// Array ID 
100         CkArrayID     aid;
101         /// Spanning tree parent
102         CkSectionInfo parentGrp;
103         /// List of direct children
104         sectionIdList children;
105         /// Number of direct children
106         int numChild;
107         /// List of all tree member array indices (Only useful on the tree root)
108         arrayIndexList allElem;
109         /// Only useful on root for LB
110         ObjKeyList     allObjKeys;
111         /// List of array elements on local PE
112         arrayIndexList localElem;
113         /// Should always be myPE
114         int pe;
115         /// Section ID of the root
116         CkSectionInfo rootSid;
117         multicastGrpMsgBuf msgBuf;
118         /// Buffer storing the pending packets
119         multicastGrpPacketBuf packetBuf;
120         /// For multicast packetization
121         char *asm_msg;
122         int   asm_fill;
123         /// Linked list of entries on the same processor
124         mCastEntry *oldc, *newc;
125         /// Old spanning tree
126         SectionLocation   oldtree;
127         // for reduction
128         reductionInfo red;
129         //
130         char needRebuild;
131     private:
132         char flag;
133     public:
134         mCastEntry(CkArrayID _aid): aid(_aid), numChild(0), asm_msg(NULL), asm_fill(0),
135                     oldc(NULL), newc(NULL), needRebuild(0), flag(COOKIE_NOTREADY) {}
136         mCastEntry(mCastEntry *);
137         /// Check if this tree is only a branch and has a parent
138         inline int hasParent() { return parentGrp.get_val()?1:0; }
139         /// Is this tree obsolete?
140         inline int isObsolete() { return (flag == COOKIE_OLD); }
141         /// Make the current tree obsolete
142         inline void setObsolete() { flag=COOKIE_OLD; }
143         /// Check if this (branch of the) tree is ready for use
144         inline int notReady() { return (flag == COOKIE_NOTREADY); }
145         /// Mark this (branch of the) tree as ready for use
146         inline void setReady() { flag=COOKIE_READY; }
147         /// Increment the reduction number for all the section members on this PE
148         inline void incReduceNo() {
149             red.redNo ++;
150             for (mCastEntry *next = newc; next; next=next->newc) 
151                 next->red.redNo++;
152         }
153         /// Get a handle on the array ID this tree is a member of
154         inline CkArrayID getAid() { return aid; }
155         inline int hasOldtree() { return oldtree.entry != NULL; }
156         inline void print() {
157             CmiPrintf("[%d] mCastEntry: %p, numChild: %d pe: %d flag: %d asm_msg:%p asm_fill:%d\n", CkMyPe(), this, numChild, pe, flag, asm_msg, asm_fill);
158         }
159 };
160
161
162
163
164 class cookieMsg: public CMessage_cookieMsg {
165 public:
166   CkSectionInfo cookie;
167 public:
168   cookieMsg() {};
169   cookieMsg(CkSectionInfo m): cookie(m) {};
170 };
171
172
173
174
175 /// multicast tree setup message
176 class multicastSetupMsg: public CMessage_multicastSetupMsg {
177 public:
178   int  nIdx;
179   CkArrayIndexMax *arrIdx;
180   int      *lastKnown;
181   CkSectionInfo parent;
182   CkSectionInfo rootSid;
183   int redNo;
184 };
185
186
187
188
189 /// message send in spanning tree
190 class multicastGrpMsg: public CkMcastBaseMsg, public CMessage_multicastGrpMsg {
191 };
192
193
194 extern void CkPackMessage(envelope **pEnv);
195 extern void CkUnpackMessage(envelope **pEnv);
196
197
198
199 void _ckMulticastInit(void)
200 {
201 /*
202   CkDisableTracing(CkIndex_CkMulticastMgr::recvMsg(0));
203   CkDisableTracing(CkIndex_CkMulticastMgr::recvRedMsg(0));
204 */
205 }
206
207
208 mCastEntry::mCastEntry (mCastEntry *old): 
209   numChild(0), oldc(NULL), newc(NULL), flag(COOKIE_NOTREADY)
210 {
211   int i;
212   aid = old->aid;
213   parentGrp = old->parentGrp;
214   for (i=0; i<old->allElem.length(); i++)
215     allElem.push_back(old->allElem[i]);
216 #if CMK_LBDB_ON
217   CmiAssert(old->allElem.length() == old->allObjKeys.length());
218   for (i=0; i<old->allObjKeys.length(); i++)
219     allObjKeys.push_back(old->allObjKeys[i]);
220 #endif
221   pe = old->pe;
222   red.storedCallback = old->red.storedCallback;
223   red.storedClient = old->red.storedClient;
224   red.storedClientParam = old->red.storedClientParam;
225   red.redNo = old->red.redNo;
226   needRebuild = 0;
227   asm_msg = NULL;
228   asm_fill = 0;
229 }
230
231 extern LDObjid idx2LDObjid(const CkArrayIndex &idx);    // cklocation.C
232
233
234
235
236 void CkMulticastMgr::setSection(CkSectionInfo &_id, CkArrayID aid, CkArrayIndexMax *al, int n)
237 {
238     // Create a multicast entry
239     mCastEntry *entry = new mCastEntry(aid);
240     // Push all the section member indices into the entry
241     for (int i=0; i<n; i++) {
242         entry->allElem.push_back(al[i]);
243 #if CMK_LBDB_ON
244         const LDObjid key = idx2LDObjid(al[i]);
245         entry->allObjKeys.push_back(key);
246 #endif
247     }
248     //  entry->aid = aid;
249     _id.aid = aid;
250     _id.get_val() = entry;              // allocate table for this section
251     // 
252     initCookie(_id);
253 }
254
255
256
257
258 void CkMulticastMgr::setSection(CkSectionInfo &id)
259 {
260   initCookie(id);
261 }
262
263
264
265
266 /// @warning: This is deprecated
267 void CkMulticastMgr::setSection(CProxySection_ArrayElement &proxy)
268 {
269   CkArrayID aid = proxy.ckGetArrayID();
270   CkSectionInfo &_id = proxy.ckGetSectionInfo();
271
272   mCastEntry *entry = new mCastEntry(aid);
273
274   const CkArrayIndexMax *al = proxy.ckGetArrayElements();
275   for (int i=0; i<proxy.ckGetNumElements(); i++) {
276     entry->allElem.push_back(al[i]);
277 #if CMK_LBDB_ON
278     const LDObjid key = idx2LDObjid(al[i]);
279     entry->allObjKeys.push_back(key);
280 #endif
281   }
282   _id.type = MulticastMsg;
283   _id.aid = aid;
284   _id.get_val() = entry;                // allocate table for this section
285   initCookie(_id);
286 }
287
288
289
290
291 void CkMulticastMgr::resetSection(CProxySection_ArrayElement &proxy)
292 {
293   CkSectionInfo &info = proxy.ckGetSectionInfo();
294
295   int oldpe = info.get_pe();
296   if (oldpe == CkMyPe()) return;        // we don't have to recreate one
297
298   CkArrayID aid = proxy.ckGetArrayID();
299   CkSectionID *sid = proxy.ckGetSectionIDs();
300   mCastEntry *entry = new mCastEntry(aid);
301
302   mCastEntry *oldentry = (mCastEntry *)info.get_val();
303   DEBUGF(("[%d] resetSection: old entry:%p new entry:%p\n", CkMyPe(), oldentry, entry));
304
305   const CkArrayIndexMax *al = sid->_elems;
306   CmiAssert(info.aid == aid);
307   prepareCookie(entry, *sid, al, sid->_nElems, aid);
308
309   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
310
311     // store old tree info
312   entry->oldtree.set(oldentry, oldpe);
313
314     // obsolete old tree
315   mCastGrp[oldpe].retire(CkSectionInfo(oldpe, oldentry, 0, entry->getAid()), info);
316
317   // find reduction number
318   mCastGrp[oldpe].retrieveCookie(CkSectionInfo(oldpe, oldentry, 0, aid), info);
319 }
320
321
322
323
324 /// Build a mCastEntry object with relevant section info and set the section cookie to point to this object
325 void CkMulticastMgr::prepareCookie(mCastEntry *entry, CkSectionID &sid, const CkArrayIndexMax *al, int count, CkArrayID aid)
326 {
327   for (int i=0; i<count; i++) {
328     entry->allElem.push_back(al[i]);
329 #if CMK_LBDB_ON
330     const LDObjid key = idx2LDObjid(al[i]);
331     entry->allObjKeys.push_back(key);
332 #endif
333   }
334   sid._cookie.type = MulticastMsg;
335   sid._cookie.aid = aid;
336   sid._cookie.get_val() = entry;        // allocate table for this section
337   sid._cookie.get_pe() = CkMyPe();
338 }
339
340
341
342
343 // this is used
344 void CkMulticastMgr::initDelegateMgr(CProxy *cproxy)
345 {
346   CProxySection_ArrayBase *proxy = (CProxySection_ArrayBase *)cproxy;
347   int numSubSections = proxy->ckGetNumSubSections();
348   for (int i=0; i<numSubSections; i++)
349   {
350       CkArrayID aid = proxy->ckGetArrayIDn(i);
351       mCastEntry *entry = new mCastEntry(aid);
352       CkSectionID *sid = &( proxy->ckGetSectionID(i) );
353       const CkArrayIndexMax *al = proxy->ckGetArrayElements(i);
354       prepareCookie(entry, *sid, al, proxy->ckGetNumElements(i), aid);
355       initCookie(sid->_cookie);
356   }
357 }
358
359
360
361
362 void CkMulticastMgr::retrieveCookie(CkSectionInfo s, CkSectionInfo srcInfo)
363 {
364   mCastEntry *entry = (mCastEntry *)s.get_val();
365   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
366   mCastGrp[srcInfo.get_pe()].recvCookieInfo(srcInfo, entry->red.redNo);
367 }
368
369 // now that we get reduction number from the old cookie,
370 // we continue to build the spanning tree
371 void CkMulticastMgr::recvCookieInfo(CkSectionInfo s, int red)
372 {
373   mCastEntry *entry = (mCastEntry *)s.get_val();
374   entry->red.redNo = red;
375
376   initCookie(s);
377
378   // TODO delete old tree
379 }
380
381
382
383
384 void CkMulticastMgr::initCookie(CkSectionInfo s)
385 {
386     mCastEntry *entry = (mCastEntry *)s.get_val();
387     int n = entry->allElem.length();
388     DEBUGF(("init: %d elems %p\n", n, s.get_val()));
389     // Create and initialize a setup message
390     multicastSetupMsg *msg = new (n, n, 0) multicastSetupMsg;
391     msg->nIdx = n;
392     msg->parent = CkSectionInfo(entry->getAid());
393     msg->rootSid = s;
394     msg->redNo = entry->red.redNo;
395     // Fill the message with the section member indices and their last known locations
396     CkArray *array = CProxy_ArrayBase(s.aid).ckLocalBranch();
397     for (int i=0; i<n; i++) {
398       msg->arrIdx[i] = entry->allElem[i];
399       int ape = array->lastKnown(entry->allElem[i]);
400       CmiAssert(ape >=0 && ape < CkNumPes());
401       msg->lastKnown[i] = ape;
402     }
403     // Trigger the spanning tree build
404     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
405     mCastGrp[CkMyPe()].setup(msg);
406 }
407
408
409
410
411 void CkMulticastMgr::teardown(CkSectionInfo cookie)
412 {
413     int i;
414     mCastEntry *sect = (mCastEntry *)cookie.get_val();
415     // Mark this section as obsolete
416     sect->setObsolete();
417     // Release the buffered messages 
418     releaseBufferedReduceMsgs(sect);
419     // Propagate the teardown to each of your children
420     CProxy_CkMulticastMgr mp(thisgroup);
421     for (i=0; i<sect->children.length(); i++)
422         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
423 }
424
425
426
427
428 void CkMulticastMgr::retire(CkSectionInfo cookie, CkSectionInfo newroot)
429 {
430     int i;
431     mCastEntry *sect = (mCastEntry *)cookie.get_val();
432     // Reset the root section info
433     sect->rootSid = newroot;
434     // Mark this section as obsolete
435     sect->setObsolete();
436     // Release the buffered messages 
437     releaseBufferedReduceMsgs(sect);
438     // Propagate the teardown to each of your children
439     CProxy_CkMulticastMgr mp(thisgroup);
440     for (i=0; i<sect->children.length(); i++)
441         mp[sect->children[i].get_pe()].teardown(sect->children[i]);
442 }
443
444
445
446
447 void CkMulticastMgr::freeup(CkSectionInfo cookie)
448 {
449   mCastEntry *sect = (mCastEntry *)cookie.get_val();
450   CProxy_CkMulticastMgr mp(thisgroup);
451   // Parse through all the section members on this PE and...
452   while (sect) 
453   {
454       // Free their children
455       for (int i=0; i<sect->children.length(); i++)
456           mp[ sect->children[i].get_pe() ].freeup(sect->children[i]);
457       // Free the cookie itself
458       DEBUGF(("[%d] Free up on %p\n", CkMyPe(), sect));
459       mCastEntry *oldc= sect->oldc;
460       delete sect;
461       sect = oldc;
462   }
463 }
464
465
466
467
468 void CkMulticastMgr::setup(multicastSetupMsg *msg)
469 {
470     int i,j;
471     mCastEntry *entry;
472     CkArrayID aid = msg->rootSid.aid;
473     if (msg->parent.get_pe() == CkMyPe()) 
474       entry = (mCastEntry *)msg->rootSid.get_val(); //sid.val;
475     else 
476       entry = new mCastEntry(aid);
477     entry->aid = aid;
478     entry->pe = CkMyPe();
479     entry->rootSid = msg->rootSid;
480     entry->parentGrp = msg->parent;
481
482     DEBUGF(("[%d] setup: %p redNo: %d => %d with %d elems\n", CkMyPe(), entry, entry->red.redNo, msg->redNo, msg->nIdx));
483     entry->red.redNo = msg->redNo;
484
485     // Create a numPE sized array of vectors to hold the array elements in each PE
486     int numpes = CkNumPes();
487     arrayIndexPosList *lists = new arrayIndexPosList[numpes];
488     // Sort each array index in the setup message based on last known location
489     for (i=0; i<msg->nIdx; i++) 
490     {
491       int lastKnown = msg->lastKnown[i];
492       // If msg->arrIdx[i] local, add it to a special local element list
493       if (lastKnown == CkMyPe())
494           entry->localElem.insertAtEnd(msg->arrIdx[i]);
495       // else, add it to the list corresponding to its PE
496       else
497           lists[lastKnown].push_back(IndexPos(msg->arrIdx[i], lastKnown));
498     }
499
500     CkVec<int> mySubTreePEs;
501     mySubTreePEs.reserve(numpes);
502     // The first PE in my subtree should be me, the tree root (as required by the spanning tree builder)
503     mySubTreePEs.push_back(CkMyPe());
504     // Identify the child PEs in the tree, ie the PEs with section members on them
505     for (i=0; i<numpes; i++) 
506     {
507       if (i==CkMyPe()) continue;
508       if (lists[i].size()) 
509           mySubTreePEs.push_back(i);
510     }
511     // The number of multicast children can be limited by the spanning tree factor 
512     int num = mySubTreePEs.size() - 1, numchild = 0;
513     if (factor <= 0) numchild = num;
514     else numchild = num<factor?num:factor;
515   
516     entry->numChild = numchild;
517
518     // If there are any children, go about building a spanning tree
519     if (numchild) 
520     {
521         // Build the next generation of the spanning tree rooted at my PE
522         int *peListPtr = mySubTreePEs.getVec();
523         topo::SpanningTreeVertex *nextGenInfo;
524         nextGenInfo = topo::buildSpanningTreeGeneration(peListPtr,peListPtr + mySubTreePEs.size(),numchild);
525         //CkAssert(nextGenInfo->childIndex.size() == numchild);
526         numchild = nextGenInfo->childIndex.size();
527         entry->numChild = numchild;
528
529         // Distribute the section members across the number of direct children (branches)
530         // Direct children are simply the first section member in each of the branch lists
531         arrayIndexPosList *slots = new arrayIndexPosList[numchild];
532
533         // For each direct child, collate indices of all section members on the PEs in that branch
534         for (i=0; i < numchild; i++)
535         {
536             // Determine the indices of the first and last PEs in this branch of my sub-tree
537             int childStartIndex = nextGenInfo->childIndex[i], childEndIndex;
538             if (i < numchild-1)
539                 childEndIndex = nextGenInfo->childIndex[i+1];
540             else
541                 childEndIndex = mySubTreePEs.size();
542             // For each PE in this branch, add the section members on that PE to a list
543             for (j = childStartIndex; j < childEndIndex; j++)
544             {
545                 int pe = mySubTreePEs[j];
546                 for (int k=0; k<lists[pe].size(); k++)
547                     slots[i].push_back(lists[pe][k]);
548             }
549         }
550
551         // Ask each of your direct children to setup their branches
552         CProxy_CkMulticastMgr  mCastGrp(thisgroup);
553         for (i=0; i<numchild; i++) 
554         {
555             // Give each child info about the number, indices and location of its children
556             int n = slots[i].length();
557             multicastSetupMsg *m = new (n, n, 0) multicastSetupMsg;
558             m->parent = CkSectionInfo(aid, entry);
559             m->nIdx = slots[i].length();
560             m->rootSid = msg->rootSid;
561             m->redNo = msg->redNo;
562             for (j=0; j<slots[i].length(); j++) 
563             {
564                 m->arrIdx[j] = slots[i][j].idx;
565                 m->lastKnown[j] = slots[i][j].pe;
566             }
567             int childroot = slots[i][0].pe;
568             DEBUGF(("[%d] call set up %d numelem:%d\n", CkMyPe(), childroot, n));
569             // Send the message to the child
570             mCastGrp[childroot].setup(m);
571         }
572         delete [] slots;
573         delete nextGenInfo;
574     }
575     // else, tell yourself that your children are ready
576     else 
577     {
578         childrenReady(entry);
579     }
580     delete [] lists;
581     delete msg;
582 }
583
584
585
586
587 void CkMulticastMgr::childrenReady(mCastEntry *entry)
588 {
589     // Mark this entry as ready
590     entry->setReady();
591     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
592     DEBUGF(("[%d] entry %p childrenReady with %d elems.\n", CkMyPe(), entry, entry->allElem.length()));
593     if (entry->hasParent()) 
594         mCastGrp[entry->parentGrp.get_pe()].recvCookie(entry->parentGrp, CkSectionInfo(entry->getAid(), entry));
595 #if SPLIT_MULTICAST
596     // clear packet buffer
597     while (!entry->packetBuf.isEmpty()) 
598     {
599         mCastPacket *packet = entry->packetBuf.deq();
600         packet->cookie.get_val() = entry;
601         mCastGrp[CkMyPe()].recvPacket(packet->cookie, packet->n, packet->data, packet->seqno, packet->count, packet->totalsize, 1);
602         delete [] packet->data;
603         delete packet;
604     }
605 #else
606     // clear msg buffer
607     while (!entry->msgBuf.isEmpty()) 
608     {
609         multicastGrpMsg *newmsg = entry->msgBuf.deq();
610         DEBUGF(("[%d] release buffer %p %d\n", CkMyPe(), newmsg, newmsg->ep));
611         newmsg->_cookie.get_val() = entry;
612         mCastGrp[CkMyPe()].recvMsg(newmsg);
613     }
614 #endif
615     // release reduction msgs
616     releaseFutureReduceMsgs(entry);
617 }
618
619
620
621
622 void CkMulticastMgr::recvCookie(CkSectionInfo sid, CkSectionInfo child)
623 {
624   mCastEntry *entry = (mCastEntry *)sid.get_val();
625   entry->children.push_back(child);
626   if (entry->children.length() == entry->numChild) {
627     childrenReady(entry);
628   }
629 }
630
631
632
633
634 // rebuild is called when root not migrated
635 // when rebuilding, all multicast msgs will be buffered.
636 void CkMulticastMgr::rebuild(CkSectionInfo &sectId)
637 {
638   // tear down old tree
639   mCastEntry *curCookie = (mCastEntry*)sectId.get_val();
640   CkAssert(curCookie->pe == CkMyPe());
641   // make sure I am the newest one
642   while (curCookie->newc) curCookie = curCookie->newc;
643   if (curCookie->isObsolete()) return;
644
645   //CmiPrintf("tree rebuild\n");
646   mCastEntry *newCookie = new mCastEntry(curCookie);  // allocate table for this section
647
648   // build a chain
649   newCookie->oldc = curCookie;
650   curCookie->newc = newCookie;
651
652   sectId.get_val() = newCookie;
653
654   DEBUGF(("rebuild: redNo:%d oldc:%p newc;%p\n", newCookie->red.redNo, curCookie, newCookie));
655
656   curCookie->setObsolete();
657
658   resetCookie(sectId);
659 }
660
661 void CkMulticastMgr::resetCookie(CkSectionInfo s)
662 {
663   mCastEntry *newCookie = (mCastEntry*)s.get_val();
664   mCastEntry *oldCookie = newCookie->oldc;
665
666   // get rid of old one
667   DEBUGF(("reset: oldc: %p\n", oldCookie));
668   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
669   int mype = CkMyPe();
670   mCastGrp[mype].teardown(CkSectionInfo(mype, oldCookie, 0, oldCookie->getAid()));
671
672   // build a new one
673   initCookie(s);
674 }
675
676 void CkMulticastMgr::SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts)
677 {
678   DEBUGF(("[%d] SimpleSend: nElems:%d\n", CkMyPe(), sid._nElems));
679     // set an invalid cookie since we don't have it
680   ((multicastGrpMsg *)m)->_cookie = CkSectionInfo(-1, NULL, 0, a);
681   for (int i=0; i< sid._nElems-1; i++) {
682      CProxyElement_ArrayBase ap(a, sid._elems[i]);
683      void *newMsg=CkCopyMsg((void **)&m);
684      ap.ckSend((CkArrayMessage *)newMsg,ep,opts|CK_MSG_LB_NOTRACE);
685   }
686   if (sid._nElems > 0) {
687      CProxyElement_ArrayBase ap(a, sid._elems[sid._nElems-1]);
688      ap.ckSend((CkArrayMessage *)m,ep,opts|CK_MSG_LB_NOTRACE);
689   }
690 }
691
692 void CkMulticastMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid, CkSectionID *sid, int opts)
693 {
694     for (int snum = 0; snum < nsid; snum++) {
695         void *msgCopy = m;
696         if (nsid - snum > 1)
697             msgCopy = CkCopyMsg(&m);
698         sendToSection(pd, ep, msgCopy, &(sid[snum]), opts);
699     }
700 }
701
702
703
704 void CkMulticastMgr::sendToSection(CkDelegateData *pd,int ep,void *m, CkSectionID *sid, int opts)
705 {
706             DEBUGF(("ArraySectionSend\n"));
707
708   multicastGrpMsg *msg = (multicastGrpMsg *)m;
709 //  msg->aid = a;
710   msg->ep = ep;
711
712   CkSectionInfo &s = sid->_cookie;
713
714   mCastEntry *entry;
715   if (s.get_pe() == CkMyPe()) {
716     entry = (mCastEntry *)s.get_val();   
717     if (entry == NULL) {
718       CmiAbort("Unknown array section, Did you forget to register the array section to CkMulticastMgr using setSection()?");
719     }
720
721     // update entry pointer in case there is a newer one.
722     if (entry->newc) {
723       do { entry=entry->newc; } while (entry->newc);
724       s.get_val() = entry;
725     }
726
727 #if CMK_LBDB_ON
728     // fixme: running obj?
729     envelope *env = UsrToEnv(msg);
730     const LDOMHandle &om = CProxy_ArrayBase(s.aid).ckLocMgr()->getOMHandle();
731     LBDatabaseObj()->MulticastSend(om,entry->allObjKeys.getVec(),entry->allObjKeys.size(),env->getTotalsize());
732 #endif
733
734     // first time need to rebuild, we do simple send to refresh lastKnown
735     if (entry->needRebuild == 1) {
736       msg->_cookie = s;
737       SimpleSend(ep, msg, s.aid, *sid, opts);
738       entry->needRebuild = 2;
739       return;
740     }
741     else if (entry->needRebuild == 2) rebuild(s);
742   }
743   else {
744     // fixme - in this case, not recorded in LB
745     CmiPrintf("Warning: Multicast not optimized after multicast root migrated. \n");
746   }
747
748   // don't need packing here
749 /*
750   register envelope *env = UsrToEnv(m);
751   CkPackMessage(&env);
752   m = EnvToUsr(env);
753 */
754
755   // update cookie
756   msg->_cookie = s;
757
758 #if SPLIT_MULTICAST
759   // split multicast msg into SPLIT_NUM copies
760   register envelope *env = UsrToEnv(m);
761   CkPackMessage(&env);
762   int totalsize = env->getTotalsize();
763   int packetSize = 0;
764   int totalcount = 0;
765   if(totalsize < SPLIT_THRESHOLD){
766     packetSize = totalsize;
767     totalcount = 1;
768   }else{
769     packetSize = SPLIT_SIZE;
770     totalcount = totalsize/SPLIT_SIZE;
771     if(totalcount%SPLIT_SIZE) totalcount++; 
772     //packetSize = totalsize/SPLIT_NUM;
773     //if (totalsize%SPLIT_NUM) packetSize ++;
774     //totalcount = SPLIT_NUM;
775   }
776   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
777   int sizesofar = 0;
778   char *data = (char*) env;
779   for (int i=0; i<totalcount; i++) {
780     int mysize = packetSize;
781     if (mysize + sizesofar > totalsize) {
782       mysize = totalsize-sizesofar;
783     }
784     //CmiPrintf("[%d] send to %d : mysize: %d total: %d \n", CkMyPe(), s.get_pe(), mysize, totalsize);
785     mCastGrp[s.get_pe()].recvPacket(s, mysize, data, i, totalcount, totalsize, 0);
786     sizesofar += mysize;
787     data += mysize;
788   }
789   CmiFree(env);
790 #else
791   if (s.get_pe() == CkMyPe()) {
792     recvMsg(msg);
793   }
794   else {
795     CProxy_CkMulticastMgr  mCastGrp(thisgroup);
796     mCastGrp[s.get_pe()].recvMsg(msg);
797   }
798 #endif
799 }
800
801 void CkMulticastMgr::recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer)
802 {
803   int i;
804   mCastEntry *entry = (mCastEntry *)_cookie.get_val();
805
806
807   if (!fromBuffer && (entry->notReady() || !entry->packetBuf.isEmpty())) {
808     char *newdata = new char[n];
809     memcpy(newdata, data, n);
810     entry->packetBuf.enq(new mCastPacket(_cookie, n, newdata, seqno, count, totalsize));
811 //CmiPrintf("[%d] Buffered recvPacket: seqno: %d %d frombuf:%d empty:%d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry->packetBuf.isEmpty(),entry);
812     return;
813   }
814
815 //CmiPrintf("[%d] recvPacket ready: seqno: %d %d buffer: %d entry:%p\n", CkMyPe(), seqno, count, fromBuffer, entry);
816
817   // send to spanning tree children
818   // can not optimize using list send because the difference in cookie
819   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
820   for (i=0; i<entry->children.length(); i++) {
821     mCastGrp[entry->children[i].get_pe()].recvPacket(entry->children[i], n, data, seqno, count, totalsize, 0);
822   }
823
824   if (seqno == 0) {
825     if (entry->asm_msg != NULL || entry->asm_fill != 0) {
826       entry->print();
827       CmiAssert(entry->asm_msg == NULL && entry->asm_fill==0);
828     }
829     entry->asm_msg = (char *)CmiAlloc(totalsize);
830   }
831   memcpy(entry->asm_msg+entry->asm_fill, data, n);
832   entry->asm_fill += n;
833   if (seqno + 1 == count) {
834     CmiAssert(entry->asm_fill == totalsize);
835     CkUnpackMessage((envelope **)&entry->asm_msg);
836     multicastGrpMsg *msg = (multicastGrpMsg *)EnvToUsr((envelope*)entry->asm_msg);
837     msg->_cookie = _cookie;
838 //    mCastGrp[CkMyPe()].recvMsg(msg);
839     recvMsg(msg);
840     entry->asm_msg = NULL;
841     entry->asm_fill = 0;
842   }
843 //  if (fromBuffer) delete [] data;
844 }
845
846 void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
847 {
848   int i;
849   CkSectionInfo &sectionInfo = msg->_cookie;
850   mCastEntry *entry = (mCastEntry *)msg->_cookie.get_val();
851   CmiAssert(entry->getAid() == sectionInfo.aid);
852
853 #if ! SPLIT_MULTICAST
854   if (entry->notReady()) {
855     DEBUGF(("entry not ready, enq buffer %p\n", msg));
856     entry->msgBuf.enq(msg);
857     return;
858   }
859
860   // send to spanning tree children
861   // can not optimize using list send because the difference in cookie
862   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
863   for (i=0; i<entry->children.length(); i++) {
864     multicastGrpMsg *newmsg = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
865     newmsg->_cookie = entry->children[i];
866     mCastGrp[entry->children[i].get_pe()].recvMsg(newmsg);
867   }
868 #endif
869
870   // send to local
871   int nLocal = entry->localElem.length();
872   DEBUGF(("send to local %d\n", nLocal));
873   for (i=0; i<nLocal-1; i++) {
874     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[i]);
875     if (_entryTable[msg->ep]->noKeep) {
876       CkSendMsgArrayInline(msg->ep, msg, sectionInfo.aid, entry->localElem[i], CK_MSG_KEEP);
877     }
878     else {
879       // send through scheduler queue
880       multicastGrpMsg *newm = (multicastGrpMsg *)CkCopyMsg((void **)&msg);
881       ap.ckSend((CkArrayMessage *)newm, msg->ep, CK_MSG_LB_NOTRACE);
882     }
883     // use CK_MSG_DONTFREE so that the message can be reused
884     // the drawback of this scheme bypassing queue is that 
885     // if # of local element is huge, this leads to a long time occupying CPU
886     // also load balancer seems not be able to correctly instrument load
887 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[i], CK_MSG_KEEP);
888     //CmiNetworkProgressAfter(3);
889   }
890   if (nLocal) {
891     CProxyElement_ArrayBase ap(sectionInfo.aid, entry->localElem[nLocal-1]);
892     ap.ckSend((CkArrayMessage *)msg, msg->ep, CK_MSG_LB_NOTRACE);
893 //    CkSendMsgArrayInline(msg->ep, msg, msg->aid, entry->localElem[nLocal-1]);
894   }
895   else {
896     CkAssert (entry->rootSid.get_pe() == CkMyPe());
897     delete msg;
898   }
899 }
900
901 // user function
902 // to retrieve section info from a multicast msg
903 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
904 {
905   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
906   if (CkMcastBaseMsg::checkMagic(m) == 0) {
907     CmiPrintf("ERROR: This is not a CkMulticast message!\n");
908     CmiAbort("Did you remember to do CkMulticast delegation, and inherit multicast message from CkMcastBaseMsg in correct order?");
909   }
910   // ignore invalid cookie sent by SimpleSend
911   if (m->gpe() != -1) {
912     id.type = MulticastMsg;
913     id.get_pe() = m->gpe();
914     id.get_val() = m->cookie();
915   }
916   // note: retain old redNo
917 }
918
919 // Reduction
920
921 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, CkCallback *cb)
922 {
923   CkCallback *sectionCB;
924   int numSubSections = proxy.ckGetNumSubSections();
925   // If its a cross-array section,
926   if (numSubSections > 1)
927   {
928       /** @warning: Each instantiation is a mem leak! :o
929        * The class is trivially small, but there's one instantiation for each
930        * section delegated to CkMulticast. The objects need to live as long as
931        * their section exists and is used. The idea of 'destroying' an array
932        * section is still academic, and hence no effort has been made to charge
933        * some 'owner' entity with the task of deleting this object.
934        *
935        * Reimplementing delegated x-array reductions will make this consideration moot
936        */
937       // Configure the final cross-section reducer
938       ck::impl::XArraySectionReducer *red =
939           new ck::impl::XArraySectionReducer(numSubSections, cb);
940       // Configure the subsection callback to deposit with the final reducer
941       sectionCB = new CkCallback(ck::impl::processSectionContribution, red);
942   }
943   // else, just direct the reduction to the actual client cb
944   else
945       sectionCB = cb;
946   // Wire the sections together by storing the subsection cb in each sectionID
947   for (int i=0; i<numSubSections; i++)
948   {
949       CkSectionInfo &sInfo = proxy.ckGetSectionID(i)._cookie;
950       mCastEntry *entry = (mCastEntry *)sInfo.get_val();
951       entry->red.storedCallback = sectionCB;
952   }
953 }
954
955 void CkMulticastMgr::setReductionClient(CProxySection_ArrayElement &proxy, redClientFn fn,void *param)
956 {
957   CkSectionInfo &id = proxy.ckGetSectionInfo();
958   mCastEntry *entry = (mCastEntry *)id.get_val();
959   entry->red.storedClient = fn;
960   entry->red.storedClientParam = param;
961 }
962
963 inline CkReductionMsg *CkMulticastMgr::buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag)
964 {
965   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, data);
966   msg->reducer = type;
967   msg->sid = id;
968   msg->sourceFlag = 1;   // from array element
969   msg->redNo = id.get_redNo();
970   msg->gcount = 1;
971   msg->rebuilt = (id.get_pe() == CkMyPe())?0:1;
972   msg->callback = cb;
973   msg->userFlag=userFlag;
974   return msg;
975 }
976
977
978
979 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, int userFlag, int fragSize)
980 {
981   CkCallback cb;
982   contribute(dataSize, data, type, id, cb, userFlag, fragSize);
983 }
984
985
986 void CkMulticastMgr::contribute(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag, int fragSize)
987 {
988   if (id.get_val() == NULL || id.get_redNo() == -1) 
989     CmiAbort("contribute: SectionID is not initialized\n");
990
991   int nFrags;
992   if (-1 == fragSize) {         // no frag
993     nFrags = 1;
994     fragSize = dataSize;
995   }
996   else {
997     CmiAssert (dataSize >= fragSize);
998     nFrags = dataSize/fragSize;
999     if (dataSize%fragSize) nFrags++;
1000   }
1001
1002   if (MAXFRAGS < nFrags) {
1003     CmiPrintf ("Recompile CkMulticast library for fragmenting msgs into more than %d fragments\n", MAXFRAGS);
1004     CmiAbort ("frag size too small\n");
1005   }
1006
1007   int mpe = id.get_pe();
1008   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1009
1010   // break the message into k-piece fragments
1011   int fSize = fragSize;
1012   for (int i=0; i<nFrags; i++) {
1013     if ((0 != i) && ((nFrags-1) == i) && (0 != dataSize%fragSize)) {
1014       fSize = dataSize%fragSize;
1015     }
1016
1017     CkReductionMsg *msg = CkReductionMsg::buildNew(fSize, data);
1018
1019     // initialize the new msg
1020     msg->reducer            = type;
1021     msg->sid                = id;
1022     msg->nFrags             = nFrags;
1023     msg->fragNo             = i;
1024     msg->sourceFlag         = 1;
1025     msg->redNo              = id.get_redNo();
1026     msg->gcount             = 1;
1027     msg->rebuilt            = (mpe == CkMyPe())?0:1;
1028     msg->callback           = cb;
1029     msg->userFlag           = userFlag;
1030
1031     mCastGrp[mpe].recvRedMsg(msg);
1032
1033     data = (void*)(((char*)data) + fSize);
1034   }
1035
1036   id.get_redNo()++;
1037   DEBUGF(("[%d] val: %d %p\n", CkMyPe(), id.get_pe(), id.get_val()));
1038 }
1039
1040 CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id, 
1041                                               mCastEntry* entry,
1042                                               reductionInfo& redInfo) {
1043   int i;
1044   int dataSize = 0;
1045   int nFrags   = redInfo.msgs[0][0]->nFrags;
1046
1047   // to avoid memcpy and allocation cost for non-pipelined reductions
1048   if (1 == nFrags) {
1049     CkReductionMsg* msg = redInfo.msgs[0][0];
1050
1051     // free up the msg slot
1052     redInfo.msgs[0].length() = 0;
1053
1054     return msg;
1055   }
1056
1057   for (i=0; i<nFrags; i++) {
1058     dataSize += redInfo.msgs[i][0]->dataSize;
1059   }
1060
1061   CkReductionMsg *msg = CkReductionMsg::buildNew(dataSize, NULL);
1062
1063   // initialize msg header
1064   msg->redNo      = redInfo.msgs[0][0]->redNo;
1065   msg->reducer    = redInfo.msgs[0][0]->reducer;
1066   msg->sid        = id;
1067   msg->nFrags     = nFrags;
1068
1069   // I guess following fields need not be initialized
1070   msg->sourceFlag = 2;
1071   msg->rebuilt    = redInfo.msgs[0][0]->rebuilt;
1072   msg->callback   = redInfo.msgs[0][0]->callback;
1073   msg->userFlag   = redInfo.msgs[0][0]->userFlag;
1074
1075   byte* data = (byte*)msg->getData ();
1076   for (i=0; i<nFrags; i++) {
1077     // copy data from fragments to msg
1078     memcpy(data, redInfo.msgs[i][0]->getData(), redInfo.msgs[i][0]->dataSize);
1079     data += redInfo.msgs[i][0]->dataSize;
1080
1081     // free fragments
1082     delete redInfo.msgs[i][0];
1083     redInfo.msgs[i].length() = 0;    
1084   }
1085
1086   return msg;
1087 }
1088
1089 void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
1090                                      mCastEntry* entry, reductionInfo& redInfo,
1091                                      int& updateReduceNo, int currentTreeUp){
1092
1093   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1094   reductionMsgs& rmsgs = redInfo.msgs[index];
1095   int dataSize         = rmsgs[0]->dataSize;
1096   CkReduction::reducerType reducer = rmsgs[0]->reducer;
1097   int i;
1098   int oldRedNo = redInfo.redNo;
1099   int nFrags   = rmsgs[0]->nFrags;
1100   int fragNo   = rmsgs[0]->fragNo;
1101   int userFlag   = rmsgs[0]->userFlag;
1102                                                                                 
1103   // reduce msgs
1104   CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
1105   CkAssert(NULL != f);
1106
1107   // check valid callback in msg and check if migration happened
1108   CkCallback msg_cb;
1109   int rebuilt = 0;
1110   for (i=0; i<rmsgs.length(); i++) {
1111     if (rmsgs[i]->rebuilt) rebuilt = 1;
1112     if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
1113   }
1114
1115   CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec()); 
1116   newmsg->redNo  = redInfo.redNo;
1117   newmsg->nFrags = nFrags;
1118   newmsg->fragNo = fragNo;
1119   newmsg->userFlag = userFlag;
1120   newmsg->reducer = reducer;
1121
1122   // increment num-frags processed
1123   redInfo.npProcessed ++;
1124
1125   // check if migration and free messages
1126   for (i=0; i<rmsgs.length(); i++) {
1127     if (rmsgs[i]!=newmsg) delete rmsgs[i];
1128   }
1129   rmsgs.length() = 0;
1130
1131   if (redInfo.npProcessed == nFrags) {
1132     entry->incReduceNo();
1133     DEBUGF(("Advanced entry:%p redNo: %d\n", entry, entry->red.redNo));
1134   }
1135   if (updateReduceNo) mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
1136                                                                                 
1137   if (entry->hasParent()) {
1138     // send up to parent
1139     newmsg->sid        = entry->parentGrp;
1140     newmsg->sourceFlag = 2;
1141     newmsg->redNo      = oldRedNo;
1142     newmsg->gcount     = redInfo.gcount [index];
1143     newmsg->rebuilt    = rebuilt;
1144     newmsg->callback   = msg_cb;
1145     DEBUGF(("send to parent %p: %d\n", entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
1146     mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
1147   } else { // root
1148     newmsg->sid = id;
1149     // buffer message
1150     rmsgs.push_back (newmsg);
1151
1152     //if (entry->allElem.length() == redInfo.gcount) {
1153     if (redInfo.npProcessed == nFrags) {
1154
1155       newmsg = combineFrags (id, entry, redInfo);
1156       CkSetRefNum(newmsg, userFlag);
1157
1158       if (!msg_cb.isInvalid()) {
1159         msg_cb.send(newmsg);
1160       }
1161       else if (redInfo.storedCallback != NULL) {
1162         redInfo.storedCallback->send(newmsg);
1163       }
1164       else if (redInfo.storedClient != NULL) {
1165         redInfo.storedClient(id, redInfo.storedClientParam, dataSize,
1166            newmsg->data);
1167         delete newmsg;
1168       }
1169       else
1170         CmiAbort("Did you forget to register a reduction client?");
1171                                                                                 
1172       DEBUGF(("Reduction client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
1173       if (currentTreeUp) {
1174         if (entry->oldc) {
1175             // free old tree on same processor;
1176           mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
1177           entry->oldc = NULL;
1178         }
1179         if (entry->hasOldtree()) {
1180             // free old tree on old processor
1181           int oldpe = entry->oldtree.pe;
1182           mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
1183           entry->oldtree.clear();
1184         }
1185       }
1186       if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
1187     }
1188   }
1189 }
1190
1191 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
1192 {
1193   int i;
1194   CkSectionInfo id = msg->sid;
1195   mCastEntry *entry = (mCastEntry *)id.get_val();
1196   CmiAssert(entry!=NULL);
1197 //CmiPrintf("[%d] recvRedMsg: entry: %p\n", CkMyPe(), entry);
1198
1199   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1200
1201   int updateReduceNo = 0;
1202
1203   // update entry if obsolete
1204   if (entry->isObsolete()) {
1205       // send up to root
1206     DEBUGF(("[%d] entry obsolete-send to root %d\n", CkMyPe(), entry->rootSid.pe));
1207     if (!entry->hasParent()) { //rootSid.pe == CkMyPe()
1208       // I am root, set to the new cookie if there is
1209       mCastEntry *newentry = entry->newc;
1210       while (newentry && newentry->newc) newentry=newentry->newc;
1211       if (newentry) entry = newentry;
1212       CmiAssert(entry!=NULL);
1213     }
1214     if (!entry->hasParent() && !entry->isObsolete()) {
1215        // root find the latest cookie that is not obsolete
1216       msg->sourceFlag = 0;       // indicate it is not on old spanning tree
1217       updateReduceNo = 1;        // reduce from old tree, new entry need update.
1218     }
1219     else {
1220       CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
1221       // entry is obsolete, send to root directly
1222       msg->sid = entry->rootSid;
1223
1224       msg->sourceFlag = 0;
1225       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1226       return;
1227     }
1228   }
1229
1230   reductionInfo &redInfo = entry->red;
1231
1232   DEBUGF(("[%d] msg %p red:%d, entry:%p redno:%d\n", CkMyPe(), msg, msg->redNo, entry, entry->red.redNo));
1233   // old message come, ignore
1234   if (msg->redNo < redInfo.redNo) {
1235     CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
1236     CmiAbort("Could never happen! \n");
1237   }
1238   if (entry->notReady() || msg->redNo > redInfo.redNo) {
1239     DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
1240     redInfo.futureMsgs.push_back(msg);
1241     return;
1242   }
1243
1244   DEBUGF(("[%d] recvRedMsg rebuilt:%d red:%d\n", CkMyPe(), msg->rebuilt, redInfo.redNo));
1245
1246   const int index = msg->fragNo;
1247
1248   // buffer this msg
1249   if (msg->sourceFlag == 1) {
1250     // new reduction message from ArrayElement
1251     redInfo.lcount [index] ++;
1252   }
1253
1254   if (msg->sourceFlag == 2) {
1255     redInfo.ccount [index] ++;
1256   }
1257
1258   redInfo.gcount [index] += msg->gcount;
1259
1260   // buffer the msg
1261   // first check if message is of proper size
1262   if ((0 != redInfo.msgs[index].length()) && 
1263       (msg->dataSize != (redInfo.msgs [index][0]->dataSize))) {
1264     CmiAbort("Reduction data are not of same length!");
1265   }
1266
1267   redInfo.msgs [index].push_back(msg);
1268
1269   const int numFragsRcvd = redInfo.msgs [index].length();
1270
1271   DEBUGF(("[%d] index:%d lcount:%d-%d, ccount:%d-%d, gcount:%d-%d root:%d\n", CkMyPe(),index, entry->red.lcount[index],entry->localElem.length(), entry->red.ccount[index], entry->children.length(), entry->red.gcount[index], entry->allElem.length(), !entry->hasParent()));
1272
1273   int currentTreeUp = 0;
1274   if (redInfo.lcount [index] == entry->localElem.length() &&
1275       redInfo.ccount [index] == entry->children.length())
1276       currentTreeUp = 1;
1277
1278   int mixTreeUp = 0;
1279   const int numElems = entry->allElem.length();
1280   
1281   if (!entry->hasParent()) {
1282     mixTreeUp = 1;
1283     for (int i=0; i<msg->nFrags; i++) {
1284       if (entry->allElem.length() != redInfo.gcount [i]) {
1285         mixTreeUp = 0;
1286       }
1287     }
1288   }
1289
1290   if (currentTreeUp || mixTreeUp)
1291   {
1292     const int nFrags = msg->nFrags;  
1293     
1294     // msg from children contain only one fragment
1295     reduceFragment (index, id, entry, redInfo, updateReduceNo, 
1296                     currentTreeUp);
1297
1298     if (redInfo.npProcessed == nFrags) {
1299       // reset counters
1300       for (i=0; i<nFrags; i++) {
1301         redInfo.lcount [i] = 0;
1302         redInfo.ccount [i] = 0;
1303         redInfo.gcount [i] = 0;
1304       }
1305       redInfo.npProcessed = 0;
1306
1307       // release future msgs
1308       releaseFutureReduceMsgs(entry);
1309     }
1310   }
1311 }
1312
1313 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
1314 {
1315   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1316
1317   for (int i=0; i<entry->red.futureMsgs.length(); i++) {
1318     DEBUGF(("releaseFutureReduceMsgs: %p\n", entry->red.futureMsgs[i]));
1319     mCastGrp[CkMyPe()].recvRedMsg(entry->red.futureMsgs[i]);
1320   }
1321   entry->red.futureMsgs.length() = 0;
1322 }
1323
1324 // these messages have to be sent to root
1325 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
1326 {
1327   int i;
1328   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
1329
1330   for (int j=0; j<MAXFRAGS; j++) {
1331     for (i=0; i<entry->red.msgs[j].length(); i++) {
1332       CkReductionMsg *msg = entry->red.msgs[j][i];
1333       DEBUGF(("releaseBufferedReduceMsgs:%p red:%d in entry:%p\n", msg, msg->redNo, entry));
1334       msg->sid = entry->rootSid;
1335       msg->sourceFlag = 0;
1336       mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1337     }
1338     entry->red.msgs[j].length() = 0;
1339   }
1340
1341
1342   for (i=0; i<entry->red.futureMsgs.length(); i++) {
1343     CkReductionMsg *msg = entry->red.futureMsgs[i];
1344     DEBUGF(("releaseBufferedFutureReduceMsgs: %p red:%d in entry: %p\n", msg,msg->redNo, entry));
1345     msg->sid = entry->rootSid;
1346     msg->sourceFlag = 0;
1347     mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
1348   }
1349   entry->red.futureMsgs.length() = 0;
1350 }
1351
1352 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
1353 {
1354   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
1355   if (entry->red.redNo < red)
1356     entry->red.redNo = red;
1357
1358   CProxy_CkMulticastMgr mp(thisgroup);
1359   for (int i=0; i<entry->children.length(); i++) {
1360     mp[entry->children[i].get_pe()].updateRedNo((mCastEntry *)entry->children[i].get_val(), red);
1361   }
1362
1363   releaseFutureReduceMsgs(entry);
1364 }
1365
1366 #if 0
1367 ////////////////////////////////////////////////////////////////////////////////
1368 /////
1369 ///////////////// Builtin Reducer Functions //////////////
1370 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1371 {CkAbort("ERROR! Called the invalid reducer!\n");return NULL;}
1372
1373 /* A simple reducer, like sum_int, looks like this:
1374 static CkReductionMsg *sum_int(int nMsg,CkReductionMsg **msg)
1375 {
1376   int i,ret=0;
1377   for (i=0;i<nMsg;i++)
1378     ret+=*(int *)(msg[i]->data);
1379   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1380 }
1381 */
1382
1383 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1384 static CkReductionMsg *name(int nMsg, CkReductionMsg **msg)\
1385 {\
1386   int m,i;\
1387   int nElem=msg[0]->getSize()/sizeof(dataType);\
1388   dataType *ret=(dataType *)(msg[0]->getData());\
1389   for (m=1;m<nMsg;m++)\
1390   {\
1391     dataType *value=(dataType *)(msg[m]->getData());\
1392     for (i=0;i<nElem;i++)\
1393     {\
1394       loop\
1395     }\
1396   }\
1397   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret);\
1398 }
1399
1400 //Use this macro for reductions that have the same type for all inputs
1401 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1402   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1403   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1404   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1405
1406
1407 //Compute the sum the numbers passed by each element.
1408 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1409
1410 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1411
1412 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1413
1414 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1415
1416 CkReduction::reducerFn CkMulticastMgr::reducerTable[CkMulticastMgr::MAXREDUCERS]={
1417     ::invalid_reducer,
1418   //Compute the sum the numbers passed by each element.
1419     ::sum_int,::sum_float,::sum_double,
1420     ::product_int,::product_float,::product_double,
1421     ::max_int,::max_float,::max_double,
1422     ::min_int,::min_float,::min_double
1423 };
1424 #endif
1425
1426 #include "CkMulticast.def.h"