Adding support for collectives and parallel recovery.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   disableNotifyChildrenStart = CmiFalse;
208   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
209 }
210
211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
212 {
213   redNo=0;
214   completedRedNo = -1;
215   inProgress=CmiFalse;
216   creating=CmiFalse;
217   startRequested=CmiFalse;
218   gcount=lcount=0;
219   nContrib=nRemote=0;
220   maxStartRequest=0;
221   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
222 }
223
224 void CkReductionMgr::flushStates(int isgroup)
225 {
226   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
227   redNo=0;
228   completedRedNo = -1;
229   inProgress=CmiFalse;
230   creating=CmiFalse;
231   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
232   startRequested=CmiFalse;
233   nContrib=nRemote=0;
234   maxStartRequest=0;
235
236   while (!msgs.isEmpty()) { delete msgs.deq(); }
237   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
238   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
239   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
240
241   adjVec.length()=0;
242
243 #if ! GROUP_LEVEL_REDUCTION
244   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
245 #endif
246 }
247
248 //////////// Reduction Manager Client API /////////////
249
250 //Add the given client function.  Overwrites any previous client.
251 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
252 {
253   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
254
255   if (CkMyPe()!=0)
256           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
257   storedCallback=*cb;
258 #if ! GROUP_LEVEL_REDUCTION
259   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
260   nodeProxy.ckSetReductionClient(callback);
261 #endif
262 }
263
264 ///////////////////////////// Contributor ////////////////////////
265 //Contributors keep a copy of this structure:
266
267 /*Contributor migration support:
268 */
269 void contributorInfo::pup(PUP::er &p)
270 {
271   p(redNo);
272 }
273
274 ////////////////////// Contributor list maintainance: /////////////////
275 //These just set and clear the "creating" flag to prevent
276 // reductions from finishing early because not all elements
277 // have been created.
278 void CkReductionMgr::creatingContributors(void)
279 {
280   DEBR((AA"Creating contributors...\n"AB));
281   creating=CmiTrue;
282 }
283 void CkReductionMgr::doneCreatingContributors(void)
284 {
285   DEBR((AA"Done creating contributors...\n"AB));
286   creating=CmiFalse;
287   if (startRequested) startReduction(redNo,CkMyPe());
288   finishReduction();
289 }
290
291 //A new contributor will be created
292 void CkReductionMgr::contributorStamped(contributorInfo *ci)
293 {
294   DEBR((AA"Contributor %p stamped\n"AB,ci));
295   //There is another contributor
296   gcount++;
297   if (inProgress)
298   {
299     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
300     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
301   } else
302     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
303 }
304
305 //A new contributor was actually created
306 void CkReductionMgr::contributorCreated(contributorInfo *ci)
307 {
308   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
309   //We've got another contributor
310   lcount++;
311   //He may not need to contribute to some of our reductions:
312   for (int r=redNo;r<ci->redNo;r++)
313     adj(r).lcount--;//He won't be contributing to r here
314 }
315
316 /*Don't expect any more contributions from this one.
317 This is rather horrifying because we now have to make
318 sure the global element count accurately reflects all the
319 contributions the element made before it died-- these may stretch
320 far into the future.  The adj() vector is what saves us here.
321 */
322 void CkReductionMgr::contributorDied(contributorInfo *ci)
323 {
324 #if CMK_MEM_CHECKPOINT
325   // ignore from listener if it is during restart from crash
326   if (CkInRestarting()) return;
327 #endif
328   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
329   //We lost a contributor
330   gcount--;
331
332   if (ci->redNo<redNo)
333   {//Must have been migrating during reductions-- root is waiting for his
334   // contribution, which will never come.
335     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
336     for (int r=ci->redNo;r<redNo;r++)
337       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
338   }
339
340   //Add to the global count for all his future messages (wherever they are)
341   int r;
342   for (r=redNo;r<ci->redNo;r++)
343   {//He already contributed to this reduction, but won't show up in global count.
344     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
345     adj(r).gcount++;
346   }
347
348   lcount--;
349   //He's already contributed to several reductions here
350   for (r=redNo;r<ci->redNo;r++)
351     adj(r).lcount++;//He'll be contributing to r here
352
353   finishReduction();
354 }
355
356 //Migrating away (note that global count doesn't change)
357 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
358 {
359   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
360   lcount--;//We lost a local
361   //He's already contributed to several reductions here
362   for (int r=redNo;r<ci->redNo;r++)
363     adj(r).lcount++;//He'll be contributing to r here
364
365   finishReduction();
366 }
367
368 //Migrating in (note that global count doesn't change)
369 void CkReductionMgr::contributorArriving(contributorInfo *ci)
370 {
371   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
372   lcount++;//We gained a local
373 #if CMK_MEM_CHECKPOINT
374   // ignore from listener if it is during restart from crash
375   // because the ci may be old.
376   if (CkInRestarting()) return;
377 #endif
378   //He has already contributed (elsewhere) to several reductions:
379   for (int r=redNo;r<ci->redNo;r++)
380     adj(r).lcount--;//He won't be contributing to r here
381
382 }
383
384 //Contribute-- the given msg can contain any data.  The reducerType
385 // field of the message must be valid.
386 // Each contributor must contribute exactly once to the each reduction.
387 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
388 {
389 #if CMK_BIGSIM_CHARM
390   _TRACE_BG_TLINE_END(&(m->log));
391 #endif
392   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
393   //m->ci=ci;
394   m->redNo=ci->redNo++;
395   m->sourceFlag=-1;//A single contribution
396   m->gcount=0;
397
398 #if defined(_FAULT_CAUSAL_)
399
400         // if object is an immigrant recovery object, we send the contribution to the source PE
401         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
402                 //DELETE CkPrintf("[%d] Sending contribution via message %d\n",CkMyPe(),m->redNo);
403         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
404                 return;
405         }
406
407     Chare *oldObj = CpvAccess(_currentObj);
408     CpvAccess(_currentObj) = this;
409
410         // adding contribution
411         //DELETE if(CkMyPe() == 4) CkPrintf("[%d] Adding local contribution %d\n",CkMyPe(),m->redNo);
412         addContribution(m);
413
414     CpvAccess(_currentObj) = oldObj;
415 #else
416   addContribution(m);
417 #endif
418 }
419
420 #if defined(_FAULT_CAUSAL_)
421 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
422
423         //DELETE CkPrintf("[%d] Receiving contribution via message %d %d\n",CkMyPe(),m->redNo,redNo);
424         
425         // adding contribution
426     addContribution(m);
427 }
428 #else
429 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
430 #endif
431
432 //////////// Reduction Manager Remote Entry Points /////////////
433 //Sent down the reduction tree (used by barren PEs)
434 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
435 {
436  if(CkMyPe()==0){
437         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
438         //delete m;
439         //return;
440  }
441  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
442  int srcPE = (UsrToEnv(m))->getSrcPe();
443 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
444   if (isPresent(m->num) && !inProgress)
445   {
446     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
447     startReduction(m->num,srcPE);
448     finishReduction();
449   } else if (isFuture(m->num)){
450 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
451           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
452           if(maxStartRequest < m->num){
453                   maxStartRequest = m->num;
454           }
455  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
456           
457     }
458   else //is Past
459     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
460   delete m;
461 #else
462     if(redNo == 0){
463         if(lcount == 0){
464             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
465             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
466             dummy->fromPE = CmiMyPe();
467             dummy->sourceProcessorCount = 0;
468             dummy->redNo = 0;
469             thisProxy[0].contributeViaMessage(dummy);
470         }
471     }
472 #endif
473 }
474
475 //Sent to root of reduction tree with reduction contribution
476 // of migrants that missed the main reduction.
477 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
478 {
479 #if GROUP_LEVEL_REDUCTION
480 #if CMK_BIGSIM_CHARM
481   _TRACE_BG_TLINE_END(&(m->log));
482 #endif
483   addContribution(m);
484 #else
485   m->secondaryCallback = m->callback;
486   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
487   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
488   nodeMgr->LateMigrantMsg(m);
489 /*      int len = finalMsgs.length();
490         finalMsgs.enq(m);
491 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
492         endArrayReduction();*/
493 #endif
494 }
495
496 //A late migrating contributor will never contribute to this reduction
497 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
498 {
499   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
500   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
501  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
502   adj(m->num).gcount--;//He won't be contributing to this one.
503   finishReduction();
504   delete m;
505 }
506
507 //////////// Reduction Manager State /////////////
508 void CkReductionMgr::startReduction(int number,int srcPE)
509 {
510   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
511   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
512   if (inProgress){
513         DEBR((AA"This reduction is already in progress\n"AB));
514         return;//This reduction already started
515   }
516   if (creating) //Don't start yet-- we're creating elements
517   {
518     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
519     startRequested=CmiTrue;
520     return;
521   }
522
523 //If none of these cases, we need to start the reduction--
524   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
525   inProgress=CmiTrue;
526  
527
528         /*
529                 FAULT_EVAC
530         */
531   if(!CmiNodeAlive(CkMyPe())){
532         return;
533   }
534
535   if(disableNotifyChildrenStart) return;
536  
537 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_) 
538   if(CmiMyPe() == 0 && redNo == 0){
539             for(int j=0;j<CkNumPes();j++){
540                 if(j != CkMyPe() && j != srcPE){
541                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
542                 }
543             }
544             if(lcount == 0){
545                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
546                 dummy->fromPE = CmiMyPe();
547                 dummy->sourceProcessorCount = 0;
548                 dummy->redNo = 0;
549                 thisProxy[0].contributeViaMessage(dummy);
550             }
551     }   else{
552         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
553     }
554
555
556 #else
557   //Sent start requests to our kids (in case they don't already know)
558 #if GROUP_LEVEL_REDUCTION
559   for (int k=0;k<treeKids();k++)
560   {
561     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
562     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
563   }
564 #else
565   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
566 #endif
567 #endif
568         
569         /*
570   int temp;
571   //making it a broadcast done only by PE 0
572   if(!hasParent()){
573                 temp = completedRedNo+1;
574                 for(int i=temp;i<=number;i++){
575                         for(int j=0;j<CkNumPes();j++){
576                                 if(j != CkMyPe() && j != srcPE){
577                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
578                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
579                                         }
580                                 }
581                         }
582                 }       
583         }       else{
584                 temp = number;
585         }*/
586 /*  if(!hasParent()){
587                 temp = completedRedNo+1;
588         }       else{
589                 temp = number;
590         }
591         for(int i=temp;i<=number;i++){
592         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
593                 if(hasParent()){
594           // kick-start your parent too ...
595                         if(treeParent() != srcPE){
596                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
597 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
598     CpvAccess(_currentObj) = oldObj;
599 #endif
600                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
601                                 }       
602                         }       
603                 }
604           for (int k=0;k<treeKids();k++)
605           {
606                         if(firstKid()+k != srcPE){
607                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
608                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
609                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
610                                 }       
611                         }       
612         }
613         }
614         */
615 }       
616
617 /*Handle a message from one element for the reduction*/
618 void CkReductionMgr::addContribution(CkReductionMsg *m)
619 {
620   if (isPast(m->redNo))
621   {
622 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
623         CmiAbort("this version should not have late migrations");
624 #else
625         //We've moved on-- forward late contribution straight to root
626     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
627         // if (!hasParent()) //Root moved on too soon-- should never happen
628         //   CkAbort("Late reduction contribution received at root!\n");
629     thisProxy[0].LateMigrantMsg(m);
630 #endif
631   }
632   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
633     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
634     futureMsgs.enq(m);
635   } else {// An ordinary contribution
636     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
637    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
638     startReduction(m->redNo,CkMyPe());
639     msgs.enq(m);
640     nContrib++;
641     finishReduction();
642   }
643 }
644
645 /**function checks if it has got all contributions that it is supposed to
646 get at this processor. If it is done it sends the reduced result to the local
647 nodegroup */
648 void CkReductionMgr::finishReduction(void)
649 {
650   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
651   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
652   if ((!inProgress) || creating){
653         DEBR((AA"Either not in Progress or creating\n"AB));
654         return;
655   }
656   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
657 #if (defined(_FAULT_CAUSAL_))
658         //DELETE CkPrintf("[%d] finishReduction %d %d %d %d\n",CkMyPe(),nContrib,(lcount+adj(redNo).lcount),nRemote,treeKids()); 
659         //CODING
660         if (nContrib<(lcount+adj(redNo).lcount)-CpvAccess(_numImmigrantRecObjs)){
661         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
662                 return;//Need more local messages
663         }
664 #else
665   if (nContrib<(lcount+adj(redNo).lcount)){
666          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
667          return;//Need more local messages
668   }
669 #endif
670
671 #if GROUP_LEVEL_REDUCTION
672   if (nRemote<treeKids()) return;//Need more remote messages
673 #endif
674  
675   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
676   CkReductionMsg *result=reduceMessages();
677   result->redNo=redNo;
678 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
679
680 #if GROUP_LEVEL_REDUCTION
681   if (hasParent())
682   {//Pass data up tree to parent
683     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
684     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
685     result->gcount+=gcount+adj(redNo).gcount;
686     thisProxy[treeParent()].RecvMsg(result);
687   }
688   else 
689   {//We are root-- pass data to client
690     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
691     int totalElements=result->gcount+gcount+adj(redNo).gcount;
692     if (totalElements>result->nSources()) 
693     {
694       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
695       msgs.enq(result);
696       return; // Wait for migrants to contribute
697     } else if (totalElements<result->nSources()) {
698       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
699       CkAbort("ERROR! Too many contributions at root!\n");
700     }
701     DEBR((AA"Passing result to client function\n"AB));
702     CkSetRefNum(result, result->getUserFlag());
703     if (!result->callback.isInvalid())
704             result->callback.send(result);
705     else if (!storedCallback.isInvalid())
706             storedCallback.send(result);
707     else
708             CkAbort("No reduction client!\n"
709                     "You must register a client with either SetReductionClient or during contribute.\n");
710   }
711
712 #else
713   result->gcount+=gcount+adj(redNo).gcount;
714
715   result->secondaryCallback = result->callback;
716   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
717         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
718
719   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
720
721  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
722   
723   // Find our node reduction manager, and pass reduction to him:
724   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
725   nodeMgr->contributeArrayReduction(result);
726 #endif
727 #else                // _FAULT_MLOG_
728   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
729
730     CkSetRefNum(result, result->getUserFlag());
731     if (!result->callback.isInvalid())
732         result->callback.send(result);
733     else if (!storedCallback.isInvalid())
734         storedCallback.send(result);
735     else{
736       DEBR(("No reduction client for group %d \n",thisgroup.idx));
737         CkAbort("No reduction client!\n"
738             "You must register a client with either SetReductionClient or during contribute.\n");
739     }
740
741     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
742
743 #endif               // _FAULT_MLOG_
744
745   //House Keeping Operations will have to check later what needs to be changed
746   redNo++;
747   //Shift the count adjustment vector down one slot (to match new redNo)
748   int i;
749 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
750     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
751   if(CkMyPe()!=0)
752 #endif
753   {
754         completedRedNo++;
755         for (i=1;i<(int)(adjVec.length());i++){
756            adjVec[i-1]=adjVec[i];
757         }
758         adjVec.length()--;  
759   }
760
761   inProgress=CmiFalse;
762   startRequested=CmiFalse;
763   nRemote=nContrib=0;
764
765   //Look through the future queue for messages we can now handle
766   int n=futureMsgs.length();
767   for (i=0;i<n;i++)
768   {
769     CkReductionMsg *m=futureMsgs.deq();
770     if (m!=NULL) //One of these addContributions may have finished us.
771       addContribution(m);//<- if *still* early, puts it back in the queue
772   }
773 #if GROUP_LEVEL_REDUCTION
774   n=futureRemoteMsgs.length();
775   for (i=0;i<n;i++)
776   {
777     CkReductionMsg *m=futureRemoteMsgs.deq();
778     if (m!=NULL) {
779       RecvMsg(m);//<- if *still* early, puts it back in the queue
780     }
781   }
782 #endif
783
784 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
785   if(maxStartRequest >= redNo){
786           startReduction(redNo,CkMyPe());
787           finishReduction();
788   }
789  
790 #endif
791
792 }
793
794 //Sent up the reduction tree with reduced data
795   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
796 {
797 #if GROUP_LEVEL_REDUCTION
798 #if CMK_BIGSIM_CHARM
799   _TRACE_BG_TLINE_END(&m->log);
800 #endif
801   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
802     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
803     startReduction(m->redNo, CkMyPe());
804     msgs.enq(m);
805     nRemote++;
806     finishReduction();
807   }
808   else if (isFuture(m->redNo)) {
809     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
810     futureRemoteMsgs.enq(m);
811   } 
812   else CkAbort("Recv'd late remote contribution!\n");
813 #endif
814 }
815
816 //////////// Reduction Manager Utilities /////////////
817
818 //Return the countAdjustment struct for the given redNo:
819 countAdjustment &CkReductionMgr::adj(int number)
820 {
821   number-=completedRedNo;
822   number--;
823   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
824   //Pad the adjustment vector with zeros until it's at least number long
825   while ((int)(adjVec.length())<=number)
826     adjVec.push_back(countAdjustment());
827   return adjVec[number];
828 }
829
830 //Combine (& free) the current message vector msgs.
831 CkReductionMsg *CkReductionMgr::reduceMessages(void)
832 {
833 #if CMK_BIGSIM_CHARM
834   _TRACE_BG_END_EXECUTE(1);
835   void* _bgParentLog = NULL;
836   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
837 #endif
838   CkReductionMsg *ret=NULL;
839
840   //Look through the vector for a valid reducer, swapping out placeholder messages
841   CkReduction::reducerType r=CkReduction::invalid;
842   int msgs_gcount=0;//Reduced gcount
843   int msgs_nSources=0;//Reduced nSources
844   int msgs_userFlag=-1;
845   CkCallback msgs_callback;
846   int i;
847   int nMsgs=0;
848   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
849   CkReductionMsg *m;
850   bool isMigratableContributor;
851
852   // Copy message queue into msgArr, skipping placeholders:
853   while (NULL!=(m=msgs.deq()))
854   {
855     msgs_gcount+=m->gcount;
856     if (m->sourceFlag!=0)
857     { //This is a real message from an element, not just a placeholder
858       msgArr[nMsgs++]=m;
859       msgs_nSources+=m->nSources();
860       r=m->reducer;
861       if (!m->callback.isInvalid())
862         msgs_callback=m->callback;
863       if (m->userFlag!=-1)
864         msgs_userFlag=m->userFlag;
865                         
866         isMigratableContributor=m->isMigratableContributor();
867 #if CMK_BIGSIM_CHARM
868         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
869 #endif
870     }
871     else
872     { //This is just a placeholder message-- forget it
873       delete m;
874     }
875   }
876
877   if (nMsgs==0||r==CkReduction::invalid)
878   //No valid reducer in the whole vector
879     ret=CkReductionMsg::buildNew(0,NULL);
880   else
881   {//Use the reducer to reduce the messages
882                 //if there is only one msg to be reduced just return that message
883     if(nMsgs == 1){
884         ret = msgArr[0];        
885     }else{
886         CkReduction::reducerFn f=CkReduction::reducerTable[r];
887         ret=(*f)(nMsgs,msgArr);
888     }
889     ret->reducer=r;
890   }
891
892
893
894 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
895
896 #if CRITICAL_PATH_DEBUG > 3
897   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
898 #endif
899
900   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
901   path.updateMax(UsrToEnv(ret));
902   // Combine the critical paths from all the reduction messages
903   for (i=0;i<nMsgs;i++){
904     if (msgArr[i]!=ret){
905       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
906       path.updateMax(UsrToEnv(msgArr[i]));
907     }
908   }
909   
910
911 #if CRITICAL_PATH_DEBUG > 3
912   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
913 #endif
914   
915   PathHistoryTableEntry tableEntry(path);
916   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
917   
918 #endif
919
920         //Go back through the vector, deleting old messages
921   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
922   delete [] msgArr;
923
924   //Set the message counts
925   ret->redNo=redNo;
926   ret->gcount=msgs_gcount;
927   ret->userFlag=msgs_userFlag;
928   ret->callback=msgs_callback;
929   ret->sourceFlag=msgs_nSources;
930         ret->setMigratableContributor(isMigratableContributor);
931   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
932
933   return ret;
934 }
935
936
937 //Checkpointing utilities
938 //pack-unpack method for CkReductionMsg
939 //if packing pack the message and then unpack and return it
940 //if unpacking allocate memory for it read it off disk and then unapck
941 //and return it
942 void CkReductionMgr::pup(PUP::er &p)
943 {
944 //We do not store the client function pointer or the client function parameter,
945 //it is the responsibility of the programmer to correctly restore these
946   CkGroupInitCallback::pup(p);
947   p(redNo);
948   p(completedRedNo);
949   p(inProgress); p(creating); p(startRequested);
950   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
951   p|msgs;
952   p|futureMsgs;
953   p|futureRemoteMsgs;
954   p|finalMsgs;
955   p|adjVec;
956   p|nodeProxy;
957   p|storedCallback;
958     // handle CkReductionClientBundle
959   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
960   {
961     CkReductionClientBundle *bd;
962     if (p.isUnpacking()) 
963       bd = new CkReductionClientBundle;
964     else
965       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
966     p|*bd;
967     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
968   }
969
970   // subtle --- Gengbin
971   // Group : CkReductionMgr
972   // CkArray: CkReductionMgr
973   // lcount/gcount in Group is set in Group constructor
974   // lcount/gcount in CkArray is not, it is set when array elements are created
975   // we can not pup because inserting array elems will add the counters again
976 //  p|lcount;
977 //  p|gcount;
978 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
979 //  p|lcount;
980 //  //  p|gcount;
981 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
982 #endif
983   if(p.isUnpacking()){
984     thisProxy = thisgroup;
985     maxStartRequest=0;
986 #if GROUP_LEVEL_REDUCTION
987 #ifdef BINOMIAL_TREE
988     init_BinomialTree();
989 #else
990     init_BinaryTree();
991 #endif
992 #endif
993   }
994
995   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
996 }
997
998
999 //Callback for doing Reduction through NodeGroups added by Sayantan
1000
1001 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1002
1003         int total = m->gcount+adj(m->redNo).gcount;
1004         finalMsgs.enq(m);
1005         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1006         adj(m->redNo).mainRecvd = 1;
1007         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1008         endArrayReduction();
1009 }
1010
1011 void CkReductionMgr :: endArrayReduction(){
1012         CkReductionMsg *ret=NULL;
1013         int nMsgs=finalMsgs.length();
1014         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1015         //Look through the vector for a valid reducer, swapping out placeholder messages
1016         //CkPrintf("Length of Final Message %d \n",nMsgs);
1017         CkReduction::reducerType r=CkReduction::invalid;
1018         int msgs_gcount=0;//Reduced gcount
1019         int msgs_nSources=0;//Reduced nSources
1020         int msgs_userFlag=-1;
1021         CkCallback msgs_callback;
1022         CkCallback msgs_secondaryCallback;
1023         CkVec<CkReductionMsg *> tempMsgs;
1024         int i;
1025         int numMsgs = 0;
1026         for (i=0;i<nMsgs;i++)
1027         {
1028                 CkReductionMsg *m=finalMsgs.deq();
1029                 if(m->redNo == completedRedNo +1){
1030                         msgs_gcount+=m->gcount;
1031                         if (m->sourceFlag!=0)
1032                         { //This is a real message from an element, not just a placeholder
1033                                 msgs_nSources+=m->nSources();
1034                                 r=m->reducer;
1035                                 if (!m->callback.isInvalid())
1036                                 msgs_callback=m->callback;
1037                                 if(!m->secondaryCallback.isInvalid())
1038                                         msgs_secondaryCallback = m->secondaryCallback;
1039                                 if (m->userFlag!=-1)
1040                                         msgs_userFlag=m->userFlag;
1041                                 tempMsgs.push_back(m);
1042                         }
1043                 }else{
1044                         finalMsgs.enq(m);
1045                 }
1046
1047         }
1048         numMsgs = tempMsgs.length();
1049
1050         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1051
1052         if(numMsgs == 0){
1053                 return;
1054         }
1055         if(adj(completedRedNo+1).mainRecvd == 0){
1056                 for(i=0;i<numMsgs;i++){
1057                         finalMsgs.enq(tempMsgs[i]);
1058                 }
1059                 return;
1060         }
1061
1062 /*
1063         NOT NEEDED ANYMORE DONE at nodegroup level
1064         if(msgs_gcount  > msgs_nSources){
1065                 for(i=0;i<numMsgs;i++){
1066                         finalMsgs.enq(tempMsgs[i]);
1067                 }
1068                 return;
1069         }*/
1070
1071         if (nMsgs==0||r==CkReduction::invalid)
1072                 //No valid reducer in the whole vector
1073                 ret=CkReductionMsg::buildNew(0,NULL);
1074         else{//Use the reducer to reduce the messages
1075                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1076                 // has to be corrected elements from above need to be put into a temporary vector
1077                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1078                 ret=(*f)(numMsgs,msgArr);
1079                 ret->reducer=r;
1080
1081         }
1082
1083         
1084 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1085
1086 #if CRITICAL_PATH_DEBUG > 3
1087         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1088 #endif
1089
1090         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1091         path.updateMax(UsrToEnv(ret));
1092         // Combine the critical paths from all the reduction messages into the header for the new result
1093         for (i=0;i<numMsgs;i++){
1094           if (tempMsgs[i]!=ret){
1095             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1096             path.updateMax(UsrToEnv(tempMsgs[i]));
1097           } else {
1098             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1099           }
1100         }
1101         // Also consider the path which got us into this entry method
1102
1103 #if CRITICAL_PATH_DEBUG > 3
1104         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1105 #endif
1106
1107 #endif
1108   
1109
1110
1111
1112
1113         for(i = 0;i<numMsgs;i++){
1114                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1115         }
1116
1117         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1118         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1119
1120         ret->redNo=completedRedNo+1;
1121         ret->gcount=msgs_gcount;
1122         ret->userFlag=msgs_userFlag;
1123         ret->callback=msgs_callback;
1124         ret->secondaryCallback = msgs_secondaryCallback;
1125         ret->sourceFlag=msgs_nSources;
1126
1127         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1128
1129         CkSetRefNum(ret, ret->getUserFlag());
1130         if (!ret->secondaryCallback.isInvalid())
1131             ret->secondaryCallback.send(ret);
1132     else if (!storedCallback.isInvalid())
1133             storedCallback.send(ret);
1134     else{
1135       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1136             CkAbort("No reduction client!\n"
1137                     "You must register a client with either SetReductionClient or during contribute.\n");
1138     }
1139         completedRedNo++;
1140
1141         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1142
1143         for (i=1;i<(int)(adjVec.length());i++)
1144                 adjVec[i-1]=adjVec[i];
1145         adjVec.length()--;
1146         endArrayReduction();
1147 }
1148
1149 #if GROUP_LEVEL_REDUCTION
1150
1151 void CkReductionMgr::init_BinaryTree(){
1152         parent = (CkMyPe()-1)/TREE_WID;
1153         int firstkid = CkMyPe()*TREE_WID+1;
1154         numKids=CkNumPes()-firstkid;
1155         if (numKids>TREE_WID) numKids=TREE_WID;
1156         if (numKids<0) numKids=0;
1157
1158         for(int i=0;i<numKids;i++){
1159                 kids.push_back(firstkid+i);
1160                 newKids.push_back(firstkid+i);
1161         }
1162 }
1163
1164 void CkReductionMgr::init_BinomialTree(){
1165         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1166         /*upperSize = (unsigned )pow((double)2,depth);*/
1167         upperSize = (unsigned) 1 << depth;
1168         label = upperSize-CkMyPe()-1;
1169         int p=label;
1170         int count=0;
1171         while( p > 0){
1172                 if(p % 2 == 0)
1173                         break;
1174                 else{
1175                         p = p/2;
1176                         count++;
1177                 }
1178         }
1179         /*parent = label + rint(pow((double)2,count));*/
1180         parent = label + (1<<count);
1181         parent = upperSize -1 -parent;
1182         int temp;
1183         if(count != 0){
1184                 numKids = 0;
1185                 for(int i=0;i<count;i++){
1186                         /*temp = label - rint(pow((double)2,i));*/
1187                         temp = label - (1<<i);
1188                         temp = upperSize-1-temp;
1189                         if(temp <= CkNumPes()-1){
1190                                 kids.push_back(temp);
1191                                 numKids++;
1192                         }
1193                 }
1194         }else{
1195                 numKids = 0;
1196         //      kids = NULL;
1197         }
1198 }
1199
1200
1201 int CkReductionMgr::treeRoot(void)
1202 {
1203   return 0;
1204 }
1205 CmiBool CkReductionMgr::hasParent(void) //Root Node
1206 {
1207   return (CmiBool)(CkMyPe()!=treeRoot());
1208 }
1209 int CkReductionMgr::treeParent(void) //My parent Node
1210 {
1211   return parent;
1212 }
1213
1214 int CkReductionMgr::firstKid(void) //My first child Node
1215 {
1216   return CkMyPe()*TREE_WID+1;
1217 }
1218 int CkReductionMgr::treeKids(void)//Number of children in tree
1219 {
1220   return numKids;
1221 }
1222
1223 #endif
1224
1225 /////////////////////////////////////////////////////////////////////////
1226
1227 ////////////////////////////////
1228 //CkReductionMsg support
1229
1230 //ReductionMessage default private constructor-- does nothing
1231 CkReductionMsg::CkReductionMsg(){}
1232 CkReductionMsg::~CkReductionMsg(){}
1233
1234 //This define gives the distance from the start of the CkReductionMsg
1235 // object to the start of the user data area (just below last object field)
1236 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1237
1238 //"Constructor"-- builds and returns a new CkReductionMsg.
1239 //  the "data" array you specify will be copied into this object.
1240 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1241     CkReduction::reducerType reducer, CkReductionMsg *buf)
1242 {
1243   int len[1];
1244   len[0]=NdataSize;
1245   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1246   
1247   ret->dataSize=NdataSize;
1248   if (srcData!=NULL && !buf)
1249     memcpy(ret->data,srcData,NdataSize);
1250   ret->userFlag=-1;
1251   ret->reducer=reducer;
1252   //ret->ci=NULL;
1253   ret->sourceFlag=-1000;
1254   ret->gcount=0;
1255   ret->migratableContributor = true;
1256 #if CMK_BIGSIM_CHARM
1257   ret->log = NULL;
1258 #endif
1259   return ret;
1260 }
1261
1262 // Charm kernel message runtime support:
1263 void *
1264 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1265 {
1266   int totalsize=ARM_DATASTART+(*sz);
1267   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1268   CkReductionMsg *ret = (CkReductionMsg *)
1269     CkAllocMsg(msgnum,totalsize,priobits);
1270   ret->data=(void *)(&ret->dataStorage);
1271   return (void *) ret;
1272 }
1273
1274 void *
1275 CkReductionMsg::pack(CkReductionMsg* in)
1276 {
1277   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1278   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1279   in->data = NULL;
1280   return (void*) in;
1281 }
1282
1283 CkReductionMsg* CkReductionMsg::unpack(void *in)
1284 {
1285   CkReductionMsg *ret = (CkReductionMsg *)in;
1286   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1287   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1288   ret->data=(void *)(&ret->dataStorage);
1289   return ret;
1290 }
1291
1292
1293 /////////////////////////////////////////////////////////////////////////////////////
1294 ///////////////// Builtin Reducer Functions //////////////
1295 /* A simple reducer, like sum_int, looks like this:
1296 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1297 {
1298   int i,ret=0;
1299   for (i=0;i<nMsg;i++)
1300     ret+=*(int *)(msg[i]->getData());
1301   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1302 }
1303
1304 To keep the code small and easy to change, the implementations below
1305 are built with preprocessor macros.
1306 */
1307
1308 //////////////// simple reducers ///////////////////
1309 /*A define used to quickly and tersely construct simple reductions.
1310 The basic idea is to use the first message's data array as
1311 (pre-initialized!) scratch space for folding in the other messages.
1312  */
1313
1314 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1315 {
1316         CkAbort("Called the invalid reducer type 0.  This probably\n"
1317                 "means you forgot to initialize your custom reducer index.\n");
1318         return NULL;
1319 }
1320
1321 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1322 {
1323   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1324 }
1325
1326 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1327 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1328 {\
1329   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1330   int m,i;\
1331   int nElem=msg[0]->getLength()/sizeof(dataType);\
1332   dataType *ret=(dataType *)(msg[0]->getData());\
1333   for (m=1;m<nMsg;m++)\
1334   {\
1335     dataType *value=(dataType *)(msg[m]->getData());\
1336     for (i=0;i<nElem;i++)\
1337     {\
1338       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1339       loop\
1340     }\
1341   }\
1342   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1343   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1344 }
1345
1346 //Use this macro for reductions that have the same type for all inputs
1347 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1348   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1349   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1350   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1351   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1352
1353
1354 //Compute the sum the numbers passed by each element.
1355 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1356
1357 //Compute the product of the numbers passed by each element.
1358 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1359
1360 //Compute the largest number passed by any element.
1361 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1362
1363 //Compute the smallest integer passed by any element.
1364 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1365
1366
1367 //Compute the logical AND of the integers passed by each element.
1368 // The resulting integer will be zero if any source integer is zero; else 1.
1369 SIMPLE_REDUCTION(logical_and,int,"%d",
1370         if (value[i]==0)
1371      ret[i]=0;
1372   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1373 )
1374
1375 //Compute the logical OR of the integers passed by each element.
1376 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1377 SIMPLE_REDUCTION(logical_or,int,"%d",
1378   if (value[i]!=0)
1379            ret[i]=1;
1380   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1381 )
1382
1383 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1384 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1385
1386 //Select one random message to pass on
1387 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1388   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1389 }
1390
1391 /////////////// concat ////////////////
1392 /*
1393 This reducer simply appends the data it recieves from each element,
1394 without any housekeeping data to separate them.
1395 */
1396 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1397 {
1398   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1399   //Figure out how big a message we'll need
1400   int i,retSize=0;
1401   for (i=0;i<nMsg;i++)
1402       retSize+=msg[i]->getSize();
1403
1404   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1405
1406   //Allocate a new message
1407   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1408
1409   //Copy the source message data into the return message
1410   char *cur=(char *)(ret->getData());
1411   for (i=0;i<nMsg;i++) {
1412     int messageBytes=msg[i]->getSize();
1413     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1414     cur+=messageBytes;
1415   }
1416   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1417   return ret;
1418 }
1419
1420 /////////////// set ////////////////
1421 /*
1422 This reducer appends the data it recieves from each element
1423 along with some housekeeping data indicating contribution boundaries.
1424 The message data is thus a list of reduction_set_element structures
1425 terminated by a dummy reduction_set_element with a sourceElement of -1.
1426 */
1427
1428 //This rounds an integer up to the nearest multiple of sizeof(double)
1429 static const int alignSize=sizeof(double);
1430 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1431
1432 //This gives the size (in bytes) of a reduction_set_element
1433 static int SET_SIZE(int dataSize)
1434 {return SET_ALIGN(sizeof(int)+dataSize);}
1435
1436 //This returns a pointer to the next reduction_set_element in the list
1437 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1438 {
1439   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1440   return (CkReduction::setElement *)next;
1441 }
1442
1443 //Combine the data passed by each element into an list of reduction_set_elements.
1444 // Each element may contribute arbitrary data (with arbitrary length).
1445 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1446 {
1447   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1448   //Figure out how big a message we'll need
1449   int i,retSize=0;
1450   for (i=0;i<nMsg;i++) {
1451     if (!msg[i]->isFromUser())
1452     //This message is composite-- it will just be copied over (less terminating -1)
1453       retSize+=(msg[i]->getSize()-sizeof(int));
1454     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1455       retSize+=SET_SIZE(msg[i]->getSize());
1456   }
1457   retSize+=sizeof(int);//Leave room for terminating -1.
1458
1459   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1460
1461   //Allocate a new message
1462   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1463
1464   //Copy the source message data into the return message
1465   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1466   for (i=0;i<nMsg;i++)
1467     if (!msg[i]->isFromUser())
1468     {//This message is composite-- just copy it over (less terminating -1)
1469                         int messageBytes=msg[i]->getSize()-sizeof(int);
1470                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1471                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1472                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1473     }
1474     else //This is a message from an element-- wrap it in a reduction_set_element
1475     {
1476       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1477       cur->dataSize=msg[i]->getSize();
1478       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1479       cur=SET_NEXT(cur);
1480     }
1481   cur->dataSize=-1;//Add a terminating -1.
1482   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1483   return ret;
1484 }
1485
1486 //Utility routine: get the next reduction_set_element in the list
1487 // if there is one, or return NULL if there are none.
1488 //To get all the elements, just keep feeding this procedure's output back to
1489 // its input until it returns NULL.
1490 CkReduction::setElement *CkReduction::setElement::next(void)
1491 {
1492   CkReduction::setElement *n=SET_NEXT(this);
1493   if (n->dataSize==-1)
1494     return NULL;//This is the end of the list
1495   else
1496     return n;//This is just another element
1497 }
1498
1499 /////////////////// Reduction Function Table /////////////////////
1500 CkReduction::CkReduction() {} //Dummy private constructor
1501
1502 //Add the given reducer to the list.  Returns the new reducer's
1503 // reducerType.  Must be called in the same order on every node.
1504 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1505 {
1506   reducerTable[nReducers]=fn;
1507   return (reducerType)nReducers++;
1508 }
1509
1510 /*Reducer table: maps reducerTypes to reducerFns.
1511 It's indexed by reducerType, so the order in this table
1512 must *exactly* match the reducerType enum declaration.
1513 The names don't have to match, but it helps.
1514 */
1515 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1516
1517 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1518     ::invalid_reducer,
1519     ::nop,
1520   //Compute the sum the numbers passed by each element.
1521     ::sum_int,::sum_long,::sum_float,::sum_double,
1522
1523   //Compute the product the numbers passed by each element.
1524     ::product_int,::product_long,::product_float,::product_double,
1525
1526   //Compute the largest number passed by any element.
1527     ::max_int,::max_long,::max_float,::max_double,
1528
1529   //Compute the smallest number passed by any element.
1530     ::min_int,::min_long,::min_float,::min_double,
1531
1532   //Compute the logical AND of the integers passed by each element.
1533   // The resulting integer will be zero if any source integer is zero.
1534     ::logical_and,
1535
1536   //Compute the logical OR of the integers passed by each element.
1537   // The resulting integer will be 1 if any source integer is nonzero.
1538     ::logical_or,
1539
1540     // Compute the logical bitvector AND of the integers passed by each element.
1541     ::bitvec_and,
1542
1543     // Compute the logical bitvector OR of the integers passed by each element.
1544     ::bitvec_or,
1545
1546     // Select one of the messages at random to pass on
1547     ::random,
1548
1549   //Concatenate the (arbitrary) data passed by each element
1550     ::concat,
1551
1552   //Combine the data passed by each element into an list of setElements.
1553   // Each element may contribute arbitrary data (with arbitrary length).
1554     ::set
1555 };
1556
1557
1558
1559
1560
1561
1562
1563
1564 /********** Code added by Sayantan *********************/
1565 /** Locking is a big problem in the nodegroup code for smp.
1566  So a few assumptions have had to be made. There is one lock
1567  called lockEverything. It protects all the data structures 
1568  of the nodegroup reduction mgr. I tried grabbing it separately 
1569  for each datastructure, modifying it and then releasing it and
1570  then grabbing it again, for the next change.
1571  That doesn't really help because the interleaved execution of 
1572  different threads makes the state of the reduction manager 
1573  inconsistent. 
1574  
1575  1. Grab lockEverything before calling finishreduction or startReduction
1576     or doRecvMsg
1577  2. lockEverything is grabbed only in entry methods reductionStarting
1578     or RecvMesg or  addcontribution.
1579  ****/
1580  
1581 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1582 NodeGroup::NodeGroup(void) {
1583   __nodelock=CmiCreateLock();
1584 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1585     mlogData->objID.type = TypeNodeGroup;
1586     mlogData->objID.data.group.onPE = CkMyNode();
1587 #endif
1588
1589 }
1590 NodeGroup::~NodeGroup() {
1591   CmiDestroyLock(__nodelock);
1592 }
1593 void NodeGroup::pup(PUP::er &p)
1594 {
1595   CkNodeReductionMgr::pup(p);
1596   p|reductionInfo;
1597 }
1598
1599 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1600
1601 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1602   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1603  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1604   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1605  }
1606
1607 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1608                                     ((CkNodeReductionMgr *)this),
1609                                     reductionInfo,false)
1610
1611 /* this contribute also adds up the count across all messages it receives.
1612   Useful for summing up number of array elements who have contributed ****/ 
1613 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1614         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1615
1616
1617
1618 //#define BINOMIAL_TREE
1619
1620 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1621   : thisProxy(thisgroup)
1622 {
1623 #ifdef BINOMIAL_TREE
1624   init_BinomialTree();
1625 #else
1626   init_BinaryTree();
1627 #endif
1628   storedCallback=NULL;
1629   redNo=0;
1630   inProgress=CmiFalse;
1631   
1632   startRequested=CmiFalse;
1633   gcount=CkNumNodes();
1634   lcount=1;
1635   nContrib=nRemote=0;
1636   lockEverything = CmiCreateLock();
1637
1638
1639   creating=CmiFalse;
1640   interrupt = 0;
1641   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1642         /*
1643                 FAULT_EVAC
1644         */
1645         blocked = false;
1646         maxModificationRedNo = INT_MAX;
1647         killed=0;
1648         additionalGCount = newAdditionalGCount = 0;
1649 }
1650
1651 void CkNodeReductionMgr::flushStates()
1652 {
1653  if(CkMyRank() == 0){
1654   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1655   redNo=0;
1656   inProgress=CmiFalse;
1657
1658   startRequested=CmiFalse;
1659   gcount=CkNumNodes();
1660   lcount=1;
1661   nContrib=nRemote=0;
1662
1663   creating=CmiFalse;
1664   interrupt = 0;
1665   while (!msgs.isEmpty()) { delete msgs.deq(); }
1666   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1667   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1668   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1669   }
1670 }
1671
1672 //////////// Reduction Manager Client API /////////////
1673
1674 //Add the given client function.  Overwrites any previous client.
1675 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1676 {
1677   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1678   if(cb->isInvalid()){
1679         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1680   }else{
1681         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1682   }
1683
1684   if (CkMyNode()!=0)
1685           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1686   delete storedCallback;
1687   storedCallback=cb;
1688 }
1689
1690
1691
1692 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1693 {
1694 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1695     Chare *oldObj =CpvAccess(_currentObj);
1696     CpvAccess(_currentObj) = this;
1697 #endif
1698
1699   //m->ci=ci;
1700   m->redNo=ci->redNo++;
1701   m->sourceFlag=-1;//A single contribution
1702   m->gcount=0;
1703   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1704   addContribution(m);
1705
1706 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1707     CpvAccess(_currentObj) = oldObj;
1708 #endif
1709 }
1710
1711
1712 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1713 {
1714 #if CMK_BIGSIM_CHARM
1715   _TRACE_BG_TLINE_END(&m->log);
1716 #endif
1717 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1718     Chare *oldObj =CpvAccess(_currentObj);
1719     CpvAccess(_currentObj) = this;
1720 #endif
1721   //m->ci=ci;
1722   m->redNo=ci->redNo++;
1723   m->gcount=count;
1724   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1725   addContribution(m);
1726   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1727
1728 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1729     CpvAccess(_currentObj) = oldObj;
1730 #endif
1731 }
1732
1733
1734 //////////// Reduction Manager Remote Entry Points /////////////
1735
1736 //Sent down the reduction tree (used by barren PEs)
1737 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1738 {
1739   CmiLock(lockEverything);
1740   if(blocked){
1741         delete m;
1742         CmiUnlock(lockEverything);
1743         return ;
1744   }
1745   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1746   if (isPresent(m->num) && !inProgress)
1747   {
1748     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1749     startReduction(m->num,srcNode);
1750     finishReduction();
1751   } else if (isFuture(m->num)){
1752         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1753   }
1754   else //is Past
1755     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1756   CmiUnlock(lockEverything);
1757   delete m;
1758
1759 }
1760
1761
1762 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1763         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1764         /*
1765                 FAULT_EVAC
1766         */
1767         if(blocked){
1768                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1769                 bufferedRemoteMsgs.enq(m);
1770                 return;
1771         }
1772         
1773         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1774             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1775             startReduction(m->redNo,CkMyNode());
1776             msgs.enq(m);
1777             nRemote++;
1778             finishReduction();
1779         }
1780         else {
1781             if (isFuture(m->redNo)) {
1782                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1783                     futureRemoteMsgs.enq(m);
1784             }else{
1785                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1786                    CkAbort("Recv'd late remote contribution!\n");
1787             }
1788         }
1789         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1790 }
1791
1792 //Sent up the reduction tree with reduced data
1793 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1794 {
1795 #if CMK_BIGSIM_CHARM
1796   _TRACE_BG_TLINE_END(&m->log);
1797 #endif
1798 #ifndef CMK_CPV_IS_SMP
1799 #if CMK_IMMEDIATE_MSG
1800         if(interrupt == 1){
1801                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1802                 CpvAccess(_qd)->process(-1);
1803                 CmiDelayImmediate();
1804                 return;
1805         }
1806 #endif  
1807 #endif
1808    interrupt = 1;       
1809    CmiLock(lockEverything);   
1810    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1811    doRecvMsg(m);
1812    CmiUnlock(lockEverything);    
1813    interrupt = 0;
1814    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1815 }
1816
1817 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1818 {
1819         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1820         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1821         if (inProgress){
1822                 DEBR((AA"This Node reduction is already in progress\n"AB));
1823                 return;//This reduction already started
1824         }
1825         if (creating) //Don't start yet-- we're creating elements
1826         {
1827                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1828                 startRequested=CmiTrue;
1829                 return;
1830         }
1831         
1832         //If none of these cases, we need to start the reduction--
1833         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1834         inProgress=CmiTrue;
1835
1836         if(!_isNotifyChildInRed) return;
1837
1838         //Sent start requests to our kids (in case they don't already know)
1839         
1840         for (int k=0;k<treeKids();k++)
1841         {
1842 #ifdef BINOMIAL_TREE
1843                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1844                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1845 #else
1846                 if(kids[k] != srcNode){
1847                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1848                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1849                 }       
1850 #endif
1851         }
1852
1853         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1854         // in case, nodegroup does not has the attached red group,
1855         // it has to restart groups again
1856         startLocalGroupReductions(number);
1857 /*
1858         if (startLocalGroupReductions(number) == 0)
1859           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1860 */
1861 }
1862
1863 // restart local groups until succeed
1864 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1865   CmiLock(lockEverything);    
1866   if (startLocalGroupReductions(number) == 0)
1867     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1868   CmiUnlock(lockEverything);    
1869 }
1870
1871 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1872         /*
1873                 FAULT_EVAC
1874         */
1875         if(blocked){
1876                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1877                 bufferedMsgs.enq(m);
1878                 return;
1879         }
1880         
1881         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1882                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1883                 futureMsgs.enq(m);
1884         } else {// An ordinary contribution
1885                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1886                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1887                 startReduction(m->redNo,CkMyNode());
1888                 msgs.enq(m);
1889                 nContrib++;
1890                 finishReduction();
1891         }
1892 }
1893
1894 //Handle a message from one element for the reduction
1895 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1896 {
1897   interrupt = 1;
1898   CmiLock(lockEverything);
1899   doAddContribution(m);
1900   CmiUnlock(lockEverything);
1901   interrupt = 0;
1902 }
1903
1904 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1905         if(blocked){
1906                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1907                 bufferedMsgs.enq(m);
1908                 return;
1909         }
1910         
1911         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1912                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1913 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1914                 futureLateMigrantMsgs.enq(m);
1915         } else {// An ordinary contribution
1916                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1917 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1918                 msgs.enq(m);
1919                 finishReduction();
1920         }
1921 }
1922
1923
1924
1925
1926
1927 /** check if the nodegroup reduction is finished at this node. In that case send it
1928 up the reduction tree **/
1929
1930 void CkNodeReductionMgr::finishReduction(void)
1931 {
1932   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1933   /***Check if reduction is finished in the next few ifs***/
1934   if ((!inProgress) || creating){
1935         DEBR((AA"Either not in Progress or creating\n"AB));
1936         return;
1937   }
1938
1939   if (nContrib<(lcount)){
1940         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1941
1942          return;//Need more local messages
1943   }
1944   if (nRemote<treeKids()){
1945         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1946
1947         return;//Need more remote messages
1948   }
1949   if (nRemote>treeKids()){
1950
1951           interrupt = 0;
1952            CkAbort("Nodegrp Excess remote reduction message received!\n");
1953   }
1954
1955   DEBR((AA"Reducing node data...\n"AB));
1956
1957   /**reduce all messages received at this node **/
1958   CkReductionMsg *result=reduceMessages();
1959
1960   if (hasParent())
1961   {//Pass data up tree to parent
1962         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1963         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1964         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1965                 /*
1966                         FAULT_EVAC
1967                 */
1968                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1969             thisProxy[treeParent()].RecvMsg(result);
1970         }
1971
1972   }
1973   else
1974   {
1975                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1976                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
1977                         msgs.enq(result);
1978                         return;
1979                 }
1980                 result->gcount += additionalGCount;
1981           /** if the reduction is finished and I am the root of the reduction tree
1982           then call the reductionhandler and other stuff ***/
1983                 
1984
1985                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
1986     CkSetRefNum(result, result->getUserFlag());
1987     if (!result->callback.isInvalid()){
1988       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
1989             result->callback.send(result);
1990     }
1991     else if (storedCallback!=NULL){
1992       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
1993             storedCallback->send(result);
1994     }
1995     else{
1996                 DEBR((AA"Invalid Callback \n"AB));
1997             CkAbort("No reduction client!\n"
1998                     "You must register a client with either SetReductionClient or during contribute.\n");
1999                 }
2000   }
2001
2002   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2003   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2004   redNo++;
2005         updateTree();
2006   int i;
2007   inProgress=CmiFalse;
2008   startRequested=CmiFalse;
2009   nRemote=nContrib=0;
2010
2011   //Look through the future queue for messages we can now handle
2012   int n=futureMsgs.length();
2013
2014   for (i=0;i<n;i++)
2015   {
2016     interrupt = 1;
2017
2018     CkReductionMsg *m=futureMsgs.deq();
2019
2020     interrupt = 0;
2021     if (m!=NULL){ //One of these addContributions may have finished us.
2022       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2023       doAddContribution(m);//<- if *still* early, puts it back in the queue
2024     }
2025   }
2026
2027   interrupt = 1;
2028
2029   n=futureRemoteMsgs.length();
2030
2031   interrupt = 0;
2032   for (i=0;i<n;i++)
2033   {
2034     interrupt = 1;
2035
2036     CkReductionMsg *m=futureRemoteMsgs.deq();
2037
2038     interrupt = 0;
2039     if (m!=NULL)
2040       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2041   }
2042   
2043   n = futureLateMigrantMsgs.length();
2044   for(i=0;i<n;i++){
2045     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2046     if(m != NULL){
2047       if(m->redNo == redNo){
2048         msgs.enq(m);
2049       }else{
2050         futureLateMigrantMsgs.enq(m);
2051       }
2052     }
2053   }
2054 }
2055
2056 //////////// Reduction Manager Utilities /////////////
2057
2058 void CkNodeReductionMgr::init_BinaryTree(){
2059         parent = (CkMyNode()-1)/TREE_WID;
2060         int firstkid = CkMyNode()*TREE_WID+1;
2061         numKids=CkNumNodes()-firstkid;
2062   if (numKids>TREE_WID) numKids=TREE_WID;
2063   if (numKids<0) numKids=0;
2064
2065         for(int i=0;i<numKids;i++){
2066                 kids.push_back(firstkid+i);
2067                 newKids.push_back(firstkid+i);
2068         }
2069 }
2070
2071 void CkNodeReductionMgr::init_BinomialTree(){
2072         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2073         /*upperSize = (unsigned )pow((double)2,depth);*/
2074         upperSize = (unsigned) 1 << depth;
2075         label = upperSize-CkMyNode()-1;
2076         int p=label;
2077         int count=0;
2078         while( p > 0){
2079                 if(p % 2 == 0)
2080                         break;
2081                 else{
2082                         p = p/2;
2083                         count++;
2084                 }
2085         }
2086         /*parent = label + rint(pow((double)2,count));*/
2087         parent = label + (1<<count);
2088         parent = upperSize -1 -parent;
2089         int temp;
2090         if(count != 0){
2091                 numKids = 0;
2092                 for(int i=0;i<count;i++){
2093                         /*temp = label - rint(pow((double)2,i));*/
2094                         temp = label - (1<<i);
2095                         temp = upperSize-1-temp;
2096                         if(temp <= CkNumNodes()-1){
2097                 //              kids[numKids] = temp;
2098                                 kids.push_back(temp);
2099                                 numKids++;
2100                         }
2101                 }
2102         }else{
2103                 numKids = 0;
2104         //      kids = NULL;
2105         }
2106 }
2107
2108
2109 int CkNodeReductionMgr::treeRoot(void)
2110 {
2111   return 0;
2112 }
2113 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2114 {
2115   return (CmiBool)(CkMyNode()!=treeRoot());
2116 }
2117 int CkNodeReductionMgr::treeParent(void) //My parent Node
2118 {
2119   return parent;
2120 }
2121
2122 int CkNodeReductionMgr::firstKid(void) //My first child Node
2123 {
2124   return CkMyNode()*TREE_WID+1;
2125 }
2126 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2127 {
2128 #ifdef BINOMIAL_TREE
2129         return numKids;
2130 #else
2131 /*  int nKids=CkNumNodes()-firstKid();
2132   if (nKids>TREE_WID) nKids=TREE_WID;
2133   if (nKids<0) nKids=0;
2134   return nKids;*/
2135         return numKids;
2136 #endif
2137 }
2138
2139 //Combine (& free) the current message vector msgs.
2140 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2141 {
2142 #if CMK_BIGSIM_CHARM
2143   _TRACE_BG_END_EXECUTE(1);
2144   void* _bgParentLog = NULL;
2145   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2146 #endif
2147   CkReductionMsg *ret=NULL;
2148
2149   //Look through the vector for a valid reducer, swapping out placeholder messages
2150   CkReduction::reducerType r=CkReduction::invalid;
2151   int msgs_gcount=0;//Reduced gcount
2152   int msgs_nSources=0;//Reduced nSources
2153   int msgs_userFlag=-1;
2154   CkCallback msgs_callback;
2155   CkCallback msgs_secondaryCallback;
2156   int i;
2157   int nMsgs=0;
2158   CkReductionMsg *m;
2159   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2160   bool isMigratableContributor;
2161         
2162
2163   while(NULL!=(m=msgs.deq()))
2164   {
2165     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2166     msgs_gcount+=m->gcount;
2167     if (m->sourceFlag!=0)
2168     { //This is a real message from an element, not just a placeholder
2169       msgArr[nMsgs++]=m;
2170       msgs_nSources+=m->nSources();
2171       r=m->reducer;
2172       if (!m->callback.isInvalid())
2173         msgs_callback=m->callback;
2174       if(!m->secondaryCallback.isInvalid()){
2175         msgs_secondaryCallback = m->secondaryCallback;
2176       }
2177       if (m->userFlag!=-1)
2178         msgs_userFlag=m->userFlag;
2179
2180         isMigratableContributor= m->isMigratableContributor();
2181 #if CMK_BIGSIM_CHARM
2182       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2183 #endif
2184                                 
2185     }
2186     else
2187     { //This is just a placeholder message-- replace it
2188       delete m;
2189     }
2190   }
2191
2192   if (nMsgs==0||r==CkReduction::invalid)
2193   //No valid reducer in the whole vector
2194     ret=CkReductionMsg::buildNew(0,NULL);
2195   else
2196   {//Use the reducer to reduce the messages
2197     if(nMsgs == 1){
2198         ret = msgArr[0];
2199     }else{
2200         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2201         ret=(*f)(nMsgs,msgArr);
2202     }
2203     ret->reducer=r;
2204   }
2205
2206         
2207 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2208 #if CRITICAL_PATH_DEBUG > 3
2209         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2210 #endif
2211         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2212         path.updateMax(UsrToEnv(ret));
2213         // Combine the critical paths from all the reduction messages into the header for the new result
2214         for (i=0;i<nMsgs;i++){
2215           if (msgArr[i]!=ret){
2216             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2217             path.updateMax(UsrToEnv(msgArr[i]));
2218           } else {
2219             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2220           }
2221         }
2222 #if CRITICAL_PATH_DEBUG > 3
2223         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2224 #endif
2225
2226 #endif
2227
2228
2229         //Go back through the vector, deleting old messages
2230   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2231   delete [] msgArr;
2232   //Set the message counts
2233   ret->redNo=redNo;
2234   ret->gcount=msgs_gcount;
2235   ret->userFlag=msgs_userFlag;
2236   ret->callback=msgs_callback;
2237   ret->secondaryCallback = msgs_secondaryCallback;
2238   ret->sourceFlag=msgs_nSources;
2239   ret->setMigratableContributor(isMigratableContributor);
2240   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2241 #if CMK_BIGSIM_CHARM
2242   _TRACE_BG_TLINE_END(&ret->log);
2243 #endif
2244
2245   return ret;
2246 }
2247
2248 void CkNodeReductionMgr::pup(PUP::er &p)
2249 {
2250 //We do not store the client function pointer or the client function parameter,
2251 //it is the responsibility of the programmer to correctly restore these
2252   IrrGroup::pup(p);
2253   p(redNo);
2254   p(inProgress); p(creating); p(startRequested);
2255   p(lcount);
2256   p(nContrib); p(nRemote);
2257   p(interrupt);
2258   p|msgs;
2259   p|futureMsgs;
2260   p|futureRemoteMsgs;
2261   p|futureLateMigrantMsgs;
2262   p|parent;
2263   p|additionalGCount;
2264   p|newAdditionalGCount;
2265   if(p.isUnpacking()) {
2266     gcount=CkNumNodes();
2267     thisProxy = thisgroup;
2268     lockEverything = CmiCreateLock();
2269 #ifdef BINOMIAL_TREE
2270     init_BinomialTree();
2271 #else
2272     init_BinaryTree();
2273 #endif          
2274   }
2275   p | blocked;
2276   p | maxModificationRedNo;
2277
2278 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2279   int isnull = (storedCallback == NULL);
2280   p | isnull;
2281   if (!isnull) {
2282     if (p.isUnpacking()) {
2283       storedCallback = new CkCallback;
2284     }
2285     p|*storedCallback;
2286   }
2287 #endif
2288
2289 }
2290
2291 /*
2292         FAULT_EVAC
2293         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2294         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2295         reduction tree. 
2296 */
2297 void CkNodeReductionMgr::evacuate(){
2298         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2299         if(treeKids() == 0){
2300         /*
2301                 if the node going down is a leaf
2302         */
2303                 oldleaf=true;
2304                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2305                 /*
2306                         Need to ask parent for the reduction number that it has seen. 
2307                         Since it is a leaf, the tree does not need to be rewired. 
2308                         We reuse the oldparent type of tree modification message to get 
2309                         the parent to block and tell us about the highest reduction number it has seen.
2310                         
2311                 */
2312                 int data[2];
2313                 data[0]=CkMyNode();
2314                 data[1]=getTotalGCount()+additionalGCount;
2315                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2316                 newParent = treeParent();
2317         }else{
2318                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2319                 oldleaf= false;
2320         /*
2321                 It is not a leaf. It needs to rewire the tree around itself.
2322                 It also needs to decide on a reduction No after which the new tree will be used
2323                 Till it decides on the new tree and the redNo at which it becomes valid,
2324                 all received messages will be buffered
2325         */
2326                 newParent = kids[0];
2327                 for(int i=numKids-1;i>=0;i--){
2328                         newKids.remove(i);
2329                 }
2330                 /*
2331                         Ask everybody for the highest reduction number they have seen and
2332                         also tell them about the new tree
2333                 */
2334                 /*
2335                         Tell parent about its new child;
2336                 */
2337                 int oldParentData[2];
2338                 oldParentData[0] = CkMyNode();
2339                 oldParentData[1] = newParent;
2340                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2341
2342                 /*
2343                         Tell the other children about their new parent
2344                 */
2345                 int childrenData=newParent;
2346                 for(int i=1;i<numKids;i++){
2347                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2348                 }
2349                 
2350                 /*
2351                         Tell newParent (1st child) about its new children,
2352                         the current node and its children except the newParent
2353                 */
2354                 int *newParentData = new int[numKids+2];
2355                 for(int i=1;i<numKids;i++){
2356                         newParentData[i] = kids[i];
2357                 }
2358                 newParentData[0] = CkMyNode();
2359                 newParentData[numKids] = parent;
2360                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2361                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2362         }
2363         readyDeletion = false;
2364         blocked = true;
2365         numModificationReplies = 0;
2366         tempModificationRedNo = findMaxRedNo();
2367 }
2368
2369 /*
2370         Depending on the code, use the data to change the tree
2371         1. OLDPARENT : replace the old child with a new one
2372         2. OLDCHILDREN: replace the parent
2373         3. NEWPARENT:  add the children and change the parent
2374         4. LEAFPARENT: delete the old child
2375 */
2376
2377 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2378         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2379         int sender;
2380         newKids = kids;
2381         readyDeletion = false;
2382         newAdditionalGCount = additionalGCount;
2383         switch(code){
2384                 case OLDPARENT: 
2385                         for(int i=0;i<numKids;i++){
2386                                 if(newKids[i] == data[0]){
2387                                         newKids[i] = data[1];
2388                                         break;
2389                                 }
2390                         }
2391                         sender = data[0];
2392                         newParent = parent;
2393                         break;
2394                 case OLDCHILDREN:
2395                         newParent = data[0];
2396                         sender = parent;
2397                         break;
2398                 case NEWPARENT:
2399                         for(int i=0;i<size-2;i++){
2400                                 newKids.push_back(data[i]);
2401                         }
2402                         newParent = data[size-2];
2403                         newAdditionalGCount += data[size-1];
2404                         sender = parent;
2405                         break;
2406                 case LEAFPARENT:
2407                         for(int i=0;i<numKids;i++){
2408                                 if(newKids[i] == data[0]){
2409                                         newKids.remove(i);
2410                                         break;
2411                                 }
2412                         }
2413                         sender = data[0];
2414                         newParent = parent;
2415                         newAdditionalGCount += data[1];
2416                         break;
2417         };
2418         blocked = true;
2419         int maxRedNo = findMaxRedNo();
2420         
2421         thisProxy[sender].collectMaxRedNo(maxRedNo);
2422 }
2423
2424 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2425         /*
2426                 Find out the maximum redNo that has been seen by 
2427                 the affected nodes
2428         */
2429         numModificationReplies++;
2430         if(maxRedNo > tempModificationRedNo){
2431                 tempModificationRedNo = maxRedNo;
2432         }
2433         if(numModificationReplies == numKids+1){
2434                 maxModificationRedNo = tempModificationRedNo;
2435                 /*
2436                         when all the affected nodes have replied, tell them the maximum.
2437                         Unblock yourself. deal with the buffered messages local and remote
2438                 */
2439                 if(maxModificationRedNo == -1){
2440                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2441                 }else{
2442                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2443                 }
2444                 thisProxy[parent].unblockNode(maxModificationRedNo);
2445                 for(int i=0;i<numKids;i++){
2446                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2447                 }
2448                 blocked = false;
2449                 updateTree();
2450                 clearBlockedMsgs();
2451         }
2452 }
2453
2454 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2455         maxModificationRedNo = maxRedNo;
2456         updateTree();
2457         blocked = false;
2458         clearBlockedMsgs();
2459 }
2460
2461
2462 void CkNodeReductionMgr::clearBlockedMsgs(){
2463         int len = bufferedMsgs.length();
2464         for(int i=0;i<len;i++){
2465                 CkReductionMsg *m = bufferedMsgs.deq();
2466                 doAddContribution(m);
2467         }
2468         len = bufferedRemoteMsgs.length();
2469         for(int i=0;i<len;i++){
2470                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2471                 doRecvMsg(m);
2472         }
2473
2474 }
2475 /*
2476         if the reduction number exceeds the maxModificationRedNo, change the tree
2477         to become the new one
2478 */
2479
2480 void CkNodeReductionMgr::updateTree(){
2481         if(redNo > maxModificationRedNo){
2482                 parent = newParent;
2483                 kids = newKids;
2484                 maxModificationRedNo = INT_MAX;
2485                 numKids = kids.size();
2486                 readyDeletion = true;
2487                 additionalGCount = newAdditionalGCount;
2488                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2489                 for(int i=0;i<(int)(newKids.size());i++){
2490                         DEBREVAC(("%d ",newKids[i]));
2491                 }
2492                 DEBREVAC(("\n"));
2493         //      startReduction(redNo,CkMyNode());
2494         }else{
2495                 if(maxModificationRedNo != INT_MAX){
2496                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2497                         startReduction(redNo,CkMyNode());
2498                         finishReduction();
2499                 }       
2500         }
2501 }
2502
2503
2504 void CkNodeReductionMgr::doneEvacuate(){
2505         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2506 /*      if(oldleaf){
2507                 
2508                         It used to be a leaf
2509                         Then as soon as future messages have been emptied you can 
2510                         send the parent a message telling them that they are not going
2511                         to receive anymore messages from this child
2512                 
2513                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2514                 while(futureMsgs.length() != 0){
2515                         int n = futureMsgs.length();
2516                         for(int i=0;i<n;i++){
2517                                 CkReductionMsg *m = futureMsgs.deq();
2518                                 if(isPresent(m->redNo)){
2519                                         msgs.enq(m);
2520                                 }else{
2521                                         futureMsgs.enq(m);
2522                                 }
2523                         }
2524                         CkReductionMsg *result = reduceMessages();
2525                         thisProxy[treeParent()].RecvMsg(result);
2526                         redNo++;
2527                 }
2528                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2529                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2530         }else{*/
2531                 if(readyDeletion){
2532                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2533                 }else{
2534                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2535                 }
2536 //      }
2537 }
2538
2539 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2540         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2541         for(int i=0;i<numKids;i++){
2542                 if(kids[i] == deletedChild){
2543                         kids.remove(i);
2544                         break;
2545                 }
2546         }
2547         numKids = kids.length();
2548         finishReduction();
2549 }
2550
2551 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2552         for(int i=0;i<(int)(newKids.length());i++){
2553                 if(newKids[i] == deletedChild){
2554                         newKids.remove(i);
2555                         break;
2556                 }
2557         }
2558         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2559         for(int i=0;i<(int)(newKids.size());i++){
2560                 DEBREVAC(("%d ",newKids[i]));
2561         }
2562         DEBREVAC(("\n"));
2563         finishReduction();
2564 }
2565
2566 int CkNodeReductionMgr::findMaxRedNo(){
2567         int max = redNo;
2568         for(int i=0;i<futureRemoteMsgs.length();i++){
2569                 if(futureRemoteMsgs[i]->redNo  > max){
2570                         max = futureRemoteMsgs[i]->redNo;
2571                 }
2572         }
2573         /*
2574                 if redNo is max (that is no future message) and the current reduction has not started
2575                 then tree can be changed before the reduction redNo can be started
2576         */ 
2577         if(redNo == max && msgs.length() == 0){
2578                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2579                 max--;
2580         }
2581         return max;
2582 }
2583
2584 // initnode call. check the size of reduction table
2585 void CkReductionMgr::sanitycheck()
2586 {
2587 #if CMK_ERROR_CHECKING
2588   int count = 0;
2589   while (CkReduction::reducerTable[count] != NULL) count++;
2590   CmiAssert(CkReduction::nReducers == count);
2591 #endif
2592 }
2593
2594 #include "CkReduction.def.h"