Critical path header changes for the pics merge
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #include "pathHistory.h"
54
55 #if CMK_DEBUG_REDUCTIONS
56 //Debugging messages:
57 // Reduction mananger internal information:
58 #define DEBR(x) CkPrintf x
59 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
60 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
61
62 #define DEBN(x) CkPrintf x
63 #define AAN "Red Node%d "
64 #define ABN ,CkMyNode()
65
66 // For status and data messages from the builtin reducer functions.
67 #define RED_DEB(x) //CkPrintf x
68 #define DEBREVAC(x) CkPrintf x
69 #define DEB_TUPLE(x) CkPrintf x
70 #else
71 //No debugging info-- empty defines
72 #define DEBR(x) // CkPrintf x
73 #define DEBRMLOG(x) CkPrintf x
74 #define AA
75 #define AB
76 #define DEBN(x) //CkPrintf x
77 #define RED_DEB(x) //CkPrintf x
78 #define DEBREVAC(x) //CkPrintf x
79 #define DEB_TUPLE(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89   : CkReductionMgr(CkpvAccess(_currentGroupRednMgr))
90 {
91         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
92
93         creatingContributors();
94         contributorStamped(&reductionInfo);
95         contributorCreated(&reductionInfo);
96         doneCreatingContributors();
97 #if !GROUP_LEVEL_REDUCTION
98         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
99 #endif
100 }
101
102 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
103 {
104         creatingContributors();
105         contributorStamped(&reductionInfo);
106         contributorCreated(&reductionInfo);
107         doneCreatingContributors();
108 }
109
110 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
111                                     ((CkReductionMgr *)this),
112                                     reductionInfo,false)
113 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
114
115 CK_BARRIER_CONTRIBUTE_METHODS_DEF(Group,
116                                    ((CkReductionMgr *)this),
117                                    reductionInfo,false)
118
119
120
121 CkGroupInitCallback::CkGroupInitCallback(void) {}
122 /*
123 The callback is just used to tell the caller that this group
124 has been constructed.  (Now they can safely call CkLocalBranch)
125 */
126 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
127 {
128   m->call();
129   delete m;
130 }
131
132 /*
133 The callback is just used to tell the caller that this group
134 is constructed and ready to process other calls.
135 */
136 CkGroupReadyCallback::CkGroupReadyCallback(void)
137 {
138   _isReady = 0;
139 }
140 void
141 CkGroupReadyCallback::callBuffered(void)
142 {
143   int n = _msgs.length();
144   for(int i=0;i<n;i++)
145   {
146     CkGroupCallbackMsg *msg = _msgs.deq();
147     msg->call();
148     delete msg;
149   }
150 }
151 void
152 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
153 {
154   if(_isReady) {
155     msg->call();
156     delete msg;
157   } else {
158     _msgs.enq(msg);
159   }
160 }
161
162 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
163         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
164 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
165 {
166         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
167         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
168         b->fn(b->param,m->getSize(),m->getData());
169         delete m;
170 }
171
172 ///////////////// Reduction Manager //////////////////
173 /*
174 One CkReductionMgr runs a non-overlapping set of reductions.
175 It collects messages from all local contributors, then sends
176 the reduced message up the reduction tree to node zero, where
177 they're passed to the user's client function.
178 */
179
180 CkReductionMgr::CkReductionMgr(CProxy_CkArrayReductionMgr groupRednMgr)
181   :
182 #if !GROUP_LEVEL_REDUCTION
183   nodeProxy(groupRednMgr),
184 #endif
185   thisProxy(thisgroup),
186   isDestroying(false)
187
188 #ifdef BINOMIAL_TREE
189   init_BinomialTree();
190 #else
191   init_BinaryTree();
192 #endif
193   redNo=0;
194   completedRedNo = -1;
195   inProgress=false;
196   creating=false;
197   startRequested=false;
198   gcount=lcount=0;
199   nContrib=nRemote=0;
200   is_inactive = false;
201   maxStartRequest=0;
202 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
203         numImmigrantRecObjs = 0;
204         numEmigrantRecObjs = 0;
205 #endif
206   disableNotifyChildrenStart = false;
207
208   barrier_gCount=0;
209   barrier_nSource=0;
210   barrier_nContrib=barrier_nRemote=0;
211
212   DEBR((AA "In reductionMgr constructor at %d \n" AB,this));
213 }
214
215 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
216                                                     , isDestroying(false)
217 {
218   numKids = -1;
219   redNo=0;
220   completedRedNo = -1;
221   inProgress=false;
222   creating=false;
223   startRequested=false;
224   gcount=lcount=0;
225   nContrib=nRemote=0;
226   is_inactive = false;
227   maxStartRequest=0;
228   DEBR((AA "In reductionMgr migratable constructor at %d \n" AB,this));
229
230   barrier_gCount=0;
231   barrier_nSource=0;
232   barrier_nContrib=barrier_nRemote=0;
233
234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
235   numImmigrantRecObjs = 0;
236   numEmigrantRecObjs = 0;
237 #endif
238
239 }
240
241 CkReductionMgr::~CkReductionMgr()
242 {
243 #if !GROUP_LEVEL_REDUCTION
244   if (CkMyRank() == 0) {
245     delete nodeProxy.ckLocalBranch();
246   }
247 #endif
248 }
249
250 void CkReductionMgr::flushStates()
251 {
252   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
253   redNo=0;
254   completedRedNo = -1;
255   inProgress=false;
256   creating=false;
257   startRequested=false;
258   nContrib=nRemote=0;
259   maxStartRequest=0;
260
261   while (!msgs.isEmpty()) { delete msgs.deq(); }
262   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
263   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
264   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
265
266   adjVec.length()=0;
267
268 #if ! GROUP_LEVEL_REDUCTION
269   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
270 #endif
271 }
272
273 //////////// Reduction Manager Client API /////////////
274
275 //Add the given client function.  Overwrites any previous client.
276 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
277 {
278   DEBR((AA "Setting reductionClient in ReductionMgr groupid %d\n" AB,thisgroup.idx));
279
280   if (CkMyPe()!=0)
281           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
282   storedCallback=*cb;
283 #if ! GROUP_LEVEL_REDUCTION
284   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
285   nodeProxy.ckSetReductionClient(callback);
286 #endif
287 }
288
289 ///////////////////////////// Contributor ////////////////////////
290 //Contributors keep a copy of this structure:
291
292 /*Contributor migration support:
293 */
294 void contributorInfo::pup(PUP::er &p)
295 {
296   p(redNo);
297 }
298
299 ////////////////////// Contributor list maintainance: /////////////////
300 //These just set and clear the "creating" flag to prevent
301 // reductions from finishing early because not all elements
302 // have been created.
303 void CkReductionMgr::creatingContributors(void)
304 {
305   DEBR((AA "Creating contributors...\n" AB));
306   creating=true;
307 }
308 void CkReductionMgr::doneCreatingContributors(void)
309 {
310   DEBR((AA "Done creating contributors...\n" AB));
311   creating=false;
312   checkIsActive();
313   if (startRequested) startReduction(redNo,CkMyPe());
314   finishReduction();
315 }
316
317 //A new contributor will be created
318 void CkReductionMgr::contributorStamped(contributorInfo *ci)
319 {
320   DEBR((AA "Contributor %p stamped\n" AB,ci));
321   //There is another contributor
322   gcount++;
323   if (inProgress)
324   {
325     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
326     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
327   } else
328     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
329 }
330
331 //A new contributor was actually created
332 void CkReductionMgr::contributorCreated(contributorInfo *ci)
333 {
334   DEBR((AA "Contributor %p created in grp %d\n" AB,ci,thisgroup.idx));
335   //We've got another contributor
336   lcount++;
337   //He may not need to contribute to some of our reductions:
338   for (int r=redNo;r<ci->redNo;r++)
339     adj(r).lcount--;//He won't be contributing to r here
340   checkIsActive();
341 }
342
343 /*Don't expect any more contributions from this one.
344 This is rather horrifying because we now have to make
345 sure the global element count accurately reflects all the
346 contributions the element made before it died-- these may stretch
347 far into the future.  The adj() vector is what saves us here.
348 */
349 void CkReductionMgr::contributorDied(contributorInfo *ci)
350 {
351 #if CMK_MEM_CHECKPOINT
352   // ignore from listener if it is during restart from crash
353   if (CkInRestarting()) return;
354 #endif
355
356   if (isDestroying) return;
357
358   DEBR((AA "Contributor %p(%d) died\n" AB,ci,ci->redNo));
359   //We lost a contributor
360   gcount--;
361
362   if (ci->redNo<redNo)
363   {//Must have been migrating during reductions-- root is waiting for his
364   // contribution, which will never come.
365     DEBR((AA "Dying guy %p must have been migrating-- he's at #%d!\n" AB,ci,ci->redNo));
366     for (int r=ci->redNo;r<redNo;r++)
367       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
368   }
369
370   //Add to the global count for all his future messages (wherever they are)
371   int r;
372   for (r=redNo;r<ci->redNo;r++)
373   {//He already contributed to this reduction, but won't show up in global count.
374     DEBR((AA "Dead guy %p left contribution for #%d\n" AB,ci,r));
375     adj(r).gcount++;
376   }
377
378   lcount--;
379   //He's already contributed to several reductions here
380   for (r=redNo;r<ci->redNo;r++)
381     adj(r).lcount++;//He'll be contributing to r here
382
383   // Check whether the death of this contributor made this pe go barren at this
384   // redNo
385   if (ci->redNo <= redNo) {
386     checkIsActive();
387   }
388   finishReduction();
389 }
390
391 //Migrating away (note that global count doesn't change)
392 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
393 {
394   DEBR((AA "Contributor %p(%d) migrating away\n" AB,ci,ci->redNo));
395   lcount--;//We lost a local
396   //He's already contributed to several reductions here
397   for (int r=redNo;r<ci->redNo;r++)
398     adj(r).lcount++;//He'll be contributing to r here
399
400   // Check whether this made this pe go barren at redNo
401   if (ci->redNo <= redNo) {
402     checkIsActive();
403   }
404   finishReduction();
405 }
406
407 //Migrating in (note that global count doesn't change)
408 void CkReductionMgr::contributorArriving(contributorInfo *ci)
409 {
410   DEBR((AA "Contributor %p(%d) migrating in\n" AB,ci,ci->redNo));
411   lcount++;//We gained a local
412 #if CMK_MEM_CHECKPOINT
413   // ignore from listener if it is during restart from crash
414   // because the ci may be old.
415   if (CkInRestarting()) return;
416 #endif
417   //He has already contributed (elsewhere) to several reductions:
418   for (int r=redNo;r<ci->redNo;r++)
419     adj(r).lcount--;//He won't be contributing to r here
420
421   // Check if the arrival of a new contributor makes this PE become active again
422   if (ci->redNo == redNo) {
423     checkIsActive();
424   }
425 }
426
427 //Contribute-- the given msg can contain any data.  The reducerType
428 // field of the message must be valid.
429 // Each contributor must contribute exactly once to the each reduction.
430 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
431 {
432 #if CMK_BIGSIM_CHARM
433   _TRACE_BG_TLINE_END(&(m->log));
434 #endif
435   DEBR((AA "Contributor %p contributed for %d in grp %d ismigratable %d \n" AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
436   //m->ci=ci;
437   m->redNo=ci->redNo++;
438   m->sourceFlag=-1;//A single contribution
439   m->gcount=0;
440
441 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
442
443         // if object is an immigrant recovery object, we send the contribution to the source PE
444         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
445                 
446                 // turning on the message-logging bypass flag
447                 envelope *env = UsrToEnv(m);
448                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
449         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
450                 return;
451         }
452
453     Chare *oldObj = CpvAccess(_currentObj);
454     CpvAccess(_currentObj) = this;
455
456         // adding contribution
457         addContribution(m);
458
459     CpvAccess(_currentObj) = oldObj;
460 #else
461   addContribution(m);
462 #endif
463 }
464
465 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
466 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
467         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
468         
469         // turning off bypassing flag
470         envelope *env = UsrToEnv(m);
471         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
472
473         // adding contribution
474     addContribution(m);
475 }
476 #else
477 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
478 #endif
479
480 void CkReductionMgr::checkIsActive() {
481 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
482   return;
483 #endif
484
485   // Check the number of kids in the inactivelist before or at this redNo
486   std::map<int, int>::iterator it;
487   int c_inactive = 0;
488   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
489     if (it->second <= redNo) {
490       DEBR((AA "Kid %d is inactive from redNo %d\n" AB, it->first, it->second));
491       c_inactive++;
492     }
493   }
494   DEBR((AA "CheckIsActive redNo %d, kids %d(inactive %d), lcount %d\n" AB, redNo,
495     numKids, c_inactive, lcount));
496
497   if(numKids == c_inactive && lcount == 0) {
498     if(!is_inactive) {
499       informParentInactive();
500     }
501     is_inactive = true;
502   } else if(is_inactive) {
503     is_inactive = false;
504   }
505 }
506
507 /*
508 * Add to the child to the inactiveList
509 */
510 void CkReductionMgr::checkAndAddToInactiveList(int id, int red_no) {
511   // If there is already a reduction in progress corresponding to red_no, then
512   // the time to call ReductionStarting is past so explicitly invoke
513   // ReductionStarting on the kid
514   if (inProgress && redNo == red_no) {
515     thisProxy[id].ReductionStarting(new CkReductionNumberMsg(red_no));
516   }
517
518   std::map<int, int>::iterator it;
519   it = inactiveList.find(id);
520   if (it == inactiveList.end()) {
521     inactiveList.insert(std::pair<int, int>(id, red_no));
522   } else {
523     it->second = red_no;
524   }
525   // If the red_no is redNo, then check whether this makes this PE inactive
526   if (redNo == red_no) {
527     checkIsActive();
528   }
529 }
530
531 /*
532 * This is invoked when a real contribution is received from the kid for a
533 * particular red_no
534 */
535 void CkReductionMgr::checkAndRemoveFromInactiveList(int id, int red_no) {
536   std::map<int, int>::iterator it;
537   it = inactiveList.find(id);
538   if (it == inactiveList.end()) {
539     return;
540   }
541   if (it->second <= red_no) {
542     inactiveList.erase(it);
543     DEBR((AA "Parent removing kid %d from inactivelist red_no %d\n" AB,
544       id, red_no));
545   }
546 }
547
548 // Inform parent that I am inactive
549 void CkReductionMgr::informParentInactive() {
550   if (hasParent()) {
551     DEBR((AA "Inform parent to add to inactivelist red_no %d\n" AB, redNo));
552     thisProxy[treeParent()].AddToInactiveList(
553       new CkReductionInactiveMsg(CkMyPe(), redNo));
554   }
555 }
556
557 /*
558 *  Send ReductionStarting message to all the inactive kids which are inactive
559 *  for the specified red_no
560 */
561 void CkReductionMgr::sendReductionStartingToKids(int red_no) {
562 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
563   for (int k=0;k<treeKids();k++)
564   {
565     DEBR((AA "Asking child PE %d to start #%d\n" AB,firstKid()+k,redNo));
566     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
567   }
568 #else
569   std::map<int, int>::iterator it;
570   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
571     if (it->second <= red_no) {
572       DEBR((AA "Parent sending reductionstarting to inactive kid %d\n" AB,
573         it->first));
574       thisProxy[it->first].ReductionStarting(new CkReductionNumberMsg(red_no));
575     }
576   }
577 #endif
578 }
579
580
581 //////////// Reduction Manager Remote Entry Points /////////////
582 //Sent down the reduction tree (used by barren PEs)
583 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
584 {
585  if(CkMyPe()==0){
586         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
587         //delete m;
588         //return;
589  }
590  DEBR((AA " Group ReductionStarting called for redNo %d\n" AB,m->num));
591  int srcPE = (UsrToEnv(m))->getSrcPe();
592   if (isPresent(m->num) && !inProgress)
593   {
594     DEBR((AA "Starting reduction #%d at parent's request\n" AB,m->num));
595     startReduction(m->num,srcPE);
596     finishReduction();
597   } else if (isFuture(m->num)){
598 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
599           DEBR((AA "Asked to startfuture Reduction %d \n" AB,m->num));
600           if(maxStartRequest < m->num){
601                   maxStartRequest = m->num;
602           }
603  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
604           
605     }
606   else //is Past
607     DEBR((AA "Ignoring parent's late request to start #%d\n" AB,m->num));
608   delete m;
609 }
610
611 //Sent to root of reduction tree with reduction contribution
612 // of migrants that missed the main reduction.
613 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
614 {
615 #if GROUP_LEVEL_REDUCTION
616 #if CMK_BIGSIM_CHARM
617   _TRACE_BG_TLINE_END(&(m->log));
618 #endif
619   addContribution(m);
620 #else
621   m->secondaryCallback = m->callback;
622   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
623   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
624   nodeMgr->LateMigrantMsg(m);
625 /*      int len = finalMsgs.length();
626         finalMsgs.enq(m);
627 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
628         endArrayReduction();*/
629 #endif
630 }
631
632 //A late migrating contributor will never contribute to this reduction
633 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
634 {
635   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
636   DEBR((AA "Migrant died before contributing to #%d\n" AB,m->num));
637  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
638   adj(m->num).gcount--;//He won't be contributing to this one.
639   finishReduction();
640   delete m;
641 }
642
643 //////////// Reduction Manager State /////////////
644 void CkReductionMgr::startReduction(int number,int srcPE)
645 {
646   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
647   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
648   if (inProgress){
649         DEBR((AA "This reduction is already in progress\n" AB));
650         return;//This reduction already started
651   }
652   if (creating) //Don't start yet-- we're creating elements
653   {
654     DEBR((AA "Postponing start request #%d until we're done creating\n" AB,redNo));
655     startRequested=true;
656     return;
657   }
658
659 //If none of these cases, we need to start the reduction--
660   DEBR((AA "Starting reduction #%d  %d %d \n" AB,redNo,completedRedNo,number));
661   inProgress=true;
662  
663
664         /*
665                 FAULT_EVAC
666         */
667   if(!CmiNodeAlive(CkMyPe())){
668         return;
669   }
670
671   if(disableNotifyChildrenStart) return;
672  
673   //Sent start requests to our kids (in case they don't already know)
674 #if GROUP_LEVEL_REDUCTION
675   sendReductionStartingToKids(redNo);
676   //for (int k=0;k<treeKids();k++)
677   //{
678   //  DEBR((AA "Asking child PE %d to start #%d\n" AB,firstKid()+k,redNo));
679   //  thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
680   //}
681 #else
682   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
683 #endif
684         
685         /*
686   int temp;
687   //making it a broadcast done only by PE 0
688   if(!hasParent()){
689                 temp = completedRedNo+1;
690                 for(int i=temp;i<=number;i++){
691                         for(int j=0;j<CkNumPes();j++){
692                                 if(j != CkMyPe() && j != srcPE){
693                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
694                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
695                                         }
696                                 }
697                         }
698                 }       
699         }       else{
700                 temp = number;
701         }*/
702 /*  if(!hasParent()){
703                 temp = completedRedNo+1;
704         }       else{
705                 temp = number;
706         }
707         for(int i=temp;i<=number;i++){
708         //      DEBR((AA "Asking all child PEs to start #%d \n" AB,i));
709                 if(hasParent()){
710           // kick-start your parent too ...
711                         if(treeParent() != srcPE){
712                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
713 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
714     CpvAccess(_currentObj) = oldObj;
715 #endif
716                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
717                                 }       
718                         }       
719                 }
720           for (int k=0;k<treeKids();k++)
721           {
722                         if(firstKid()+k != srcPE){
723                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
724                             DEBR((AA "Asking child PE %d to start #%d\n" AB,kids[k],redNo));
725                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
726                                 }       
727                         }       
728         }
729         }
730         */
731 }       
732
733 /*Handle a message from one element for the reduction*/
734 void CkReductionMgr::addContribution(CkReductionMsg *m)
735 {
736   if (isPast(m->redNo))
737   {
738 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
739         CmiAbort("this version should not have late migrations");
740 #else
741         //We've moved on-- forward late contribution straight to root
742     DEBR((AA "Migrant gives late contribution for #%d!\n" AB,m->redNo));
743         // if (!hasParent()) //Root moved on too soon-- should never happen
744         //   CkAbort("Late reduction contribution received at root!\n");
745     thisProxy[0].LateMigrantMsg(m);
746 #endif
747   }
748   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
749     DEBR((AA "Contributor gives early contribution-- for #%d\n" AB,m->redNo));
750     futureMsgs.enq(m);
751   } else {// An ordinary contribution
752     DEBR((AA "Recv'd local contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
753    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
754     startReduction(m->redNo,CkMyPe());
755     msgs.enq(m);
756     nContrib++;
757     finishReduction();
758   }
759 }
760
761 /**function checks if it has got all contributions that it is supposed to
762 get at this processor. If it is done it sends the reduced result to the local
763 nodegroup */
764 void CkReductionMgr::finishReduction(void)
765 {
766   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
767   DEBR((AA "in finishReduction (inProgress=%d) in grp %d\n" AB,inProgress,thisgroup.idx));
768   if ((!inProgress) || creating){
769         DEBR((AA "Either not in Progress or creating\n" AB));
770         return;
771   }
772
773   bool partialReduction = false;
774
775   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
776 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
777         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
778           if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
779             partialReduction = true;
780           }
781           else {
782             DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
783             return; //Need more local messages
784           }
785         }
786 #else
787   if (nContrib<(lcount+adj(redNo).lcount)){
788          if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
789            partialReduction = true;
790          }
791          else {
792            DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
793            return; //Need more local messages
794          }
795   }
796 #endif
797
798 #if GROUP_LEVEL_REDUCTION
799   if (nRemote<treeKids()) {
800     if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
801       partialReduction = true;
802     }
803     else {
804       DEBR((AA "Need more remote messages %d %d\n" AB,nRemote,treeKids()));
805       return; //Need more remote messages
806     }
807   }
808         
809 #endif
810  
811   DEBR((AA "Reducing data... %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
812   CkReductionMsg *result=reduceMessages();
813   result->redNo=redNo;
814
815   if (partialReduction) {
816     msgs.enq(result);
817     return;
818   }
819
820 #if GROUP_LEVEL_REDUCTION
821   if (hasParent())
822   {//Pass data up tree to parent
823     DEBR((AA "Passing reduced data up to parent node %d.\n" AB,treeParent()));
824     DEBR((AA "Message gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
825 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
826     result->gcount+=gcount+adj(redNo).gcount;
827 #else
828     result->gcount+=gcount+adj(redNo).gcount;
829 #endif
830     thisProxy[treeParent()].RecvMsg(result);
831   }
832   else 
833   {//We are root-- pass data to client
834     DEBR((AA "Final gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
835     int totalElements=result->gcount+gcount+adj(redNo).gcount;
836     if (totalElements>result->nSources()) 
837     {
838       DEBR((AA "Only got %d of %d contributions (c'mon, migrators!)\n" AB,result->nSources(),totalElements));
839       msgs.enq(result);
840       return; // Wait for migrants to contribute
841     } else if (totalElements<result->nSources()) {
842       DEBR((AA "Got %d of %d contributions\n" AB,result->nSources(),totalElements));
843 #if !defined(_FAULT_CAUSAL_)
844       CkAbort("ERROR! Too many contributions at root!\n");
845 #endif
846     }
847     DEBR((AA "Passing result to client function\n" AB));
848     CkSetRefNum(result, result->getUserFlag());
849     if (!result->callback.isInvalid())
850             result->callback.send(result);
851     else if (!storedCallback.isInvalid())
852             storedCallback.send(result);
853     else
854             CkAbort("No reduction client!\n"
855                     "You must register a client with either SetReductionClient or during contribute.\n");
856   }
857
858 #else
859   result->gcount+=gcount+adj(redNo).gcount;
860
861   result->secondaryCallback = result->callback;
862   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
863         DEBR((AA "Reduced mesg gcount %d localgcount %d\n" AB,result->gcount,gcount));
864
865   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
866
867  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
868   
869   // Find our node reduction manager, and pass reduction to him:
870   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
871   nodeMgr->contributeArrayReduction(result);
872 #endif
873
874   //House Keeping Operations will have to check later what needs to be changed
875   redNo++;
876   // Check after every reduction contribution whether this makes the PE inactive
877   // starting this redNo
878   checkIsActive();
879   //Shift the count adjustment vector down one slot (to match new redNo)
880   int i;
881 #if !GROUP_LEVEL_REDUCTION
882     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
883   if(CkMyPe()!=0)
884 #endif
885   {
886         completedRedNo++;
887         for (i=1;i<(int)(adjVec.length());i++){
888            adjVec[i-1]=adjVec[i];
889         }
890         adjVec.length()--;  
891   }
892
893   inProgress=false;
894   startRequested=false;
895   nRemote=nContrib=0;
896
897   //Look through the future queue for messages we can now handle
898   int n=futureMsgs.length();
899   for (i=0;i<n;i++)
900   {
901     CkReductionMsg *m=futureMsgs.deq();
902     if (m!=NULL) //One of these addContributions may have finished us.
903       addContribution(m);//<- if *still* early, puts it back in the queue
904   }
905 #if GROUP_LEVEL_REDUCTION
906   n=futureRemoteMsgs.length();
907   for (i=0;i<n;i++)
908   {
909     CkReductionMsg *m=futureRemoteMsgs.deq();
910     if (m!=NULL) {
911       RecvMsg(m);//<- if *still* early, puts it back in the queue
912     }
913   }
914 #endif
915
916   if(maxStartRequest >= redNo){
917           startReduction(redNo,CkMyPe());
918           finishReduction();
919   }
920  
921
922 }
923
924 //Sent up the reduction tree with reduced data
925   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
926 {
927 #if GROUP_LEVEL_REDUCTION
928 #if CMK_BIGSIM_CHARM
929   _TRACE_BG_TLINE_END(&m->log);
930 #endif
931   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
932     DEBR((AA "Recv'd remote contribution %d for #%d\n" AB,nRemote,m->redNo));
933     // If the remote contribution is real, then check whether we can remove the
934     // child from the inactiveList if it is in the list
935     if (m->nSources() > 0) {
936       checkAndRemoveFromInactiveList(m->fromPE, m->redNo);
937     }
938     startReduction(m->redNo, CkMyPe());
939     msgs.enq(m);
940     nRemote++;
941     finishReduction();
942   }
943   else if (isFuture(m->redNo)) {
944     DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
945     futureRemoteMsgs.enq(m);
946   } 
947   else CkAbort("Recv'd late remote contribution!\n");
948 #endif
949 }
950
951 void CkReductionMgr::AddToInactiveList(CkReductionInactiveMsg *m) {
952   int id = m->id;
953   int last_redno = m->redno;
954   delete m;
955
956   DEBR((AA "Parent add kid %d to inactive list from redno %d\n" AB,
957     id, last_redno));
958   checkAndAddToInactiveList(id, last_redno);
959
960   finishReduction();
961   if (last_redno <= redNo) {
962     checkIsActive();
963   }
964 }
965
966 //////////// Reduction Manager Utilities /////////////
967
968 //Return the countAdjustment struct for the given redNo:
969 countAdjustment &CkReductionMgr::adj(int number)
970 {
971   number-=completedRedNo;
972   number--;
973   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
974   //Pad the adjustment vector with zeros until it's at least number long
975   while ((int)(adjVec.length())<=number)
976     adjVec.push_back(countAdjustment());
977   return adjVec[number];
978 }
979
980 //Combine (& free) the current message vector msgs.
981 CkReductionMsg *CkReductionMgr::reduceMessages(void)
982 {
983 #if CMK_BIGSIM_CHARM
984   _TRACE_BG_END_EXECUTE(1);
985   void* _bgParentLog = NULL;
986   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
987 #endif
988   CkReductionMsg *ret=NULL;
989
990   //Look through the vector for a valid reducer, swapping out placeholder messages
991   CkReduction::reducerType r=CkReduction::invalid;
992   int msgs_gcount=0;//Reduced gcount
993   int msgs_nSources=0;//Reduced nSources
994   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
995   CkCallback msgs_callback;
996   int i;
997   int nMsgs=0;
998   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
999   CkReductionMsg *m;
1000   bool isMigratableContributor;
1001
1002   // Copy message queue into msgArr, skipping placeholders:
1003   while (NULL!=(m=msgs.deq()))
1004   {
1005     msgs_gcount+=m->gcount;
1006     if (m->sourceFlag!=0)
1007     { //This is a real message from an element, not just a placeholder
1008       msgs_nSources+=m->nSources();
1009 #if CMK_BIGSIM_CHARM
1010       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
1011 #endif
1012
1013       // for "random" reducer type, only need to accept one message
1014       if (nMsgs == 0 || m->reducer != CkReduction::random) {
1015         msgArr[nMsgs++]=m;
1016         r=m->reducer;
1017         if (!m->callback.isInvalid()){
1018 #if CMK_ERROR_CHECKING
1019           if(nMsgs > 1 && !(msgs_callback == m->callback))
1020             CkAbort("mis-matched client callbacks in reduction messages\n");
1021 #endif
1022           msgs_callback=m->callback;
1023         }
1024         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
1025           msgs_userFlag=m->userFlag;
1026         isMigratableContributor=m->isMigratableContributor();
1027       }
1028       else {
1029 #if CMK_ERROR_CHECKING
1030         if(!(msgs_callback == m->callback))
1031           CkAbort("mis-matched client callbacks in reduction messages\n");
1032 #endif  
1033         delete m;
1034       }
1035     }
1036     else
1037     { //This is just a placeholder message-- forget it
1038       delete m;
1039     }
1040   }
1041
1042   if (nMsgs==0||r==CkReduction::invalid)
1043   //No valid reducer in the whole vector
1044     ret=CkReductionMsg::buildNew(0,NULL);
1045   else
1046   {//Use the reducer to reduce the messages
1047                 //if there is only one msg to be reduced just return that message
1048     if(nMsgs == 1 &&
1049        msgArr[0]->reducer != CkReduction::set &&
1050        msgArr[0]->reducer != CkReduction::tuple) {
1051       ret = msgArr[0];
1052     }else{
1053       if (msgArr[0]->reducer == CkReduction::random) {
1054         // nMsgs > 1 indicates that reduction type is not random
1055         // this means any data with reducer type random was submitted
1056         // only so that counts would agree, and can be removed
1057         delete msgArr[0];
1058         msgArr[0] = msgArr[nMsgs - 1];
1059         nMsgs--;
1060       }
1061       CkReduction::reducerFn f=CkReduction::reducerTable[r].fn;
1062       ret=(*f)(nMsgs,msgArr);
1063     }
1064     ret->reducer=r;
1065   }
1066
1067
1068
1069 #if USE_CRITICAL_PATH_HEADER_ARRAY
1070
1071 #if CRITICAL_PATH_DEBUG > 3
1072   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
1073 #endif
1074
1075   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1076   path.updateMax(UsrToEnv(ret));
1077   // Combine the critical paths from all the reduction messages
1078   for (i=0;i<nMsgs;i++){
1079     if (msgArr[i]!=ret){
1080       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
1081       path.updateMax(UsrToEnv(msgArr[i]));
1082     }
1083   }
1084   
1085
1086 #if CRITICAL_PATH_DEBUG > 3
1087   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1088 #endif
1089   
1090   PathHistoryTableEntry tableEntry(path);
1091   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
1092   
1093 #endif
1094
1095         //Go back through the vector, deleting old messages
1096   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
1097   delete [] msgArr;
1098
1099   //Set the message counts
1100   ret->redNo=redNo;
1101   ret->gcount=msgs_gcount;
1102   ret->userFlag=msgs_userFlag;
1103   ret->callback=msgs_callback;
1104   ret->sourceFlag=msgs_nSources;
1105         ret->setMigratableContributor(isMigratableContributor);
1106   ret->fromPE = CkMyPe();
1107   DEBR((AA "Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
1108
1109   return ret;
1110 }
1111
1112
1113 //Checkpointing utilities
1114 //pack-unpack method for CkReductionMsg
1115 //if packing pack the message and then unpack and return it
1116 //if unpacking allocate memory for it read it off disk and then unapck
1117 //and return it
1118 void CkReductionMgr::pup(PUP::er &p)
1119 {
1120 //We do not store the client function pointer or the client function parameter,
1121 //it is the responsibility of the programmer to correctly restore these
1122   CkGroupInitCallback::pup(p);
1123   p(redNo);
1124   p(completedRedNo);
1125   p(inProgress); p(creating); p(startRequested);
1126   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
1127   p|msgs;
1128   p|futureMsgs;
1129   p|futureRemoteMsgs;
1130   p|finalMsgs;
1131   p|adjVec;
1132 #if !GROUP_LEVEL_REDUCTION
1133   p|nodeProxy;
1134 #endif
1135   p|storedCallback;
1136     // handle CkReductionClientBundle
1137   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
1138   {
1139     CkReductionClientBundle *bd;
1140     if (p.isUnpacking()) 
1141       bd = new CkReductionClientBundle;
1142     else
1143       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1144     p|*bd;
1145     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1146   }
1147
1148   // subtle --- Gengbin
1149   // Group : CkReductionMgr
1150   // CkArray: CkReductionMgr
1151   // lcount/gcount in Group is set in Group constructor
1152   // lcount/gcount in CkArray is not, it is set when array elements are created
1153   // we can not pup because inserting array elems will add the counters again
1154 //  p|lcount;
1155 //  p|gcount;
1156 //  p|lcount;
1157 //  //  p|gcount;
1158 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1159   if(p.isUnpacking()){
1160     thisProxy = thisgroup;
1161     maxStartRequest=0;
1162 #ifdef BINOMIAL_TREE
1163     init_BinomialTree();
1164 #else
1165     init_BinaryTree();
1166 #endif
1167     is_inactive = false;
1168     checkIsActive();
1169   }
1170
1171   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1172 }
1173
1174
1175 //Callback for doing Reduction through NodeGroups added by Sayantan
1176
1177 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1178         finalMsgs.enq(m);
1179         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1180         adj(m->redNo).mainRecvd = 1;
1181         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1182         endArrayReduction();
1183 }
1184
1185 void CkReductionMgr :: endArrayReduction(){
1186         CkReductionMsg *ret=NULL;
1187         int nMsgs=finalMsgs.length();
1188         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1189         //Look through the vector for a valid reducer, swapping out placeholder messages
1190         //CkPrintf("Length of Final Message %d \n",nMsgs);
1191         CkReduction::reducerType r=CkReduction::invalid;
1192         int msgs_gcount=0;//Reduced gcount
1193         int msgs_nSources=0;//Reduced nSources
1194         CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
1195         CkCallback msgs_callback;
1196         CkCallback msgs_secondaryCallback;
1197         CkVec<CkReductionMsg *> tempMsgs;
1198         int i;
1199         int numMsgs = 0;
1200         for (i=0;i<nMsgs;i++)
1201         {
1202           CkReductionMsg *m=finalMsgs.deq();
1203           if(m->redNo == completedRedNo +1){
1204             msgs_gcount+=m->gcount;
1205             if (m->sourceFlag!=0)
1206             { //This is a real message from an element, not just a placeholder
1207               msgs_nSources+=m->nSources();
1208
1209               // for "random" reducer type, only need to accept one message
1210               if (tempMsgs.length() == 0 || m->reducer != CkReduction::random) {
1211                 r=m->reducer;
1212                 if (!m->callback.isInvalid())
1213                   msgs_callback=m->callback;
1214                 if(!m->secondaryCallback.isInvalid())
1215                   msgs_secondaryCallback = m->secondaryCallback;
1216                 if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
1217                   msgs_userFlag=m->userFlag;
1218                 tempMsgs.push_back(m);
1219               }
1220               else {
1221                 delete m;
1222               }
1223             }
1224             else {
1225               delete m;
1226             }
1227           }else{
1228             finalMsgs.enq(m);
1229           }
1230         }
1231         numMsgs = tempMsgs.length();
1232
1233         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1234
1235         if(numMsgs == 0){
1236                 return;
1237         }
1238         if(adj(completedRedNo+1).mainRecvd == 0){
1239                 for(i=0;i<numMsgs;i++){
1240                         finalMsgs.enq(tempMsgs[i]);
1241                 }
1242                 return;
1243         }
1244
1245 /*
1246         NOT NEEDED ANYMORE DONE at nodegroup level
1247         if(msgs_gcount  > msgs_nSources){
1248                 for(i=0;i<numMsgs;i++){
1249                         finalMsgs.enq(tempMsgs[i]);
1250                 }
1251                 return;
1252         }*/
1253
1254         if (numMsgs==0||r==CkReduction::invalid)
1255                 //No valid reducer in the whole vector
1256                 ret=CkReductionMsg::buildNew(0,NULL);
1257         else{//Use the reducer to reduce the messages
1258                CkReduction::reducerFn f=CkReduction::reducerTable[r].fn;
1259                 // has to be corrected elements from above need to be put into a temporary vector
1260                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1261
1262                 if (numMsgs > 1 && msgArr[0]->reducer == CkReduction::random) {
1263                   // nMsgs > 1 indicates that reduction type is not "random"
1264                   // this means any data with reducer type random was submitted
1265                   // only so that counts would agree, and can be removed
1266                   delete msgArr[0];
1267                   msgArr[0] = msgArr[numMsgs - 1];
1268                   numMsgs--;
1269                 }
1270
1271                 ret=(*f)(numMsgs,msgArr);
1272                 ret->reducer=r;
1273
1274         }
1275
1276         
1277 #if USE_CRITICAL_PATH_HEADER_ARRAY
1278
1279 #if CRITICAL_PATH_DEBUG > 3
1280         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1281 #endif
1282
1283         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1284         path.updateMax(UsrToEnv(ret));
1285         // Combine the critical paths from all the reduction messages into the header for the new result
1286         for (i=0;i<numMsgs;i++){
1287           if (tempMsgs[i]!=ret){
1288             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1289             path.updateMax(UsrToEnv(tempMsgs[i]));
1290           } else {
1291             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1292           }
1293         }
1294         // Also consider the path which got us into this entry method
1295
1296 #if CRITICAL_PATH_DEBUG > 3
1297         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1298 #endif
1299
1300 #endif
1301   
1302
1303
1304
1305
1306         for(i = 0;i<numMsgs;i++){
1307                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1308         }
1309
1310         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1311         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1312
1313         ret->redNo=completedRedNo+1;
1314         ret->gcount=msgs_gcount;
1315         ret->userFlag=msgs_userFlag;
1316         ret->callback=msgs_callback;
1317         ret->secondaryCallback = msgs_secondaryCallback;
1318         ret->sourceFlag=msgs_nSources;
1319
1320         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1321
1322         CkSetRefNum(ret, ret->getUserFlag());
1323         if (!ret->secondaryCallback.isInvalid())
1324             ret->secondaryCallback.send(ret);
1325     else if (!storedCallback.isInvalid())
1326             storedCallback.send(ret);
1327     else{
1328       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1329             CkAbort("No reduction client!\n"
1330                     "You must register a client with either SetReductionClient or during contribute.\n");
1331     }
1332         completedRedNo++;
1333
1334         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1335
1336         for (i=1;i<(int)(adjVec.length());i++)
1337                 adjVec[i-1]=adjVec[i];
1338         adjVec.length()--;
1339         endArrayReduction();
1340 }
1341
1342 void CkReductionMgr::init_BinaryTree(){
1343         parent = (CkMyPe()-1)/TREE_WID;
1344         int firstkid = CkMyPe()*TREE_WID+1;
1345         numKids=CkNumPes()-firstkid;
1346         if (numKids>TREE_WID) numKids=TREE_WID;
1347         if (numKids<0) numKids=0;
1348
1349         for(int i=0;i<numKids;i++){
1350                 kids.push_back(firstkid+i);
1351                 newKids.push_back(firstkid+i);
1352         }
1353 }
1354
1355 void CkReductionMgr::init_BinomialTree(){
1356         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1357         /*upperSize = (unsigned )pow((double)2,depth);*/
1358         upperSize = (unsigned) 1 << depth;
1359         label = upperSize-CkMyPe()-1;
1360         int p=label;
1361         int count=0;
1362         while( p > 0){
1363                 if(p % 2 == 0)
1364                         break;
1365                 else{
1366                         p = p/2;
1367                         count++;
1368                 }
1369         }
1370         /*parent = label + rint(pow((double)2,count));*/
1371         parent = label + (1<<count);
1372         parent = upperSize -1 -parent;
1373         int temp;
1374         if(count != 0){
1375                 numKids = 0;
1376                 for(int i=0;i<count;i++){
1377                         /*temp = label - rint(pow((double)2,i));*/
1378                         temp = label - (1<<i);
1379                         temp = upperSize-1-temp;
1380                         if(temp <= CkNumPes()-1){
1381                                 kids.push_back(temp);
1382                                 numKids++;
1383                         }
1384                 }
1385         }else{
1386                 numKids = 0;
1387         //      kids = NULL;
1388         }
1389 }
1390
1391
1392 int CkReductionMgr::treeRoot(void)
1393 {
1394   return 0;
1395 }
1396 bool CkReductionMgr::hasParent(void) //Root Node
1397 {
1398   return (bool)(CkMyPe()!=treeRoot());
1399 }
1400 int CkReductionMgr::treeParent(void) //My parent Node
1401 {
1402   return parent;
1403 }
1404
1405 int CkReductionMgr::firstKid(void) //My first child Node
1406 {
1407   return CkMyPe()*TREE_WID+1;
1408 }
1409 int CkReductionMgr::treeKids(void)//Number of children in tree
1410 {
1411   return numKids;
1412 }
1413
1414
1415 //                simple "stateless" barrier
1416 //                no state checkpointed, for FT purpose
1417 //                require no overlapping barriers
1418 void CkReductionMgr::barrier(CkReductionMsg *m)
1419 {
1420   barrier_nContrib++;
1421   barrier_nSource++;
1422   if(!m->callback.isInvalid())
1423       barrier_storedCallback=m->callback;
1424   finishBarrier();
1425   delete m;
1426 }
1427
1428 void CkReductionMgr::finishBarrier(void)
1429 {
1430        if(barrier_nContrib<lcount){//need more local message
1431                DEBR(("[%d] current contrib:%d,lcount:%d\n",CkMyPe(),barrier_nContrib,lcount));
1432                return;
1433        }
1434        if(barrier_nRemote<treeKids()){//need more remote messages
1435                DEBR(("[%d] current remote:%d,kids:%d\n",CkMyPe(),barrier_nRemote,treeKids()));
1436                return;
1437        }
1438        CkReductionMsg * result = CkReductionMsg::buildNew(0,NULL);
1439        result->callback=barrier_storedCallback;
1440        result->sourceFlag=barrier_nSource;
1441        result->gcount=barrier_gCount;
1442        if(hasParent())
1443        {
1444                DEBR(("[%d]send to parent:%d\n",CkMyPe(),treeParent()));
1445                result->gcount+=gcount;
1446                thisProxy[treeParent()].Barrier_RecvMsg(result);
1447        }
1448        else{
1449                int totalElements=result->gcount+gcount;
1450                DEBR(("[%d]root,totalElements:%d,source:%d\n",CkMyPe(),totalElements,result->nSources()));
1451                if(totalElements<result->nSources()){
1452                        CkAbort("ERROR! Too many contributions at barrier root\n");
1453                }
1454                CkSetRefNum(result,result->getUserFlag());
1455                if(!result->callback.isInvalid())
1456                        result->callback.send(result);
1457                else if(!barrier_storedCallback.isInvalid())
1458                                barrier_storedCallback.send(result);
1459                else 
1460                        CkAbort("No reduction client!\n");
1461        }
1462        barrier_nRemote=barrier_nContrib=0;
1463        barrier_gCount=0;
1464        barrier_nSource=0;
1465 }
1466
1467 void CkReductionMgr::Barrier_RecvMsg(CkReductionMsg *m)
1468 {
1469        barrier_nRemote++;
1470        barrier_gCount+=m->gcount;
1471        barrier_nSource+=m->nSources();
1472        if(!m->callback.isInvalid())
1473                barrier_storedCallback=m->callback;
1474        finishBarrier();
1475 }
1476
1477
1478
1479 /////////////////////////////////////////////////////////////////////////
1480
1481 ////////////////////////////////
1482 //CkReductionMsg support
1483
1484 //ReductionMessage default private constructor-- does nothing
1485 CkReductionMsg::CkReductionMsg(){}
1486 CkReductionMsg::~CkReductionMsg(){}
1487
1488 //This define gives the distance from the start of the CkReductionMsg
1489 // object to the start of the user data area (just below last object field)
1490 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1491
1492 //"Constructor"-- builds and returns a new CkReductionMsg.
1493 //  the "data" array you specify will be copied into this object.
1494 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1495     CkReduction::reducerType reducer, CkReductionMsg *buf)
1496 {
1497   int len[1];
1498   len[0]=NdataSize;
1499   CkReductionMsg *ret = buf ? buf : new(len,0) CkReductionMsg();
1500
1501   ret->dataSize=NdataSize;
1502   if (srcData!=NULL && !buf)
1503     memcpy(ret->data,srcData,NdataSize);
1504   ret->userFlag=(CMK_REFNUM_TYPE)-1;
1505   ret->reducer=reducer;
1506   //ret->ci=NULL;
1507   ret->sourceFlag=-1000;
1508   ret->gcount=0;
1509   ret->migratableContributor = true;
1510 #if CMK_BIGSIM_CHARM
1511   ret->log = NULL;
1512 #endif
1513   return ret;
1514 }
1515
1516 // Charm kernel message runtime support:
1517 void *
1518 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1519 {
1520   int totalsize=ARM_DATASTART+(*sz);
1521   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1522   CkReductionMsg *ret = (CkReductionMsg *)
1523     CkAllocMsg(msgnum,totalsize,priobits);
1524   ret->data=(void *)(&ret->dataStorage);
1525   return (void *) ret;
1526 }
1527
1528 void *
1529 CkReductionMsg::pack(CkReductionMsg* in)
1530 {
1531   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1532   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1533   in->data = NULL;
1534   return (void*) in;
1535 }
1536
1537 CkReductionMsg* CkReductionMsg::unpack(void *in)
1538 {
1539   CkReductionMsg *ret = (CkReductionMsg *)in;
1540   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1541   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1542   ret->data=(void *)(&ret->dataStorage);
1543   return ret;
1544 }
1545
1546
1547 /////////////////////////////////////////////////////////////////////////////////////
1548 ///////////////// Builtin Reducer Functions //////////////
1549 /* A simple reducer, like sum_int, looks like this:
1550 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1551 {
1552   int i,ret=0;
1553   for (i=0;i<nMsg;i++)
1554     ret+=*(int *)(msg[i]->getData());
1555   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1556 }
1557
1558 To keep the code small and easy to change, the implementations below
1559 are built with preprocessor macros.
1560 */
1561
1562 //////////////// simple reducers ///////////////////
1563 /*A define used to quickly and tersely construct simple reductions.
1564 The basic idea is to use the first message's data array as
1565 (pre-initialized!) scratch space for folding in the other messages.
1566  */
1567
1568 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1569 {
1570         CkAbort("Called the invalid reducer type 0.  This probably\n"
1571                 "means you forgot to initialize your custom reducer index.\n");
1572         return NULL;
1573 }
1574
1575 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1576 {
1577   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1578 }
1579
1580 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1581 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1582 {\
1583   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1584   int m,i;\
1585   int nElem=msg[0]->getLength()/sizeof(dataType);\
1586   dataType *ret=(dataType *)(msg[0]->getData());\
1587   for (m=1;m<nMsg;m++)\
1588   {\
1589     dataType *value=(dataType *)(msg[m]->getData());\
1590     for (i=0;i<nElem;i++)\
1591     {\
1592       RED_DEB(("|\tmsg%d (from %d) [%d]=" typeStr "\n",m,msg[m]->sourceFlag,i,value[i]));\
1593       loop\
1594     }\
1595   }\
1596   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1597   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1598 }
1599
1600 //Use this macro for reductions that have the same type for all inputs
1601 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1602   SIMPLE_REDUCTION(nameBase##_char,char,"%c",loop) \
1603   SIMPLE_REDUCTION(nameBase##_short,short,"%h",loop) \
1604   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1605   SIMPLE_REDUCTION(nameBase##_long,long,"%ld",loop) \
1606   SIMPLE_REDUCTION(nameBase##_long_long,long long,"%lld",loop) \
1607   SIMPLE_REDUCTION(nameBase##_uchar,unsigned char,"%c",loop) \
1608   SIMPLE_REDUCTION(nameBase##_ushort,unsigned short,"%hu",loop) \
1609   SIMPLE_REDUCTION(nameBase##_uint,unsigned int,"%u",loop) \
1610   SIMPLE_REDUCTION(nameBase##_ulong,unsigned long,"%lu",loop) \
1611   SIMPLE_REDUCTION(nameBase##_ulong_long,unsigned long long,"%llu",loop) \
1612   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1613   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1614
1615 //Compute the sum the numbers passed by each element.
1616 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1617
1618 //Compute the product of the numbers passed by each element.
1619 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1620
1621 //Compute the largest number passed by any element.
1622 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1623
1624 //Compute the smallest integer passed by any element.
1625 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1626
1627
1628 //Compute the logical AND of the integers passed by each element.
1629 // The resulting integer will be zero if any source integer is zero; else 1.
1630 SIMPLE_REDUCTION(logical_and,int,"%d",
1631         if (value[i]==0)
1632      ret[i]=0;
1633   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1634 )
1635
1636 //Compute the logical OR of the integers passed by each element.
1637 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1638 SIMPLE_REDUCTION(logical_or,int,"%d",
1639   if (value[i]!=0)
1640            ret[i]=1;
1641   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1642 )
1643
1644 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1645 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1646 SIMPLE_REDUCTION(bitvec_xor,int,"%d",ret[i]^=value[i];)
1647
1648 //Select one random message to pass on
1649 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1650   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::random, msg[0]);
1651 }
1652
1653 /////////////// concat ////////////////
1654 /*
1655 This reducer simply appends the data it recieves from each element,
1656 without any housekeeping data to separate them.
1657 */
1658 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1659 {
1660   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1661   //Figure out how big a message we'll need
1662   int i,retSize=0;
1663   for (i=0;i<nMsg;i++)
1664       retSize+=msg[i]->getSize();
1665
1666   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1667
1668   //Allocate a new message
1669   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1670
1671   //Copy the source message data into the return message
1672   char *cur=(char *)(ret->getData());
1673   for (i=0;i<nMsg;i++) {
1674     int messageBytes=msg[i]->getSize();
1675     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1676     cur+=messageBytes;
1677   }
1678   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1679   return ret;
1680 }
1681
1682 /////////////// set ////////////////
1683 /*
1684 This reducer appends the data it recieves from each element
1685 along with some housekeeping data indicating contribution boundaries.
1686 The message data is thus a list of reduction_set_element structures
1687 terminated by a dummy reduction_set_element with a sourceElement of -1.
1688 */
1689
1690 //This rounds an integer up to the nearest multiple of sizeof(double)
1691 static const int alignSize=sizeof(double);
1692 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1693
1694 //This gives the size (in bytes) of a reduction_set_element
1695 static int SET_SIZE(int dataSize)
1696 {return SET_ALIGN(sizeof(int)+dataSize);}
1697
1698 //This returns a pointer to the next reduction_set_element in the list
1699 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1700 {
1701   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1702   return (CkReduction::setElement *)next;
1703 }
1704
1705 //Combine the data passed by each element into an list of reduction_set_elements.
1706 // Each element may contribute arbitrary data (with arbitrary length).
1707 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1708 {
1709   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1710   //Figure out how big a message we'll need
1711   int i,retSize=0;
1712   for (i=0;i<nMsg;i++) {
1713     if (!msg[i]->isFromUser())
1714     //This message is composite-- it will just be copied over (less terminating -1)
1715       retSize+=(msg[i]->getSize()-sizeof(int));
1716     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1717       retSize+=SET_SIZE(msg[i]->getSize());
1718   }
1719   retSize+=sizeof(int);//Leave room for terminating -1.
1720
1721   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1722
1723   //Allocate a new message
1724   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1725
1726   //Copy the source message data into the return message
1727   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1728   for (i=0;i<nMsg;i++)
1729     if (!msg[i]->isFromUser())
1730     {//This message is composite-- just copy it over (less terminating -1)
1731                         int messageBytes=msg[i]->getSize()-sizeof(int);
1732                         RED_DEB(("|\tc msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1733                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1734                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1735     }
1736     else //This is a message from an element-- wrap it in a reduction_set_element
1737     {
1738       RED_DEB(("|\tu msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1739       cur->dataSize=msg[i]->getSize();
1740       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1741       cur=SET_NEXT(cur);
1742     }
1743   cur->dataSize=-1;//Add a terminating -1.
1744   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1745   return ret;
1746 }
1747
1748 //Utility routine: get the next reduction_set_element in the list
1749 // if there is one, or return NULL if there are none.
1750 //To get all the elements, just keep feeding this procedure's output back to
1751 // its input until it returns NULL.
1752 CkReduction::setElement *CkReduction::setElement::next(void)
1753 {
1754   CkReduction::setElement *n=SET_NEXT(this);
1755   if (n->dataSize==-1)
1756     return NULL;//This is the end of the list
1757   else
1758     return n;//This is just another element
1759 }
1760
1761
1762 ///////// statisticsElement
1763
1764 CkReduction::statisticsElement::statisticsElement(double initialValue)
1765   : count(1)
1766   , mean(initialValue)
1767   , m2(0.0)
1768 {}
1769
1770 // statistics reducer
1771 // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
1772 // Chan, Tony F.; Golub, Gene H.; LeVeque, Randall J. (1979),
1773 // "Updating Formulae and a Pairwise Algorithm for Computing Sample Variances." (PDF),
1774 // Technical Report STAN-CS-79-773, Department of Computer Science, Stanford University.
1775 static CkReductionMsg* statistics(int nMsgs, CkReductionMsg** msg)
1776 {
1777   int nElem = msg[0]->getLength() / sizeof(CkReduction::statisticsElement);
1778   CkReduction::statisticsElement* ret = (CkReduction::statisticsElement*)(msg[0]->getData());
1779   for (int m = 1; m < nMsgs; m++)
1780   {
1781     CkReduction::statisticsElement* value = (CkReduction::statisticsElement*)(msg[m]->getData());
1782     for (int i = 0; i < nElem; i++)
1783     {
1784       double a_count = ret[i].count;
1785       ret[i].count += value[i].count;
1786       double delta = value[i].mean - ret[i].mean;
1787       ret[i].mean += delta * value[i].count / ret[i].count;
1788       ret[i].m2 += value[i].m2 + delta * delta * value[i].count * a_count / ret[i].count;
1789     }
1790   }
1791   return CkReductionMsg::buildNew(
1792     nElem*sizeof(CkReduction::statisticsElement),
1793     (void *)ret,
1794     CkReduction::invalid,
1795     msg[0]);
1796 }
1797
1798 ///////// tupleElement
1799
1800 CkReduction::tupleElement::tupleElement()
1801   : dataSize(0)
1802   , data(NULL)
1803   , reducer(CkReduction::invalid)
1804   , owns_data(false)
1805 {}
1806 CkReduction::tupleElement::tupleElement(size_t dataSize_, void* data_, CkReduction::reducerType reducer_)
1807   : dataSize(dataSize_)
1808   , data((char*)data_)
1809   , reducer(reducer_)
1810   , owns_data(false)
1811 {
1812 }
1813 CkReduction::tupleElement::tupleElement(CkReduction::tupleElement&& rhs_move)
1814   : dataSize(rhs_move.dataSize)
1815   , data(rhs_move.data)
1816   , reducer(rhs_move.reducer)
1817   , owns_data(rhs_move.owns_data)
1818 {
1819   rhs_move.dataSize = 0;
1820   rhs_move.data = 0;
1821   rhs_move.reducer = CkReduction::invalid;
1822   rhs_move.owns_data = false;
1823 }
1824 CkReduction::tupleElement& CkReduction::tupleElement::operator=(CkReduction::tupleElement&& rhs_move)
1825 {
1826   if (owns_data)
1827     delete[] data;
1828   dataSize = rhs_move.dataSize;
1829   data = rhs_move.data;
1830   reducer = rhs_move.reducer;
1831   owns_data = rhs_move.owns_data;
1832   rhs_move.dataSize = 0;
1833   rhs_move.data = 0;
1834   rhs_move.reducer = CkReduction::invalid;
1835   rhs_move.owns_data = false;
1836   return *this;
1837 }
1838 CkReduction::tupleElement::~tupleElement()
1839 {
1840   if (owns_data)
1841     delete[] data;
1842 }
1843
1844 void CkReduction::tupleElement::pup(PUP::er &p) {
1845   p|dataSize;
1846   // TODO - it might be better to pack these raw, then we don't have to
1847   //  transform & copy them out on unpacking, we could just use the message's
1848   //  memory directly
1849   if (p.isUnpacking()) {
1850     data = new char[dataSize];
1851     owns_data = true;
1852   }
1853   PUParray(p, data, dataSize);
1854   if (p.isUnpacking()){
1855     int temp;
1856     p|temp;
1857     reducer=(CkReduction::reducerType)temp;
1858   } else {
1859     int temp=(int)reducer;
1860     p|temp;
1861   }
1862 }
1863
1864 CkReductionMsg* CkReductionMsg::buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions)
1865 {
1866   PUP::sizer ps;
1867   ps|num_reductions;
1868   PUParray(ps, reductions, num_reductions);
1869
1870   CkReductionMsg* msg = CkReductionMsg::buildNew(ps.size(), NULL, CkReduction::tuple);
1871   PUP::toMem p(msg->data);
1872   p|num_reductions;
1873   PUParray(p, reductions, num_reductions);
1874   if (p.size() != ps.size()) CmiAbort("Size mismatch packing CkReduction::tupleElement::tupleToBuffer\n");
1875   return msg;
1876 }
1877
1878 void CkReductionMsg::toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions)
1879 {
1880   PUP::fromMem p(this->getData());
1881   p|(*num_reductions);
1882   *out_reductions = new CkReduction::tupleElement[*num_reductions];
1883   PUParray(p, *out_reductions, *num_reductions);
1884 }
1885
1886 // tuple reducer
1887 CkReductionMsg* CkReduction::tupleReduction(int num_messages, CkReductionMsg** messages)
1888 {
1889   CkReduction::tupleElement** tuple_data = new CkReduction::tupleElement*[num_messages];
1890   int num_reductions = 0;
1891   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1892   {
1893     int itr_num_reductions = 0;
1894     messages[message_idx]->toTuple(&tuple_data[message_idx], &itr_num_reductions);
1895
1896     // each message must submit the same reductions
1897     if (num_reductions == 0)
1898       num_reductions = itr_num_reductions;
1899     else if (num_reductions != itr_num_reductions)
1900       CmiAbort("num_reductions mismatch in CkReduction::tupleReduction");
1901   }
1902
1903   DEB_TUPLE(("tupleReduction {\n  num_messages=%d,\n  num_reductions=%d,\n  length=%d\n",
1904            num_messages, num_reductions, messages[0]->getLength()));
1905
1906   CkReduction::tupleElement* return_data = new CkReduction::tupleElement[num_reductions];
1907   // using a raw buffer to avoid CkReductionMsg constructor/destructor, we want to manage
1908   //  the inner memory of these temps ourselves to avoid unneeded copies
1909   char* simulated_messages_buffer = new char[sizeof(CkReductionMsg) * num_reductions * num_messages];
1910   CkReductionMsg** simulated_messages = new CkReductionMsg*[num_messages];
1911
1912   // imagine the given data in a 2D layout where the messages are rows and reductions are columns
1913   // here we grab each column and run that reduction
1914
1915   for (int reduction_idx = 0; reduction_idx < num_reductions; ++reduction_idx)
1916   {
1917     DEB_TUPLE(("  reduction_idx=%d {\n", reduction_idx));
1918     CkReduction::reducerType reducerType = CkReduction::invalid;
1919     for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1920     {
1921       CkReduction::tupleElement* reductions = (CkReduction::tupleElement*)(tuple_data[message_idx]);
1922       CkReduction::tupleElement& element = reductions[reduction_idx];
1923       DEB_TUPLE(("    msg %d, sf=%d, length=%d : { dataSize=%d, data=%p, reducer=%d },\n",
1924                  message_idx, messages[message_idx]->sourceFlag, messages[message_idx]->getLength(), element.dataSize, element.data, element.reducer));
1925
1926       reducerType = element.reducer;
1927
1928       size_t sim_idx = (reduction_idx * num_messages + message_idx) * sizeof(CkReductionMsg);
1929       CkReductionMsg& simulated_message = *(CkReductionMsg*)&simulated_messages_buffer[sim_idx];
1930       simulated_message.dataSize = element.dataSize;
1931       simulated_message.data = element.data;
1932       simulated_message.reducer = element.reducer;
1933       simulated_message.sourceFlag = messages[message_idx]->sourceFlag;
1934       simulated_message.userFlag = messages[message_idx]->userFlag;
1935       simulated_message.gcount = messages[message_idx]->gcount;
1936       simulated_message.migratableContributor = messages[message_idx]->migratableContributor;
1937 #if CMK_BIGSIM_CHARM
1938       simulated_message.log = NULL;
1939 #endif
1940       simulated_messages[message_idx] = &simulated_message;
1941     }
1942
1943     // run the reduction and copy the result back to our data structure
1944     const auto& reducerFp = CkReduction::reducerTable[reducerType].fn;
1945     CkReductionMsg* result = reducerFp(num_messages, simulated_messages);
1946     DEB_TUPLE(("    result_len=%d\n  },\n", result->getLength()));
1947     return_data[reduction_idx] = CkReduction::tupleElement(result->getLength(), result->getData(), reducerType);
1948     // TODO - leak - the built in reducers all reuse message memory, so this is not safe to delete
1949     // delete result;
1950   }
1951
1952   CkReductionMsg* retval = CkReductionMsg::buildFromTuple(return_data, num_reductions);
1953   DEB_TUPLE(("} tupleReduction msg_size=%d\n", retval->getSize()));
1954
1955   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1956     delete[] tuple_data[message_idx];
1957   delete[] tuple_data;
1958   delete[] return_data;
1959   delete[] simulated_messages_buffer;
1960   // note that although this is a 2d array, we don't need to delete the inner objects,
1961   //  their memory is tracked in simulated_messages_buffer
1962   delete[] simulated_messages;
1963   return retval;
1964 }
1965
1966
1967
1968 /////////////////// Reduction Function Table /////////////////////
1969 CkReduction::CkReduction() {} //Dummy private constructor
1970
1971 //Add the given reducer to the list.  Returns the new reducer's
1972 // reducerType.  Must be called in the same order on every node.
1973 CkReduction::reducerType CkReduction::addReducer(reducerFn fn, bool streamable)
1974 {
1975   reducerTable[nReducers].fn=fn;
1976   reducerTable[nReducers].streamable=streamable;
1977   return (reducerType)nReducers++;
1978 }
1979
1980 /*Reducer table: maps reducerTypes to reducerStructs.
1981 It's indexed by reducerType, so the order in this table
1982 must *exactly* match the reducerType enum declaration.
1983 The names don't have to match, but it helps.
1984 */
1985 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1986
1987 CkReduction::reducerStruct CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1988     CkReduction::reducerStruct(::invalid_reducer, true),
1989     CkReduction::reducerStruct(::nop, true),
1990     //Compute the sum the numbers passed by each element.
1991     CkReduction::reducerStruct(::sum_char, true),
1992     CkReduction::reducerStruct(::sum_short, true),
1993     CkReduction::reducerStruct(::sum_int, true),
1994     CkReduction::reducerStruct(::sum_long, true),
1995     CkReduction::reducerStruct(::sum_long_long, true),
1996     CkReduction::reducerStruct(::sum_uchar, true),
1997     CkReduction::reducerStruct(::sum_ushort, true),
1998     CkReduction::reducerStruct(::sum_uint, true),
1999     CkReduction::reducerStruct(::sum_ulong, true),
2000     CkReduction::reducerStruct(::sum_ulong_long, true),
2001     // The floating point sums are marked as unstreamable to avoid
2002     // implictly stating that they will always be precision oblivious.
2003     CkReduction::reducerStruct(::sum_float, false),
2004     CkReduction::reducerStruct(::sum_double, false),
2005
2006     //Compute the product the numbers passed by each element.
2007     CkReduction::reducerStruct(::product_char, true),
2008     CkReduction::reducerStruct(::product_short, true),
2009     CkReduction::reducerStruct(::product_int, true),
2010     CkReduction::reducerStruct(::product_long, true),
2011     CkReduction::reducerStruct(::product_long_long, true),
2012     CkReduction::reducerStruct(::product_uchar, true),
2013     CkReduction::reducerStruct(::product_ushort, true),
2014     CkReduction::reducerStruct(::product_uint, true),
2015     CkReduction::reducerStruct(::product_ulong, true),
2016     CkReduction::reducerStruct(::product_ulong_long, true),
2017     CkReduction::reducerStruct(::product_float, true),
2018     CkReduction::reducerStruct(::product_double, true),
2019
2020     //Compute the largest number passed by any element.
2021     CkReduction::reducerStruct(::max_char, true),
2022     CkReduction::reducerStruct(::max_short, true),
2023     CkReduction::reducerStruct(::max_int, true),
2024     CkReduction::reducerStruct(::max_long, true),
2025     CkReduction::reducerStruct(::max_long_long, true),
2026     CkReduction::reducerStruct(::max_uchar, true),
2027     CkReduction::reducerStruct(::max_ushort, true),
2028     CkReduction::reducerStruct(::max_uint, true),
2029     CkReduction::reducerStruct(::max_ulong, true),
2030     CkReduction::reducerStruct(::max_ulong_long, true),
2031     CkReduction::reducerStruct(::max_float, true),
2032     CkReduction::reducerStruct(::max_double, true),
2033
2034     //Compute the smallest number passed by any element.
2035     CkReduction::reducerStruct(::min_char, true),
2036     CkReduction::reducerStruct(::min_short, true),
2037     CkReduction::reducerStruct(::min_int, true),
2038     CkReduction::reducerStruct(::min_long, true),
2039     CkReduction::reducerStruct(::min_long_long, true),
2040     CkReduction::reducerStruct(::min_uchar, true),
2041     CkReduction::reducerStruct(::min_ushort, true),
2042     CkReduction::reducerStruct(::min_uint, true),
2043     CkReduction::reducerStruct(::min_ulong, true),
2044     CkReduction::reducerStruct(::min_ulong_long, true),
2045     CkReduction::reducerStruct(::min_float, true),
2046     CkReduction::reducerStruct(::min_double, true),
2047
2048     //Compute the logical AND of the integers passed by each element.
2049     // The resulting integer will be zero if any source integer is zero.
2050     CkReduction::reducerStruct(::logical_and, true),
2051
2052     //Compute the logical OR of the integers passed by each element.
2053     // The resulting integer will be 1 if any source integer is nonzero.
2054     CkReduction::reducerStruct(::logical_or, true),
2055
2056     // Compute the logical bitvector AND of the integers passed by each element.
2057     CkReduction::reducerStruct(::bitvec_and, true),
2058
2059     // Compute the logical bitvector OR of the integers passed by each element.
2060     CkReduction::reducerStruct(::bitvec_or, true),
2061     
2062     // Compute the logical bitvector XOR of the integers passed by each element.
2063     CkReduction::reducerStruct(::bitvec_xor, true),
2064
2065     // Select one of the messages at random to pass on
2066     CkReduction::reducerStruct(::random, true),
2067
2068     //Concatenate the (arbitrary) data passed by each element
2069     // This reduction is marked as unstreamable because of the n^2
2070     // work required to stream it
2071     CkReduction::reducerStruct(::concat, false),
2072
2073     //Combine the data passed by each element into an list of setElements.
2074     // Each element may contribute arbitrary data (with arbitrary length).
2075     // This reduction is marked as unstreamable because of the n^2
2076     // work required to stream it
2077     CkReduction::reducerStruct(::set, false),
2078
2079     CkReduction::reducerStruct(::statistics, true),
2080     CkReduction::reducerStruct(CkReduction::tupleReduction, false),
2081 };
2082
2083
2084
2085
2086
2087
2088
2089
2090 /********** Code added by Sayantan *********************/
2091 /** Locking is a big problem in the nodegroup code for smp.
2092  So a few assumptions have had to be made. There is one lock
2093  called lockEverything. It protects all the data structures 
2094  of the nodegroup reduction mgr. I tried grabbing it separately 
2095  for each datastructure, modifying it and then releasing it and
2096  then grabbing it again, for the next change.
2097  That doesn't really help because the interleaved execution of 
2098  different threads makes the state of the reduction manager 
2099  inconsistent. 
2100  
2101  1. Grab lockEverything before calling finishreduction or startReduction
2102     or doRecvMsg
2103  2. lockEverything is grabbed only in entry methods reductionStarting
2104     or RecvMesg or  addcontribution.
2105  ****/
2106  
2107 /**nodegroup reduction manager . Most of it is similar to the guy above***/
2108 NodeGroup::NodeGroup(void) {
2109   __nodelock=CmiCreateLock();
2110 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2111     mlogData->objID.type = TypeNodeGroup;
2112     mlogData->objID.data.group.onPE = CkMyNode();
2113 #endif
2114
2115 }
2116 NodeGroup::~NodeGroup() {
2117   CmiDestroyLock(__nodelock);
2118   CkpvAccess(_destroyingNodeGroup) = true;
2119 }
2120 void NodeGroup::pup(PUP::er &p)
2121 {
2122   CkNodeReductionMgr::pup(p);
2123   p|reductionInfo;
2124 }
2125
2126 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
2127
2128 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
2129   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
2130  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
2131   //ckLocalNodeBranch()->ckSetReductionClient(cb);
2132  }
2133
2134 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
2135                                     ((CkNodeReductionMgr *)this),
2136                                     reductionInfo,false)
2137
2138 /* this contribute also adds up the count across all messages it receives.
2139   Useful for summing up number of array elements who have contributed ****/ 
2140 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
2141         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
2142
2143
2144
2145 //#define BINOMIAL_TREE
2146
2147 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
2148   : thisProxy(thisgroup)
2149 {
2150 #ifdef BINOMIAL_TREE
2151   init_BinomialTree();
2152 #else
2153   init_BinaryTree();
2154 #endif
2155   storedCallback=NULL;
2156   redNo=0;
2157   inProgress=false;
2158   
2159   startRequested=false;
2160   gcount=CkNumNodes();
2161   lcount=1;
2162   nContrib=nRemote=0;
2163   lockEverything = CmiCreateLock();
2164
2165
2166   creating=false;
2167   interrupt = 0;
2168   DEBR((AA "In NodereductionMgr constructor at %d \n" AB,this));
2169         /*
2170                 FAULT_EVAC
2171         */
2172         blocked = false;
2173         maxModificationRedNo = INT_MAX;
2174         killed=0;
2175         additionalGCount = newAdditionalGCount = 0;
2176 }
2177
2178 CkNodeReductionMgr::~CkNodeReductionMgr()
2179 {
2180   CmiDestroyLock(lockEverything);
2181 }
2182
2183 void CkNodeReductionMgr::flushStates()
2184 {
2185  if(CkMyRank() == 0){
2186   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
2187   redNo=0;
2188   inProgress=false;
2189
2190   startRequested=false;
2191   gcount=CkNumNodes();
2192   lcount=1;
2193   nContrib=nRemote=0;
2194
2195   creating=false;
2196   interrupt = 0;
2197   while (!msgs.isEmpty()) { delete msgs.deq(); }
2198   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
2199   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
2200   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
2201   }
2202 }
2203
2204 //////////// Reduction Manager Client API /////////////
2205
2206 //Add the given client function.  Overwrites any previous client.
2207 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
2208 {
2209   DEBR((AA "Setting reductionClient in NodeReductionMgr %d at %d\n" AB,cb,this));
2210   if(cb->isInvalid()){
2211         DEBR((AA "Invalid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2212   }else{
2213         DEBR((AA "Valid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2214   }
2215
2216   if (CkMyNode()!=0)
2217           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
2218   delete storedCallback;
2219   storedCallback=cb;
2220 }
2221
2222
2223
2224 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
2225 {
2226 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2227     Chare *oldObj =CpvAccess(_currentObj);
2228     CpvAccess(_currentObj) = this;
2229 #endif
2230
2231   //m->ci=ci;
2232   m->redNo=ci->redNo++;
2233   m->sourceFlag=-1;//A single contribution
2234   m->gcount=0;
2235   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
2236   addContribution(m);
2237
2238 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2239     CpvAccess(_currentObj) = oldObj;
2240 #endif
2241 }
2242
2243
2244 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
2245 {
2246 #if CMK_BIGSIM_CHARM
2247   _TRACE_BG_TLINE_END(&m->log);
2248 #endif
2249 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2250     Chare *oldObj =CpvAccess(_currentObj);
2251     CpvAccess(_currentObj) = this;
2252 #endif
2253   //m->ci=ci;
2254   m->redNo=ci->redNo++;
2255   m->gcount=count;
2256   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2257   addContribution(m);
2258   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2259
2260 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2261     CpvAccess(_currentObj) = oldObj;
2262 #endif
2263 }
2264
2265
2266 //////////// Reduction Manager Remote Entry Points /////////////
2267
2268 //Sent down the reduction tree (used by barren PEs)
2269 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
2270 {
2271   DEBR((AA "[%d] Received reductionStarting redNo %d\n" AB, CkMyPe(), m->num));
2272   CmiLock(lockEverything);
2273         /*
2274                 FAULT_EVAC
2275         */
2276   if(blocked){
2277         delete m;
2278         CmiUnlock(lockEverything);
2279         return ;
2280   }
2281   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
2282   if (isPresent(m->num) && !inProgress)
2283   {
2284     DEBR((AA "Starting Node reduction #%d at parent's request\n" AB,m->num));
2285     startReduction(m->num,srcNode);
2286     finishReduction();
2287   } else if (isFuture(m->num)){
2288         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
2289   }
2290   else //is Past
2291     DEBR((AA "Ignoring node parent's late request to start #%d\n" AB,m->num));
2292   CmiUnlock(lockEverything);
2293   delete m;
2294
2295 }
2296
2297
2298 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
2299         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2300         /*
2301                 FAULT_EVAC
2302         */
2303         if(blocked){
2304                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
2305                 bufferedRemoteMsgs.enq(m);
2306                 return;
2307         }
2308         
2309         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
2310             //DEBR((AA "Recv'd remote contribution %d for #%d at %d\n" AB,nRemote,m->redNo,this));
2311             startReduction(m->redNo,CkMyNode());
2312             msgs.enq(m);
2313             nRemote++;
2314             finishReduction();
2315         }
2316         else {
2317             if (isFuture(m->redNo)) {
2318                    // DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
2319                     futureRemoteMsgs.enq(m);
2320             }else{
2321                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
2322                    CkAbort("Recv'd late remote contribution!\n");
2323             }
2324         }
2325         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2326 }
2327
2328 //Sent up the reduction tree with reduced data
2329 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
2330 {
2331 #if CMK_BIGSIM_CHARM
2332   _TRACE_BG_TLINE_END(&m->log);
2333 #endif
2334 #ifndef CMK_CPV_IS_SMP
2335 #if CMK_IMMEDIATE_MSG
2336         if(interrupt == 1){
2337                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
2338                 CpvAccess(_qd)->process(-1);
2339                 CmiDelayImmediate();
2340                 return;
2341         }
2342 #endif  
2343 #endif
2344    interrupt = 1;       
2345    CmiLock(lockEverything);   
2346    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2347    doRecvMsg(m);
2348    CmiUnlock(lockEverything);    
2349    interrupt = 0;
2350    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2351 }
2352
2353 void CkNodeReductionMgr::startReduction(int number,int srcNode)
2354 {
2355         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
2356         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
2357         if (inProgress){
2358                 DEBR((AA "This Node reduction is already in progress\n" AB));
2359                 return;//This reduction already started
2360         }
2361         if (creating) //Don't start yet-- we're creating elements
2362         {
2363                 DEBR((AA " Node Postponing start request #%d until we're done creating\n" AB,redNo));
2364                 startRequested=true;
2365                 return;
2366         }
2367         
2368         //If none of these cases, we need to start the reduction--
2369         DEBR((AA "Starting Node reduction #%d on %p srcNode %d\n" AB,redNo,this,srcNode));
2370         inProgress=true;
2371
2372         if(!_isNotifyChildInRed) return;
2373
2374         //Sent start requests to our kids (in case they don't already know)
2375         
2376         for (int k=0;k<treeKids();k++)
2377         {
2378 #ifdef BINOMIAL_TREE
2379                 DEBR((AA "Asking child Node %d to start #%d\n" AB,kids[k],redNo));
2380                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
2381 #else
2382                 if(kids[k] != srcNode){
2383                         DEBR((AA "Asking child Node %d to start #%d\n" AB,kids[k],redNo));
2384                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
2385                 }       
2386 #endif
2387         }
2388
2389         DEBR((AA "Asking all local groups to start #%d\n" AB,redNo));
2390         // in case, nodegroup does not has the attached red group,
2391         // it has to restart groups again
2392         startLocalGroupReductions(number);
2393 /*
2394         if (startLocalGroupReductions(number) == 0)
2395           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
2396 */
2397 }
2398
2399 // restart local groups until succeed
2400 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
2401   CmiLock(lockEverything);    
2402   if (startLocalGroupReductions(number) == 0)
2403     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
2404   CmiUnlock(lockEverything);    
2405 }
2406
2407 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
2408         /*
2409                 FAULT_EVAC
2410         */
2411         if(blocked){
2412                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2413                 bufferedMsgs.enq(m);
2414                 return;
2415         }
2416         
2417         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2418                 DEBR((AA "Contributor gives early node contribution-- for #%d\n" AB,m->redNo));
2419                 futureMsgs.enq(m);
2420         } else {// An ordinary contribution
2421                 DEBR((AA "Recv'd local node contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2422                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
2423                 startReduction(m->redNo,CkMyNode());
2424                 msgs.enq(m);
2425                 nContrib++;
2426                 finishReduction();
2427         }
2428 }
2429
2430 //Handle a message from one element for the reduction
2431 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
2432 {
2433   interrupt = 1;
2434   CmiLock(lockEverything);
2435   doAddContribution(m);
2436   CmiUnlock(lockEverything);
2437   interrupt = 0;
2438 }
2439
2440 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
2441         CmiLock(lockEverything);   
2442         /*
2443                 FAULT_EVAC
2444         */
2445         if(blocked){
2446                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2447                 bufferedMsgs.enq(m);
2448                 CmiUnlock(lockEverything);   
2449                 return;
2450         }
2451         
2452         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2453                 DEBR((AA "Latemigrant gives early node contribution-- for #%d\n" AB,m->redNo));
2454 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2455                 futureLateMigrantMsgs.enq(m);
2456         } else {// An ordinary contribution
2457                 DEBR((AA "Recv'd late migrant contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2458 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2459                 msgs.enq(m);
2460                 finishReduction();
2461         }
2462         CmiUnlock(lockEverything);   
2463 }
2464
2465
2466
2467
2468
2469 /** check if the nodegroup reduction is finished at this node. In that case send it
2470 up the reduction tree **/
2471
2472 void CkNodeReductionMgr::finishReduction(void)
2473 {
2474   DEBR((AA "in Nodegrp finishReduction %d treeKids %d \n" AB,inProgress,treeKids()));
2475   /***Check if reduction is finished in the next few ifs***/
2476   if ((!inProgress) || creating){
2477         DEBR((AA "Either not in Progress or creating\n" AB));
2478         return;
2479   }
2480
2481   bool partialReduction = false;
2482
2483   if (nContrib<(lcount)){
2484     if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
2485       partialReduction = true;
2486     }
2487     else {
2488       DEBR((AA "Nodegrp Need more local messages %d %d\n" AB,nContrib,(lcount)));
2489       return;//Need more local messages
2490     }
2491   }
2492   if (nRemote<treeKids()){
2493     if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
2494       partialReduction = true;
2495     }
2496     else {
2497       DEBR((AA "Nodegrp Need more Remote messages %d %d\n" AB,nRemote,treeKids()));
2498       return;//Need more remote messages
2499     }
2500   }
2501   if (nRemote>treeKids()){
2502
2503           interrupt = 0;
2504            CkAbort("Nodegrp Excess remote reduction message received!\n");
2505   }
2506
2507   DEBR((AA "Reducing node data...\n" AB));
2508
2509   /**reduce all messages received at this node **/
2510   CkReductionMsg *result=reduceMessages();
2511
2512   if (partialReduction) {
2513     msgs.enq(result);
2514     return;
2515   }
2516
2517   if (hasParent())
2518   {//Pass data up tree to parent
2519         if(CmiNodeAlive(CkMyNode()) || killed == 0){
2520         DEBR((AA "Passing reduced data up to parent node %d. \n" AB,treeParent()));
2521         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
2522                 /*
2523                         FAULT_EVAC
2524                 */
2525                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2526             thisProxy[treeParent()].RecvMsg(result);
2527         }
2528
2529   }
2530   else
2531   {
2532                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2533                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2534                         msgs.enq(result);
2535                         return;
2536                 }
2537                 result->gcount += additionalGCount;
2538           /** if the reduction is finished and I am the root of the reduction tree
2539           then call the reductionhandler and other stuff ***/
2540                 
2541
2542                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2543     CkSetRefNum(result, result->getUserFlag());
2544     if (!result->callback.isInvalid()){
2545       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2546             result->callback.send(result);
2547     }
2548     else if (storedCallback!=NULL){
2549       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2550             storedCallback->send(result);
2551     }
2552     else{
2553                 DEBR((AA "Invalid Callback \n" AB));
2554             CkAbort("No reduction client!\n"
2555                     "You must register a client with either SetReductionClient or during contribute.\n");
2556                 }
2557   }
2558
2559   // DEBR((AA "Reduction %d finished in group!\n" AB,redNo));
2560   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2561   redNo++;
2562         updateTree();
2563   int i;
2564   inProgress=false;
2565   startRequested=false;
2566   nRemote=nContrib=0;
2567
2568   //Look through the future queue for messages we can now handle
2569   int n=futureMsgs.length();
2570
2571   for (i=0;i<n;i++)
2572   {
2573     interrupt = 1;
2574
2575     CkReductionMsg *m=futureMsgs.deq();
2576
2577     interrupt = 0;
2578     if (m!=NULL){ //One of these addContributions may have finished us.
2579       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2580       doAddContribution(m);//<- if *still* early, puts it back in the queue
2581     }
2582   }
2583
2584   interrupt = 1;
2585
2586   n=futureRemoteMsgs.length();
2587
2588   interrupt = 0;
2589   for (i=0;i<n;i++)
2590   {
2591     interrupt = 1;
2592
2593     CkReductionMsg *m=futureRemoteMsgs.deq();
2594
2595     interrupt = 0;
2596     if (m!=NULL)
2597       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2598   }
2599   
2600   n = futureLateMigrantMsgs.length();
2601   for(i=0;i<n;i++){
2602     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2603     if(m != NULL){
2604       if(m->redNo == redNo){
2605         msgs.enq(m);
2606       }else{
2607         futureLateMigrantMsgs.enq(m);
2608       }
2609     }
2610   }
2611 }
2612
2613 //////////// Reduction Manager Utilities /////////////
2614
2615 void CkNodeReductionMgr::init_BinaryTree(){
2616         parent = (CkMyNode()-1)/TREE_WID;
2617         int firstkid = CkMyNode()*TREE_WID+1;
2618         numKids=CkNumNodes()-firstkid;
2619   if (numKids>TREE_WID) numKids=TREE_WID;
2620   if (numKids<0) numKids=0;
2621
2622         for(int i=0;i<numKids;i++){
2623                 kids.push_back(firstkid+i);
2624                 newKids.push_back(firstkid+i);
2625         }
2626 }
2627
2628 void CkNodeReductionMgr::init_BinomialTree(){
2629         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2630         /*upperSize = (unsigned )pow((double)2,depth);*/
2631         upperSize = (unsigned) 1 << depth;
2632         label = upperSize-CkMyNode()-1;
2633         int p=label;
2634         int count=0;
2635         while( p > 0){
2636                 if(p % 2 == 0)
2637                         break;
2638                 else{
2639                         p = p/2;
2640                         count++;
2641                 }
2642         }
2643         /*parent = label + rint(pow((double)2,count));*/
2644         parent = label + (1<<count);
2645         parent = upperSize -1 -parent;
2646         int temp;
2647         if(count != 0){
2648                 numKids = 0;
2649                 for(int i=0;i<count;i++){
2650                         /*temp = label - rint(pow((double)2,i));*/
2651                         temp = label - (1<<i);
2652                         temp = upperSize-1-temp;
2653                         if(temp <= CkNumNodes()-1){
2654                 //              kids[numKids] = temp;
2655                                 kids.push_back(temp);
2656                                 numKids++;
2657                         }
2658                 }
2659         }else{
2660                 numKids = 0;
2661         //      kids = NULL;
2662         }
2663 }
2664
2665
2666 int CkNodeReductionMgr::treeRoot(void)
2667 {
2668   return 0;
2669 }
2670 bool CkNodeReductionMgr::hasParent(void) //Root Node
2671 {
2672   return (bool)(CkMyNode()!=treeRoot());
2673 }
2674 int CkNodeReductionMgr::treeParent(void) //My parent Node
2675 {
2676   return parent;
2677 }
2678
2679 int CkNodeReductionMgr::firstKid(void) //My first child Node
2680 {
2681   return CkMyNode()*TREE_WID+1;
2682 }
2683 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2684 {
2685 #ifdef BINOMIAL_TREE
2686         return numKids;
2687 #else
2688 /*  int nKids=CkNumNodes()-firstKid();
2689   if (nKids>TREE_WID) nKids=TREE_WID;
2690   if (nKids<0) nKids=0;
2691   return nKids;*/
2692         return numKids;
2693 #endif
2694 }
2695
2696 //Combine (& free) the current message vector msgs.
2697 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2698 {
2699 #if CMK_BIGSIM_CHARM
2700   _TRACE_BG_END_EXECUTE(1);
2701   void* _bgParentLog = NULL;
2702   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2703 #endif
2704   CkReductionMsg *ret=NULL;
2705
2706   //Look through the vector for a valid reducer, swapping out placeholder messages
2707   CkReduction::reducerType r=CkReduction::invalid;
2708   int msgs_gcount=0;//Reduced gcount
2709   int msgs_nSources=0;//Reduced nSources
2710   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
2711   CkCallback msgs_callback;
2712   CkCallback msgs_secondaryCallback;
2713   int i;
2714   int nMsgs=0;
2715   CkReductionMsg *m;
2716   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2717   bool isMigratableContributor;
2718         
2719
2720   while(NULL!=(m=msgs.deq()))
2721   {
2722     DEBR((AA "***** gcount=%d; sourceFlag=%d ismigratable %d \n" AB,m->gcount,m->nSources(),m->isMigratableContributor()));       
2723     msgs_gcount+=m->gcount;
2724     if (m->sourceFlag!=0)
2725     { //This is a real message from an element, not just a placeholder
2726       msgs_nSources+=m->nSources();
2727 #if CMK_BIGSIM_CHARM
2728       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2729 #endif
2730
2731       if (nMsgs == 0 || m->reducer != CkReduction::random) {
2732         msgArr[nMsgs++]=m;
2733         r=m->reducer;
2734         if (!m->callback.isInvalid()){
2735 #if CMK_ERROR_CHECKING
2736           if(nMsgs > 1 && !(msgs_callback == m->callback))
2737             CkAbort("mis-matched client callbacks in reduction messages\n");
2738 #endif  
2739           msgs_callback=m->callback;
2740         }
2741         if(!m->secondaryCallback.isInvalid()){
2742           msgs_secondaryCallback = m->secondaryCallback;
2743         }
2744         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
2745           msgs_userFlag=m->userFlag;
2746         isMigratableContributor= m->isMigratableContributor();
2747       }
2748       else {
2749 #if CMK_ERROR_CHECKING
2750         if(!(msgs_callback == m->callback))
2751           CkAbort("mis-matched client callbacks in reduction messages\n");
2752 #endif  
2753         delete m;
2754       }
2755     }
2756     else
2757     { //This is just a placeholder message-- replace it
2758       delete m;
2759     }
2760   }
2761
2762   if (nMsgs==0||r==CkReduction::invalid)
2763   //No valid reducer in the whole vector
2764     ret=CkReductionMsg::buildNew(0,NULL);
2765   else
2766   {//Use the reducer to reduce the messages
2767     if(nMsgs == 1){
2768       ret = msgArr[0];
2769     }else{
2770       if (msgArr[0]->reducer == CkReduction::random) {
2771         // nMsgs > 1 indicates that reduction type is not random
2772         // this means any data with reducer type random was submitted
2773         // only so that counts would agree, and can be removed
2774         delete msgArr[0];
2775         msgArr[0] = msgArr[nMsgs - 1];
2776         nMsgs--;
2777       }
2778       CkReduction::reducerFn f=CkReduction::reducerTable[r].fn;
2779       ret=(*f)(nMsgs,msgArr);
2780     }
2781     ret->reducer=r;
2782   }
2783
2784         
2785 #if USE_CRITICAL_PATH_HEADER_ARRAY
2786 #if CRITICAL_PATH_DEBUG > 3
2787         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2788 #endif
2789         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2790         path.updateMax(UsrToEnv(ret));
2791         // Combine the critical paths from all the reduction messages into the header for the new result
2792         for (i=0;i<nMsgs;i++){
2793           if (msgArr[i]!=ret){
2794             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2795             path.updateMax(UsrToEnv(msgArr[i]));
2796           } else {
2797             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2798           }
2799         }
2800 #if CRITICAL_PATH_DEBUG > 3
2801         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2802 #endif
2803
2804 #endif
2805
2806
2807         //Go back through the vector, deleting old messages
2808   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2809   delete [] msgArr;
2810   //Set the message counts
2811   ret->redNo=redNo;
2812   ret->gcount=msgs_gcount;
2813   ret->userFlag=msgs_userFlag;
2814   ret->callback=msgs_callback;
2815   ret->secondaryCallback = msgs_secondaryCallback;
2816   ret->sourceFlag=msgs_nSources;
2817   ret->setMigratableContributor(isMigratableContributor);
2818   DEBR((AA "Node Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
2819 #if CMK_BIGSIM_CHARM
2820   _TRACE_BG_TLINE_END(&ret->log);
2821 #endif
2822
2823   return ret;
2824 }
2825
2826 void CkNodeReductionMgr::pup(PUP::er &p)
2827 {
2828 //We do not store the client function pointer or the client function parameter,
2829 //it is the responsibility of the programmer to correctly restore these
2830   IrrGroup::pup(p);
2831   p(redNo);
2832   p(inProgress); p(creating); p(startRequested);
2833   p(lcount);
2834   p(nContrib); p(nRemote);
2835   p(interrupt);
2836   p|msgs;
2837   p|futureMsgs;
2838   p|futureRemoteMsgs;
2839   p|futureLateMigrantMsgs;
2840   p|parent;
2841   p|additionalGCount;
2842   p|newAdditionalGCount;
2843   if(p.isUnpacking()) {
2844     gcount=CkNumNodes();
2845     thisProxy = thisgroup;
2846     lockEverything = CmiCreateLock();
2847 #ifdef BINOMIAL_TREE
2848     init_BinomialTree();
2849 #else
2850     init_BinaryTree();
2851 #endif          
2852   }
2853   p | blocked;
2854   p | maxModificationRedNo;
2855
2856 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2857   int isnull = (storedCallback == NULL);
2858   p | isnull;
2859   if (!isnull) {
2860     if (p.isUnpacking()) {
2861       storedCallback = new CkCallback;
2862     }
2863     p|*storedCallback;
2864   }
2865 #endif
2866
2867 }
2868
2869 /*
2870         FAULT_EVAC
2871         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2872         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2873         reduction tree. 
2874 */
2875 void CkNodeReductionMgr::evacuate(){
2876         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2877         if(treeKids() == 0){
2878         /*
2879                 if the node going down is a leaf
2880         */
2881                 oldleaf=true;
2882                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2883                 /*
2884                         Need to ask parent for the reduction number that it has seen. 
2885                         Since it is a leaf, the tree does not need to be rewired. 
2886                         We reuse the oldparent type of tree modification message to get 
2887                         the parent to block and tell us about the highest reduction number it has seen.
2888                         
2889                 */
2890                 int data[2];
2891                 data[0]=CkMyNode();
2892                 data[1]=getTotalGCount()+additionalGCount;
2893                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2894                 newParent = treeParent();
2895         }else{
2896                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2897                 oldleaf= false;
2898         /*
2899                 It is not a leaf. It needs to rewire the tree around itself.
2900                 It also needs to decide on a reduction No after which the new tree will be used
2901                 Till it decides on the new tree and the redNo at which it becomes valid,
2902                 all received messages will be buffered
2903         */
2904                 newParent = kids[0];
2905                 for(int i=numKids-1;i>=0;i--){
2906                         newKids.remove(i);
2907                 }
2908                 /*
2909                         Ask everybody for the highest reduction number they have seen and
2910                         also tell them about the new tree
2911                 */
2912                 /*
2913                         Tell parent about its new child;
2914                 */
2915                 int oldParentData[2];
2916                 oldParentData[0] = CkMyNode();
2917                 oldParentData[1] = newParent;
2918                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2919
2920                 /*
2921                         Tell the other children about their new parent
2922                 */
2923                 int childrenData=newParent;
2924                 for(int i=1;i<numKids;i++){
2925                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2926                 }
2927                 
2928                 /*
2929                         Tell newParent (1st child) about its new children,
2930                         the current node and its children except the newParent
2931                 */
2932                 int *newParentData = new int[numKids+2];
2933                 for(int i=1;i<numKids;i++){
2934                         newParentData[i] = kids[i];
2935                 }
2936                 newParentData[0] = CkMyNode();
2937                 newParentData[numKids] = parent;
2938                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2939                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2940         }
2941         readyDeletion = false;
2942         blocked = true;
2943         numModificationReplies = 0;
2944         tempModificationRedNo = findMaxRedNo();
2945 }
2946
2947 /*
2948         Depending on the code, use the data to change the tree
2949         1. OLDPARENT : replace the old child with a new one
2950         2. OLDCHILDREN: replace the parent
2951         3. NEWPARENT:  add the children and change the parent
2952         4. LEAFPARENT: delete the old child
2953 */
2954
2955 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2956         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2957         int sender;
2958         newKids = kids;
2959         readyDeletion = false;
2960         newAdditionalGCount = additionalGCount;
2961         switch(code){
2962                 case OLDPARENT: 
2963                         for(int i=0;i<numKids;i++){
2964                                 if(newKids[i] == data[0]){
2965                                         newKids[i] = data[1];
2966                                         break;
2967                                 }
2968                         }
2969                         sender = data[0];
2970                         newParent = parent;
2971                         break;
2972                 case OLDCHILDREN:
2973                         newParent = data[0];
2974                         sender = parent;
2975                         break;
2976                 case NEWPARENT:
2977                         for(int i=0;i<size-2;i++){
2978                                 newKids.push_back(data[i]);
2979                         }
2980                         newParent = data[size-2];
2981                         newAdditionalGCount += data[size-1];
2982                         sender = parent;
2983                         break;
2984                 case LEAFPARENT:
2985                         for(int i=0;i<numKids;i++){
2986                                 if(newKids[i] == data[0]){
2987                                         newKids.remove(i);
2988                                         break;
2989                                 }
2990                         }
2991                         sender = data[0];
2992                         newParent = parent;
2993                         newAdditionalGCount += data[1];
2994                         break;
2995         };
2996         blocked = true;
2997         int maxRedNo = findMaxRedNo();
2998         
2999         thisProxy[sender].collectMaxRedNo(maxRedNo);
3000 }
3001
3002 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
3003         /*
3004                 Find out the maximum redNo that has been seen by 
3005                 the affected nodes
3006         */
3007         numModificationReplies++;
3008         if(maxRedNo > tempModificationRedNo){
3009                 tempModificationRedNo = maxRedNo;
3010         }
3011         if(numModificationReplies == numKids+1){
3012                 maxModificationRedNo = tempModificationRedNo;
3013                 /*
3014                         when all the affected nodes have replied, tell them the maximum.
3015                         Unblock yourself. deal with the buffered messages local and remote
3016                 */
3017                 if(maxModificationRedNo == -1){
3018                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
3019                 }else{
3020                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
3021                 }
3022                 thisProxy[parent].unblockNode(maxModificationRedNo);
3023                 for(int i=0;i<numKids;i++){
3024                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
3025                 }
3026                 blocked = false;
3027                 updateTree();
3028                 clearBlockedMsgs();
3029         }
3030 }
3031
3032 void CkNodeReductionMgr::unblockNode(int maxRedNo){
3033         maxModificationRedNo = maxRedNo;
3034         updateTree();
3035         blocked = false;
3036         clearBlockedMsgs();
3037 }
3038
3039
3040 void CkNodeReductionMgr::clearBlockedMsgs(){
3041         int len = bufferedMsgs.length();
3042         for(int i=0;i<len;i++){
3043                 CkReductionMsg *m = bufferedMsgs.deq();
3044                 doAddContribution(m);
3045         }
3046         len = bufferedRemoteMsgs.length();
3047         for(int i=0;i<len;i++){
3048                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
3049                 doRecvMsg(m);
3050         }
3051
3052 }
3053 /*
3054         if the reduction number exceeds the maxModificationRedNo, change the tree
3055         to become the new one
3056 */
3057
3058 void CkNodeReductionMgr::updateTree(){
3059         if(redNo > maxModificationRedNo){
3060                 parent = newParent;
3061                 kids = newKids;
3062                 maxModificationRedNo = INT_MAX;
3063                 numKids = kids.size();
3064                 readyDeletion = true;
3065                 additionalGCount = newAdditionalGCount;
3066                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
3067                 for(int i=0;i<(int)(newKids.size());i++){
3068                         DEBREVAC(("%d ",newKids[i]));
3069                 }
3070                 DEBREVAC(("\n"));
3071         //      startReduction(redNo,CkMyNode());
3072         }else{
3073                 if(maxModificationRedNo != INT_MAX){
3074                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
3075                         startReduction(redNo,CkMyNode());
3076                         finishReduction();
3077                 }       
3078         }
3079 }
3080
3081
3082 void CkNodeReductionMgr::doneEvacuate(){
3083         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
3084 /*      if(oldleaf){
3085                 
3086                         It used to be a leaf
3087                         Then as soon as future messages have been emptied you can 
3088                         send the parent a message telling them that they are not going
3089                         to receive anymore messages from this child
3090                 
3091                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
3092                 while(futureMsgs.length() != 0){
3093                         int n = futureMsgs.length();
3094                         for(int i=0;i<n;i++){
3095                                 CkReductionMsg *m = futureMsgs.deq();
3096                                 if(isPresent(m->redNo)){
3097                                         msgs.enq(m);
3098                                 }else{
3099                                         futureMsgs.enq(m);
3100                                 }
3101                         }
3102                         CkReductionMsg *result = reduceMessages();
3103                         thisProxy[treeParent()].RecvMsg(result);
3104                         redNo++;
3105                 }
3106                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
3107                 thisProxy[treeParent()].DeleteChild(CkMyNode());
3108         }else{*/
3109                 if(readyDeletion){
3110                         thisProxy[treeParent()].DeleteChild(CkMyNode());
3111                 }else{
3112                         thisProxy[newParent].DeleteNewChild(CkMyNode());
3113                 }
3114 //      }
3115 }
3116
3117 void CkNodeReductionMgr::DeleteChild(int deletedChild){
3118         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
3119         for(int i=0;i<numKids;i++){
3120                 if(kids[i] == deletedChild){
3121                         kids.remove(i);
3122                         break;
3123                 }
3124         }
3125         numKids = kids.length();
3126         finishReduction();
3127 }
3128
3129 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
3130         for(int i=0;i<(int)(newKids.length());i++){
3131                 if(newKids[i] == deletedChild){
3132                         newKids.remove(i);
3133                         break;
3134                 }
3135         }
3136         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
3137         for(int i=0;i<(int)(newKids.size());i++){
3138                 DEBREVAC(("%d ",newKids[i]));
3139         }
3140         DEBREVAC(("\n"));
3141         finishReduction();
3142 }
3143
3144 int CkNodeReductionMgr::findMaxRedNo(){
3145         int max = redNo;
3146         for(int i=0;i<futureRemoteMsgs.length();i++){
3147                 if(futureRemoteMsgs[i]->redNo  > max){
3148                         max = futureRemoteMsgs[i]->redNo;
3149                 }
3150         }
3151         /*
3152                 if redNo is max (that is no future message) and the current reduction has not started
3153                 then tree can be changed before the reduction redNo can be started
3154         */ 
3155         if(redNo == max && msgs.length() == 0){
3156                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
3157                 max--;
3158         }
3159         return max;
3160 }
3161
3162 // initnode call. check the size of reduction table
3163 void CkReductionMgr::sanitycheck()
3164 {
3165 #if CMK_ERROR_CHECKING
3166   int count = 0;
3167   while (CkReduction::reducerTable[count].fn != NULL) count++;
3168   CmiAssert(CkReduction::nReducers == count);
3169 #endif
3170 }
3171
3172 #include "CkReduction.def.h"