54a89b9a325a8de48e130c8a3f1e3b5a97e4041b
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #define DEB_TUPLE(x) CkPrintf x
72 #else
73 //No debugging info-- empty defines
74 #define DEBR(x) // CkPrintf x
75 #define DEBRMLOG(x) CkPrintf x
76 #define AA
77 #define AB
78 #define DEBN(x) //CkPrintf x
79 #define RED_DEB(x) //CkPrintf x
80 #define DEBREVAC(x) //CkPrintf x
81 #define DEB_TUPLE(x) //CkPrintf x
82 #endif
83
84 #ifndef INT_MAX
85 #define INT_MAX 2147483647
86 #endif
87
88 extern int _inrestart;
89
90 Group::Group()
91   : CkReductionMgr(CkpvAccess(_currentGroupRednMgr))
92 {
93         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
94
95         creatingContributors();
96         contributorStamped(&reductionInfo);
97         contributorCreated(&reductionInfo);
98         doneCreatingContributors();
99 #if !GROUP_LEVEL_REDUCTION
100         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
101 #endif
102 }
103
104 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
105 {
106         creatingContributors();
107         contributorStamped(&reductionInfo);
108         contributorCreated(&reductionInfo);
109         doneCreatingContributors();
110 }
111
112 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
113                                     ((CkReductionMgr *)this),
114                                     reductionInfo,false)
115 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
116
117 CK_BARRIER_CONTRIBUTE_METHODS_DEF(Group,
118                                    ((CkReductionMgr *)this),
119                                    reductionInfo,false)
120
121
122
123 CkGroupInitCallback::CkGroupInitCallback(void) {}
124 /*
125 The callback is just used to tell the caller that this group
126 has been constructed.  (Now they can safely call CkLocalBranch)
127 */
128 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
129 {
130   m->call();
131   delete m;
132 }
133
134 /*
135 The callback is just used to tell the caller that this group
136 is constructed and ready to process other calls.
137 */
138 CkGroupReadyCallback::CkGroupReadyCallback(void)
139 {
140   _isReady = 0;
141 }
142 void
143 CkGroupReadyCallback::callBuffered(void)
144 {
145   int n = _msgs.length();
146   for(int i=0;i<n;i++)
147   {
148     CkGroupCallbackMsg *msg = _msgs.deq();
149     msg->call();
150     delete msg;
151   }
152 }
153 void
154 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
155 {
156   if(_isReady) {
157     msg->call();
158     delete msg;
159   } else {
160     _msgs.enq(msg);
161   }
162 }
163
164 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
165         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
166 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
167 {
168         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
169         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
170         b->fn(b->param,m->getSize(),m->getData());
171         delete m;
172 }
173
174 ///////////////// Reduction Manager //////////////////
175 /*
176 One CkReductionMgr runs a non-overlapping set of reductions.
177 It collects messages from all local contributors, then sends
178 the reduced message up the reduction tree to node zero, where
179 they're passed to the user's client function.
180 */
181
182 CkReductionMgr::CkReductionMgr(CProxy_CkArrayReductionMgr groupRednMgr)
183   :
184 #if !GROUP_LEVEL_REDUCTION
185   nodeProxy(groupRednMgr),
186 #endif
187   thisProxy(thisgroup),
188   isDestroying(false)
189
190 #ifdef BINOMIAL_TREE
191   init_BinomialTree();
192 #else
193   init_BinaryTree();
194 #endif
195   redNo=0;
196   completedRedNo = -1;
197   inProgress=false;
198   creating=false;
199   startRequested=false;
200   gcount=lcount=0;
201   nContrib=nRemote=0;
202   is_inactive = false;
203   maxStartRequest=0;
204 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
205         numImmigrantRecObjs = 0;
206         numEmigrantRecObjs = 0;
207 #endif
208   disableNotifyChildrenStart = false;
209
210   barrier_gCount=0;
211   barrier_nSource=0;
212   barrier_nContrib=barrier_nRemote=0;
213
214   DEBR((AA "In reductionMgr constructor at %d \n" AB,this));
215 }
216
217 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
218                                                     , isDestroying(false)
219 {
220   numKids = -1;
221   redNo=0;
222   completedRedNo = -1;
223   inProgress=false;
224   creating=false;
225   startRequested=false;
226   gcount=lcount=0;
227   nContrib=nRemote=0;
228   is_inactive = false;
229   maxStartRequest=0;
230   DEBR((AA "In reductionMgr migratable constructor at %d \n" AB,this));
231
232   barrier_gCount=0;
233   barrier_nSource=0;
234   barrier_nContrib=barrier_nRemote=0;
235
236 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
237   numImmigrantRecObjs = 0;
238   numEmigrantRecObjs = 0;
239 #endif
240
241 }
242
243 CkReductionMgr::~CkReductionMgr()
244 {
245 #if !GROUP_LEVEL_REDUCTION
246   if (CkMyRank() == 0) {
247     delete nodeProxy.ckLocalBranch();
248   }
249 #endif
250 }
251
252 void CkReductionMgr::flushStates()
253 {
254   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
255   redNo=0;
256   completedRedNo = -1;
257   inProgress=false;
258   creating=false;
259   startRequested=false;
260   nContrib=nRemote=0;
261   maxStartRequest=0;
262
263   while (!msgs.isEmpty()) { delete msgs.deq(); }
264   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
265   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
266   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
267
268   adjVec.length()=0;
269
270 #if ! GROUP_LEVEL_REDUCTION
271   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
272 #endif
273 }
274
275 //////////// Reduction Manager Client API /////////////
276
277 //Add the given client function.  Overwrites any previous client.
278 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
279 {
280   DEBR((AA "Setting reductionClient in ReductionMgr groupid %d\n" AB,thisgroup.idx));
281
282   if (CkMyPe()!=0)
283           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
284   storedCallback=*cb;
285 #if ! GROUP_LEVEL_REDUCTION
286   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
287   nodeProxy.ckSetReductionClient(callback);
288 #endif
289 }
290
291 ///////////////////////////// Contributor ////////////////////////
292 //Contributors keep a copy of this structure:
293
294 /*Contributor migration support:
295 */
296 void contributorInfo::pup(PUP::er &p)
297 {
298   p(redNo);
299 }
300
301 ////////////////////// Contributor list maintainance: /////////////////
302 //These just set and clear the "creating" flag to prevent
303 // reductions from finishing early because not all elements
304 // have been created.
305 void CkReductionMgr::creatingContributors(void)
306 {
307   DEBR((AA "Creating contributors...\n" AB));
308   creating=true;
309 }
310 void CkReductionMgr::doneCreatingContributors(void)
311 {
312   DEBR((AA "Done creating contributors...\n" AB));
313   creating=false;
314   checkIsActive();
315   if (startRequested) startReduction(redNo,CkMyPe());
316   finishReduction();
317 }
318
319 //A new contributor will be created
320 void CkReductionMgr::contributorStamped(contributorInfo *ci)
321 {
322   DEBR((AA "Contributor %p stamped\n" AB,ci));
323   //There is another contributor
324   gcount++;
325   if (inProgress)
326   {
327     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
328     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
329   } else
330     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
331 }
332
333 //A new contributor was actually created
334 void CkReductionMgr::contributorCreated(contributorInfo *ci)
335 {
336   DEBR((AA "Contributor %p created in grp %d\n" AB,ci,thisgroup.idx));
337   //We've got another contributor
338   lcount++;
339   //He may not need to contribute to some of our reductions:
340   for (int r=redNo;r<ci->redNo;r++)
341     adj(r).lcount--;//He won't be contributing to r here
342   checkIsActive();
343 }
344
345 /*Don't expect any more contributions from this one.
346 This is rather horrifying because we now have to make
347 sure the global element count accurately reflects all the
348 contributions the element made before it died-- these may stretch
349 far into the future.  The adj() vector is what saves us here.
350 */
351 void CkReductionMgr::contributorDied(contributorInfo *ci)
352 {
353 #if CMK_MEM_CHECKPOINT
354   // ignore from listener if it is during restart from crash
355   if (CkInRestarting()) return;
356 #endif
357
358   if (isDestroying) return;
359
360   DEBR((AA "Contributor %p(%d) died\n" AB,ci,ci->redNo));
361   //We lost a contributor
362   gcount--;
363
364   if (ci->redNo<redNo)
365   {//Must have been migrating during reductions-- root is waiting for his
366   // contribution, which will never come.
367     DEBR((AA "Dying guy %p must have been migrating-- he's at #%d!\n" AB,ci,ci->redNo));
368     for (int r=ci->redNo;r<redNo;r++)
369       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
370   }
371
372   //Add to the global count for all his future messages (wherever they are)
373   int r;
374   for (r=redNo;r<ci->redNo;r++)
375   {//He already contributed to this reduction, but won't show up in global count.
376     DEBR((AA "Dead guy %p left contribution for #%d\n" AB,ci,r));
377     adj(r).gcount++;
378   }
379
380   lcount--;
381   //He's already contributed to several reductions here
382   for (r=redNo;r<ci->redNo;r++)
383     adj(r).lcount++;//He'll be contributing to r here
384
385   // Check whether the death of this contributor made this pe go barren at this
386   // redNo
387   if (ci->redNo <= redNo) {
388     checkIsActive();
389   }
390   finishReduction();
391 }
392
393 //Migrating away (note that global count doesn't change)
394 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
395 {
396   DEBR((AA "Contributor %p(%d) migrating away\n" AB,ci,ci->redNo));
397   lcount--;//We lost a local
398   //He's already contributed to several reductions here
399   for (int r=redNo;r<ci->redNo;r++)
400     adj(r).lcount++;//He'll be contributing to r here
401
402   // Check whether this made this pe go barren at redNo
403   if (ci->redNo <= redNo) {
404     checkIsActive();
405   }
406   finishReduction();
407 }
408
409 //Migrating in (note that global count doesn't change)
410 void CkReductionMgr::contributorArriving(contributorInfo *ci)
411 {
412   DEBR((AA "Contributor %p(%d) migrating in\n" AB,ci,ci->redNo));
413   lcount++;//We gained a local
414 #if CMK_MEM_CHECKPOINT
415   // ignore from listener if it is during restart from crash
416   // because the ci may be old.
417   if (CkInRestarting()) return;
418 #endif
419   //He has already contributed (elsewhere) to several reductions:
420   for (int r=redNo;r<ci->redNo;r++)
421     adj(r).lcount--;//He won't be contributing to r here
422
423   // Check if the arrival of a new contributor makes this PE become active again
424   if (ci->redNo == redNo) {
425     checkIsActive();
426   }
427 }
428
429 //Contribute-- the given msg can contain any data.  The reducerType
430 // field of the message must be valid.
431 // Each contributor must contribute exactly once to the each reduction.
432 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
433 {
434 #if CMK_BIGSIM_CHARM
435   _TRACE_BG_TLINE_END(&(m->log));
436 #endif
437   DEBR((AA "Contributor %p contributed for %d in grp %d ismigratable %d \n" AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
438   //m->ci=ci;
439   m->redNo=ci->redNo++;
440   m->sourceFlag=-1;//A single contribution
441   m->gcount=0;
442
443 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
444
445         // if object is an immigrant recovery object, we send the contribution to the source PE
446         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
447                 
448                 // turning on the message-logging bypass flag
449                 envelope *env = UsrToEnv(m);
450                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
451         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
452                 return;
453         }
454
455     Chare *oldObj = CpvAccess(_currentObj);
456     CpvAccess(_currentObj) = this;
457
458         // adding contribution
459         addContribution(m);
460
461     CpvAccess(_currentObj) = oldObj;
462 #else
463   addContribution(m);
464 #endif
465 }
466
467 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
468 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
469         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
470         
471         // turning off bypassing flag
472         envelope *env = UsrToEnv(m);
473         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
474
475         // adding contribution
476     addContribution(m);
477 }
478 #else
479 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
480 #endif
481
482 void CkReductionMgr::checkIsActive() {
483 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
484   return;
485 #endif
486
487   // Check the number of kids in the inactivelist before or at this redNo
488   std::map<int, int>::iterator it;
489   int c_inactive = 0;
490   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
491     if (it->second <= redNo) {
492       DEBR((AA "Kid %d is inactive from redNo %d\n" AB, it->first, it->second));
493       c_inactive++;
494     }
495   }
496   DEBR((AA "CheckIsActive redNo %d, kids %d(inactive %d), lcount %d\n" AB, redNo,
497     numKids, c_inactive, lcount));
498
499   if(numKids == c_inactive && lcount == 0) {
500     if(!is_inactive) {
501       informParentInactive();
502     }
503     is_inactive = true;
504   } else if(is_inactive) {
505     is_inactive = false;
506   }
507 }
508
509 /*
510 * Add to the child to the inactiveList
511 */
512 void CkReductionMgr::checkAndAddToInactiveList(int id, int red_no) {
513   // If there is already a reduction in progress corresponding to red_no, then
514   // the time to call ReductionStarting is past so explicitly invoke
515   // ReductionStarting on the kid
516   if (inProgress && redNo == red_no) {
517     thisProxy[id].ReductionStarting(new CkReductionNumberMsg(red_no));
518   }
519
520   std::map<int, int>::iterator it;
521   it = inactiveList.find(id);
522   if (it == inactiveList.end()) {
523     inactiveList.insert(std::pair<int, int>(id, red_no));
524   } else {
525     it->second = red_no;
526   }
527   // If the red_no is redNo, then check whether this makes this PE inactive
528   if (redNo == red_no) {
529     checkIsActive();
530   }
531 }
532
533 /*
534 * This is invoked when a real contribution is received from the kid for a
535 * particular red_no
536 */
537 void CkReductionMgr::checkAndRemoveFromInactiveList(int id, int red_no) {
538   std::map<int, int>::iterator it;
539   it = inactiveList.find(id);
540   if (it == inactiveList.end()) {
541     return;
542   }
543   if (it->second <= red_no) {
544     inactiveList.erase(it);
545     DEBR((AA "Parent removing kid %d from inactivelist red_no %d\n" AB,
546       id, red_no));
547   }
548 }
549
550 // Inform parent that I am inactive
551 void CkReductionMgr::informParentInactive() {
552   if (hasParent()) {
553     DEBR((AA "Inform parent to add to inactivelist red_no %d\n" AB, redNo));
554     thisProxy[treeParent()].AddToInactiveList(
555       new CkReductionInactiveMsg(CkMyPe(), redNo));
556   }
557 }
558
559 /*
560 *  Send ReductionStarting message to all the inactive kids which are inactive
561 *  for the specified red_no
562 */
563 void CkReductionMgr::sendReductionStartingToKids(int red_no) {
564 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
565   for (int k=0;k<treeKids();k++)
566   {
567     DEBR((AA "Asking child PE %d to start #%d\n" AB,firstKid()+k,redNo));
568     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
569   }
570 #else
571   std::map<int, int>::iterator it;
572   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
573     if (it->second <= red_no) {
574       DEBR((AA "Parent sending reductionstarting to inactive kid %d\n" AB,
575         it->first));
576       thisProxy[it->first].ReductionStarting(new CkReductionNumberMsg(red_no));
577     }
578   }
579 #endif
580 }
581
582
583 //////////// Reduction Manager Remote Entry Points /////////////
584 //Sent down the reduction tree (used by barren PEs)
585 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
586 {
587  if(CkMyPe()==0){
588         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
589         //delete m;
590         //return;
591  }
592  DEBR((AA " Group ReductionStarting called for redNo %d\n" AB,m->num));
593  int srcPE = (UsrToEnv(m))->getSrcPe();
594   if (isPresent(m->num) && !inProgress)
595   {
596     DEBR((AA "Starting reduction #%d at parent's request\n" AB,m->num));
597     startReduction(m->num,srcPE);
598     finishReduction();
599   } else if (isFuture(m->num)){
600 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
601           DEBR((AA "Asked to startfuture Reduction %d \n" AB,m->num));
602           if(maxStartRequest < m->num){
603                   maxStartRequest = m->num;
604           }
605  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
606           
607     }
608   else //is Past
609     DEBR((AA "Ignoring parent's late request to start #%d\n" AB,m->num));
610   delete m;
611 }
612
613 //Sent to root of reduction tree with reduction contribution
614 // of migrants that missed the main reduction.
615 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
616 {
617 #if GROUP_LEVEL_REDUCTION
618 #if CMK_BIGSIM_CHARM
619   _TRACE_BG_TLINE_END(&(m->log));
620 #endif
621   addContribution(m);
622 #else
623   m->secondaryCallback = m->callback;
624   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
625   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
626   nodeMgr->LateMigrantMsg(m);
627 /*      int len = finalMsgs.length();
628         finalMsgs.enq(m);
629 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
630         endArrayReduction();*/
631 #endif
632 }
633
634 //A late migrating contributor will never contribute to this reduction
635 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
636 {
637   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
638   DEBR((AA "Migrant died before contributing to #%d\n" AB,m->num));
639  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
640   adj(m->num).gcount--;//He won't be contributing to this one.
641   finishReduction();
642   delete m;
643 }
644
645 //////////// Reduction Manager State /////////////
646 void CkReductionMgr::startReduction(int number,int srcPE)
647 {
648   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
649   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
650   if (inProgress){
651         DEBR((AA "This reduction is already in progress\n" AB));
652         return;//This reduction already started
653   }
654   if (creating) //Don't start yet-- we're creating elements
655   {
656     DEBR((AA "Postponing start request #%d until we're done creating\n" AB,redNo));
657     startRequested=true;
658     return;
659   }
660
661 //If none of these cases, we need to start the reduction--
662   DEBR((AA "Starting reduction #%d  %d %d \n" AB,redNo,completedRedNo,number));
663   inProgress=true;
664  
665
666         /*
667                 FAULT_EVAC
668         */
669   if(!CmiNodeAlive(CkMyPe())){
670         return;
671   }
672
673   if(disableNotifyChildrenStart) return;
674  
675   //Sent start requests to our kids (in case they don't already know)
676 #if GROUP_LEVEL_REDUCTION
677   sendReductionStartingToKids(redNo);
678   //for (int k=0;k<treeKids();k++)
679   //{
680   //  DEBR((AA "Asking child PE %d to start #%d\n" AB,firstKid()+k,redNo));
681   //  thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
682   //}
683 #else
684   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
685 #endif
686         
687         /*
688   int temp;
689   //making it a broadcast done only by PE 0
690   if(!hasParent()){
691                 temp = completedRedNo+1;
692                 for(int i=temp;i<=number;i++){
693                         for(int j=0;j<CkNumPes();j++){
694                                 if(j != CkMyPe() && j != srcPE){
695                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
696                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
697                                         }
698                                 }
699                         }
700                 }       
701         }       else{
702                 temp = number;
703         }*/
704 /*  if(!hasParent()){
705                 temp = completedRedNo+1;
706         }       else{
707                 temp = number;
708         }
709         for(int i=temp;i<=number;i++){
710         //      DEBR((AA "Asking all child PEs to start #%d \n" AB,i));
711                 if(hasParent()){
712           // kick-start your parent too ...
713                         if(treeParent() != srcPE){
714                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
715 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
716     CpvAccess(_currentObj) = oldObj;
717 #endif
718                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
719                                 }       
720                         }       
721                 }
722           for (int k=0;k<treeKids();k++)
723           {
724                         if(firstKid()+k != srcPE){
725                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
726                             DEBR((AA "Asking child PE %d to start #%d\n" AB,kids[k],redNo));
727                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
728                                 }       
729                         }       
730         }
731         }
732         */
733 }       
734
735 /*Handle a message from one element for the reduction*/
736 void CkReductionMgr::addContribution(CkReductionMsg *m)
737 {
738   if (isPast(m->redNo))
739   {
740 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
741         CmiAbort("this version should not have late migrations");
742 #else
743         //We've moved on-- forward late contribution straight to root
744     DEBR((AA "Migrant gives late contribution for #%d!\n" AB,m->redNo));
745         // if (!hasParent()) //Root moved on too soon-- should never happen
746         //   CkAbort("Late reduction contribution received at root!\n");
747     thisProxy[0].LateMigrantMsg(m);
748 #endif
749   }
750   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
751     DEBR((AA "Contributor gives early contribution-- for #%d\n" AB,m->redNo));
752     futureMsgs.enq(m);
753   } else {// An ordinary contribution
754     DEBR((AA "Recv'd local contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
755    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
756     startReduction(m->redNo,CkMyPe());
757     msgs.enq(m);
758     nContrib++;
759     finishReduction();
760   }
761 }
762
763 /**function checks if it has got all contributions that it is supposed to
764 get at this processor. If it is done it sends the reduced result to the local
765 nodegroup */
766 void CkReductionMgr::finishReduction(void)
767 {
768   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
769   DEBR((AA "in finishReduction (inProgress=%d) in grp %d\n" AB,inProgress,thisgroup.idx));
770   if ((!inProgress) || creating){
771         DEBR((AA "Either not in Progress or creating\n" AB));
772         return;
773   }
774
775   bool partialReduction = false;
776
777   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
778 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
779         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
780           if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
781             partialReduction = true;
782           }
783           else {
784             DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
785             return; //Need more local messages
786           }
787         }
788 #else
789   if (nContrib<(lcount+adj(redNo).lcount)){
790          if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
791            partialReduction = true;
792          }
793          else {
794            DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
795            return; //Need more local messages
796          }
797   }
798 #endif
799
800 #if GROUP_LEVEL_REDUCTION
801   if (nRemote<treeKids()) {
802     if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
803       partialReduction = true;
804     }
805     else {
806       DEBR((AA "Need more remote messages %d %d\n" AB,nRemote,treeKids()));
807       return; //Need more remote messages
808     }
809   }
810         
811 #endif
812  
813   DEBR((AA "Reducing data... %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
814   CkReductionMsg *result=reduceMessages();
815   result->redNo=redNo;
816
817   if (partialReduction) {
818     msgs.enq(result);
819     return;
820   }
821
822 #if GROUP_LEVEL_REDUCTION
823   if (hasParent())
824   {//Pass data up tree to parent
825     DEBR((AA "Passing reduced data up to parent node %d.\n" AB,treeParent()));
826     DEBR((AA "Message gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
827 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
828     result->gcount+=gcount+adj(redNo).gcount;
829 #else
830     result->gcount+=gcount+adj(redNo).gcount;
831 #endif
832     thisProxy[treeParent()].RecvMsg(result);
833   }
834   else 
835   {//We are root-- pass data to client
836     DEBR((AA "Final gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
837     int totalElements=result->gcount+gcount+adj(redNo).gcount;
838     if (totalElements>result->nSources()) 
839     {
840       DEBR((AA "Only got %d of %d contributions (c'mon, migrators!)\n" AB,result->nSources(),totalElements));
841       msgs.enq(result);
842       return; // Wait for migrants to contribute
843     } else if (totalElements<result->nSources()) {
844       DEBR((AA "Got %d of %d contributions\n" AB,result->nSources(),totalElements));
845 #if !defined(_FAULT_CAUSAL_)
846       CkAbort("ERROR! Too many contributions at root!\n");
847 #endif
848     }
849     DEBR((AA "Passing result to client function\n" AB));
850     CkSetRefNum(result, result->getUserFlag());
851     if (!result->callback.isInvalid())
852             result->callback.send(result);
853     else if (!storedCallback.isInvalid())
854             storedCallback.send(result);
855     else
856             CkAbort("No reduction client!\n"
857                     "You must register a client with either SetReductionClient or during contribute.\n");
858   }
859
860 #else
861   result->gcount+=gcount+adj(redNo).gcount;
862
863   result->secondaryCallback = result->callback;
864   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
865         DEBR((AA "Reduced mesg gcount %d localgcount %d\n" AB,result->gcount,gcount));
866
867   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
868
869  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
870   
871   // Find our node reduction manager, and pass reduction to him:
872   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
873   nodeMgr->contributeArrayReduction(result);
874 #endif
875
876   //House Keeping Operations will have to check later what needs to be changed
877   redNo++;
878   // Check after every reduction contribution whether this makes the PE inactive
879   // starting this redNo
880   checkIsActive();
881   //Shift the count adjustment vector down one slot (to match new redNo)
882   int i;
883 #if !GROUP_LEVEL_REDUCTION
884     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
885   if(CkMyPe()!=0)
886 #endif
887   {
888         completedRedNo++;
889         for (i=1;i<(int)(adjVec.length());i++){
890            adjVec[i-1]=adjVec[i];
891         }
892         adjVec.length()--;  
893   }
894
895   inProgress=false;
896   startRequested=false;
897   nRemote=nContrib=0;
898
899   //Look through the future queue for messages we can now handle
900   int n=futureMsgs.length();
901   for (i=0;i<n;i++)
902   {
903     CkReductionMsg *m=futureMsgs.deq();
904     if (m!=NULL) //One of these addContributions may have finished us.
905       addContribution(m);//<- if *still* early, puts it back in the queue
906   }
907 #if GROUP_LEVEL_REDUCTION
908   n=futureRemoteMsgs.length();
909   for (i=0;i<n;i++)
910   {
911     CkReductionMsg *m=futureRemoteMsgs.deq();
912     if (m!=NULL) {
913       RecvMsg(m);//<- if *still* early, puts it back in the queue
914     }
915   }
916 #endif
917
918   if(maxStartRequest >= redNo){
919           startReduction(redNo,CkMyPe());
920           finishReduction();
921   }
922  
923
924 }
925
926 //Sent up the reduction tree with reduced data
927   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
928 {
929 #if GROUP_LEVEL_REDUCTION
930 #if CMK_BIGSIM_CHARM
931   _TRACE_BG_TLINE_END(&m->log);
932 #endif
933   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
934     DEBR((AA "Recv'd remote contribution %d for #%d\n" AB,nRemote,m->redNo));
935     // If the remote contribution is real, then check whether we can remove the
936     // child from the inactiveList if it is in the list
937     if (m->nSources() > 0) {
938       checkAndRemoveFromInactiveList(m->fromPE, m->redNo);
939     }
940     startReduction(m->redNo, CkMyPe());
941     msgs.enq(m);
942     nRemote++;
943     finishReduction();
944   }
945   else if (isFuture(m->redNo)) {
946     DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
947     futureRemoteMsgs.enq(m);
948   } 
949   else CkAbort("Recv'd late remote contribution!\n");
950 #endif
951 }
952
953 void CkReductionMgr::AddToInactiveList(CkReductionInactiveMsg *m) {
954   int id = m->id;
955   int last_redno = m->redno;
956   delete m;
957
958   DEBR((AA "Parent add kid %d to inactive list from redno %d\n" AB,
959     id, last_redno));
960   checkAndAddToInactiveList(id, last_redno);
961
962   finishReduction();
963   if (last_redno <= redNo) {
964     checkIsActive();
965   }
966 }
967
968 //////////// Reduction Manager Utilities /////////////
969
970 //Return the countAdjustment struct for the given redNo:
971 countAdjustment &CkReductionMgr::adj(int number)
972 {
973   number-=completedRedNo;
974   number--;
975   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
976   //Pad the adjustment vector with zeros until it's at least number long
977   while ((int)(adjVec.length())<=number)
978     adjVec.push_back(countAdjustment());
979   return adjVec[number];
980 }
981
982 //Combine (& free) the current message vector msgs.
983 CkReductionMsg *CkReductionMgr::reduceMessages(void)
984 {
985 #if CMK_BIGSIM_CHARM
986   _TRACE_BG_END_EXECUTE(1);
987   void* _bgParentLog = NULL;
988   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
989 #endif
990   CkReductionMsg *ret=NULL;
991
992   //Look through the vector for a valid reducer, swapping out placeholder messages
993   CkReduction::reducerType r=CkReduction::invalid;
994   int msgs_gcount=0;//Reduced gcount
995   int msgs_nSources=0;//Reduced nSources
996   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
997   CkCallback msgs_callback;
998   int i;
999   int nMsgs=0;
1000   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
1001   CkReductionMsg *m;
1002   bool isMigratableContributor;
1003
1004   // Copy message queue into msgArr, skipping placeholders:
1005   while (NULL!=(m=msgs.deq()))
1006   {
1007     msgs_gcount+=m->gcount;
1008     if (m->sourceFlag!=0)
1009     { //This is a real message from an element, not just a placeholder
1010       msgs_nSources+=m->nSources();
1011 #if CMK_BIGSIM_CHARM
1012       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
1013 #endif
1014
1015       // for "random" reducer type, only need to accept one message
1016       if (nMsgs == 0 || m->reducer != CkReduction::random) {
1017         msgArr[nMsgs++]=m;
1018         r=m->reducer;
1019         if (!m->callback.isInvalid()){
1020 #if CMK_ERROR_CHECKING
1021           if(nMsgs > 1 && !(msgs_callback == m->callback))
1022             CkAbort("mis-matched client callbacks in reduction messages\n");
1023 #endif
1024           msgs_callback=m->callback;
1025         }
1026         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
1027           msgs_userFlag=m->userFlag;
1028         isMigratableContributor=m->isMigratableContributor();
1029       }
1030       else {
1031 #if CMK_ERROR_CHECKING
1032         if(!(msgs_callback == m->callback))
1033           CkAbort("mis-matched client callbacks in reduction messages\n");
1034 #endif  
1035         delete m;
1036       }
1037     }
1038     else
1039     { //This is just a placeholder message-- forget it
1040       delete m;
1041     }
1042   }
1043
1044   if (nMsgs==0||r==CkReduction::invalid)
1045   //No valid reducer in the whole vector
1046     ret=CkReductionMsg::buildNew(0,NULL);
1047   else
1048   {//Use the reducer to reduce the messages
1049                 //if there is only one msg to be reduced just return that message
1050     if(nMsgs == 1 &&
1051        msgArr[0]->reducer != CkReduction::set &&
1052        msgArr[0]->reducer != CkReduction::tuple) {
1053       ret = msgArr[0];
1054     }else{
1055       if (msgArr[0]->reducer == CkReduction::random) {
1056         // nMsgs > 1 indicates that reduction type is not random
1057         // this means any data with reducer type random was submitted
1058         // only so that counts would agree, and can be removed
1059         delete msgArr[0];
1060         msgArr[0] = msgArr[nMsgs - 1];
1061         nMsgs--;
1062       }
1063       CkReduction::reducerFn f=CkReduction::reducerTable[r].fn;
1064       ret=(*f)(nMsgs,msgArr);
1065     }
1066     ret->reducer=r;
1067   }
1068
1069
1070
1071 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1072
1073 #if CRITICAL_PATH_DEBUG > 3
1074   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
1075 #endif
1076
1077   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1078   path.updateMax(UsrToEnv(ret));
1079   // Combine the critical paths from all the reduction messages
1080   for (i=0;i<nMsgs;i++){
1081     if (msgArr[i]!=ret){
1082       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
1083       path.updateMax(UsrToEnv(msgArr[i]));
1084     }
1085   }
1086   
1087
1088 #if CRITICAL_PATH_DEBUG > 3
1089   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1090 #endif
1091   
1092   PathHistoryTableEntry tableEntry(path);
1093   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
1094   
1095 #endif
1096
1097         //Go back through the vector, deleting old messages
1098   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
1099   delete [] msgArr;
1100
1101   //Set the message counts
1102   ret->redNo=redNo;
1103   ret->gcount=msgs_gcount;
1104   ret->userFlag=msgs_userFlag;
1105   ret->callback=msgs_callback;
1106   ret->sourceFlag=msgs_nSources;
1107         ret->setMigratableContributor(isMigratableContributor);
1108   ret->fromPE = CkMyPe();
1109   DEBR((AA "Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
1110
1111   return ret;
1112 }
1113
1114
1115 //Checkpointing utilities
1116 //pack-unpack method for CkReductionMsg
1117 //if packing pack the message and then unpack and return it
1118 //if unpacking allocate memory for it read it off disk and then unapck
1119 //and return it
1120 void CkReductionMgr::pup(PUP::er &p)
1121 {
1122 //We do not store the client function pointer or the client function parameter,
1123 //it is the responsibility of the programmer to correctly restore these
1124   CkGroupInitCallback::pup(p);
1125   p(redNo);
1126   p(completedRedNo);
1127   p(inProgress); p(creating); p(startRequested);
1128   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
1129   p|msgs;
1130   p|futureMsgs;
1131   p|futureRemoteMsgs;
1132   p|finalMsgs;
1133   p|adjVec;
1134 #if !GROUP_LEVEL_REDUCTION
1135   p|nodeProxy;
1136 #endif
1137   p|storedCallback;
1138     // handle CkReductionClientBundle
1139   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
1140   {
1141     CkReductionClientBundle *bd;
1142     if (p.isUnpacking()) 
1143       bd = new CkReductionClientBundle;
1144     else
1145       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1146     p|*bd;
1147     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1148   }
1149
1150   // subtle --- Gengbin
1151   // Group : CkReductionMgr
1152   // CkArray: CkReductionMgr
1153   // lcount/gcount in Group is set in Group constructor
1154   // lcount/gcount in CkArray is not, it is set when array elements are created
1155   // we can not pup because inserting array elems will add the counters again
1156 //  p|lcount;
1157 //  p|gcount;
1158 //  p|lcount;
1159 //  //  p|gcount;
1160 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1161   if(p.isUnpacking()){
1162     thisProxy = thisgroup;
1163     maxStartRequest=0;
1164 #ifdef BINOMIAL_TREE
1165     init_BinomialTree();
1166 #else
1167     init_BinaryTree();
1168 #endif
1169     is_inactive = false;
1170     checkIsActive();
1171   }
1172
1173   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1174 }
1175
1176
1177 //Callback for doing Reduction through NodeGroups added by Sayantan
1178
1179 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1180         finalMsgs.enq(m);
1181         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1182         adj(m->redNo).mainRecvd = 1;
1183         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1184         endArrayReduction();
1185 }
1186
1187 void CkReductionMgr :: endArrayReduction(){
1188         CkReductionMsg *ret=NULL;
1189         int nMsgs=finalMsgs.length();
1190         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1191         //Look through the vector for a valid reducer, swapping out placeholder messages
1192         //CkPrintf("Length of Final Message %d \n",nMsgs);
1193         CkReduction::reducerType r=CkReduction::invalid;
1194         int msgs_gcount=0;//Reduced gcount
1195         int msgs_nSources=0;//Reduced nSources
1196         CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
1197         CkCallback msgs_callback;
1198         CkCallback msgs_secondaryCallback;
1199         CkVec<CkReductionMsg *> tempMsgs;
1200         int i;
1201         int numMsgs = 0;
1202         for (i=0;i<nMsgs;i++)
1203         {
1204           CkReductionMsg *m=finalMsgs.deq();
1205           if(m->redNo == completedRedNo +1){
1206             msgs_gcount+=m->gcount;
1207             if (m->sourceFlag!=0)
1208             { //This is a real message from an element, not just a placeholder
1209               msgs_nSources+=m->nSources();
1210
1211               // for "random" reducer type, only need to accept one message
1212               if (tempMsgs.length() == 0 || m->reducer != CkReduction::random) {
1213                 r=m->reducer;
1214                 if (!m->callback.isInvalid())
1215                   msgs_callback=m->callback;
1216                 if(!m->secondaryCallback.isInvalid())
1217                   msgs_secondaryCallback = m->secondaryCallback;
1218                 if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
1219                   msgs_userFlag=m->userFlag;
1220                 tempMsgs.push_back(m);
1221               }
1222               else {
1223                 delete m;
1224               }
1225             }
1226             else {
1227               delete m;
1228             }
1229           }else{
1230             finalMsgs.enq(m);
1231           }
1232         }
1233         numMsgs = tempMsgs.length();
1234
1235         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1236
1237         if(numMsgs == 0){
1238                 return;
1239         }
1240         if(adj(completedRedNo+1).mainRecvd == 0){
1241                 for(i=0;i<numMsgs;i++){
1242                         finalMsgs.enq(tempMsgs[i]);
1243                 }
1244                 return;
1245         }
1246
1247 /*
1248         NOT NEEDED ANYMORE DONE at nodegroup level
1249         if(msgs_gcount  > msgs_nSources){
1250                 for(i=0;i<numMsgs;i++){
1251                         finalMsgs.enq(tempMsgs[i]);
1252                 }
1253                 return;
1254         }*/
1255
1256         if (numMsgs==0||r==CkReduction::invalid)
1257                 //No valid reducer in the whole vector
1258                 ret=CkReductionMsg::buildNew(0,NULL);
1259         else{//Use the reducer to reduce the messages
1260                CkReduction::reducerFn f=CkReduction::reducerTable[r].fn;
1261                 // has to be corrected elements from above need to be put into a temporary vector
1262                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1263
1264                 if (numMsgs > 1 && msgArr[0]->reducer == CkReduction::random) {
1265                   // nMsgs > 1 indicates that reduction type is not "random"
1266                   // this means any data with reducer type random was submitted
1267                   // only so that counts would agree, and can be removed
1268                   delete msgArr[0];
1269                   msgArr[0] = msgArr[numMsgs - 1];
1270                   numMsgs--;
1271                 }
1272
1273                 ret=(*f)(numMsgs,msgArr);
1274                 ret->reducer=r;
1275
1276         }
1277
1278         
1279 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1280
1281 #if CRITICAL_PATH_DEBUG > 3
1282         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1283 #endif
1284
1285         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1286         path.updateMax(UsrToEnv(ret));
1287         // Combine the critical paths from all the reduction messages into the header for the new result
1288         for (i=0;i<numMsgs;i++){
1289           if (tempMsgs[i]!=ret){
1290             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1291             path.updateMax(UsrToEnv(tempMsgs[i]));
1292           } else {
1293             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1294           }
1295         }
1296         // Also consider the path which got us into this entry method
1297
1298 #if CRITICAL_PATH_DEBUG > 3
1299         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1300 #endif
1301
1302 #endif
1303   
1304
1305
1306
1307
1308         for(i = 0;i<numMsgs;i++){
1309                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1310         }
1311
1312         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1313         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1314
1315         ret->redNo=completedRedNo+1;
1316         ret->gcount=msgs_gcount;
1317         ret->userFlag=msgs_userFlag;
1318         ret->callback=msgs_callback;
1319         ret->secondaryCallback = msgs_secondaryCallback;
1320         ret->sourceFlag=msgs_nSources;
1321
1322         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1323
1324         CkSetRefNum(ret, ret->getUserFlag());
1325         if (!ret->secondaryCallback.isInvalid())
1326             ret->secondaryCallback.send(ret);
1327     else if (!storedCallback.isInvalid())
1328             storedCallback.send(ret);
1329     else{
1330       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1331             CkAbort("No reduction client!\n"
1332                     "You must register a client with either SetReductionClient or during contribute.\n");
1333     }
1334         completedRedNo++;
1335
1336         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1337
1338         for (i=1;i<(int)(adjVec.length());i++)
1339                 adjVec[i-1]=adjVec[i];
1340         adjVec.length()--;
1341         endArrayReduction();
1342 }
1343
1344 void CkReductionMgr::init_BinaryTree(){
1345         parent = (CkMyPe()-1)/TREE_WID;
1346         int firstkid = CkMyPe()*TREE_WID+1;
1347         numKids=CkNumPes()-firstkid;
1348         if (numKids>TREE_WID) numKids=TREE_WID;
1349         if (numKids<0) numKids=0;
1350
1351         for(int i=0;i<numKids;i++){
1352                 kids.push_back(firstkid+i);
1353                 newKids.push_back(firstkid+i);
1354         }
1355 }
1356
1357 void CkReductionMgr::init_BinomialTree(){
1358         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1359         /*upperSize = (unsigned )pow((double)2,depth);*/
1360         upperSize = (unsigned) 1 << depth;
1361         label = upperSize-CkMyPe()-1;
1362         int p=label;
1363         int count=0;
1364         while( p > 0){
1365                 if(p % 2 == 0)
1366                         break;
1367                 else{
1368                         p = p/2;
1369                         count++;
1370                 }
1371         }
1372         /*parent = label + rint(pow((double)2,count));*/
1373         parent = label + (1<<count);
1374         parent = upperSize -1 -parent;
1375         int temp;
1376         if(count != 0){
1377                 numKids = 0;
1378                 for(int i=0;i<count;i++){
1379                         /*temp = label - rint(pow((double)2,i));*/
1380                         temp = label - (1<<i);
1381                         temp = upperSize-1-temp;
1382                         if(temp <= CkNumPes()-1){
1383                                 kids.push_back(temp);
1384                                 numKids++;
1385                         }
1386                 }
1387         }else{
1388                 numKids = 0;
1389         //      kids = NULL;
1390         }
1391 }
1392
1393
1394 int CkReductionMgr::treeRoot(void)
1395 {
1396   return 0;
1397 }
1398 bool CkReductionMgr::hasParent(void) //Root Node
1399 {
1400   return (bool)(CkMyPe()!=treeRoot());
1401 }
1402 int CkReductionMgr::treeParent(void) //My parent Node
1403 {
1404   return parent;
1405 }
1406
1407 int CkReductionMgr::firstKid(void) //My first child Node
1408 {
1409   return CkMyPe()*TREE_WID+1;
1410 }
1411 int CkReductionMgr::treeKids(void)//Number of children in tree
1412 {
1413   return numKids;
1414 }
1415
1416
1417 //                simple "stateless" barrier
1418 //                no state checkpointed, for FT purpose
1419 //                require no overlapping barriers
1420 void CkReductionMgr::barrier(CkReductionMsg *m)
1421 {
1422   barrier_nContrib++;
1423   barrier_nSource++;
1424   if(!m->callback.isInvalid())
1425       barrier_storedCallback=m->callback;
1426   finishBarrier();
1427   delete m;
1428 }
1429
1430 void CkReductionMgr::finishBarrier(void)
1431 {
1432        if(barrier_nContrib<lcount){//need more local message
1433                DEBR(("[%d] current contrib:%d,lcount:%d\n",CkMyPe(),barrier_nContrib,lcount));
1434                return;
1435        }
1436        if(barrier_nRemote<treeKids()){//need more remote messages
1437                DEBR(("[%d] current remote:%d,kids:%d\n",CkMyPe(),barrier_nRemote,treeKids()));
1438                return;
1439        }
1440        CkReductionMsg * result = CkReductionMsg::buildNew(0,NULL);
1441        result->callback=barrier_storedCallback;
1442        result->sourceFlag=barrier_nSource;
1443        result->gcount=barrier_gCount;
1444        if(hasParent())
1445        {
1446                DEBR(("[%d]send to parent:%d\n",CkMyPe(),treeParent()));
1447                result->gcount+=gcount;
1448                thisProxy[treeParent()].Barrier_RecvMsg(result);
1449        }
1450        else{
1451                int totalElements=result->gcount+gcount;
1452                DEBR(("[%d]root,totalElements:%d,source:%d\n",CkMyPe(),totalElements,result->nSources()));
1453                if(totalElements<result->nSources()){
1454                        CkAbort("ERROR! Too many contributions at barrier root\n");
1455                }
1456                CkSetRefNum(result,result->getUserFlag());
1457                if(!result->callback.isInvalid())
1458                        result->callback.send(result);
1459                else if(!barrier_storedCallback.isInvalid())
1460                                barrier_storedCallback.send(result);
1461                else 
1462                        CkAbort("No reduction client!\n");
1463        }
1464        barrier_nRemote=barrier_nContrib=0;
1465        barrier_gCount=0;
1466        barrier_nSource=0;
1467 }
1468
1469 void CkReductionMgr::Barrier_RecvMsg(CkReductionMsg *m)
1470 {
1471        barrier_nRemote++;
1472        barrier_gCount+=m->gcount;
1473        barrier_nSource+=m->nSources();
1474        if(!m->callback.isInvalid())
1475                barrier_storedCallback=m->callback;
1476        finishBarrier();
1477 }
1478
1479
1480
1481 /////////////////////////////////////////////////////////////////////////
1482
1483 ////////////////////////////////
1484 //CkReductionMsg support
1485
1486 //ReductionMessage default private constructor-- does nothing
1487 CkReductionMsg::CkReductionMsg(){}
1488 CkReductionMsg::~CkReductionMsg(){}
1489
1490 //This define gives the distance from the start of the CkReductionMsg
1491 // object to the start of the user data area (just below last object field)
1492 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1493
1494 //"Constructor"-- builds and returns a new CkReductionMsg.
1495 //  the "data" array you specify will be copied into this object.
1496 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1497     CkReduction::reducerType reducer, CkReductionMsg *buf)
1498 {
1499   int len[1];
1500   len[0]=NdataSize;
1501   CkReductionMsg *ret = buf ? buf : new(len,0) CkReductionMsg();
1502
1503   ret->dataSize=NdataSize;
1504   if (srcData!=NULL && !buf)
1505     memcpy(ret->data,srcData,NdataSize);
1506   ret->userFlag=(CMK_REFNUM_TYPE)-1;
1507   ret->reducer=reducer;
1508   //ret->ci=NULL;
1509   ret->sourceFlag=-1000;
1510   ret->gcount=0;
1511   ret->migratableContributor = true;
1512 #if CMK_BIGSIM_CHARM
1513   ret->log = NULL;
1514 #endif
1515   return ret;
1516 }
1517
1518 // Charm kernel message runtime support:
1519 void *
1520 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1521 {
1522   int totalsize=ARM_DATASTART+(*sz);
1523   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1524   CkReductionMsg *ret = (CkReductionMsg *)
1525     CkAllocMsg(msgnum,totalsize,priobits);
1526   ret->data=(void *)(&ret->dataStorage);
1527   return (void *) ret;
1528 }
1529
1530 void *
1531 CkReductionMsg::pack(CkReductionMsg* in)
1532 {
1533   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1534   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1535   in->data = NULL;
1536   return (void*) in;
1537 }
1538
1539 CkReductionMsg* CkReductionMsg::unpack(void *in)
1540 {
1541   CkReductionMsg *ret = (CkReductionMsg *)in;
1542   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1543   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1544   ret->data=(void *)(&ret->dataStorage);
1545   return ret;
1546 }
1547
1548
1549 /////////////////////////////////////////////////////////////////////////////////////
1550 ///////////////// Builtin Reducer Functions //////////////
1551 /* A simple reducer, like sum_int, looks like this:
1552 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1553 {
1554   int i,ret=0;
1555   for (i=0;i<nMsg;i++)
1556     ret+=*(int *)(msg[i]->getData());
1557   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1558 }
1559
1560 To keep the code small and easy to change, the implementations below
1561 are built with preprocessor macros.
1562 */
1563
1564 //////////////// simple reducers ///////////////////
1565 /*A define used to quickly and tersely construct simple reductions.
1566 The basic idea is to use the first message's data array as
1567 (pre-initialized!) scratch space for folding in the other messages.
1568  */
1569
1570 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1571 {
1572         CkAbort("Called the invalid reducer type 0.  This probably\n"
1573                 "means you forgot to initialize your custom reducer index.\n");
1574         return NULL;
1575 }
1576
1577 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1578 {
1579   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1580 }
1581
1582 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1583 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1584 {\
1585   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1586   int m,i;\
1587   int nElem=msg[0]->getLength()/sizeof(dataType);\
1588   dataType *ret=(dataType *)(msg[0]->getData());\
1589   for (m=1;m<nMsg;m++)\
1590   {\
1591     dataType *value=(dataType *)(msg[m]->getData());\
1592     for (i=0;i<nElem;i++)\
1593     {\
1594       RED_DEB(("|\tmsg%d (from %d) [%d]=" typeStr "\n",m,msg[m]->sourceFlag,i,value[i]));\
1595       loop\
1596     }\
1597   }\
1598   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1599   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1600 }
1601
1602 //Use this macro for reductions that have the same type for all inputs
1603 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1604   SIMPLE_REDUCTION(nameBase##_char,char,"%c",loop) \
1605   SIMPLE_REDUCTION(nameBase##_short,short,"%h",loop) \
1606   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1607   SIMPLE_REDUCTION(nameBase##_long,long,"%ld",loop) \
1608   SIMPLE_REDUCTION(nameBase##_long_long,long long,"%lld",loop) \
1609   SIMPLE_REDUCTION(nameBase##_uchar,unsigned char,"%c",loop) \
1610   SIMPLE_REDUCTION(nameBase##_ushort,unsigned short,"%hu",loop) \
1611   SIMPLE_REDUCTION(nameBase##_uint,unsigned int,"%u",loop) \
1612   SIMPLE_REDUCTION(nameBase##_ulong,unsigned long,"%lu",loop) \
1613   SIMPLE_REDUCTION(nameBase##_ulong_long,unsigned long long,"%llu",loop) \
1614   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1615   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1616
1617 //Compute the sum the numbers passed by each element.
1618 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1619
1620 //Compute the product of the numbers passed by each element.
1621 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1622
1623 //Compute the largest number passed by any element.
1624 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1625
1626 //Compute the smallest integer passed by any element.
1627 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1628
1629
1630 //Compute the logical AND of the integers passed by each element.
1631 // The resulting integer will be zero if any source integer is zero; else 1.
1632 SIMPLE_REDUCTION(logical_and,int,"%d",
1633         if (value[i]==0)
1634      ret[i]=0;
1635   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1636 )
1637
1638 //Compute the logical OR of the integers passed by each element.
1639 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1640 SIMPLE_REDUCTION(logical_or,int,"%d",
1641   if (value[i]!=0)
1642            ret[i]=1;
1643   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1644 )
1645
1646 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1647 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1648 SIMPLE_REDUCTION(bitvec_xor,int,"%d",ret[i]^=value[i];)
1649
1650 //Select one random message to pass on
1651 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1652   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::random, msg[0]);
1653 }
1654
1655 /////////////// concat ////////////////
1656 /*
1657 This reducer simply appends the data it recieves from each element,
1658 without any housekeeping data to separate them.
1659 */
1660 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1661 {
1662   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1663   //Figure out how big a message we'll need
1664   int i,retSize=0;
1665   for (i=0;i<nMsg;i++)
1666       retSize+=msg[i]->getSize();
1667
1668   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1669
1670   //Allocate a new message
1671   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1672
1673   //Copy the source message data into the return message
1674   char *cur=(char *)(ret->getData());
1675   for (i=0;i<nMsg;i++) {
1676     int messageBytes=msg[i]->getSize();
1677     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1678     cur+=messageBytes;
1679   }
1680   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1681   return ret;
1682 }
1683
1684 /////////////// set ////////////////
1685 /*
1686 This reducer appends the data it recieves from each element
1687 along with some housekeeping data indicating contribution boundaries.
1688 The message data is thus a list of reduction_set_element structures
1689 terminated by a dummy reduction_set_element with a sourceElement of -1.
1690 */
1691
1692 //This rounds an integer up to the nearest multiple of sizeof(double)
1693 static const int alignSize=sizeof(double);
1694 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1695
1696 //This gives the size (in bytes) of a reduction_set_element
1697 static int SET_SIZE(int dataSize)
1698 {return SET_ALIGN(sizeof(int)+dataSize);}
1699
1700 //This returns a pointer to the next reduction_set_element in the list
1701 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1702 {
1703   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1704   return (CkReduction::setElement *)next;
1705 }
1706
1707 //Combine the data passed by each element into an list of reduction_set_elements.
1708 // Each element may contribute arbitrary data (with arbitrary length).
1709 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1710 {
1711   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1712   //Figure out how big a message we'll need
1713   int i,retSize=0;
1714   for (i=0;i<nMsg;i++) {
1715     if (!msg[i]->isFromUser())
1716     //This message is composite-- it will just be copied over (less terminating -1)
1717       retSize+=(msg[i]->getSize()-sizeof(int));
1718     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1719       retSize+=SET_SIZE(msg[i]->getSize());
1720   }
1721   retSize+=sizeof(int);//Leave room for terminating -1.
1722
1723   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1724
1725   //Allocate a new message
1726   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1727
1728   //Copy the source message data into the return message
1729   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1730   for (i=0;i<nMsg;i++)
1731     if (!msg[i]->isFromUser())
1732     {//This message is composite-- just copy it over (less terminating -1)
1733                         int messageBytes=msg[i]->getSize()-sizeof(int);
1734                         RED_DEB(("|\tc msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1735                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1736                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1737     }
1738     else //This is a message from an element-- wrap it in a reduction_set_element
1739     {
1740       RED_DEB(("|\tu msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1741       cur->dataSize=msg[i]->getSize();
1742       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1743       cur=SET_NEXT(cur);
1744     }
1745   cur->dataSize=-1;//Add a terminating -1.
1746   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1747   return ret;
1748 }
1749
1750 //Utility routine: get the next reduction_set_element in the list
1751 // if there is one, or return NULL if there are none.
1752 //To get all the elements, just keep feeding this procedure's output back to
1753 // its input until it returns NULL.
1754 CkReduction::setElement *CkReduction::setElement::next(void)
1755 {
1756   CkReduction::setElement *n=SET_NEXT(this);
1757   if (n->dataSize==-1)
1758     return NULL;//This is the end of the list
1759   else
1760     return n;//This is just another element
1761 }
1762
1763
1764 ///////// statisticsElement
1765
1766 CkReduction::statisticsElement::statisticsElement(double initialValue)
1767   : count(1)
1768   , mean(initialValue)
1769   , m2(0.0)
1770 {}
1771
1772 // statistics reducer
1773 // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
1774 // Chan, Tony F.; Golub, Gene H.; LeVeque, Randall J. (1979),
1775 // "Updating Formulae and a Pairwise Algorithm for Computing Sample Variances." (PDF),
1776 // Technical Report STAN-CS-79-773, Department of Computer Science, Stanford University.
1777 static CkReductionMsg* statistics(int nMsgs, CkReductionMsg** msg)
1778 {
1779   int nElem = msg[0]->getLength() / sizeof(CkReduction::statisticsElement);
1780   CkReduction::statisticsElement* ret = (CkReduction::statisticsElement*)(msg[0]->getData());
1781   for (int m = 1; m < nMsgs; m++)
1782   {
1783     CkReduction::statisticsElement* value = (CkReduction::statisticsElement*)(msg[m]->getData());
1784     for (int i = 0; i < nElem; i++)
1785     {
1786       double a_count = ret[i].count;
1787       ret[i].count += value[i].count;
1788       double delta = value[i].mean - ret[i].mean;
1789       ret[i].mean += delta * value[i].count / ret[i].count;
1790       ret[i].m2 += value[i].m2 + delta * delta * value[i].count * a_count / ret[i].count;
1791     }
1792   }
1793   return CkReductionMsg::buildNew(
1794     nElem*sizeof(CkReduction::statisticsElement),
1795     (void *)ret,
1796     CkReduction::invalid,
1797     msg[0]);
1798 }
1799
1800 ///////// tupleElement
1801
1802 CkReduction::tupleElement::tupleElement()
1803   : dataSize(0)
1804   , data(NULL)
1805   , reducer(CkReduction::invalid)
1806   , owns_data(false)
1807 {}
1808 CkReduction::tupleElement::tupleElement(size_t dataSize_, void* data_, CkReduction::reducerType reducer_)
1809   : dataSize(dataSize_)
1810   , data((char*)data_)
1811   , reducer(reducer_)
1812   , owns_data(false)
1813 {
1814 }
1815 CkReduction::tupleElement::tupleElement(CkReduction::tupleElement&& rhs_move)
1816   : dataSize(rhs_move.dataSize)
1817   , data(rhs_move.data)
1818   , reducer(rhs_move.reducer)
1819   , owns_data(rhs_move.owns_data)
1820 {
1821   rhs_move.dataSize = 0;
1822   rhs_move.data = 0;
1823   rhs_move.reducer = CkReduction::invalid;
1824   rhs_move.owns_data = false;
1825 }
1826 CkReduction::tupleElement& CkReduction::tupleElement::operator=(CkReduction::tupleElement&& rhs_move)
1827 {
1828   if (owns_data)
1829     delete[] data;
1830   dataSize = rhs_move.dataSize;
1831   data = rhs_move.data;
1832   reducer = rhs_move.reducer;
1833   owns_data = rhs_move.owns_data;
1834   rhs_move.dataSize = 0;
1835   rhs_move.data = 0;
1836   rhs_move.reducer = CkReduction::invalid;
1837   rhs_move.owns_data = false;
1838   return *this;
1839 }
1840 CkReduction::tupleElement::~tupleElement()
1841 {
1842   if (owns_data)
1843     delete[] data;
1844 }
1845
1846 void CkReduction::tupleElement::pup(PUP::er &p) {
1847   p|dataSize;
1848   // TODO - it might be better to pack these raw, then we don't have to
1849   //  transform & copy them out on unpacking, we could just use the message's
1850   //  memory directly
1851   if (p.isUnpacking()) {
1852     data = new char[dataSize];
1853     owns_data = true;
1854   }
1855   PUParray(p, data, dataSize);
1856   if (p.isUnpacking()){
1857     int temp;
1858     p|temp;
1859     reducer=(CkReduction::reducerType)temp;
1860   } else {
1861     int temp=(int)reducer;
1862     p|temp;
1863   }
1864 }
1865
1866 CkReductionMsg* CkReductionMsg::buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions)
1867 {
1868   PUP::sizer ps;
1869   ps|num_reductions;
1870   PUParray(ps, reductions, num_reductions);
1871
1872   CkReductionMsg* msg = CkReductionMsg::buildNew(ps.size(), NULL, CkReduction::tuple);
1873   PUP::toMem p(msg->data);
1874   p|num_reductions;
1875   PUParray(p, reductions, num_reductions);
1876   if (p.size() != ps.size()) CmiAbort("Size mismatch packing CkReduction::tupleElement::tupleToBuffer\n");
1877   return msg;
1878 }
1879
1880 void CkReductionMsg::toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions)
1881 {
1882   PUP::fromMem p(this->getData());
1883   p|(*num_reductions);
1884   *out_reductions = new CkReduction::tupleElement[*num_reductions];
1885   PUParray(p, *out_reductions, *num_reductions);
1886 }
1887
1888 // tuple reducer
1889 CkReductionMsg* CkReduction::tupleReduction(int num_messages, CkReductionMsg** messages)
1890 {
1891   CkReduction::tupleElement** tuple_data = new CkReduction::tupleElement*[num_messages];
1892   int num_reductions = 0;
1893   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1894   {
1895     int itr_num_reductions = 0;
1896     messages[message_idx]->toTuple(&tuple_data[message_idx], &itr_num_reductions);
1897
1898     // each message must submit the same reductions
1899     if (num_reductions == 0)
1900       num_reductions = itr_num_reductions;
1901     else if (num_reductions != itr_num_reductions)
1902       CmiAbort("num_reductions mismatch in CkReduction::tupleReduction");
1903   }
1904
1905   DEB_TUPLE(("tupleReduction {\n  num_messages=%d,\n  num_reductions=%d,\n  length=%d\n",
1906            num_messages, num_reductions, messages[0]->getLength()));
1907
1908   CkReduction::tupleElement* return_data = new CkReduction::tupleElement[num_reductions];
1909   // using a raw buffer to avoid CkReductionMsg constructor/destructor, we want to manage
1910   //  the inner memory of these temps ourselves to avoid unneeded copies
1911   char* simulated_messages_buffer = new char[sizeof(CkReductionMsg) * num_reductions * num_messages];
1912   CkReductionMsg** simulated_messages = new CkReductionMsg*[num_messages];
1913
1914   // imagine the given data in a 2D layout where the messages are rows and reductions are columns
1915   // here we grab each column and run that reduction
1916
1917   for (int reduction_idx = 0; reduction_idx < num_reductions; ++reduction_idx)
1918   {
1919     DEB_TUPLE(("  reduction_idx=%d {\n", reduction_idx));
1920     CkReduction::reducerType reducerType = CkReduction::invalid;
1921     for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1922     {
1923       CkReduction::tupleElement* reductions = (CkReduction::tupleElement*)(tuple_data[message_idx]);
1924       CkReduction::tupleElement& element = reductions[reduction_idx];
1925       DEB_TUPLE(("    msg %d, sf=%d, length=%d : { dataSize=%d, data=%p, reducer=%d },\n",
1926                  message_idx, messages[message_idx]->sourceFlag, messages[message_idx]->getLength(), element.dataSize, element.data, element.reducer));
1927
1928       reducerType = element.reducer;
1929
1930       size_t sim_idx = (reduction_idx * num_messages + message_idx) * sizeof(CkReductionMsg);
1931       CkReductionMsg& simulated_message = *(CkReductionMsg*)&simulated_messages_buffer[sim_idx];
1932       simulated_message.dataSize = element.dataSize;
1933       simulated_message.data = element.data;
1934       simulated_message.reducer = element.reducer;
1935       simulated_message.sourceFlag = messages[message_idx]->sourceFlag;
1936       simulated_message.userFlag = messages[message_idx]->userFlag;
1937       simulated_message.gcount = messages[message_idx]->gcount;
1938       simulated_message.migratableContributor = messages[message_idx]->migratableContributor;
1939 #if CMK_BIGSIM_CHARM
1940       simulated_message.log = NULL;
1941 #endif
1942       simulated_messages[message_idx] = &simulated_message;
1943     }
1944
1945     // run the reduction and copy the result back to our data structure
1946     const auto& reducerFp = CkReduction::reducerTable[reducerType].fn;
1947     CkReductionMsg* result = reducerFp(num_messages, simulated_messages);
1948     DEB_TUPLE(("    result_len=%d\n  },\n", result->getLength()));
1949     return_data[reduction_idx] = CkReduction::tupleElement(result->getLength(), result->getData(), reducerType);
1950     // TODO - leak - the built in reducers all reuse message memory, so this is not safe to delete
1951     // delete result;
1952   }
1953
1954   CkReductionMsg* retval = CkReductionMsg::buildFromTuple(return_data, num_reductions);
1955   DEB_TUPLE(("} tupleReduction msg_size=%d\n", retval->getSize()));
1956
1957   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1958     delete[] tuple_data[message_idx];
1959   delete[] tuple_data;
1960   delete[] return_data;
1961   delete[] simulated_messages_buffer;
1962   // note that although this is a 2d array, we don't need to delete the inner objects,
1963   //  their memory is tracked in simulated_messages_buffer
1964   delete[] simulated_messages;
1965   return retval;
1966 }
1967
1968
1969
1970 /////////////////// Reduction Function Table /////////////////////
1971 CkReduction::CkReduction() {} //Dummy private constructor
1972
1973 //Add the given reducer to the list.  Returns the new reducer's
1974 // reducerType.  Must be called in the same order on every node.
1975 CkReduction::reducerType CkReduction::addReducer(reducerFn fn, bool streamable)
1976 {
1977   reducerTable[nReducers].fn=fn;
1978   reducerTable[nReducers].streamable=streamable;
1979   return (reducerType)nReducers++;
1980 }
1981
1982 /*Reducer table: maps reducerTypes to reducerStructs.
1983 It's indexed by reducerType, so the order in this table
1984 must *exactly* match the reducerType enum declaration.
1985 The names don't have to match, but it helps.
1986 */
1987 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1988
1989 CkReduction::reducerStruct CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1990     CkReduction::reducerStruct(::invalid_reducer, true),
1991     CkReduction::reducerStruct(::nop, true),
1992     //Compute the sum the numbers passed by each element.
1993     CkReduction::reducerStruct(::sum_char, true),
1994     CkReduction::reducerStruct(::sum_short, true),
1995     CkReduction::reducerStruct(::sum_int, true),
1996     CkReduction::reducerStruct(::sum_long, true),
1997     CkReduction::reducerStruct(::sum_long_long, true),
1998     CkReduction::reducerStruct(::sum_uchar, true),
1999     CkReduction::reducerStruct(::sum_ushort, true),
2000     CkReduction::reducerStruct(::sum_uint, true),
2001     CkReduction::reducerStruct(::sum_ulong, true),
2002     CkReduction::reducerStruct(::sum_ulong_long, true),
2003     // The floating point sums are marked as unstreamable to avoid
2004     // implictly stating that they will always be precision oblivious.
2005     CkReduction::reducerStruct(::sum_float, false),
2006     CkReduction::reducerStruct(::sum_double, false),
2007
2008     //Compute the product the numbers passed by each element.
2009     CkReduction::reducerStruct(::product_char, true),
2010     CkReduction::reducerStruct(::product_short, true),
2011     CkReduction::reducerStruct(::product_int, true),
2012     CkReduction::reducerStruct(::product_long, true),
2013     CkReduction::reducerStruct(::product_long_long, true),
2014     CkReduction::reducerStruct(::product_uchar, true),
2015     CkReduction::reducerStruct(::product_ushort, true),
2016     CkReduction::reducerStruct(::product_uint, true),
2017     CkReduction::reducerStruct(::product_ulong, true),
2018     CkReduction::reducerStruct(::product_ulong_long, true),
2019     CkReduction::reducerStruct(::product_float, true),
2020     CkReduction::reducerStruct(::product_double, true),
2021
2022     //Compute the largest number passed by any element.
2023     CkReduction::reducerStruct(::max_char, true),
2024     CkReduction::reducerStruct(::max_short, true),
2025     CkReduction::reducerStruct(::max_int, true),
2026     CkReduction::reducerStruct(::max_long, true),
2027     CkReduction::reducerStruct(::max_long_long, true),
2028     CkReduction::reducerStruct(::max_uchar, true),
2029     CkReduction::reducerStruct(::max_ushort, true),
2030     CkReduction::reducerStruct(::max_uint, true),
2031     CkReduction::reducerStruct(::max_ulong, true),
2032     CkReduction::reducerStruct(::max_ulong_long, true),
2033     CkReduction::reducerStruct(::max_float, true),
2034     CkReduction::reducerStruct(::max_double, true),
2035
2036     //Compute the smallest number passed by any element.
2037     CkReduction::reducerStruct(::min_char, true),
2038     CkReduction::reducerStruct(::min_short, true),
2039     CkReduction::reducerStruct(::min_int, true),
2040     CkReduction::reducerStruct(::min_long, true),
2041     CkReduction::reducerStruct(::min_long_long, true),
2042     CkReduction::reducerStruct(::min_uchar, true),
2043     CkReduction::reducerStruct(::min_ushort, true),
2044     CkReduction::reducerStruct(::min_uint, true),
2045     CkReduction::reducerStruct(::min_ulong, true),
2046     CkReduction::reducerStruct(::min_ulong_long, true),
2047     CkReduction::reducerStruct(::min_float, true),
2048     CkReduction::reducerStruct(::min_double, true),
2049
2050     //Compute the logical AND of the integers passed by each element.
2051     // The resulting integer will be zero if any source integer is zero.
2052     CkReduction::reducerStruct(::logical_and, true),
2053
2054     //Compute the logical OR of the integers passed by each element.
2055     // The resulting integer will be 1 if any source integer is nonzero.
2056     CkReduction::reducerStruct(::logical_or, true),
2057
2058     // Compute the logical bitvector AND of the integers passed by each element.
2059     CkReduction::reducerStruct(::bitvec_and, true),
2060
2061     // Compute the logical bitvector OR of the integers passed by each element.
2062     CkReduction::reducerStruct(::bitvec_or, true),
2063     
2064     // Compute the logical bitvector XOR of the integers passed by each element.
2065     CkReduction::reducerStruct(::bitvec_xor, true),
2066
2067     // Select one of the messages at random to pass on
2068     CkReduction::reducerStruct(::random, true),
2069
2070     //Concatenate the (arbitrary) data passed by each element
2071     // This reduction is marked as unstreamable because of the n^2
2072     // work required to stream it
2073     CkReduction::reducerStruct(::concat, false),
2074
2075     //Combine the data passed by each element into an list of setElements.
2076     // Each element may contribute arbitrary data (with arbitrary length).
2077     // This reduction is marked as unstreamable because of the n^2
2078     // work required to stream it
2079     CkReduction::reducerStruct(::set, false),
2080
2081     CkReduction::reducerStruct(::statistics, true),
2082     CkReduction::reducerStruct(CkReduction::tupleReduction, false),
2083 };
2084
2085
2086
2087
2088
2089
2090
2091
2092 /********** Code added by Sayantan *********************/
2093 /** Locking is a big problem in the nodegroup code for smp.
2094  So a few assumptions have had to be made. There is one lock
2095  called lockEverything. It protects all the data structures 
2096  of the nodegroup reduction mgr. I tried grabbing it separately 
2097  for each datastructure, modifying it and then releasing it and
2098  then grabbing it again, for the next change.
2099  That doesn't really help because the interleaved execution of 
2100  different threads makes the state of the reduction manager 
2101  inconsistent. 
2102  
2103  1. Grab lockEverything before calling finishreduction or startReduction
2104     or doRecvMsg
2105  2. lockEverything is grabbed only in entry methods reductionStarting
2106     or RecvMesg or  addcontribution.
2107  ****/
2108  
2109 /**nodegroup reduction manager . Most of it is similar to the guy above***/
2110 NodeGroup::NodeGroup(void) {
2111   __nodelock=CmiCreateLock();
2112 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2113     mlogData->objID.type = TypeNodeGroup;
2114     mlogData->objID.data.group.onPE = CkMyNode();
2115 #endif
2116
2117 }
2118 NodeGroup::~NodeGroup() {
2119   CmiDestroyLock(__nodelock);
2120   CkpvAccess(_destroyingNodeGroup) = true;
2121 }
2122 void NodeGroup::pup(PUP::er &p)
2123 {
2124   CkNodeReductionMgr::pup(p);
2125   p|reductionInfo;
2126 }
2127
2128 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
2129
2130 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
2131   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
2132  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
2133   //ckLocalNodeBranch()->ckSetReductionClient(cb);
2134  }
2135
2136 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
2137                                     ((CkNodeReductionMgr *)this),
2138                                     reductionInfo,false)
2139
2140 /* this contribute also adds up the count across all messages it receives.
2141   Useful for summing up number of array elements who have contributed ****/ 
2142 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
2143         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
2144
2145
2146
2147 //#define BINOMIAL_TREE
2148
2149 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
2150   : thisProxy(thisgroup)
2151 {
2152 #ifdef BINOMIAL_TREE
2153   init_BinomialTree();
2154 #else
2155   init_BinaryTree();
2156 #endif
2157   storedCallback=NULL;
2158   redNo=0;
2159   inProgress=false;
2160   
2161   startRequested=false;
2162   gcount=CkNumNodes();
2163   lcount=1;
2164   nContrib=nRemote=0;
2165   lockEverything = CmiCreateLock();
2166
2167
2168   creating=false;
2169   interrupt = 0;
2170   DEBR((AA "In NodereductionMgr constructor at %d \n" AB,this));
2171         /*
2172                 FAULT_EVAC
2173         */
2174         blocked = false;
2175         maxModificationRedNo = INT_MAX;
2176         killed=0;
2177         additionalGCount = newAdditionalGCount = 0;
2178 }
2179
2180 CkNodeReductionMgr::~CkNodeReductionMgr()
2181 {
2182   CmiDestroyLock(lockEverything);
2183 }
2184
2185 void CkNodeReductionMgr::flushStates()
2186 {
2187  if(CkMyRank() == 0){
2188   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
2189   redNo=0;
2190   inProgress=false;
2191
2192   startRequested=false;
2193   gcount=CkNumNodes();
2194   lcount=1;
2195   nContrib=nRemote=0;
2196
2197   creating=false;
2198   interrupt = 0;
2199   while (!msgs.isEmpty()) { delete msgs.deq(); }
2200   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
2201   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
2202   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
2203   }
2204 }
2205
2206 //////////// Reduction Manager Client API /////////////
2207
2208 //Add the given client function.  Overwrites any previous client.
2209 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
2210 {
2211   DEBR((AA "Setting reductionClient in NodeReductionMgr %d at %d\n" AB,cb,this));
2212   if(cb->isInvalid()){
2213         DEBR((AA "Invalid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2214   }else{
2215         DEBR((AA "Valid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2216   }
2217
2218   if (CkMyNode()!=0)
2219           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
2220   delete storedCallback;
2221   storedCallback=cb;
2222 }
2223
2224
2225
2226 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
2227 {
2228 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2229     Chare *oldObj =CpvAccess(_currentObj);
2230     CpvAccess(_currentObj) = this;
2231 #endif
2232
2233   //m->ci=ci;
2234   m->redNo=ci->redNo++;
2235   m->sourceFlag=-1;//A single contribution
2236   m->gcount=0;
2237   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
2238   addContribution(m);
2239
2240 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2241     CpvAccess(_currentObj) = oldObj;
2242 #endif
2243 }
2244
2245
2246 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
2247 {
2248 #if CMK_BIGSIM_CHARM
2249   _TRACE_BG_TLINE_END(&m->log);
2250 #endif
2251 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2252     Chare *oldObj =CpvAccess(_currentObj);
2253     CpvAccess(_currentObj) = this;
2254 #endif
2255   //m->ci=ci;
2256   m->redNo=ci->redNo++;
2257   m->gcount=count;
2258   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2259   addContribution(m);
2260   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2261
2262 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2263     CpvAccess(_currentObj) = oldObj;
2264 #endif
2265 }
2266
2267
2268 //////////// Reduction Manager Remote Entry Points /////////////
2269
2270 //Sent down the reduction tree (used by barren PEs)
2271 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
2272 {
2273   DEBR((AA "[%d] Received reductionStarting redNo %d\n" AB, CkMyPe(), m->num));
2274   CmiLock(lockEverything);
2275         /*
2276                 FAULT_EVAC
2277         */
2278   if(blocked){
2279         delete m;
2280         CmiUnlock(lockEverything);
2281         return ;
2282   }
2283   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
2284   if (isPresent(m->num) && !inProgress)
2285   {
2286     DEBR((AA "Starting Node reduction #%d at parent's request\n" AB,m->num));
2287     startReduction(m->num,srcNode);
2288     finishReduction();
2289   } else if (isFuture(m->num)){
2290         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
2291   }
2292   else //is Past
2293     DEBR((AA "Ignoring node parent's late request to start #%d\n" AB,m->num));
2294   CmiUnlock(lockEverything);
2295   delete m;
2296
2297 }
2298
2299
2300 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
2301         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2302         /*
2303                 FAULT_EVAC
2304         */
2305         if(blocked){
2306                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
2307                 bufferedRemoteMsgs.enq(m);
2308                 return;
2309         }
2310         
2311         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
2312             //DEBR((AA "Recv'd remote contribution %d for #%d at %d\n" AB,nRemote,m->redNo,this));
2313             startReduction(m->redNo,CkMyNode());
2314             msgs.enq(m);
2315             nRemote++;
2316             finishReduction();
2317         }
2318         else {
2319             if (isFuture(m->redNo)) {
2320                    // DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
2321                     futureRemoteMsgs.enq(m);
2322             }else{
2323                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
2324                    CkAbort("Recv'd late remote contribution!\n");
2325             }
2326         }
2327         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2328 }
2329
2330 //Sent up the reduction tree with reduced data
2331 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
2332 {
2333 #if CMK_BIGSIM_CHARM
2334   _TRACE_BG_TLINE_END(&m->log);
2335 #endif
2336 #ifndef CMK_CPV_IS_SMP
2337 #if CMK_IMMEDIATE_MSG
2338         if(interrupt == 1){
2339                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
2340                 CpvAccess(_qd)->process(-1);
2341                 CmiDelayImmediate();
2342                 return;
2343         }
2344 #endif  
2345 #endif
2346    interrupt = 1;       
2347    CmiLock(lockEverything);   
2348    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2349    doRecvMsg(m);
2350    CmiUnlock(lockEverything);    
2351    interrupt = 0;
2352    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2353 }
2354
2355 void CkNodeReductionMgr::startReduction(int number,int srcNode)
2356 {
2357         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
2358         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
2359         if (inProgress){
2360                 DEBR((AA "This Node reduction is already in progress\n" AB));
2361                 return;//This reduction already started
2362         }
2363         if (creating) //Don't start yet-- we're creating elements
2364         {
2365                 DEBR((AA " Node Postponing start request #%d until we're done creating\n" AB,redNo));
2366                 startRequested=true;
2367                 return;
2368         }
2369         
2370         //If none of these cases, we need to start the reduction--
2371         DEBR((AA "Starting Node reduction #%d on %p srcNode %d\n" AB,redNo,this,srcNode));
2372         inProgress=true;
2373
2374         if(!_isNotifyChildInRed) return;
2375
2376         //Sent start requests to our kids (in case they don't already know)
2377         
2378         for (int k=0;k<treeKids();k++)
2379         {
2380 #ifdef BINOMIAL_TREE
2381                 DEBR((AA "Asking child Node %d to start #%d\n" AB,kids[k],redNo));
2382                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
2383 #else
2384                 if(kids[k] != srcNode){
2385                         DEBR((AA "Asking child Node %d to start #%d\n" AB,kids[k],redNo));
2386                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
2387                 }       
2388 #endif
2389         }
2390
2391         DEBR((AA "Asking all local groups to start #%d\n" AB,redNo));
2392         // in case, nodegroup does not has the attached red group,
2393         // it has to restart groups again
2394         startLocalGroupReductions(number);
2395 /*
2396         if (startLocalGroupReductions(number) == 0)
2397           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
2398 */
2399 }
2400
2401 // restart local groups until succeed
2402 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
2403   CmiLock(lockEverything);    
2404   if (startLocalGroupReductions(number) == 0)
2405     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
2406   CmiUnlock(lockEverything);    
2407 }
2408
2409 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
2410         /*
2411                 FAULT_EVAC
2412         */
2413         if(blocked){
2414                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2415                 bufferedMsgs.enq(m);
2416                 return;
2417         }
2418         
2419         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2420                 DEBR((AA "Contributor gives early node contribution-- for #%d\n" AB,m->redNo));
2421                 futureMsgs.enq(m);
2422         } else {// An ordinary contribution
2423                 DEBR((AA "Recv'd local node contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2424                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
2425                 startReduction(m->redNo,CkMyNode());
2426                 msgs.enq(m);
2427                 nContrib++;
2428                 finishReduction();
2429         }
2430 }
2431
2432 //Handle a message from one element for the reduction
2433 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
2434 {
2435   interrupt = 1;
2436   CmiLock(lockEverything);
2437   doAddContribution(m);
2438   CmiUnlock(lockEverything);
2439   interrupt = 0;
2440 }
2441
2442 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
2443         CmiLock(lockEverything);   
2444         /*
2445                 FAULT_EVAC
2446         */
2447         if(blocked){
2448                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2449                 bufferedMsgs.enq(m);
2450                 CmiUnlock(lockEverything);   
2451                 return;
2452         }
2453         
2454         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2455                 DEBR((AA "Latemigrant gives early node contribution-- for #%d\n" AB,m->redNo));
2456 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2457                 futureLateMigrantMsgs.enq(m);
2458         } else {// An ordinary contribution
2459                 DEBR((AA "Recv'd late migrant contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2460 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2461                 msgs.enq(m);
2462                 finishReduction();
2463         }
2464         CmiUnlock(lockEverything);   
2465 }
2466
2467
2468
2469
2470
2471 /** check if the nodegroup reduction is finished at this node. In that case send it
2472 up the reduction tree **/
2473
2474 void CkNodeReductionMgr::finishReduction(void)
2475 {
2476   DEBR((AA "in Nodegrp finishReduction %d treeKids %d \n" AB,inProgress,treeKids()));
2477   /***Check if reduction is finished in the next few ifs***/
2478   if ((!inProgress) || creating){
2479         DEBR((AA "Either not in Progress or creating\n" AB));
2480         return;
2481   }
2482
2483   bool partialReduction = false;
2484
2485   if (nContrib<(lcount)){
2486     if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
2487       partialReduction = true;
2488     }
2489     else {
2490       DEBR((AA "Nodegrp Need more local messages %d %d\n" AB,nContrib,(lcount)));
2491       return;//Need more local messages
2492     }
2493   }
2494   if (nRemote<treeKids()){
2495     if (msgs.length() > 1 && CkReduction::reducerTable[msgs.peek()->reducer].streamable) {
2496       partialReduction = true;
2497     }
2498     else {
2499       DEBR((AA "Nodegrp Need more Remote messages %d %d\n" AB,nRemote,treeKids()));
2500       return;//Need more remote messages
2501     }
2502   }
2503   if (nRemote>treeKids()){
2504
2505           interrupt = 0;
2506            CkAbort("Nodegrp Excess remote reduction message received!\n");
2507   }
2508
2509   DEBR((AA "Reducing node data...\n" AB));
2510
2511   /**reduce all messages received at this node **/
2512   CkReductionMsg *result=reduceMessages();
2513
2514   if (partialReduction) {
2515     msgs.enq(result);
2516     return;
2517   }
2518
2519   if (hasParent())
2520   {//Pass data up tree to parent
2521         if(CmiNodeAlive(CkMyNode()) || killed == 0){
2522         DEBR((AA "Passing reduced data up to parent node %d. \n" AB,treeParent()));
2523         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
2524                 /*
2525                         FAULT_EVAC
2526                 */
2527                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2528             thisProxy[treeParent()].RecvMsg(result);
2529         }
2530
2531   }
2532   else
2533   {
2534                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2535                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2536                         msgs.enq(result);
2537                         return;
2538                 }
2539                 result->gcount += additionalGCount;
2540           /** if the reduction is finished and I am the root of the reduction tree
2541           then call the reductionhandler and other stuff ***/
2542                 
2543
2544                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2545     CkSetRefNum(result, result->getUserFlag());
2546     if (!result->callback.isInvalid()){
2547       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2548             result->callback.send(result);
2549     }
2550     else if (storedCallback!=NULL){
2551       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2552             storedCallback->send(result);
2553     }
2554     else{
2555                 DEBR((AA "Invalid Callback \n" AB));
2556             CkAbort("No reduction client!\n"
2557                     "You must register a client with either SetReductionClient or during contribute.\n");
2558                 }
2559   }
2560
2561   // DEBR((AA "Reduction %d finished in group!\n" AB,redNo));
2562   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2563   redNo++;
2564         updateTree();
2565   int i;
2566   inProgress=false;
2567   startRequested=false;
2568   nRemote=nContrib=0;
2569
2570   //Look through the future queue for messages we can now handle
2571   int n=futureMsgs.length();
2572
2573   for (i=0;i<n;i++)
2574   {
2575     interrupt = 1;
2576
2577     CkReductionMsg *m=futureMsgs.deq();
2578
2579     interrupt = 0;
2580     if (m!=NULL){ //One of these addContributions may have finished us.
2581       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2582       doAddContribution(m);//<- if *still* early, puts it back in the queue
2583     }
2584   }
2585
2586   interrupt = 1;
2587
2588   n=futureRemoteMsgs.length();
2589
2590   interrupt = 0;
2591   for (i=0;i<n;i++)
2592   {
2593     interrupt = 1;
2594
2595     CkReductionMsg *m=futureRemoteMsgs.deq();
2596
2597     interrupt = 0;
2598     if (m!=NULL)
2599       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2600   }
2601   
2602   n = futureLateMigrantMsgs.length();
2603   for(i=0;i<n;i++){
2604     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2605     if(m != NULL){
2606       if(m->redNo == redNo){
2607         msgs.enq(m);
2608       }else{
2609         futureLateMigrantMsgs.enq(m);
2610       }
2611     }
2612   }
2613 }
2614
2615 //////////// Reduction Manager Utilities /////////////
2616
2617 void CkNodeReductionMgr::init_BinaryTree(){
2618         parent = (CkMyNode()-1)/TREE_WID;
2619         int firstkid = CkMyNode()*TREE_WID+1;
2620         numKids=CkNumNodes()-firstkid;
2621   if (numKids>TREE_WID) numKids=TREE_WID;
2622   if (numKids<0) numKids=0;
2623
2624         for(int i=0;i<numKids;i++){
2625                 kids.push_back(firstkid+i);
2626                 newKids.push_back(firstkid+i);
2627         }
2628 }
2629
2630 void CkNodeReductionMgr::init_BinomialTree(){
2631         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2632         /*upperSize = (unsigned )pow((double)2,depth);*/
2633         upperSize = (unsigned) 1 << depth;
2634         label = upperSize-CkMyNode()-1;
2635         int p=label;
2636         int count=0;
2637         while( p > 0){
2638                 if(p % 2 == 0)
2639                         break;
2640                 else{
2641                         p = p/2;
2642                         count++;
2643                 }
2644         }
2645         /*parent = label + rint(pow((double)2,count));*/
2646         parent = label + (1<<count);
2647         parent = upperSize -1 -parent;
2648         int temp;
2649         if(count != 0){
2650                 numKids = 0;
2651                 for(int i=0;i<count;i++){
2652                         /*temp = label - rint(pow((double)2,i));*/
2653                         temp = label - (1<<i);
2654                         temp = upperSize-1-temp;
2655                         if(temp <= CkNumNodes()-1){
2656                 //              kids[numKids] = temp;
2657                                 kids.push_back(temp);
2658                                 numKids++;
2659                         }
2660                 }
2661         }else{
2662                 numKids = 0;
2663         //      kids = NULL;
2664         }
2665 }
2666
2667
2668 int CkNodeReductionMgr::treeRoot(void)
2669 {
2670   return 0;
2671 }
2672 bool CkNodeReductionMgr::hasParent(void) //Root Node
2673 {
2674   return (bool)(CkMyNode()!=treeRoot());
2675 }
2676 int CkNodeReductionMgr::treeParent(void) //My parent Node
2677 {
2678   return parent;
2679 }
2680
2681 int CkNodeReductionMgr::firstKid(void) //My first child Node
2682 {
2683   return CkMyNode()*TREE_WID+1;
2684 }
2685 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2686 {
2687 #ifdef BINOMIAL_TREE
2688         return numKids;
2689 #else
2690 /*  int nKids=CkNumNodes()-firstKid();
2691   if (nKids>TREE_WID) nKids=TREE_WID;
2692   if (nKids<0) nKids=0;
2693   return nKids;*/
2694         return numKids;
2695 #endif
2696 }
2697
2698 //Combine (& free) the current message vector msgs.
2699 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2700 {
2701 #if CMK_BIGSIM_CHARM
2702   _TRACE_BG_END_EXECUTE(1);
2703   void* _bgParentLog = NULL;
2704   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2705 #endif
2706   CkReductionMsg *ret=NULL;
2707
2708   //Look through the vector for a valid reducer, swapping out placeholder messages
2709   CkReduction::reducerType r=CkReduction::invalid;
2710   int msgs_gcount=0;//Reduced gcount
2711   int msgs_nSources=0;//Reduced nSources
2712   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
2713   CkCallback msgs_callback;
2714   CkCallback msgs_secondaryCallback;
2715   int i;
2716   int nMsgs=0;
2717   CkReductionMsg *m;
2718   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2719   bool isMigratableContributor;
2720         
2721
2722   while(NULL!=(m=msgs.deq()))
2723   {
2724     DEBR((AA "***** gcount=%d; sourceFlag=%d ismigratable %d \n" AB,m->gcount,m->nSources(),m->isMigratableContributor()));       
2725     msgs_gcount+=m->gcount;
2726     if (m->sourceFlag!=0)
2727     { //This is a real message from an element, not just a placeholder
2728       msgs_nSources+=m->nSources();
2729 #if CMK_BIGSIM_CHARM
2730       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2731 #endif
2732
2733       if (nMsgs == 0 || m->reducer != CkReduction::random) {
2734         msgArr[nMsgs++]=m;
2735         r=m->reducer;
2736         if (!m->callback.isInvalid()){
2737 #if CMK_ERROR_CHECKING
2738           if(nMsgs > 1 && !(msgs_callback == m->callback))
2739             CkAbort("mis-matched client callbacks in reduction messages\n");
2740 #endif  
2741           msgs_callback=m->callback;
2742         }
2743         if(!m->secondaryCallback.isInvalid()){
2744           msgs_secondaryCallback = m->secondaryCallback;
2745         }
2746         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
2747           msgs_userFlag=m->userFlag;
2748         isMigratableContributor= m->isMigratableContributor();
2749       }
2750       else {
2751 #if CMK_ERROR_CHECKING
2752         if(!(msgs_callback == m->callback))
2753           CkAbort("mis-matched client callbacks in reduction messages\n");
2754 #endif  
2755         delete m;
2756       }
2757     }
2758     else
2759     { //This is just a placeholder message-- replace it
2760       delete m;
2761     }
2762   }
2763
2764   if (nMsgs==0||r==CkReduction::invalid)
2765   //No valid reducer in the whole vector
2766     ret=CkReductionMsg::buildNew(0,NULL);
2767   else
2768   {//Use the reducer to reduce the messages
2769     if(nMsgs == 1){
2770       ret = msgArr[0];
2771     }else{
2772       if (msgArr[0]->reducer == CkReduction::random) {
2773         // nMsgs > 1 indicates that reduction type is not random
2774         // this means any data with reducer type random was submitted
2775         // only so that counts would agree, and can be removed
2776         delete msgArr[0];
2777         msgArr[0] = msgArr[nMsgs - 1];
2778         nMsgs--;
2779       }
2780       CkReduction::reducerFn f=CkReduction::reducerTable[r].fn;
2781       ret=(*f)(nMsgs,msgArr);
2782     }
2783     ret->reducer=r;
2784   }
2785
2786         
2787 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2788 #if CRITICAL_PATH_DEBUG > 3
2789         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2790 #endif
2791         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2792         path.updateMax(UsrToEnv(ret));
2793         // Combine the critical paths from all the reduction messages into the header for the new result
2794         for (i=0;i<nMsgs;i++){
2795           if (msgArr[i]!=ret){
2796             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2797             path.updateMax(UsrToEnv(msgArr[i]));
2798           } else {
2799             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2800           }
2801         }
2802 #if CRITICAL_PATH_DEBUG > 3
2803         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2804 #endif
2805
2806 #endif
2807
2808
2809         //Go back through the vector, deleting old messages
2810   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2811   delete [] msgArr;
2812   //Set the message counts
2813   ret->redNo=redNo;
2814   ret->gcount=msgs_gcount;
2815   ret->userFlag=msgs_userFlag;
2816   ret->callback=msgs_callback;
2817   ret->secondaryCallback = msgs_secondaryCallback;
2818   ret->sourceFlag=msgs_nSources;
2819   ret->setMigratableContributor(isMigratableContributor);
2820   DEBR((AA "Node Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
2821 #if CMK_BIGSIM_CHARM
2822   _TRACE_BG_TLINE_END(&ret->log);
2823 #endif
2824
2825   return ret;
2826 }
2827
2828 void CkNodeReductionMgr::pup(PUP::er &p)
2829 {
2830 //We do not store the client function pointer or the client function parameter,
2831 //it is the responsibility of the programmer to correctly restore these
2832   IrrGroup::pup(p);
2833   p(redNo);
2834   p(inProgress); p(creating); p(startRequested);
2835   p(lcount);
2836   p(nContrib); p(nRemote);
2837   p(interrupt);
2838   p|msgs;
2839   p|futureMsgs;
2840   p|futureRemoteMsgs;
2841   p|futureLateMigrantMsgs;
2842   p|parent;
2843   p|additionalGCount;
2844   p|newAdditionalGCount;
2845   if(p.isUnpacking()) {
2846     gcount=CkNumNodes();
2847     thisProxy = thisgroup;
2848     lockEverything = CmiCreateLock();
2849 #ifdef BINOMIAL_TREE
2850     init_BinomialTree();
2851 #else
2852     init_BinaryTree();
2853 #endif          
2854   }
2855   p | blocked;
2856   p | maxModificationRedNo;
2857
2858 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2859   int isnull = (storedCallback == NULL);
2860   p | isnull;
2861   if (!isnull) {
2862     if (p.isUnpacking()) {
2863       storedCallback = new CkCallback;
2864     }
2865     p|*storedCallback;
2866   }
2867 #endif
2868
2869 }
2870
2871 /*
2872         FAULT_EVAC
2873         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2874         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2875         reduction tree. 
2876 */
2877 void CkNodeReductionMgr::evacuate(){
2878         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2879         if(treeKids() == 0){
2880         /*
2881                 if the node going down is a leaf
2882         */
2883                 oldleaf=true;
2884                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2885                 /*
2886                         Need to ask parent for the reduction number that it has seen. 
2887                         Since it is a leaf, the tree does not need to be rewired. 
2888                         We reuse the oldparent type of tree modification message to get 
2889                         the parent to block and tell us about the highest reduction number it has seen.
2890                         
2891                 */
2892                 int data[2];
2893                 data[0]=CkMyNode();
2894                 data[1]=getTotalGCount()+additionalGCount;
2895                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2896                 newParent = treeParent();
2897         }else{
2898                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2899                 oldleaf= false;
2900         /*
2901                 It is not a leaf. It needs to rewire the tree around itself.
2902                 It also needs to decide on a reduction No after which the new tree will be used
2903                 Till it decides on the new tree and the redNo at which it becomes valid,
2904                 all received messages will be buffered
2905         */
2906                 newParent = kids[0];
2907                 for(int i=numKids-1;i>=0;i--){
2908                         newKids.remove(i);
2909                 }
2910                 /*
2911                         Ask everybody for the highest reduction number they have seen and
2912                         also tell them about the new tree
2913                 */
2914                 /*
2915                         Tell parent about its new child;
2916                 */
2917                 int oldParentData[2];
2918                 oldParentData[0] = CkMyNode();
2919                 oldParentData[1] = newParent;
2920                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2921
2922                 /*
2923                         Tell the other children about their new parent
2924                 */
2925                 int childrenData=newParent;
2926                 for(int i=1;i<numKids;i++){
2927                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2928                 }
2929                 
2930                 /*
2931                         Tell newParent (1st child) about its new children,
2932                         the current node and its children except the newParent
2933                 */
2934                 int *newParentData = new int[numKids+2];
2935                 for(int i=1;i<numKids;i++){
2936                         newParentData[i] = kids[i];
2937                 }
2938                 newParentData[0] = CkMyNode();
2939                 newParentData[numKids] = parent;
2940                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2941                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2942         }
2943         readyDeletion = false;
2944         blocked = true;
2945         numModificationReplies = 0;
2946         tempModificationRedNo = findMaxRedNo();
2947 }
2948
2949 /*
2950         Depending on the code, use the data to change the tree
2951         1. OLDPARENT : replace the old child with a new one
2952         2. OLDCHILDREN: replace the parent
2953         3. NEWPARENT:  add the children and change the parent
2954         4. LEAFPARENT: delete the old child
2955 */
2956
2957 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2958         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2959         int sender;
2960         newKids = kids;
2961         readyDeletion = false;
2962         newAdditionalGCount = additionalGCount;
2963         switch(code){
2964                 case OLDPARENT: 
2965                         for(int i=0;i<numKids;i++){
2966                                 if(newKids[i] == data[0]){
2967                                         newKids[i] = data[1];
2968                                         break;
2969                                 }
2970                         }
2971                         sender = data[0];
2972                         newParent = parent;
2973                         break;
2974                 case OLDCHILDREN:
2975                         newParent = data[0];
2976                         sender = parent;
2977                         break;
2978                 case NEWPARENT:
2979                         for(int i=0;i<size-2;i++){
2980                                 newKids.push_back(data[i]);
2981                         }
2982                         newParent = data[size-2];
2983                         newAdditionalGCount += data[size-1];
2984                         sender = parent;
2985                         break;
2986                 case LEAFPARENT:
2987                         for(int i=0;i<numKids;i++){
2988                                 if(newKids[i] == data[0]){
2989                                         newKids.remove(i);
2990                                         break;
2991                                 }
2992                         }
2993                         sender = data[0];
2994                         newParent = parent;
2995                         newAdditionalGCount += data[1];
2996                         break;
2997         };
2998         blocked = true;
2999         int maxRedNo = findMaxRedNo();
3000         
3001         thisProxy[sender].collectMaxRedNo(maxRedNo);
3002 }
3003
3004 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
3005         /*
3006                 Find out the maximum redNo that has been seen by 
3007                 the affected nodes
3008         */
3009         numModificationReplies++;
3010         if(maxRedNo > tempModificationRedNo){
3011                 tempModificationRedNo = maxRedNo;
3012         }
3013         if(numModificationReplies == numKids+1){
3014                 maxModificationRedNo = tempModificationRedNo;
3015                 /*
3016                         when all the affected nodes have replied, tell them the maximum.
3017                         Unblock yourself. deal with the buffered messages local and remote
3018                 */
3019                 if(maxModificationRedNo == -1){
3020                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
3021                 }else{
3022                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
3023                 }
3024                 thisProxy[parent].unblockNode(maxModificationRedNo);
3025                 for(int i=0;i<numKids;i++){
3026                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
3027                 }
3028                 blocked = false;
3029                 updateTree();
3030                 clearBlockedMsgs();
3031         }
3032 }
3033
3034 void CkNodeReductionMgr::unblockNode(int maxRedNo){
3035         maxModificationRedNo = maxRedNo;
3036         updateTree();
3037         blocked = false;
3038         clearBlockedMsgs();
3039 }
3040
3041
3042 void CkNodeReductionMgr::clearBlockedMsgs(){
3043         int len = bufferedMsgs.length();
3044         for(int i=0;i<len;i++){
3045                 CkReductionMsg *m = bufferedMsgs.deq();
3046                 doAddContribution(m);
3047         }
3048         len = bufferedRemoteMsgs.length();
3049         for(int i=0;i<len;i++){
3050                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
3051                 doRecvMsg(m);
3052         }
3053
3054 }
3055 /*
3056         if the reduction number exceeds the maxModificationRedNo, change the tree
3057         to become the new one
3058 */
3059
3060 void CkNodeReductionMgr::updateTree(){
3061         if(redNo > maxModificationRedNo){
3062                 parent = newParent;
3063                 kids = newKids;
3064                 maxModificationRedNo = INT_MAX;
3065                 numKids = kids.size();
3066                 readyDeletion = true;
3067                 additionalGCount = newAdditionalGCount;
3068                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
3069                 for(int i=0;i<(int)(newKids.size());i++){
3070                         DEBREVAC(("%d ",newKids[i]));
3071                 }
3072                 DEBREVAC(("\n"));
3073         //      startReduction(redNo,CkMyNode());
3074         }else{
3075                 if(maxModificationRedNo != INT_MAX){
3076                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
3077                         startReduction(redNo,CkMyNode());
3078                         finishReduction();
3079                 }       
3080         }
3081 }
3082
3083
3084 void CkNodeReductionMgr::doneEvacuate(){
3085         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
3086 /*      if(oldleaf){
3087                 
3088                         It used to be a leaf
3089                         Then as soon as future messages have been emptied you can 
3090                         send the parent a message telling them that they are not going
3091                         to receive anymore messages from this child
3092                 
3093                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
3094                 while(futureMsgs.length() != 0){
3095                         int n = futureMsgs.length();
3096                         for(int i=0;i<n;i++){
3097                                 CkReductionMsg *m = futureMsgs.deq();
3098                                 if(isPresent(m->redNo)){
3099                                         msgs.enq(m);
3100                                 }else{
3101                                         futureMsgs.enq(m);
3102                                 }
3103                         }
3104                         CkReductionMsg *result = reduceMessages();
3105                         thisProxy[treeParent()].RecvMsg(result);
3106                         redNo++;
3107                 }
3108                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
3109                 thisProxy[treeParent()].DeleteChild(CkMyNode());
3110         }else{*/
3111                 if(readyDeletion){
3112                         thisProxy[treeParent()].DeleteChild(CkMyNode());
3113                 }else{
3114                         thisProxy[newParent].DeleteNewChild(CkMyNode());
3115                 }
3116 //      }
3117 }
3118
3119 void CkNodeReductionMgr::DeleteChild(int deletedChild){
3120         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
3121         for(int i=0;i<numKids;i++){
3122                 if(kids[i] == deletedChild){
3123                         kids.remove(i);
3124                         break;
3125                 }
3126         }
3127         numKids = kids.length();
3128         finishReduction();
3129 }
3130
3131 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
3132         for(int i=0;i<(int)(newKids.length());i++){
3133                 if(newKids[i] == deletedChild){
3134                         newKids.remove(i);
3135                         break;
3136                 }
3137         }
3138         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
3139         for(int i=0;i<(int)(newKids.size());i++){
3140                 DEBREVAC(("%d ",newKids[i]));
3141         }
3142         DEBREVAC(("\n"));
3143         finishReduction();
3144 }
3145
3146 int CkNodeReductionMgr::findMaxRedNo(){
3147         int max = redNo;
3148         for(int i=0;i<futureRemoteMsgs.length();i++){
3149                 if(futureRemoteMsgs[i]->redNo  > max){
3150                         max = futureRemoteMsgs[i]->redNo;
3151                 }
3152         }
3153         /*
3154                 if redNo is max (that is no future message) and the current reduction has not started
3155                 then tree can be changed before the reduction redNo can be started
3156         */ 
3157         if(redNo == max && msgs.length() == 0){
3158                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
3159                 max--;
3160         }
3161         return max;
3162 }
3163
3164 // initnode call. check the size of reduction table
3165 void CkReductionMgr::sanitycheck()
3166 {
3167 #if CMK_ERROR_CHECKING
3168   int count = 0;
3169   while (CkReduction::reducerTable[count].fn != NULL) count++;
3170   CmiAssert(CkReduction::nReducers == count);
3171 #endif
3172 }
3173
3174 #include "CkReduction.def.h"