Finally, a first version of parallel recovery with scalable collectives.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207 #if defined(_FAULT_CAUSAL_)
208         numImmigrantRecObjs = 0;
209 #endif
210   disableNotifyChildrenStart = CmiFalse;
211   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
212 }
213
214 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
215 {
216   redNo=0;
217   completedRedNo = -1;
218   inProgress=CmiFalse;
219   creating=CmiFalse;
220   startRequested=CmiFalse;
221   gcount=lcount=0;
222   nContrib=nRemote=0;
223   maxStartRequest=0;
224   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
225 #if defined(_FAULT_CAUSAL_)
226         numImmigrantRecObjs = 0;
227 #endif
228
229 }
230
231 void CkReductionMgr::flushStates(int isgroup)
232 {
233   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
234   redNo=0;
235   completedRedNo = -1;
236   inProgress=CmiFalse;
237   creating=CmiFalse;
238   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
239   startRequested=CmiFalse;
240   nContrib=nRemote=0;
241   maxStartRequest=0;
242
243   while (!msgs.isEmpty()) { delete msgs.deq(); }
244   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
245   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
246   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
247
248   adjVec.length()=0;
249
250 #if ! GROUP_LEVEL_REDUCTION
251   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
252 #endif
253 }
254
255 //////////// Reduction Manager Client API /////////////
256
257 //Add the given client function.  Overwrites any previous client.
258 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
259 {
260   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
261
262   if (CkMyPe()!=0)
263           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
264   storedCallback=*cb;
265 #if ! GROUP_LEVEL_REDUCTION
266   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
267   nodeProxy.ckSetReductionClient(callback);
268 #endif
269 }
270
271 ///////////////////////////// Contributor ////////////////////////
272 //Contributors keep a copy of this structure:
273
274 /*Contributor migration support:
275 */
276 void contributorInfo::pup(PUP::er &p)
277 {
278   p(redNo);
279 }
280
281 ////////////////////// Contributor list maintainance: /////////////////
282 //These just set and clear the "creating" flag to prevent
283 // reductions from finishing early because not all elements
284 // have been created.
285 void CkReductionMgr::creatingContributors(void)
286 {
287   DEBR((AA"Creating contributors...\n"AB));
288   creating=CmiTrue;
289 }
290 void CkReductionMgr::doneCreatingContributors(void)
291 {
292   DEBR((AA"Done creating contributors...\n"AB));
293   creating=CmiFalse;
294   if (startRequested) startReduction(redNo,CkMyPe());
295   finishReduction();
296 }
297
298 //A new contributor will be created
299 void CkReductionMgr::contributorStamped(contributorInfo *ci)
300 {
301   DEBR((AA"Contributor %p stamped\n"AB,ci));
302   //There is another contributor
303   gcount++;
304   if (inProgress)
305   {
306     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
307     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
308   } else
309     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
310 }
311
312 //A new contributor was actually created
313 void CkReductionMgr::contributorCreated(contributorInfo *ci)
314 {
315   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
316   //We've got another contributor
317   lcount++;
318   //He may not need to contribute to some of our reductions:
319   for (int r=redNo;r<ci->redNo;r++)
320     adj(r).lcount--;//He won't be contributing to r here
321 }
322
323 /*Don't expect any more contributions from this one.
324 This is rather horrifying because we now have to make
325 sure the global element count accurately reflects all the
326 contributions the element made before it died-- these may stretch
327 far into the future.  The adj() vector is what saves us here.
328 */
329 void CkReductionMgr::contributorDied(contributorInfo *ci)
330 {
331 #if CMK_MEM_CHECKPOINT
332   // ignore from listener if it is during restart from crash
333   if (CkInRestarting()) return;
334 #endif
335   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
336   //We lost a contributor
337   gcount--;
338
339   if (ci->redNo<redNo)
340   {//Must have been migrating during reductions-- root is waiting for his
341   // contribution, which will never come.
342     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
343     for (int r=ci->redNo;r<redNo;r++)
344       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
345   }
346
347   //Add to the global count for all his future messages (wherever they are)
348   int r;
349   for (r=redNo;r<ci->redNo;r++)
350   {//He already contributed to this reduction, but won't show up in global count.
351     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
352     adj(r).gcount++;
353   }
354
355   lcount--;
356   //He's already contributed to several reductions here
357   for (r=redNo;r<ci->redNo;r++)
358     adj(r).lcount++;//He'll be contributing to r here
359
360   finishReduction();
361 }
362
363 //Migrating away (note that global count doesn't change)
364 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
365 {
366   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
367   lcount--;//We lost a local
368   //He's already contributed to several reductions here
369   for (int r=redNo;r<ci->redNo;r++)
370     adj(r).lcount++;//He'll be contributing to r here
371
372   finishReduction();
373 }
374
375 //Migrating in (note that global count doesn't change)
376 void CkReductionMgr::contributorArriving(contributorInfo *ci)
377 {
378   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
379   lcount++;//We gained a local
380 #if CMK_MEM_CHECKPOINT
381   // ignore from listener if it is during restart from crash
382   // because the ci may be old.
383   if (CkInRestarting()) return;
384 #endif
385   //He has already contributed (elsewhere) to several reductions:
386   for (int r=redNo;r<ci->redNo;r++)
387     adj(r).lcount--;//He won't be contributing to r here
388
389 }
390
391 //Contribute-- the given msg can contain any data.  The reducerType
392 // field of the message must be valid.
393 // Each contributor must contribute exactly once to the each reduction.
394 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
395 {
396 #if CMK_BIGSIM_CHARM
397   _TRACE_BG_TLINE_END(&(m->log));
398 #endif
399   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
400   //m->ci=ci;
401   m->redNo=ci->redNo++;
402   m->sourceFlag=-1;//A single contribution
403   m->gcount=0;
404
405 #if defined(_FAULT_CAUSAL_)
406
407         // if object is an immigrant recovery object, we send the contribution to the source PE
408         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
409                 
410                 // turning on the message-logging bypass flag
411                 envelope *env = UsrToEnv(m);
412                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
413         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
414                 return;
415         }
416
417     Chare *oldObj = CpvAccess(_currentObj);
418     CpvAccess(_currentObj) = this;
419
420         // adding contribution
421         addContribution(m);
422
423     CpvAccess(_currentObj) = oldObj;
424 #else
425   addContribution(m);
426 #endif
427 }
428
429 #if defined(_FAULT_CAUSAL_)
430 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
431         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
432         
433         // turning off bypassing flag
434         envelope *env = UsrToEnv(m);
435         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
436
437         // adding contribution
438     addContribution(m);
439 }
440 #else
441 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
442 #endif
443
444 //////////// Reduction Manager Remote Entry Points /////////////
445 //Sent down the reduction tree (used by barren PEs)
446 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
447 {
448  if(CkMyPe()==0){
449         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
450         //delete m;
451         //return;
452  }
453  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
454  int srcPE = (UsrToEnv(m))->getSrcPe();
455 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
456   if (isPresent(m->num) && !inProgress)
457   {
458     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
459     startReduction(m->num,srcPE);
460     finishReduction();
461   } else if (isFuture(m->num)){
462 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
463           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
464           if(maxStartRequest < m->num){
465                   maxStartRequest = m->num;
466           }
467  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
468           
469     }
470   else //is Past
471     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
472   delete m;
473 #else
474     if(redNo == 0){
475         if(lcount == 0){
476             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
477             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
478             dummy->fromPE = CmiMyPe();
479             dummy->sourceProcessorCount = 0;
480             dummy->redNo = 0;
481             thisProxy[0].contributeViaMessage(dummy);
482         }
483     }
484 #endif
485 }
486
487 //Sent to root of reduction tree with reduction contribution
488 // of migrants that missed the main reduction.
489 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
490 {
491 #if GROUP_LEVEL_REDUCTION
492 #if CMK_BIGSIM_CHARM
493   _TRACE_BG_TLINE_END(&(m->log));
494 #endif
495   addContribution(m);
496 #else
497   m->secondaryCallback = m->callback;
498   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
499   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
500   nodeMgr->LateMigrantMsg(m);
501 /*      int len = finalMsgs.length();
502         finalMsgs.enq(m);
503 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
504         endArrayReduction();*/
505 #endif
506 }
507
508 //A late migrating contributor will never contribute to this reduction
509 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
510 {
511   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
512   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
513  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
514   adj(m->num).gcount--;//He won't be contributing to this one.
515   finishReduction();
516   delete m;
517 }
518
519 //////////// Reduction Manager State /////////////
520 void CkReductionMgr::startReduction(int number,int srcPE)
521 {
522   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
523   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
524   if (inProgress){
525         DEBR((AA"This reduction is already in progress\n"AB));
526         return;//This reduction already started
527   }
528   if (creating) //Don't start yet-- we're creating elements
529   {
530     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
531     startRequested=CmiTrue;
532     return;
533   }
534
535 //If none of these cases, we need to start the reduction--
536   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
537   inProgress=CmiTrue;
538  
539
540         /*
541                 FAULT_EVAC
542         */
543   if(!CmiNodeAlive(CkMyPe())){
544         return;
545   }
546
547   if(disableNotifyChildrenStart) return;
548  
549 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_) 
550   if(CmiMyPe() == 0 && redNo == 0){
551             for(int j=0;j<CkNumPes();j++){
552                 if(j != CkMyPe() && j != srcPE){
553                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
554                 }
555             }
556             if(lcount == 0){
557                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
558                 dummy->fromPE = CmiMyPe();
559                 dummy->sourceProcessorCount = 0;
560                 dummy->redNo = 0;
561                 thisProxy[0].contributeViaMessage(dummy);
562             }
563     }   else{
564         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
565     }
566
567
568 #else
569   //Sent start requests to our kids (in case they don't already know)
570 #if GROUP_LEVEL_REDUCTION
571   for (int k=0;k<treeKids();k++)
572   {
573     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
574     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
575   }
576 #else
577   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
578 #endif
579 #endif
580         
581         /*
582   int temp;
583   //making it a broadcast done only by PE 0
584   if(!hasParent()){
585                 temp = completedRedNo+1;
586                 for(int i=temp;i<=number;i++){
587                         for(int j=0;j<CkNumPes();j++){
588                                 if(j != CkMyPe() && j != srcPE){
589                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
590                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
591                                         }
592                                 }
593                         }
594                 }       
595         }       else{
596                 temp = number;
597         }*/
598 /*  if(!hasParent()){
599                 temp = completedRedNo+1;
600         }       else{
601                 temp = number;
602         }
603         for(int i=temp;i<=number;i++){
604         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
605                 if(hasParent()){
606           // kick-start your parent too ...
607                         if(treeParent() != srcPE){
608                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
609 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
610     CpvAccess(_currentObj) = oldObj;
611 #endif
612                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
613                                 }       
614                         }       
615                 }
616           for (int k=0;k<treeKids();k++)
617           {
618                         if(firstKid()+k != srcPE){
619                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
620                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
621                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
622                                 }       
623                         }       
624         }
625         }
626         */
627 }       
628
629 /*Handle a message from one element for the reduction*/
630 void CkReductionMgr::addContribution(CkReductionMsg *m)
631 {
632   if (isPast(m->redNo))
633   {
634 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
635         CmiAbort("this version should not have late migrations");
636 #else
637         //We've moved on-- forward late contribution straight to root
638     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
639         // if (!hasParent()) //Root moved on too soon-- should never happen
640         //   CkAbort("Late reduction contribution received at root!\n");
641     thisProxy[0].LateMigrantMsg(m);
642 #endif
643   }
644   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
645     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
646     futureMsgs.enq(m);
647   } else {// An ordinary contribution
648     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
649    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
650     startReduction(m->redNo,CkMyPe());
651     msgs.enq(m);
652     nContrib++;
653     finishReduction();
654   }
655 }
656
657 /**function checks if it has got all contributions that it is supposed to
658 get at this processor. If it is done it sends the reduced result to the local
659 nodegroup */
660 void CkReductionMgr::finishReduction(void)
661 {
662   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
663   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
664   if ((!inProgress) || creating){
665         DEBR((AA"Either not in Progress or creating\n"AB));
666         return;
667   }
668   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
669 #if (defined(_FAULT_CAUSAL_))
670         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
671         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
672                 return;//Need more local messages
673         }
674 #else
675   if (nContrib<(lcount+adj(redNo).lcount)){
676          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
677          return;//Need more local messages
678   }
679 #endif
680
681 #if GROUP_LEVEL_REDUCTION
682   if (nRemote<treeKids())  return;//Need more remote messages
683         
684 #endif
685  
686   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
687   CkReductionMsg *result=reduceMessages();
688   result->redNo=redNo;
689 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
690
691 #if GROUP_LEVEL_REDUCTION
692   if (hasParent())
693   {//Pass data up tree to parent
694     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
695     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
696 #if (defined(_FAULT_CAUSAL_))
697     result->gcount+=gcount+adj(redNo).gcount;
698 #else
699     result->gcount+=gcount+adj(redNo).gcount;
700 #endif
701     thisProxy[treeParent()].RecvMsg(result);
702   }
703   else 
704   {//We are root-- pass data to client
705     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
706     int totalElements=result->gcount+gcount+adj(redNo).gcount;
707     if (totalElements>result->nSources()) 
708     {
709       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
710       msgs.enq(result);
711       return; // Wait for migrants to contribute
712     } else if (totalElements<result->nSources()) {
713       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
714       CkAbort("ERROR! Too many contributions at root!\n");
715     }
716     DEBR((AA"Passing result to client function\n"AB));
717     CkSetRefNum(result, result->getUserFlag());
718     if (!result->callback.isInvalid())
719             result->callback.send(result);
720     else if (!storedCallback.isInvalid())
721             storedCallback.send(result);
722     else
723             CkAbort("No reduction client!\n"
724                     "You must register a client with either SetReductionClient or during contribute.\n");
725   }
726
727 #else
728   result->gcount+=gcount+adj(redNo).gcount;
729
730   result->secondaryCallback = result->callback;
731   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
732         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
733
734   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
735
736  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
737   
738   // Find our node reduction manager, and pass reduction to him:
739   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
740   nodeMgr->contributeArrayReduction(result);
741 #endif
742 #else                // _FAULT_MLOG_
743   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
744
745     CkSetRefNum(result, result->getUserFlag());
746     if (!result->callback.isInvalid())
747         result->callback.send(result);
748     else if (!storedCallback.isInvalid())
749         storedCallback.send(result);
750     else{
751       DEBR(("No reduction client for group %d \n",thisgroup.idx));
752         CkAbort("No reduction client!\n"
753             "You must register a client with either SetReductionClient or during contribute.\n");
754     }
755
756     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
757
758 #endif               // _FAULT_MLOG_
759
760   //House Keeping Operations will have to check later what needs to be changed
761   redNo++;
762   //Shift the count adjustment vector down one slot (to match new redNo)
763   int i;
764 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
765     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
766   if(CkMyPe()!=0)
767 #endif
768   {
769         completedRedNo++;
770         for (i=1;i<(int)(adjVec.length());i++){
771            adjVec[i-1]=adjVec[i];
772         }
773         adjVec.length()--;  
774   }
775
776   inProgress=CmiFalse;
777   startRequested=CmiFalse;
778   nRemote=nContrib=0;
779
780   //Look through the future queue for messages we can now handle
781   int n=futureMsgs.length();
782   for (i=0;i<n;i++)
783   {
784     CkReductionMsg *m=futureMsgs.deq();
785     if (m!=NULL) //One of these addContributions may have finished us.
786       addContribution(m);//<- if *still* early, puts it back in the queue
787   }
788 #if GROUP_LEVEL_REDUCTION
789   n=futureRemoteMsgs.length();
790   for (i=0;i<n;i++)
791   {
792     CkReductionMsg *m=futureRemoteMsgs.deq();
793     if (m!=NULL) {
794       RecvMsg(m);//<- if *still* early, puts it back in the queue
795     }
796   }
797 #endif
798
799 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
800   if(maxStartRequest >= redNo){
801           startReduction(redNo,CkMyPe());
802           finishReduction();
803   }
804  
805 #endif
806
807 }
808
809 //Sent up the reduction tree with reduced data
810   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
811 {
812 #if GROUP_LEVEL_REDUCTION
813 #if CMK_BIGSIM_CHARM
814   _TRACE_BG_TLINE_END(&m->log);
815 #endif
816   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
817     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
818     startReduction(m->redNo, CkMyPe());
819     msgs.enq(m);
820     nRemote++;
821     finishReduction();
822   }
823   else if (isFuture(m->redNo)) {
824     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
825     futureRemoteMsgs.enq(m);
826   } 
827   else CkAbort("Recv'd late remote contribution!\n");
828 #endif
829 }
830
831 //////////// Reduction Manager Utilities /////////////
832
833 //Return the countAdjustment struct for the given redNo:
834 countAdjustment &CkReductionMgr::adj(int number)
835 {
836   number-=completedRedNo;
837   number--;
838   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
839   //Pad the adjustment vector with zeros until it's at least number long
840   while ((int)(adjVec.length())<=number)
841     adjVec.push_back(countAdjustment());
842   return adjVec[number];
843 }
844
845 //Combine (& free) the current message vector msgs.
846 CkReductionMsg *CkReductionMgr::reduceMessages(void)
847 {
848 #if CMK_BIGSIM_CHARM
849   _TRACE_BG_END_EXECUTE(1);
850   void* _bgParentLog = NULL;
851   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
852 #endif
853   CkReductionMsg *ret=NULL;
854
855   //Look through the vector for a valid reducer, swapping out placeholder messages
856   CkReduction::reducerType r=CkReduction::invalid;
857   int msgs_gcount=0;//Reduced gcount
858   int msgs_nSources=0;//Reduced nSources
859   int msgs_userFlag=-1;
860   CkCallback msgs_callback;
861   int i;
862   int nMsgs=0;
863   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
864   CkReductionMsg *m;
865   bool isMigratableContributor;
866
867   // Copy message queue into msgArr, skipping placeholders:
868   while (NULL!=(m=msgs.deq()))
869   {
870     msgs_gcount+=m->gcount;
871     if (m->sourceFlag!=0)
872     { //This is a real message from an element, not just a placeholder
873       msgArr[nMsgs++]=m;
874       msgs_nSources+=m->nSources();
875       r=m->reducer;
876       if (!m->callback.isInvalid())
877         msgs_callback=m->callback;
878       if (m->userFlag!=-1)
879         msgs_userFlag=m->userFlag;
880                         
881         isMigratableContributor=m->isMigratableContributor();
882 #if CMK_BIGSIM_CHARM
883         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
884 #endif
885     }
886     else
887     { //This is just a placeholder message-- forget it
888       delete m;
889     }
890   }
891
892   if (nMsgs==0||r==CkReduction::invalid)
893   //No valid reducer in the whole vector
894     ret=CkReductionMsg::buildNew(0,NULL);
895   else
896   {//Use the reducer to reduce the messages
897                 //if there is only one msg to be reduced just return that message
898     if(nMsgs == 1){
899         ret = msgArr[0];        
900     }else{
901         CkReduction::reducerFn f=CkReduction::reducerTable[r];
902         ret=(*f)(nMsgs,msgArr);
903     }
904     ret->reducer=r;
905   }
906
907
908
909 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
910
911 #if CRITICAL_PATH_DEBUG > 3
912   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
913 #endif
914
915   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
916   path.updateMax(UsrToEnv(ret));
917   // Combine the critical paths from all the reduction messages
918   for (i=0;i<nMsgs;i++){
919     if (msgArr[i]!=ret){
920       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
921       path.updateMax(UsrToEnv(msgArr[i]));
922     }
923   }
924   
925
926 #if CRITICAL_PATH_DEBUG > 3
927   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
928 #endif
929   
930   PathHistoryTableEntry tableEntry(path);
931   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
932   
933 #endif
934
935         //Go back through the vector, deleting old messages
936   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
937   delete [] msgArr;
938
939   //Set the message counts
940   ret->redNo=redNo;
941   ret->gcount=msgs_gcount;
942   ret->userFlag=msgs_userFlag;
943   ret->callback=msgs_callback;
944   ret->sourceFlag=msgs_nSources;
945         ret->setMigratableContributor(isMigratableContributor);
946   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
947
948   return ret;
949 }
950
951
952 //Checkpointing utilities
953 //pack-unpack method for CkReductionMsg
954 //if packing pack the message and then unpack and return it
955 //if unpacking allocate memory for it read it off disk and then unapck
956 //and return it
957 void CkReductionMgr::pup(PUP::er &p)
958 {
959 //We do not store the client function pointer or the client function parameter,
960 //it is the responsibility of the programmer to correctly restore these
961   CkGroupInitCallback::pup(p);
962   p(redNo);
963   p(completedRedNo);
964   p(inProgress); p(creating); p(startRequested);
965   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
966   p|msgs;
967   p|futureMsgs;
968   p|futureRemoteMsgs;
969   p|finalMsgs;
970   p|adjVec;
971   p|nodeProxy;
972   p|storedCallback;
973     // handle CkReductionClientBundle
974   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
975   {
976     CkReductionClientBundle *bd;
977     if (p.isUnpacking()) 
978       bd = new CkReductionClientBundle;
979     else
980       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
981     p|*bd;
982     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
983   }
984
985   // subtle --- Gengbin
986   // Group : CkReductionMgr
987   // CkArray: CkReductionMgr
988   // lcount/gcount in Group is set in Group constructor
989   // lcount/gcount in CkArray is not, it is set when array elements are created
990   // we can not pup because inserting array elems will add the counters again
991 //  p|lcount;
992 //  p|gcount;
993 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
994 //  p|lcount;
995 //  //  p|gcount;
996 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
997 #endif
998   if(p.isUnpacking()){
999     thisProxy = thisgroup;
1000     maxStartRequest=0;
1001 #if GROUP_LEVEL_REDUCTION
1002 #ifdef BINOMIAL_TREE
1003     init_BinomialTree();
1004 #else
1005     init_BinaryTree();
1006 #endif
1007 #endif
1008   }
1009
1010   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1011 }
1012
1013
1014 //Callback for doing Reduction through NodeGroups added by Sayantan
1015
1016 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1017
1018         int total = m->gcount+adj(m->redNo).gcount;
1019         finalMsgs.enq(m);
1020         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1021         adj(m->redNo).mainRecvd = 1;
1022         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1023         endArrayReduction();
1024 }
1025
1026 void CkReductionMgr :: endArrayReduction(){
1027         CkReductionMsg *ret=NULL;
1028         int nMsgs=finalMsgs.length();
1029         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1030         //Look through the vector for a valid reducer, swapping out placeholder messages
1031         //CkPrintf("Length of Final Message %d \n",nMsgs);
1032         CkReduction::reducerType r=CkReduction::invalid;
1033         int msgs_gcount=0;//Reduced gcount
1034         int msgs_nSources=0;//Reduced nSources
1035         int msgs_userFlag=-1;
1036         CkCallback msgs_callback;
1037         CkCallback msgs_secondaryCallback;
1038         CkVec<CkReductionMsg *> tempMsgs;
1039         int i;
1040         int numMsgs = 0;
1041         for (i=0;i<nMsgs;i++)
1042         {
1043                 CkReductionMsg *m=finalMsgs.deq();
1044                 if(m->redNo == completedRedNo +1){
1045                         msgs_gcount+=m->gcount;
1046                         if (m->sourceFlag!=0)
1047                         { //This is a real message from an element, not just a placeholder
1048                                 msgs_nSources+=m->nSources();
1049                                 r=m->reducer;
1050                                 if (!m->callback.isInvalid())
1051                                 msgs_callback=m->callback;
1052                                 if(!m->secondaryCallback.isInvalid())
1053                                         msgs_secondaryCallback = m->secondaryCallback;
1054                                 if (m->userFlag!=-1)
1055                                         msgs_userFlag=m->userFlag;
1056                                 tempMsgs.push_back(m);
1057                         }
1058                 }else{
1059                         finalMsgs.enq(m);
1060                 }
1061
1062         }
1063         numMsgs = tempMsgs.length();
1064
1065         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1066
1067         if(numMsgs == 0){
1068                 return;
1069         }
1070         if(adj(completedRedNo+1).mainRecvd == 0){
1071                 for(i=0;i<numMsgs;i++){
1072                         finalMsgs.enq(tempMsgs[i]);
1073                 }
1074                 return;
1075         }
1076
1077 /*
1078         NOT NEEDED ANYMORE DONE at nodegroup level
1079         if(msgs_gcount  > msgs_nSources){
1080                 for(i=0;i<numMsgs;i++){
1081                         finalMsgs.enq(tempMsgs[i]);
1082                 }
1083                 return;
1084         }*/
1085
1086         if (nMsgs==0||r==CkReduction::invalid)
1087                 //No valid reducer in the whole vector
1088                 ret=CkReductionMsg::buildNew(0,NULL);
1089         else{//Use the reducer to reduce the messages
1090                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1091                 // has to be corrected elements from above need to be put into a temporary vector
1092                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1093                 ret=(*f)(numMsgs,msgArr);
1094                 ret->reducer=r;
1095
1096         }
1097
1098         
1099 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1100
1101 #if CRITICAL_PATH_DEBUG > 3
1102         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1103 #endif
1104
1105         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1106         path.updateMax(UsrToEnv(ret));
1107         // Combine the critical paths from all the reduction messages into the header for the new result
1108         for (i=0;i<numMsgs;i++){
1109           if (tempMsgs[i]!=ret){
1110             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1111             path.updateMax(UsrToEnv(tempMsgs[i]));
1112           } else {
1113             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1114           }
1115         }
1116         // Also consider the path which got us into this entry method
1117
1118 #if CRITICAL_PATH_DEBUG > 3
1119         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1120 #endif
1121
1122 #endif
1123   
1124
1125
1126
1127
1128         for(i = 0;i<numMsgs;i++){
1129                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1130         }
1131
1132         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1133         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1134
1135         ret->redNo=completedRedNo+1;
1136         ret->gcount=msgs_gcount;
1137         ret->userFlag=msgs_userFlag;
1138         ret->callback=msgs_callback;
1139         ret->secondaryCallback = msgs_secondaryCallback;
1140         ret->sourceFlag=msgs_nSources;
1141
1142         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1143
1144         CkSetRefNum(ret, ret->getUserFlag());
1145         if (!ret->secondaryCallback.isInvalid())
1146             ret->secondaryCallback.send(ret);
1147     else if (!storedCallback.isInvalid())
1148             storedCallback.send(ret);
1149     else{
1150       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1151             CkAbort("No reduction client!\n"
1152                     "You must register a client with either SetReductionClient or during contribute.\n");
1153     }
1154         completedRedNo++;
1155
1156         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1157
1158         for (i=1;i<(int)(adjVec.length());i++)
1159                 adjVec[i-1]=adjVec[i];
1160         adjVec.length()--;
1161         endArrayReduction();
1162 }
1163
1164 #if GROUP_LEVEL_REDUCTION
1165
1166 void CkReductionMgr::init_BinaryTree(){
1167         parent = (CkMyPe()-1)/TREE_WID;
1168         int firstkid = CkMyPe()*TREE_WID+1;
1169         numKids=CkNumPes()-firstkid;
1170         if (numKids>TREE_WID) numKids=TREE_WID;
1171         if (numKids<0) numKids=0;
1172
1173         for(int i=0;i<numKids;i++){
1174                 kids.push_back(firstkid+i);
1175                 newKids.push_back(firstkid+i);
1176         }
1177 }
1178
1179 void CkReductionMgr::init_BinomialTree(){
1180         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1181         /*upperSize = (unsigned )pow((double)2,depth);*/
1182         upperSize = (unsigned) 1 << depth;
1183         label = upperSize-CkMyPe()-1;
1184         int p=label;
1185         int count=0;
1186         while( p > 0){
1187                 if(p % 2 == 0)
1188                         break;
1189                 else{
1190                         p = p/2;
1191                         count++;
1192                 }
1193         }
1194         /*parent = label + rint(pow((double)2,count));*/
1195         parent = label + (1<<count);
1196         parent = upperSize -1 -parent;
1197         int temp;
1198         if(count != 0){
1199                 numKids = 0;
1200                 for(int i=0;i<count;i++){
1201                         /*temp = label - rint(pow((double)2,i));*/
1202                         temp = label - (1<<i);
1203                         temp = upperSize-1-temp;
1204                         if(temp <= CkNumPes()-1){
1205                                 kids.push_back(temp);
1206                                 numKids++;
1207                         }
1208                 }
1209         }else{
1210                 numKids = 0;
1211         //      kids = NULL;
1212         }
1213 }
1214
1215
1216 int CkReductionMgr::treeRoot(void)
1217 {
1218   return 0;
1219 }
1220 CmiBool CkReductionMgr::hasParent(void) //Root Node
1221 {
1222   return (CmiBool)(CkMyPe()!=treeRoot());
1223 }
1224 int CkReductionMgr::treeParent(void) //My parent Node
1225 {
1226   return parent;
1227 }
1228
1229 int CkReductionMgr::firstKid(void) //My first child Node
1230 {
1231   return CkMyPe()*TREE_WID+1;
1232 }
1233 int CkReductionMgr::treeKids(void)//Number of children in tree
1234 {
1235   return numKids;
1236 }
1237
1238 #endif
1239
1240 /////////////////////////////////////////////////////////////////////////
1241
1242 ////////////////////////////////
1243 //CkReductionMsg support
1244
1245 //ReductionMessage default private constructor-- does nothing
1246 CkReductionMsg::CkReductionMsg(){}
1247 CkReductionMsg::~CkReductionMsg(){}
1248
1249 //This define gives the distance from the start of the CkReductionMsg
1250 // object to the start of the user data area (just below last object field)
1251 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1252
1253 //"Constructor"-- builds and returns a new CkReductionMsg.
1254 //  the "data" array you specify will be copied into this object.
1255 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1256     CkReduction::reducerType reducer, CkReductionMsg *buf)
1257 {
1258   int len[1];
1259   len[0]=NdataSize;
1260   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1261   
1262   ret->dataSize=NdataSize;
1263   if (srcData!=NULL && !buf)
1264     memcpy(ret->data,srcData,NdataSize);
1265   ret->userFlag=-1;
1266   ret->reducer=reducer;
1267   //ret->ci=NULL;
1268   ret->sourceFlag=-1000;
1269   ret->gcount=0;
1270   ret->migratableContributor = true;
1271 #if CMK_BIGSIM_CHARM
1272   ret->log = NULL;
1273 #endif
1274   return ret;
1275 }
1276
1277 // Charm kernel message runtime support:
1278 void *
1279 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1280 {
1281   int totalsize=ARM_DATASTART+(*sz);
1282   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1283   CkReductionMsg *ret = (CkReductionMsg *)
1284     CkAllocMsg(msgnum,totalsize,priobits);
1285   ret->data=(void *)(&ret->dataStorage);
1286   return (void *) ret;
1287 }
1288
1289 void *
1290 CkReductionMsg::pack(CkReductionMsg* in)
1291 {
1292   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1293   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1294   in->data = NULL;
1295   return (void*) in;
1296 }
1297
1298 CkReductionMsg* CkReductionMsg::unpack(void *in)
1299 {
1300   CkReductionMsg *ret = (CkReductionMsg *)in;
1301   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1302   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1303   ret->data=(void *)(&ret->dataStorage);
1304   return ret;
1305 }
1306
1307
1308 /////////////////////////////////////////////////////////////////////////////////////
1309 ///////////////// Builtin Reducer Functions //////////////
1310 /* A simple reducer, like sum_int, looks like this:
1311 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1312 {
1313   int i,ret=0;
1314   for (i=0;i<nMsg;i++)
1315     ret+=*(int *)(msg[i]->getData());
1316   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1317 }
1318
1319 To keep the code small and easy to change, the implementations below
1320 are built with preprocessor macros.
1321 */
1322
1323 //////////////// simple reducers ///////////////////
1324 /*A define used to quickly and tersely construct simple reductions.
1325 The basic idea is to use the first message's data array as
1326 (pre-initialized!) scratch space for folding in the other messages.
1327  */
1328
1329 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1330 {
1331         CkAbort("Called the invalid reducer type 0.  This probably\n"
1332                 "means you forgot to initialize your custom reducer index.\n");
1333         return NULL;
1334 }
1335
1336 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1337 {
1338   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1339 }
1340
1341 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1342 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1343 {\
1344   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1345   int m,i;\
1346   int nElem=msg[0]->getLength()/sizeof(dataType);\
1347   dataType *ret=(dataType *)(msg[0]->getData());\
1348   for (m=1;m<nMsg;m++)\
1349   {\
1350     dataType *value=(dataType *)(msg[m]->getData());\
1351     for (i=0;i<nElem;i++)\
1352     {\
1353       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1354       loop\
1355     }\
1356   }\
1357   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1358   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1359 }
1360
1361 //Use this macro for reductions that have the same type for all inputs
1362 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1363   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1364   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1365   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1366   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1367
1368
1369 //Compute the sum the numbers passed by each element.
1370 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1371
1372 //Compute the product of the numbers passed by each element.
1373 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1374
1375 //Compute the largest number passed by any element.
1376 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1377
1378 //Compute the smallest integer passed by any element.
1379 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1380
1381
1382 //Compute the logical AND of the integers passed by each element.
1383 // The resulting integer will be zero if any source integer is zero; else 1.
1384 SIMPLE_REDUCTION(logical_and,int,"%d",
1385         if (value[i]==0)
1386      ret[i]=0;
1387   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1388 )
1389
1390 //Compute the logical OR of the integers passed by each element.
1391 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1392 SIMPLE_REDUCTION(logical_or,int,"%d",
1393   if (value[i]!=0)
1394            ret[i]=1;
1395   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1396 )
1397
1398 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1399 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1400
1401 //Select one random message to pass on
1402 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1403   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1404 }
1405
1406 /////////////// concat ////////////////
1407 /*
1408 This reducer simply appends the data it recieves from each element,
1409 without any housekeeping data to separate them.
1410 */
1411 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1412 {
1413   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1414   //Figure out how big a message we'll need
1415   int i,retSize=0;
1416   for (i=0;i<nMsg;i++)
1417       retSize+=msg[i]->getSize();
1418
1419   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1420
1421   //Allocate a new message
1422   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1423
1424   //Copy the source message data into the return message
1425   char *cur=(char *)(ret->getData());
1426   for (i=0;i<nMsg;i++) {
1427     int messageBytes=msg[i]->getSize();
1428     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1429     cur+=messageBytes;
1430   }
1431   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1432   return ret;
1433 }
1434
1435 /////////////// set ////////////////
1436 /*
1437 This reducer appends the data it recieves from each element
1438 along with some housekeeping data indicating contribution boundaries.
1439 The message data is thus a list of reduction_set_element structures
1440 terminated by a dummy reduction_set_element with a sourceElement of -1.
1441 */
1442
1443 //This rounds an integer up to the nearest multiple of sizeof(double)
1444 static const int alignSize=sizeof(double);
1445 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1446
1447 //This gives the size (in bytes) of a reduction_set_element
1448 static int SET_SIZE(int dataSize)
1449 {return SET_ALIGN(sizeof(int)+dataSize);}
1450
1451 //This returns a pointer to the next reduction_set_element in the list
1452 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1453 {
1454   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1455   return (CkReduction::setElement *)next;
1456 }
1457
1458 //Combine the data passed by each element into an list of reduction_set_elements.
1459 // Each element may contribute arbitrary data (with arbitrary length).
1460 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1461 {
1462   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1463   //Figure out how big a message we'll need
1464   int i,retSize=0;
1465   for (i=0;i<nMsg;i++) {
1466     if (!msg[i]->isFromUser())
1467     //This message is composite-- it will just be copied over (less terminating -1)
1468       retSize+=(msg[i]->getSize()-sizeof(int));
1469     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1470       retSize+=SET_SIZE(msg[i]->getSize());
1471   }
1472   retSize+=sizeof(int);//Leave room for terminating -1.
1473
1474   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1475
1476   //Allocate a new message
1477   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1478
1479   //Copy the source message data into the return message
1480   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1481   for (i=0;i<nMsg;i++)
1482     if (!msg[i]->isFromUser())
1483     {//This message is composite-- just copy it over (less terminating -1)
1484                         int messageBytes=msg[i]->getSize()-sizeof(int);
1485                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1486                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1487                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1488     }
1489     else //This is a message from an element-- wrap it in a reduction_set_element
1490     {
1491       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1492       cur->dataSize=msg[i]->getSize();
1493       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1494       cur=SET_NEXT(cur);
1495     }
1496   cur->dataSize=-1;//Add a terminating -1.
1497   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1498   return ret;
1499 }
1500
1501 //Utility routine: get the next reduction_set_element in the list
1502 // if there is one, or return NULL if there are none.
1503 //To get all the elements, just keep feeding this procedure's output back to
1504 // its input until it returns NULL.
1505 CkReduction::setElement *CkReduction::setElement::next(void)
1506 {
1507   CkReduction::setElement *n=SET_NEXT(this);
1508   if (n->dataSize==-1)
1509     return NULL;//This is the end of the list
1510   else
1511     return n;//This is just another element
1512 }
1513
1514 /////////////////// Reduction Function Table /////////////////////
1515 CkReduction::CkReduction() {} //Dummy private constructor
1516
1517 //Add the given reducer to the list.  Returns the new reducer's
1518 // reducerType.  Must be called in the same order on every node.
1519 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1520 {
1521   reducerTable[nReducers]=fn;
1522   return (reducerType)nReducers++;
1523 }
1524
1525 /*Reducer table: maps reducerTypes to reducerFns.
1526 It's indexed by reducerType, so the order in this table
1527 must *exactly* match the reducerType enum declaration.
1528 The names don't have to match, but it helps.
1529 */
1530 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1531
1532 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1533     ::invalid_reducer,
1534     ::nop,
1535   //Compute the sum the numbers passed by each element.
1536     ::sum_int,::sum_long,::sum_float,::sum_double,
1537
1538   //Compute the product the numbers passed by each element.
1539     ::product_int,::product_long,::product_float,::product_double,
1540
1541   //Compute the largest number passed by any element.
1542     ::max_int,::max_long,::max_float,::max_double,
1543
1544   //Compute the smallest number passed by any element.
1545     ::min_int,::min_long,::min_float,::min_double,
1546
1547   //Compute the logical AND of the integers passed by each element.
1548   // The resulting integer will be zero if any source integer is zero.
1549     ::logical_and,
1550
1551   //Compute the logical OR of the integers passed by each element.
1552   // The resulting integer will be 1 if any source integer is nonzero.
1553     ::logical_or,
1554
1555     // Compute the logical bitvector AND of the integers passed by each element.
1556     ::bitvec_and,
1557
1558     // Compute the logical bitvector OR of the integers passed by each element.
1559     ::bitvec_or,
1560
1561     // Select one of the messages at random to pass on
1562     ::random,
1563
1564   //Concatenate the (arbitrary) data passed by each element
1565     ::concat,
1566
1567   //Combine the data passed by each element into an list of setElements.
1568   // Each element may contribute arbitrary data (with arbitrary length).
1569     ::set
1570 };
1571
1572
1573
1574
1575
1576
1577
1578
1579 /********** Code added by Sayantan *********************/
1580 /** Locking is a big problem in the nodegroup code for smp.
1581  So a few assumptions have had to be made. There is one lock
1582  called lockEverything. It protects all the data structures 
1583  of the nodegroup reduction mgr. I tried grabbing it separately 
1584  for each datastructure, modifying it and then releasing it and
1585  then grabbing it again, for the next change.
1586  That doesn't really help because the interleaved execution of 
1587  different threads makes the state of the reduction manager 
1588  inconsistent. 
1589  
1590  1. Grab lockEverything before calling finishreduction or startReduction
1591     or doRecvMsg
1592  2. lockEverything is grabbed only in entry methods reductionStarting
1593     or RecvMesg or  addcontribution.
1594  ****/
1595  
1596 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1597 NodeGroup::NodeGroup(void) {
1598   __nodelock=CmiCreateLock();
1599 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1600     mlogData->objID.type = TypeNodeGroup;
1601     mlogData->objID.data.group.onPE = CkMyNode();
1602 #endif
1603
1604 }
1605 NodeGroup::~NodeGroup() {
1606   CmiDestroyLock(__nodelock);
1607 }
1608 void NodeGroup::pup(PUP::er &p)
1609 {
1610   CkNodeReductionMgr::pup(p);
1611   p|reductionInfo;
1612 }
1613
1614 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1615
1616 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1617   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1618  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1619   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1620  }
1621
1622 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1623                                     ((CkNodeReductionMgr *)this),
1624                                     reductionInfo,false)
1625
1626 /* this contribute also adds up the count across all messages it receives.
1627   Useful for summing up number of array elements who have contributed ****/ 
1628 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1629         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1630
1631
1632
1633 //#define BINOMIAL_TREE
1634
1635 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1636   : thisProxy(thisgroup)
1637 {
1638 #ifdef BINOMIAL_TREE
1639   init_BinomialTree();
1640 #else
1641   init_BinaryTree();
1642 #endif
1643   storedCallback=NULL;
1644   redNo=0;
1645   inProgress=CmiFalse;
1646   
1647   startRequested=CmiFalse;
1648   gcount=CkNumNodes();
1649   lcount=1;
1650   nContrib=nRemote=0;
1651   lockEverything = CmiCreateLock();
1652
1653
1654   creating=CmiFalse;
1655   interrupt = 0;
1656   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1657         /*
1658                 FAULT_EVAC
1659         */
1660         blocked = false;
1661         maxModificationRedNo = INT_MAX;
1662         killed=0;
1663         additionalGCount = newAdditionalGCount = 0;
1664 }
1665
1666 void CkNodeReductionMgr::flushStates()
1667 {
1668  if(CkMyRank() == 0){
1669   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1670   redNo=0;
1671   inProgress=CmiFalse;
1672
1673   startRequested=CmiFalse;
1674   gcount=CkNumNodes();
1675   lcount=1;
1676   nContrib=nRemote=0;
1677
1678   creating=CmiFalse;
1679   interrupt = 0;
1680   while (!msgs.isEmpty()) { delete msgs.deq(); }
1681   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1682   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1683   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1684   }
1685 }
1686
1687 //////////// Reduction Manager Client API /////////////
1688
1689 //Add the given client function.  Overwrites any previous client.
1690 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1691 {
1692   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1693   if(cb->isInvalid()){
1694         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1695   }else{
1696         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1697   }
1698
1699   if (CkMyNode()!=0)
1700           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1701   delete storedCallback;
1702   storedCallback=cb;
1703 }
1704
1705
1706
1707 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1708 {
1709 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1710     Chare *oldObj =CpvAccess(_currentObj);
1711     CpvAccess(_currentObj) = this;
1712 #endif
1713
1714   //m->ci=ci;
1715   m->redNo=ci->redNo++;
1716   m->sourceFlag=-1;//A single contribution
1717   m->gcount=0;
1718   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1719   addContribution(m);
1720
1721 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1722     CpvAccess(_currentObj) = oldObj;
1723 #endif
1724 }
1725
1726
1727 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1728 {
1729 #if CMK_BIGSIM_CHARM
1730   _TRACE_BG_TLINE_END(&m->log);
1731 #endif
1732 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1733     Chare *oldObj =CpvAccess(_currentObj);
1734     CpvAccess(_currentObj) = this;
1735 #endif
1736   //m->ci=ci;
1737   m->redNo=ci->redNo++;
1738   m->gcount=count;
1739   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1740   addContribution(m);
1741   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1742
1743 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1744     CpvAccess(_currentObj) = oldObj;
1745 #endif
1746 }
1747
1748
1749 //////////// Reduction Manager Remote Entry Points /////////////
1750
1751 //Sent down the reduction tree (used by barren PEs)
1752 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1753 {
1754   CmiLock(lockEverything);
1755   if(blocked){
1756         delete m;
1757         CmiUnlock(lockEverything);
1758         return ;
1759   }
1760   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1761   if (isPresent(m->num) && !inProgress)
1762   {
1763     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1764     startReduction(m->num,srcNode);
1765     finishReduction();
1766   } else if (isFuture(m->num)){
1767         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1768   }
1769   else //is Past
1770     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1771   CmiUnlock(lockEverything);
1772   delete m;
1773
1774 }
1775
1776
1777 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1778         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1779         /*
1780                 FAULT_EVAC
1781         */
1782         if(blocked){
1783                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1784                 bufferedRemoteMsgs.enq(m);
1785                 return;
1786         }
1787         
1788         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1789             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1790             startReduction(m->redNo,CkMyNode());
1791             msgs.enq(m);
1792             nRemote++;
1793             finishReduction();
1794         }
1795         else {
1796             if (isFuture(m->redNo)) {
1797                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1798                     futureRemoteMsgs.enq(m);
1799             }else{
1800                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1801                    CkAbort("Recv'd late remote contribution!\n");
1802             }
1803         }
1804         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1805 }
1806
1807 //Sent up the reduction tree with reduced data
1808 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1809 {
1810 #if CMK_BIGSIM_CHARM
1811   _TRACE_BG_TLINE_END(&m->log);
1812 #endif
1813 #ifndef CMK_CPV_IS_SMP
1814 #if CMK_IMMEDIATE_MSG
1815         if(interrupt == 1){
1816                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1817                 CpvAccess(_qd)->process(-1);
1818                 CmiDelayImmediate();
1819                 return;
1820         }
1821 #endif  
1822 #endif
1823    interrupt = 1;       
1824    CmiLock(lockEverything);   
1825    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1826    doRecvMsg(m);
1827    CmiUnlock(lockEverything);    
1828    interrupt = 0;
1829    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1830 }
1831
1832 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1833 {
1834         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1835         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1836         if (inProgress){
1837                 DEBR((AA"This Node reduction is already in progress\n"AB));
1838                 return;//This reduction already started
1839         }
1840         if (creating) //Don't start yet-- we're creating elements
1841         {
1842                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1843                 startRequested=CmiTrue;
1844                 return;
1845         }
1846         
1847         //If none of these cases, we need to start the reduction--
1848         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1849         inProgress=CmiTrue;
1850
1851         if(!_isNotifyChildInRed) return;
1852
1853         //Sent start requests to our kids (in case they don't already know)
1854         
1855         for (int k=0;k<treeKids();k++)
1856         {
1857 #ifdef BINOMIAL_TREE
1858                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1859                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1860 #else
1861                 if(kids[k] != srcNode){
1862                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1863                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1864                 }       
1865 #endif
1866         }
1867
1868         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1869         // in case, nodegroup does not has the attached red group,
1870         // it has to restart groups again
1871         startLocalGroupReductions(number);
1872 /*
1873         if (startLocalGroupReductions(number) == 0)
1874           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1875 */
1876 }
1877
1878 // restart local groups until succeed
1879 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1880   CmiLock(lockEverything);    
1881   if (startLocalGroupReductions(number) == 0)
1882     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1883   CmiUnlock(lockEverything);    
1884 }
1885
1886 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1887         /*
1888                 FAULT_EVAC
1889         */
1890         if(blocked){
1891                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1892                 bufferedMsgs.enq(m);
1893                 return;
1894         }
1895         
1896         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1897                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1898                 futureMsgs.enq(m);
1899         } else {// An ordinary contribution
1900                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1901                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1902                 startReduction(m->redNo,CkMyNode());
1903                 msgs.enq(m);
1904                 nContrib++;
1905                 finishReduction();
1906         }
1907 }
1908
1909 //Handle a message from one element for the reduction
1910 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1911 {
1912   interrupt = 1;
1913   CmiLock(lockEverything);
1914   doAddContribution(m);
1915   CmiUnlock(lockEverything);
1916   interrupt = 0;
1917 }
1918
1919 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1920         if(blocked){
1921                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1922                 bufferedMsgs.enq(m);
1923                 return;
1924         }
1925         
1926         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1927                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1928 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1929                 futureLateMigrantMsgs.enq(m);
1930         } else {// An ordinary contribution
1931                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1932 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1933                 msgs.enq(m);
1934                 finishReduction();
1935         }
1936 }
1937
1938
1939
1940
1941
1942 /** check if the nodegroup reduction is finished at this node. In that case send it
1943 up the reduction tree **/
1944
1945 void CkNodeReductionMgr::finishReduction(void)
1946 {
1947   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1948   /***Check if reduction is finished in the next few ifs***/
1949   if ((!inProgress) || creating){
1950         DEBR((AA"Either not in Progress or creating\n"AB));
1951         return;
1952   }
1953
1954   if (nContrib<(lcount)){
1955         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1956
1957          return;//Need more local messages
1958   }
1959   if (nRemote<treeKids()){
1960         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1961
1962         return;//Need more remote messages
1963   }
1964   if (nRemote>treeKids()){
1965
1966           interrupt = 0;
1967            CkAbort("Nodegrp Excess remote reduction message received!\n");
1968   }
1969
1970   DEBR((AA"Reducing node data...\n"AB));
1971
1972   /**reduce all messages received at this node **/
1973   CkReductionMsg *result=reduceMessages();
1974
1975   if (hasParent())
1976   {//Pass data up tree to parent
1977         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1978         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1979         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1980                 /*
1981                         FAULT_EVAC
1982                 */
1983                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1984             thisProxy[treeParent()].RecvMsg(result);
1985         }
1986
1987   }
1988   else
1989   {
1990                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1991                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
1992                         msgs.enq(result);
1993                         return;
1994                 }
1995                 result->gcount += additionalGCount;
1996           /** if the reduction is finished and I am the root of the reduction tree
1997           then call the reductionhandler and other stuff ***/
1998                 
1999
2000                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2001     CkSetRefNum(result, result->getUserFlag());
2002     if (!result->callback.isInvalid()){
2003       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2004             result->callback.send(result);
2005     }
2006     else if (storedCallback!=NULL){
2007       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2008             storedCallback->send(result);
2009     }
2010     else{
2011                 DEBR((AA"Invalid Callback \n"AB));
2012             CkAbort("No reduction client!\n"
2013                     "You must register a client with either SetReductionClient or during contribute.\n");
2014                 }
2015   }
2016
2017   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2018   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2019   redNo++;
2020         updateTree();
2021   int i;
2022   inProgress=CmiFalse;
2023   startRequested=CmiFalse;
2024   nRemote=nContrib=0;
2025
2026   //Look through the future queue for messages we can now handle
2027   int n=futureMsgs.length();
2028
2029   for (i=0;i<n;i++)
2030   {
2031     interrupt = 1;
2032
2033     CkReductionMsg *m=futureMsgs.deq();
2034
2035     interrupt = 0;
2036     if (m!=NULL){ //One of these addContributions may have finished us.
2037       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2038       doAddContribution(m);//<- if *still* early, puts it back in the queue
2039     }
2040   }
2041
2042   interrupt = 1;
2043
2044   n=futureRemoteMsgs.length();
2045
2046   interrupt = 0;
2047   for (i=0;i<n;i++)
2048   {
2049     interrupt = 1;
2050
2051     CkReductionMsg *m=futureRemoteMsgs.deq();
2052
2053     interrupt = 0;
2054     if (m!=NULL)
2055       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2056   }
2057   
2058   n = futureLateMigrantMsgs.length();
2059   for(i=0;i<n;i++){
2060     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2061     if(m != NULL){
2062       if(m->redNo == redNo){
2063         msgs.enq(m);
2064       }else{
2065         futureLateMigrantMsgs.enq(m);
2066       }
2067     }
2068   }
2069 }
2070
2071 //////////// Reduction Manager Utilities /////////////
2072
2073 void CkNodeReductionMgr::init_BinaryTree(){
2074         parent = (CkMyNode()-1)/TREE_WID;
2075         int firstkid = CkMyNode()*TREE_WID+1;
2076         numKids=CkNumNodes()-firstkid;
2077   if (numKids>TREE_WID) numKids=TREE_WID;
2078   if (numKids<0) numKids=0;
2079
2080         for(int i=0;i<numKids;i++){
2081                 kids.push_back(firstkid+i);
2082                 newKids.push_back(firstkid+i);
2083         }
2084 }
2085
2086 void CkNodeReductionMgr::init_BinomialTree(){
2087         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2088         /*upperSize = (unsigned )pow((double)2,depth);*/
2089         upperSize = (unsigned) 1 << depth;
2090         label = upperSize-CkMyNode()-1;
2091         int p=label;
2092         int count=0;
2093         while( p > 0){
2094                 if(p % 2 == 0)
2095                         break;
2096                 else{
2097                         p = p/2;
2098                         count++;
2099                 }
2100         }
2101         /*parent = label + rint(pow((double)2,count));*/
2102         parent = label + (1<<count);
2103         parent = upperSize -1 -parent;
2104         int temp;
2105         if(count != 0){
2106                 numKids = 0;
2107                 for(int i=0;i<count;i++){
2108                         /*temp = label - rint(pow((double)2,i));*/
2109                         temp = label - (1<<i);
2110                         temp = upperSize-1-temp;
2111                         if(temp <= CkNumNodes()-1){
2112                 //              kids[numKids] = temp;
2113                                 kids.push_back(temp);
2114                                 numKids++;
2115                         }
2116                 }
2117         }else{
2118                 numKids = 0;
2119         //      kids = NULL;
2120         }
2121 }
2122
2123
2124 int CkNodeReductionMgr::treeRoot(void)
2125 {
2126   return 0;
2127 }
2128 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2129 {
2130   return (CmiBool)(CkMyNode()!=treeRoot());
2131 }
2132 int CkNodeReductionMgr::treeParent(void) //My parent Node
2133 {
2134   return parent;
2135 }
2136
2137 int CkNodeReductionMgr::firstKid(void) //My first child Node
2138 {
2139   return CkMyNode()*TREE_WID+1;
2140 }
2141 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2142 {
2143 #ifdef BINOMIAL_TREE
2144         return numKids;
2145 #else
2146 /*  int nKids=CkNumNodes()-firstKid();
2147   if (nKids>TREE_WID) nKids=TREE_WID;
2148   if (nKids<0) nKids=0;
2149   return nKids;*/
2150         return numKids;
2151 #endif
2152 }
2153
2154 //Combine (& free) the current message vector msgs.
2155 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2156 {
2157 #if CMK_BIGSIM_CHARM
2158   _TRACE_BG_END_EXECUTE(1);
2159   void* _bgParentLog = NULL;
2160   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2161 #endif
2162   CkReductionMsg *ret=NULL;
2163
2164   //Look through the vector for a valid reducer, swapping out placeholder messages
2165   CkReduction::reducerType r=CkReduction::invalid;
2166   int msgs_gcount=0;//Reduced gcount
2167   int msgs_nSources=0;//Reduced nSources
2168   int msgs_userFlag=-1;
2169   CkCallback msgs_callback;
2170   CkCallback msgs_secondaryCallback;
2171   int i;
2172   int nMsgs=0;
2173   CkReductionMsg *m;
2174   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2175   bool isMigratableContributor;
2176         
2177
2178   while(NULL!=(m=msgs.deq()))
2179   {
2180     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2181     msgs_gcount+=m->gcount;
2182     if (m->sourceFlag!=0)
2183     { //This is a real message from an element, not just a placeholder
2184       msgArr[nMsgs++]=m;
2185       msgs_nSources+=m->nSources();
2186       r=m->reducer;
2187       if (!m->callback.isInvalid())
2188         msgs_callback=m->callback;
2189       if(!m->secondaryCallback.isInvalid()){
2190         msgs_secondaryCallback = m->secondaryCallback;
2191       }
2192       if (m->userFlag!=-1)
2193         msgs_userFlag=m->userFlag;
2194
2195         isMigratableContributor= m->isMigratableContributor();
2196 #if CMK_BIGSIM_CHARM
2197       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2198 #endif
2199                                 
2200     }
2201     else
2202     { //This is just a placeholder message-- replace it
2203       delete m;
2204     }
2205   }
2206
2207   if (nMsgs==0||r==CkReduction::invalid)
2208   //No valid reducer in the whole vector
2209     ret=CkReductionMsg::buildNew(0,NULL);
2210   else
2211   {//Use the reducer to reduce the messages
2212     if(nMsgs == 1){
2213         ret = msgArr[0];
2214     }else{
2215         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2216         ret=(*f)(nMsgs,msgArr);
2217     }
2218     ret->reducer=r;
2219   }
2220
2221         
2222 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2223 #if CRITICAL_PATH_DEBUG > 3
2224         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2225 #endif
2226         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2227         path.updateMax(UsrToEnv(ret));
2228         // Combine the critical paths from all the reduction messages into the header for the new result
2229         for (i=0;i<nMsgs;i++){
2230           if (msgArr[i]!=ret){
2231             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2232             path.updateMax(UsrToEnv(msgArr[i]));
2233           } else {
2234             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2235           }
2236         }
2237 #if CRITICAL_PATH_DEBUG > 3
2238         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2239 #endif
2240
2241 #endif
2242
2243
2244         //Go back through the vector, deleting old messages
2245   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2246   delete [] msgArr;
2247   //Set the message counts
2248   ret->redNo=redNo;
2249   ret->gcount=msgs_gcount;
2250   ret->userFlag=msgs_userFlag;
2251   ret->callback=msgs_callback;
2252   ret->secondaryCallback = msgs_secondaryCallback;
2253   ret->sourceFlag=msgs_nSources;
2254   ret->setMigratableContributor(isMigratableContributor);
2255   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2256 #if CMK_BIGSIM_CHARM
2257   _TRACE_BG_TLINE_END(&ret->log);
2258 #endif
2259
2260   return ret;
2261 }
2262
2263 void CkNodeReductionMgr::pup(PUP::er &p)
2264 {
2265 //We do not store the client function pointer or the client function parameter,
2266 //it is the responsibility of the programmer to correctly restore these
2267   IrrGroup::pup(p);
2268   p(redNo);
2269   p(inProgress); p(creating); p(startRequested);
2270   p(lcount);
2271   p(nContrib); p(nRemote);
2272   p(interrupt);
2273   p|msgs;
2274   p|futureMsgs;
2275   p|futureRemoteMsgs;
2276   p|futureLateMigrantMsgs;
2277   p|parent;
2278   p|additionalGCount;
2279   p|newAdditionalGCount;
2280   if(p.isUnpacking()) {
2281     gcount=CkNumNodes();
2282     thisProxy = thisgroup;
2283     lockEverything = CmiCreateLock();
2284 #ifdef BINOMIAL_TREE
2285     init_BinomialTree();
2286 #else
2287     init_BinaryTree();
2288 #endif          
2289   }
2290   p | blocked;
2291   p | maxModificationRedNo;
2292
2293 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2294   int isnull = (storedCallback == NULL);
2295   p | isnull;
2296   if (!isnull) {
2297     if (p.isUnpacking()) {
2298       storedCallback = new CkCallback;
2299     }
2300     p|*storedCallback;
2301   }
2302 #endif
2303
2304 }
2305
2306 /*
2307         FAULT_EVAC
2308         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2309         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2310         reduction tree. 
2311 */
2312 void CkNodeReductionMgr::evacuate(){
2313         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2314         if(treeKids() == 0){
2315         /*
2316                 if the node going down is a leaf
2317         */
2318                 oldleaf=true;
2319                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2320                 /*
2321                         Need to ask parent for the reduction number that it has seen. 
2322                         Since it is a leaf, the tree does not need to be rewired. 
2323                         We reuse the oldparent type of tree modification message to get 
2324                         the parent to block and tell us about the highest reduction number it has seen.
2325                         
2326                 */
2327                 int data[2];
2328                 data[0]=CkMyNode();
2329                 data[1]=getTotalGCount()+additionalGCount;
2330                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2331                 newParent = treeParent();
2332         }else{
2333                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2334                 oldleaf= false;
2335         /*
2336                 It is not a leaf. It needs to rewire the tree around itself.
2337                 It also needs to decide on a reduction No after which the new tree will be used
2338                 Till it decides on the new tree and the redNo at which it becomes valid,
2339                 all received messages will be buffered
2340         */
2341                 newParent = kids[0];
2342                 for(int i=numKids-1;i>=0;i--){
2343                         newKids.remove(i);
2344                 }
2345                 /*
2346                         Ask everybody for the highest reduction number they have seen and
2347                         also tell them about the new tree
2348                 */
2349                 /*
2350                         Tell parent about its new child;
2351                 */
2352                 int oldParentData[2];
2353                 oldParentData[0] = CkMyNode();
2354                 oldParentData[1] = newParent;
2355                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2356
2357                 /*
2358                         Tell the other children about their new parent
2359                 */
2360                 int childrenData=newParent;
2361                 for(int i=1;i<numKids;i++){
2362                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2363                 }
2364                 
2365                 /*
2366                         Tell newParent (1st child) about its new children,
2367                         the current node and its children except the newParent
2368                 */
2369                 int *newParentData = new int[numKids+2];
2370                 for(int i=1;i<numKids;i++){
2371                         newParentData[i] = kids[i];
2372                 }
2373                 newParentData[0] = CkMyNode();
2374                 newParentData[numKids] = parent;
2375                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2376                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2377         }
2378         readyDeletion = false;
2379         blocked = true;
2380         numModificationReplies = 0;
2381         tempModificationRedNo = findMaxRedNo();
2382 }
2383
2384 /*
2385         Depending on the code, use the data to change the tree
2386         1. OLDPARENT : replace the old child with a new one
2387         2. OLDCHILDREN: replace the parent
2388         3. NEWPARENT:  add the children and change the parent
2389         4. LEAFPARENT: delete the old child
2390 */
2391
2392 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2393         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2394         int sender;
2395         newKids = kids;
2396         readyDeletion = false;
2397         newAdditionalGCount = additionalGCount;
2398         switch(code){
2399                 case OLDPARENT: 
2400                         for(int i=0;i<numKids;i++){
2401                                 if(newKids[i] == data[0]){
2402                                         newKids[i] = data[1];
2403                                         break;
2404                                 }
2405                         }
2406                         sender = data[0];
2407                         newParent = parent;
2408                         break;
2409                 case OLDCHILDREN:
2410                         newParent = data[0];
2411                         sender = parent;
2412                         break;
2413                 case NEWPARENT:
2414                         for(int i=0;i<size-2;i++){
2415                                 newKids.push_back(data[i]);
2416                         }
2417                         newParent = data[size-2];
2418                         newAdditionalGCount += data[size-1];
2419                         sender = parent;
2420                         break;
2421                 case LEAFPARENT:
2422                         for(int i=0;i<numKids;i++){
2423                                 if(newKids[i] == data[0]){
2424                                         newKids.remove(i);
2425                                         break;
2426                                 }
2427                         }
2428                         sender = data[0];
2429                         newParent = parent;
2430                         newAdditionalGCount += data[1];
2431                         break;
2432         };
2433         blocked = true;
2434         int maxRedNo = findMaxRedNo();
2435         
2436         thisProxy[sender].collectMaxRedNo(maxRedNo);
2437 }
2438
2439 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2440         /*
2441                 Find out the maximum redNo that has been seen by 
2442                 the affected nodes
2443         */
2444         numModificationReplies++;
2445         if(maxRedNo > tempModificationRedNo){
2446                 tempModificationRedNo = maxRedNo;
2447         }
2448         if(numModificationReplies == numKids+1){
2449                 maxModificationRedNo = tempModificationRedNo;
2450                 /*
2451                         when all the affected nodes have replied, tell them the maximum.
2452                         Unblock yourself. deal with the buffered messages local and remote
2453                 */
2454                 if(maxModificationRedNo == -1){
2455                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2456                 }else{
2457                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2458                 }
2459                 thisProxy[parent].unblockNode(maxModificationRedNo);
2460                 for(int i=0;i<numKids;i++){
2461                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2462                 }
2463                 blocked = false;
2464                 updateTree();
2465                 clearBlockedMsgs();
2466         }
2467 }
2468
2469 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2470         maxModificationRedNo = maxRedNo;
2471         updateTree();
2472         blocked = false;
2473         clearBlockedMsgs();
2474 }
2475
2476
2477 void CkNodeReductionMgr::clearBlockedMsgs(){
2478         int len = bufferedMsgs.length();
2479         for(int i=0;i<len;i++){
2480                 CkReductionMsg *m = bufferedMsgs.deq();
2481                 doAddContribution(m);
2482         }
2483         len = bufferedRemoteMsgs.length();
2484         for(int i=0;i<len;i++){
2485                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2486                 doRecvMsg(m);
2487         }
2488
2489 }
2490 /*
2491         if the reduction number exceeds the maxModificationRedNo, change the tree
2492         to become the new one
2493 */
2494
2495 void CkNodeReductionMgr::updateTree(){
2496         if(redNo > maxModificationRedNo){
2497                 parent = newParent;
2498                 kids = newKids;
2499                 maxModificationRedNo = INT_MAX;
2500                 numKids = kids.size();
2501                 readyDeletion = true;
2502                 additionalGCount = newAdditionalGCount;
2503                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2504                 for(int i=0;i<(int)(newKids.size());i++){
2505                         DEBREVAC(("%d ",newKids[i]));
2506                 }
2507                 DEBREVAC(("\n"));
2508         //      startReduction(redNo,CkMyNode());
2509         }else{
2510                 if(maxModificationRedNo != INT_MAX){
2511                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2512                         startReduction(redNo,CkMyNode());
2513                         finishReduction();
2514                 }       
2515         }
2516 }
2517
2518
2519 void CkNodeReductionMgr::doneEvacuate(){
2520         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2521 /*      if(oldleaf){
2522                 
2523                         It used to be a leaf
2524                         Then as soon as future messages have been emptied you can 
2525                         send the parent a message telling them that they are not going
2526                         to receive anymore messages from this child
2527                 
2528                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2529                 while(futureMsgs.length() != 0){
2530                         int n = futureMsgs.length();
2531                         for(int i=0;i<n;i++){
2532                                 CkReductionMsg *m = futureMsgs.deq();
2533                                 if(isPresent(m->redNo)){
2534                                         msgs.enq(m);
2535                                 }else{
2536                                         futureMsgs.enq(m);
2537                                 }
2538                         }
2539                         CkReductionMsg *result = reduceMessages();
2540                         thisProxy[treeParent()].RecvMsg(result);
2541                         redNo++;
2542                 }
2543                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2544                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2545         }else{*/
2546                 if(readyDeletion){
2547                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2548                 }else{
2549                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2550                 }
2551 //      }
2552 }
2553
2554 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2555         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2556         for(int i=0;i<numKids;i++){
2557                 if(kids[i] == deletedChild){
2558                         kids.remove(i);
2559                         break;
2560                 }
2561         }
2562         numKids = kids.length();
2563         finishReduction();
2564 }
2565
2566 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2567         for(int i=0;i<(int)(newKids.length());i++){
2568                 if(newKids[i] == deletedChild){
2569                         newKids.remove(i);
2570                         break;
2571                 }
2572         }
2573         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2574         for(int i=0;i<(int)(newKids.size());i++){
2575                 DEBREVAC(("%d ",newKids[i]));
2576         }
2577         DEBREVAC(("\n"));
2578         finishReduction();
2579 }
2580
2581 int CkNodeReductionMgr::findMaxRedNo(){
2582         int max = redNo;
2583         for(int i=0;i<futureRemoteMsgs.length();i++){
2584                 if(futureRemoteMsgs[i]->redNo  > max){
2585                         max = futureRemoteMsgs[i]->redNo;
2586                 }
2587         }
2588         /*
2589                 if redNo is max (that is no future message) and the current reduction has not started
2590                 then tree can be changed before the reduction redNo can be started
2591         */ 
2592         if(redNo == max && msgs.length() == 0){
2593                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2594                 max--;
2595         }
2596         return max;
2597 }
2598
2599 // initnode call. check the size of reduction table
2600 void CkReductionMgr::sanitycheck()
2601 {
2602 #if CMK_ERROR_CHECKING
2603   int count = 0;
2604   while (CkReduction::reducerTable[count] != NULL) count++;
2605   CmiAssert(CkReduction::nReducers == count);
2606 #endif
2607 }
2608
2609 #include "CkReduction.def.h"