d9d928b797b5d37f0babaa9187b278fdd0d786c4
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207 #if defined(_FAULT_CAUSAL_)
208         numImmigrantRecObjs = 0;
209 #endif
210   disableNotifyChildrenStart = CmiFalse;
211   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
212 }
213
214 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
215 {
216   redNo=0;
217   completedRedNo = -1;
218   inProgress=CmiFalse;
219   creating=CmiFalse;
220   startRequested=CmiFalse;
221   gcount=lcount=0;
222   nContrib=nRemote=0;
223   maxStartRequest=0;
224   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
225 #if defined(_FAULT_CAUSAL_)
226         numImmigrantRecObjs = 0;
227 #endif
228
229 }
230
231 void CkReductionMgr::flushStates(int isgroup)
232 {
233   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
234   redNo=0;
235   completedRedNo = -1;
236   inProgress=CmiFalse;
237   creating=CmiFalse;
238   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
239   startRequested=CmiFalse;
240   nContrib=nRemote=0;
241   maxStartRequest=0;
242
243   while (!msgs.isEmpty()) { delete msgs.deq(); }
244   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
245   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
246   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
247
248   adjVec.length()=0;
249
250 #if ! GROUP_LEVEL_REDUCTION
251   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
252 #endif
253 }
254
255 //////////// Reduction Manager Client API /////////////
256
257 //Add the given client function.  Overwrites any previous client.
258 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
259 {
260   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
261
262   if (CkMyPe()!=0)
263           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
264   storedCallback=*cb;
265 #if ! GROUP_LEVEL_REDUCTION
266   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
267   nodeProxy.ckSetReductionClient(callback);
268 #endif
269 }
270
271 ///////////////////////////// Contributor ////////////////////////
272 //Contributors keep a copy of this structure:
273
274 /*Contributor migration support:
275 */
276 void contributorInfo::pup(PUP::er &p)
277 {
278   p(redNo);
279 }
280
281 ////////////////////// Contributor list maintainance: /////////////////
282 //These just set and clear the "creating" flag to prevent
283 // reductions from finishing early because not all elements
284 // have been created.
285 void CkReductionMgr::creatingContributors(void)
286 {
287   DEBR((AA"Creating contributors...\n"AB));
288   creating=CmiTrue;
289 }
290 void CkReductionMgr::doneCreatingContributors(void)
291 {
292   DEBR((AA"Done creating contributors...\n"AB));
293   creating=CmiFalse;
294   if (startRequested) startReduction(redNo,CkMyPe());
295   finishReduction();
296 }
297
298 //A new contributor will be created
299 void CkReductionMgr::contributorStamped(contributorInfo *ci)
300 {
301   DEBR((AA"Contributor %p stamped\n"AB,ci));
302   //There is another contributor
303   gcount++;
304   if (inProgress)
305   {
306     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
307     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
308   } else
309     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
310 }
311
312 //A new contributor was actually created
313 void CkReductionMgr::contributorCreated(contributorInfo *ci)
314 {
315   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
316   //We've got another contributor
317   lcount++;
318   //He may not need to contribute to some of our reductions:
319   for (int r=redNo;r<ci->redNo;r++)
320     adj(r).lcount--;//He won't be contributing to r here
321 }
322
323 /*Don't expect any more contributions from this one.
324 This is rather horrifying because we now have to make
325 sure the global element count accurately reflects all the
326 contributions the element made before it died-- these may stretch
327 far into the future.  The adj() vector is what saves us here.
328 */
329 void CkReductionMgr::contributorDied(contributorInfo *ci)
330 {
331 #if CMK_MEM_CHECKPOINT
332   // ignore from listener if it is during restart from crash
333   if (CkInRestarting()) return;
334 #endif
335   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
336   //We lost a contributor
337   gcount--;
338
339   if (ci->redNo<redNo)
340   {//Must have been migrating during reductions-- root is waiting for his
341   // contribution, which will never come.
342     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
343     for (int r=ci->redNo;r<redNo;r++)
344       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
345   }
346
347   //Add to the global count for all his future messages (wherever they are)
348   int r;
349   for (r=redNo;r<ci->redNo;r++)
350   {//He already contributed to this reduction, but won't show up in global count.
351     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
352     adj(r).gcount++;
353   }
354
355   lcount--;
356   //He's already contributed to several reductions here
357   for (r=redNo;r<ci->redNo;r++)
358     adj(r).lcount++;//He'll be contributing to r here
359
360   finishReduction();
361 }
362
363 //Migrating away (note that global count doesn't change)
364 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
365 {
366   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
367   lcount--;//We lost a local
368   //He's already contributed to several reductions here
369   for (int r=redNo;r<ci->redNo;r++)
370     adj(r).lcount++;//He'll be contributing to r here
371
372   finishReduction();
373 }
374
375 //Migrating in (note that global count doesn't change)
376 void CkReductionMgr::contributorArriving(contributorInfo *ci)
377 {
378   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
379   lcount++;//We gained a local
380 #if CMK_MEM_CHECKPOINT
381   // ignore from listener if it is during restart from crash
382   // because the ci may be old.
383   if (CkInRestarting()) return;
384 #endif
385   //He has already contributed (elsewhere) to several reductions:
386   for (int r=redNo;r<ci->redNo;r++)
387     adj(r).lcount--;//He won't be contributing to r here
388
389 }
390
391 //Contribute-- the given msg can contain any data.  The reducerType
392 // field of the message must be valid.
393 // Each contributor must contribute exactly once to the each reduction.
394 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
395 {
396 #if CMK_BIGSIM_CHARM
397   _TRACE_BG_TLINE_END(&(m->log));
398 #endif
399   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
400   //m->ci=ci;
401   m->redNo=ci->redNo++;
402   m->sourceFlag=-1;//A single contribution
403   m->gcount=0;
404
405 #if defined(_FAULT_CAUSAL_)
406
407         // if object is an immigrant recovery object, we send the contribution to the source PE
408         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
409                 
410                 // turning on the message-logging bypass flag
411                 envelope *env = UsrToEnv(m);
412                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
413         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
414                 return;
415         }
416
417     Chare *oldObj = CpvAccess(_currentObj);
418     CpvAccess(_currentObj) = this;
419
420         // adding contribution
421         addContribution(m);
422
423     CpvAccess(_currentObj) = oldObj;
424 #else
425   addContribution(m);
426 #endif
427 }
428
429 #if defined(_FAULT_CAUSAL_)
430 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
431         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
432         
433         // turning off bypassing flag
434         envelope *env = UsrToEnv(m);
435         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
436
437         // adding contribution
438     addContribution(m);
439 }
440 #else
441 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
442 #endif
443
444 //////////// Reduction Manager Remote Entry Points /////////////
445 //Sent down the reduction tree (used by barren PEs)
446 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
447 {
448  if(CkMyPe()==0){
449         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
450         //delete m;
451         //return;
452  }
453  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
454  int srcPE = (UsrToEnv(m))->getSrcPe();
455 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
456   if (isPresent(m->num) && !inProgress)
457   {
458     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
459     startReduction(m->num,srcPE);
460     finishReduction();
461   } else if (isFuture(m->num)){
462 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
463           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
464           if(maxStartRequest < m->num){
465                   maxStartRequest = m->num;
466           }
467  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
468           
469     }
470   else //is Past
471     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
472   delete m;
473 #else
474     if(redNo == 0){
475         if(lcount == 0){
476             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
477             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
478             dummy->fromPE = CmiMyPe();
479             dummy->sourceProcessorCount = 0;
480             dummy->redNo = 0;
481             thisProxy[0].contributeViaMessage(dummy);
482         }
483     }
484 #endif
485 }
486
487 //Sent to root of reduction tree with reduction contribution
488 // of migrants that missed the main reduction.
489 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
490 {
491 #if GROUP_LEVEL_REDUCTION
492 #if CMK_BIGSIM_CHARM
493   _TRACE_BG_TLINE_END(&(m->log));
494 #endif
495   addContribution(m);
496 #else
497   m->secondaryCallback = m->callback;
498   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
499   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
500   nodeMgr->LateMigrantMsg(m);
501 /*      int len = finalMsgs.length();
502         finalMsgs.enq(m);
503 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
504         endArrayReduction();*/
505 #endif
506 }
507
508 //A late migrating contributor will never contribute to this reduction
509 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
510 {
511   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
512   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
513  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
514   adj(m->num).gcount--;//He won't be contributing to this one.
515   finishReduction();
516   delete m;
517 }
518
519 //////////// Reduction Manager State /////////////
520 void CkReductionMgr::startReduction(int number,int srcPE)
521 {
522   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
523   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
524   if (inProgress){
525         DEBR((AA"This reduction is already in progress\n"AB));
526         return;//This reduction already started
527   }
528   if (creating) //Don't start yet-- we're creating elements
529   {
530     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
531     startRequested=CmiTrue;
532     return;
533   }
534
535 //If none of these cases, we need to start the reduction--
536   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
537   inProgress=CmiTrue;
538  
539
540         /*
541                 FAULT_EVAC
542         */
543   if(!CmiNodeAlive(CkMyPe())){
544         return;
545   }
546
547   if(disableNotifyChildrenStart) return;
548  
549 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_) 
550   if(CmiMyPe() == 0 && redNo == 0){
551             for(int j=0;j<CkNumPes();j++){
552                 if(j != CkMyPe() && j != srcPE){
553                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
554                 }
555             }
556             if(lcount == 0){
557                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
558                 dummy->fromPE = CmiMyPe();
559                 dummy->sourceProcessorCount = 0;
560                 dummy->redNo = 0;
561                 thisProxy[0].contributeViaMessage(dummy);
562             }
563     }   else{
564         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
565     }
566
567
568 #else
569   //Sent start requests to our kids (in case they don't already know)
570 #if GROUP_LEVEL_REDUCTION
571   for (int k=0;k<treeKids();k++)
572   {
573     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
574     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
575   }
576 #else
577   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
578 #endif
579 #endif
580         
581         /*
582   int temp;
583   //making it a broadcast done only by PE 0
584   if(!hasParent()){
585                 temp = completedRedNo+1;
586                 for(int i=temp;i<=number;i++){
587                         for(int j=0;j<CkNumPes();j++){
588                                 if(j != CkMyPe() && j != srcPE){
589                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
590                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
591                                         }
592                                 }
593                         }
594                 }       
595         }       else{
596                 temp = number;
597         }*/
598 /*  if(!hasParent()){
599                 temp = completedRedNo+1;
600         }       else{
601                 temp = number;
602         }
603         for(int i=temp;i<=number;i++){
604         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
605                 if(hasParent()){
606           // kick-start your parent too ...
607                         if(treeParent() != srcPE){
608                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
609 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
610     CpvAccess(_currentObj) = oldObj;
611 #endif
612                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
613                                 }       
614                         }       
615                 }
616           for (int k=0;k<treeKids();k++)
617           {
618                         if(firstKid()+k != srcPE){
619                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
620                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
621                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
622                                 }       
623                         }       
624         }
625         }
626         */
627 }       
628
629 /*Handle a message from one element for the reduction*/
630 void CkReductionMgr::addContribution(CkReductionMsg *m)
631 {
632   if (isPast(m->redNo))
633   {
634 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
635         CmiAbort("this version should not have late migrations");
636 #else
637         //We've moved on-- forward late contribution straight to root
638     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
639         // if (!hasParent()) //Root moved on too soon-- should never happen
640         //   CkAbort("Late reduction contribution received at root!\n");
641     thisProxy[0].LateMigrantMsg(m);
642 #endif
643   }
644   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
645     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
646     futureMsgs.enq(m);
647   } else {// An ordinary contribution
648     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
649    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
650     startReduction(m->redNo,CkMyPe());
651     msgs.enq(m);
652     nContrib++;
653     finishReduction();
654   }
655 }
656
657 /**function checks if it has got all contributions that it is supposed to
658 get at this processor. If it is done it sends the reduced result to the local
659 nodegroup */
660 void CkReductionMgr::finishReduction(void)
661 {
662   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
663   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
664   if ((!inProgress) || creating){
665         DEBR((AA"Either not in Progress or creating\n"AB));
666         return;
667   }
668   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
669 #if (defined(_FAULT_CAUSAL_))
670         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
671         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
672                 return;//Need more local messages
673         }
674 #else
675   if (nContrib<(lcount+adj(redNo).lcount)){
676          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
677          return;//Need more local messages
678   }
679 #endif
680
681 #if GROUP_LEVEL_REDUCTION
682   if (nRemote<treeKids()) return;//Need more remote messages
683 #endif
684  
685   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
686   CkReductionMsg *result=reduceMessages();
687   result->redNo=redNo;
688 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
689
690 #if GROUP_LEVEL_REDUCTION
691   if (hasParent())
692   {//Pass data up tree to parent
693     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
694     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
695 #if (defined(_FAULT_CAUSAL_))
696     result->gcount+=gcount+adj(redNo).gcount-numImmigrantRecObjs;
697 #else
698     result->gcount+=gcount+adj(redNo).gcount;
699 #endif
700     thisProxy[treeParent()].RecvMsg(result);
701   }
702   else 
703   {//We are root-- pass data to client
704     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
705     int totalElements=result->gcount+gcount+adj(redNo).gcount;
706     if (totalElements>result->nSources()) 
707     {
708       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
709       msgs.enq(result);
710       return; // Wait for migrants to contribute
711     } else if (totalElements<result->nSources()) {
712       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
713       CkAbort("ERROR! Too many contributions at root!\n");
714     }
715     DEBR((AA"Passing result to client function\n"AB));
716     CkSetRefNum(result, result->getUserFlag());
717     if (!result->callback.isInvalid())
718             result->callback.send(result);
719     else if (!storedCallback.isInvalid())
720             storedCallback.send(result);
721     else
722             CkAbort("No reduction client!\n"
723                     "You must register a client with either SetReductionClient or during contribute.\n");
724   }
725
726 #else
727   result->gcount+=gcount+adj(redNo).gcount;
728
729   result->secondaryCallback = result->callback;
730   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
731         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
732
733   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
734
735  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
736   
737   // Find our node reduction manager, and pass reduction to him:
738   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
739   nodeMgr->contributeArrayReduction(result);
740 #endif
741 #else                // _FAULT_MLOG_
742   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
743
744     CkSetRefNum(result, result->getUserFlag());
745     if (!result->callback.isInvalid())
746         result->callback.send(result);
747     else if (!storedCallback.isInvalid())
748         storedCallback.send(result);
749     else{
750       DEBR(("No reduction client for group %d \n",thisgroup.idx));
751         CkAbort("No reduction client!\n"
752             "You must register a client with either SetReductionClient or during contribute.\n");
753     }
754
755     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
756
757 #endif               // _FAULT_MLOG_
758
759   //House Keeping Operations will have to check later what needs to be changed
760   redNo++;
761   //Shift the count adjustment vector down one slot (to match new redNo)
762   int i;
763 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
764     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
765   if(CkMyPe()!=0)
766 #endif
767   {
768         completedRedNo++;
769         for (i=1;i<(int)(adjVec.length());i++){
770            adjVec[i-1]=adjVec[i];
771         }
772         adjVec.length()--;  
773   }
774
775   inProgress=CmiFalse;
776   startRequested=CmiFalse;
777   nRemote=nContrib=0;
778
779   //Look through the future queue for messages we can now handle
780   int n=futureMsgs.length();
781   for (i=0;i<n;i++)
782   {
783     CkReductionMsg *m=futureMsgs.deq();
784     if (m!=NULL) //One of these addContributions may have finished us.
785       addContribution(m);//<- if *still* early, puts it back in the queue
786   }
787 #if GROUP_LEVEL_REDUCTION
788   n=futureRemoteMsgs.length();
789   for (i=0;i<n;i++)
790   {
791     CkReductionMsg *m=futureRemoteMsgs.deq();
792     if (m!=NULL) {
793       RecvMsg(m);//<- if *still* early, puts it back in the queue
794     }
795   }
796 #endif
797
798 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
799   if(maxStartRequest >= redNo){
800           startReduction(redNo,CkMyPe());
801           finishReduction();
802   }
803  
804 #endif
805
806 }
807
808 //Sent up the reduction tree with reduced data
809   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
810 {
811 #if GROUP_LEVEL_REDUCTION
812 #if CMK_BIGSIM_CHARM
813   _TRACE_BG_TLINE_END(&m->log);
814 #endif
815   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
816     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
817     startReduction(m->redNo, CkMyPe());
818     msgs.enq(m);
819     nRemote++;
820     finishReduction();
821   }
822   else if (isFuture(m->redNo)) {
823     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
824     futureRemoteMsgs.enq(m);
825   } 
826   else CkAbort("Recv'd late remote contribution!\n");
827 #endif
828 }
829
830 //////////// Reduction Manager Utilities /////////////
831
832 //Return the countAdjustment struct for the given redNo:
833 countAdjustment &CkReductionMgr::adj(int number)
834 {
835   number-=completedRedNo;
836   number--;
837   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
838   //Pad the adjustment vector with zeros until it's at least number long
839   while ((int)(adjVec.length())<=number)
840     adjVec.push_back(countAdjustment());
841   return adjVec[number];
842 }
843
844 //Combine (& free) the current message vector msgs.
845 CkReductionMsg *CkReductionMgr::reduceMessages(void)
846 {
847 #if CMK_BIGSIM_CHARM
848   _TRACE_BG_END_EXECUTE(1);
849   void* _bgParentLog = NULL;
850   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
851 #endif
852   CkReductionMsg *ret=NULL;
853
854   //Look through the vector for a valid reducer, swapping out placeholder messages
855   CkReduction::reducerType r=CkReduction::invalid;
856   int msgs_gcount=0;//Reduced gcount
857   int msgs_nSources=0;//Reduced nSources
858   int msgs_userFlag=-1;
859   CkCallback msgs_callback;
860   int i;
861   int nMsgs=0;
862   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
863   CkReductionMsg *m;
864   bool isMigratableContributor;
865
866   // Copy message queue into msgArr, skipping placeholders:
867   while (NULL!=(m=msgs.deq()))
868   {
869     msgs_gcount+=m->gcount;
870     if (m->sourceFlag!=0)
871     { //This is a real message from an element, not just a placeholder
872       msgArr[nMsgs++]=m;
873       msgs_nSources+=m->nSources();
874       r=m->reducer;
875       if (!m->callback.isInvalid())
876         msgs_callback=m->callback;
877       if (m->userFlag!=-1)
878         msgs_userFlag=m->userFlag;
879                         
880         isMigratableContributor=m->isMigratableContributor();
881 #if CMK_BIGSIM_CHARM
882         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
883 #endif
884     }
885     else
886     { //This is just a placeholder message-- forget it
887       delete m;
888     }
889   }
890
891   if (nMsgs==0||r==CkReduction::invalid)
892   //No valid reducer in the whole vector
893     ret=CkReductionMsg::buildNew(0,NULL);
894   else
895   {//Use the reducer to reduce the messages
896                 //if there is only one msg to be reduced just return that message
897     if(nMsgs == 1){
898         ret = msgArr[0];        
899     }else{
900         CkReduction::reducerFn f=CkReduction::reducerTable[r];
901         ret=(*f)(nMsgs,msgArr);
902     }
903     ret->reducer=r;
904   }
905
906
907
908 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
909
910 #if CRITICAL_PATH_DEBUG > 3
911   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
912 #endif
913
914   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
915   path.updateMax(UsrToEnv(ret));
916   // Combine the critical paths from all the reduction messages
917   for (i=0;i<nMsgs;i++){
918     if (msgArr[i]!=ret){
919       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
920       path.updateMax(UsrToEnv(msgArr[i]));
921     }
922   }
923   
924
925 #if CRITICAL_PATH_DEBUG > 3
926   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
927 #endif
928   
929   PathHistoryTableEntry tableEntry(path);
930   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
931   
932 #endif
933
934         //Go back through the vector, deleting old messages
935   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
936   delete [] msgArr;
937
938   //Set the message counts
939   ret->redNo=redNo;
940   ret->gcount=msgs_gcount;
941   ret->userFlag=msgs_userFlag;
942   ret->callback=msgs_callback;
943   ret->sourceFlag=msgs_nSources;
944         ret->setMigratableContributor(isMigratableContributor);
945   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
946
947   return ret;
948 }
949
950
951 //Checkpointing utilities
952 //pack-unpack method for CkReductionMsg
953 //if packing pack the message and then unpack and return it
954 //if unpacking allocate memory for it read it off disk and then unapck
955 //and return it
956 void CkReductionMgr::pup(PUP::er &p)
957 {
958 //We do not store the client function pointer or the client function parameter,
959 //it is the responsibility of the programmer to correctly restore these
960   CkGroupInitCallback::pup(p);
961   p(redNo);
962   p(completedRedNo);
963   p(inProgress); p(creating); p(startRequested);
964   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
965   p|msgs;
966   p|futureMsgs;
967   p|futureRemoteMsgs;
968   p|finalMsgs;
969   p|adjVec;
970   p|nodeProxy;
971   p|storedCallback;
972     // handle CkReductionClientBundle
973   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
974   {
975     CkReductionClientBundle *bd;
976     if (p.isUnpacking()) 
977       bd = new CkReductionClientBundle;
978     else
979       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
980     p|*bd;
981     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
982   }
983
984   // subtle --- Gengbin
985   // Group : CkReductionMgr
986   // CkArray: CkReductionMgr
987   // lcount/gcount in Group is set in Group constructor
988   // lcount/gcount in CkArray is not, it is set when array elements are created
989   // we can not pup because inserting array elems will add the counters again
990 //  p|lcount;
991 //  p|gcount;
992 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
993 //  p|lcount;
994 //  //  p|gcount;
995 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
996 #endif
997   if(p.isUnpacking()){
998     thisProxy = thisgroup;
999     maxStartRequest=0;
1000 #if GROUP_LEVEL_REDUCTION
1001 #ifdef BINOMIAL_TREE
1002     init_BinomialTree();
1003 #else
1004     init_BinaryTree();
1005 #endif
1006 #endif
1007   }
1008
1009   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1010 }
1011
1012
1013 //Callback for doing Reduction through NodeGroups added by Sayantan
1014
1015 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1016
1017         int total = m->gcount+adj(m->redNo).gcount;
1018         finalMsgs.enq(m);
1019         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1020         adj(m->redNo).mainRecvd = 1;
1021         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1022         endArrayReduction();
1023 }
1024
1025 void CkReductionMgr :: endArrayReduction(){
1026         CkReductionMsg *ret=NULL;
1027         int nMsgs=finalMsgs.length();
1028         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1029         //Look through the vector for a valid reducer, swapping out placeholder messages
1030         //CkPrintf("Length of Final Message %d \n",nMsgs);
1031         CkReduction::reducerType r=CkReduction::invalid;
1032         int msgs_gcount=0;//Reduced gcount
1033         int msgs_nSources=0;//Reduced nSources
1034         int msgs_userFlag=-1;
1035         CkCallback msgs_callback;
1036         CkCallback msgs_secondaryCallback;
1037         CkVec<CkReductionMsg *> tempMsgs;
1038         int i;
1039         int numMsgs = 0;
1040         for (i=0;i<nMsgs;i++)
1041         {
1042                 CkReductionMsg *m=finalMsgs.deq();
1043                 if(m->redNo == completedRedNo +1){
1044                         msgs_gcount+=m->gcount;
1045                         if (m->sourceFlag!=0)
1046                         { //This is a real message from an element, not just a placeholder
1047                                 msgs_nSources+=m->nSources();
1048                                 r=m->reducer;
1049                                 if (!m->callback.isInvalid())
1050                                 msgs_callback=m->callback;
1051                                 if(!m->secondaryCallback.isInvalid())
1052                                         msgs_secondaryCallback = m->secondaryCallback;
1053                                 if (m->userFlag!=-1)
1054                                         msgs_userFlag=m->userFlag;
1055                                 tempMsgs.push_back(m);
1056                         }
1057                 }else{
1058                         finalMsgs.enq(m);
1059                 }
1060
1061         }
1062         numMsgs = tempMsgs.length();
1063
1064         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1065
1066         if(numMsgs == 0){
1067                 return;
1068         }
1069         if(adj(completedRedNo+1).mainRecvd == 0){
1070                 for(i=0;i<numMsgs;i++){
1071                         finalMsgs.enq(tempMsgs[i]);
1072                 }
1073                 return;
1074         }
1075
1076 /*
1077         NOT NEEDED ANYMORE DONE at nodegroup level
1078         if(msgs_gcount  > msgs_nSources){
1079                 for(i=0;i<numMsgs;i++){
1080                         finalMsgs.enq(tempMsgs[i]);
1081                 }
1082                 return;
1083         }*/
1084
1085         if (nMsgs==0||r==CkReduction::invalid)
1086                 //No valid reducer in the whole vector
1087                 ret=CkReductionMsg::buildNew(0,NULL);
1088         else{//Use the reducer to reduce the messages
1089                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1090                 // has to be corrected elements from above need to be put into a temporary vector
1091                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1092                 ret=(*f)(numMsgs,msgArr);
1093                 ret->reducer=r;
1094
1095         }
1096
1097         
1098 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1099
1100 #if CRITICAL_PATH_DEBUG > 3
1101         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1102 #endif
1103
1104         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1105         path.updateMax(UsrToEnv(ret));
1106         // Combine the critical paths from all the reduction messages into the header for the new result
1107         for (i=0;i<numMsgs;i++){
1108           if (tempMsgs[i]!=ret){
1109             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1110             path.updateMax(UsrToEnv(tempMsgs[i]));
1111           } else {
1112             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1113           }
1114         }
1115         // Also consider the path which got us into this entry method
1116
1117 #if CRITICAL_PATH_DEBUG > 3
1118         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1119 #endif
1120
1121 #endif
1122   
1123
1124
1125
1126
1127         for(i = 0;i<numMsgs;i++){
1128                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1129         }
1130
1131         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1132         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1133
1134         ret->redNo=completedRedNo+1;
1135         ret->gcount=msgs_gcount;
1136         ret->userFlag=msgs_userFlag;
1137         ret->callback=msgs_callback;
1138         ret->secondaryCallback = msgs_secondaryCallback;
1139         ret->sourceFlag=msgs_nSources;
1140
1141         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1142
1143         CkSetRefNum(ret, ret->getUserFlag());
1144         if (!ret->secondaryCallback.isInvalid())
1145             ret->secondaryCallback.send(ret);
1146     else if (!storedCallback.isInvalid())
1147             storedCallback.send(ret);
1148     else{
1149       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1150             CkAbort("No reduction client!\n"
1151                     "You must register a client with either SetReductionClient or during contribute.\n");
1152     }
1153         completedRedNo++;
1154
1155         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1156
1157         for (i=1;i<(int)(adjVec.length());i++)
1158                 adjVec[i-1]=adjVec[i];
1159         adjVec.length()--;
1160         endArrayReduction();
1161 }
1162
1163 #if GROUP_LEVEL_REDUCTION
1164
1165 void CkReductionMgr::init_BinaryTree(){
1166         parent = (CkMyPe()-1)/TREE_WID;
1167         int firstkid = CkMyPe()*TREE_WID+1;
1168         numKids=CkNumPes()-firstkid;
1169         if (numKids>TREE_WID) numKids=TREE_WID;
1170         if (numKids<0) numKids=0;
1171
1172         for(int i=0;i<numKids;i++){
1173                 kids.push_back(firstkid+i);
1174                 newKids.push_back(firstkid+i);
1175         }
1176 }
1177
1178 void CkReductionMgr::init_BinomialTree(){
1179         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1180         /*upperSize = (unsigned )pow((double)2,depth);*/
1181         upperSize = (unsigned) 1 << depth;
1182         label = upperSize-CkMyPe()-1;
1183         int p=label;
1184         int count=0;
1185         while( p > 0){
1186                 if(p % 2 == 0)
1187                         break;
1188                 else{
1189                         p = p/2;
1190                         count++;
1191                 }
1192         }
1193         /*parent = label + rint(pow((double)2,count));*/
1194         parent = label + (1<<count);
1195         parent = upperSize -1 -parent;
1196         int temp;
1197         if(count != 0){
1198                 numKids = 0;
1199                 for(int i=0;i<count;i++){
1200                         /*temp = label - rint(pow((double)2,i));*/
1201                         temp = label - (1<<i);
1202                         temp = upperSize-1-temp;
1203                         if(temp <= CkNumPes()-1){
1204                                 kids.push_back(temp);
1205                                 numKids++;
1206                         }
1207                 }
1208         }else{
1209                 numKids = 0;
1210         //      kids = NULL;
1211         }
1212 }
1213
1214
1215 int CkReductionMgr::treeRoot(void)
1216 {
1217   return 0;
1218 }
1219 CmiBool CkReductionMgr::hasParent(void) //Root Node
1220 {
1221   return (CmiBool)(CkMyPe()!=treeRoot());
1222 }
1223 int CkReductionMgr::treeParent(void) //My parent Node
1224 {
1225   return parent;
1226 }
1227
1228 int CkReductionMgr::firstKid(void) //My first child Node
1229 {
1230   return CkMyPe()*TREE_WID+1;
1231 }
1232 int CkReductionMgr::treeKids(void)//Number of children in tree
1233 {
1234   return numKids;
1235 }
1236
1237 #endif
1238
1239 /////////////////////////////////////////////////////////////////////////
1240
1241 ////////////////////////////////
1242 //CkReductionMsg support
1243
1244 //ReductionMessage default private constructor-- does nothing
1245 CkReductionMsg::CkReductionMsg(){}
1246 CkReductionMsg::~CkReductionMsg(){}
1247
1248 //This define gives the distance from the start of the CkReductionMsg
1249 // object to the start of the user data area (just below last object field)
1250 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1251
1252 //"Constructor"-- builds and returns a new CkReductionMsg.
1253 //  the "data" array you specify will be copied into this object.
1254 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1255     CkReduction::reducerType reducer, CkReductionMsg *buf)
1256 {
1257   int len[1];
1258   len[0]=NdataSize;
1259   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1260   
1261   ret->dataSize=NdataSize;
1262   if (srcData!=NULL && !buf)
1263     memcpy(ret->data,srcData,NdataSize);
1264   ret->userFlag=-1;
1265   ret->reducer=reducer;
1266   //ret->ci=NULL;
1267   ret->sourceFlag=-1000;
1268   ret->gcount=0;
1269   ret->migratableContributor = true;
1270 #if CMK_BIGSIM_CHARM
1271   ret->log = NULL;
1272 #endif
1273   return ret;
1274 }
1275
1276 // Charm kernel message runtime support:
1277 void *
1278 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1279 {
1280   int totalsize=ARM_DATASTART+(*sz);
1281   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1282   CkReductionMsg *ret = (CkReductionMsg *)
1283     CkAllocMsg(msgnum,totalsize,priobits);
1284   ret->data=(void *)(&ret->dataStorage);
1285   return (void *) ret;
1286 }
1287
1288 void *
1289 CkReductionMsg::pack(CkReductionMsg* in)
1290 {
1291   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1292   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1293   in->data = NULL;
1294   return (void*) in;
1295 }
1296
1297 CkReductionMsg* CkReductionMsg::unpack(void *in)
1298 {
1299   CkReductionMsg *ret = (CkReductionMsg *)in;
1300   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1301   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1302   ret->data=(void *)(&ret->dataStorage);
1303   return ret;
1304 }
1305
1306
1307 /////////////////////////////////////////////////////////////////////////////////////
1308 ///////////////// Builtin Reducer Functions //////////////
1309 /* A simple reducer, like sum_int, looks like this:
1310 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1311 {
1312   int i,ret=0;
1313   for (i=0;i<nMsg;i++)
1314     ret+=*(int *)(msg[i]->getData());
1315   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1316 }
1317
1318 To keep the code small and easy to change, the implementations below
1319 are built with preprocessor macros.
1320 */
1321
1322 //////////////// simple reducers ///////////////////
1323 /*A define used to quickly and tersely construct simple reductions.
1324 The basic idea is to use the first message's data array as
1325 (pre-initialized!) scratch space for folding in the other messages.
1326  */
1327
1328 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1329 {
1330         CkAbort("Called the invalid reducer type 0.  This probably\n"
1331                 "means you forgot to initialize your custom reducer index.\n");
1332         return NULL;
1333 }
1334
1335 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1336 {
1337   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1338 }
1339
1340 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1341 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1342 {\
1343   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1344   int m,i;\
1345   int nElem=msg[0]->getLength()/sizeof(dataType);\
1346   dataType *ret=(dataType *)(msg[0]->getData());\
1347   for (m=1;m<nMsg;m++)\
1348   {\
1349     dataType *value=(dataType *)(msg[m]->getData());\
1350     for (i=0;i<nElem;i++)\
1351     {\
1352       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1353       loop\
1354     }\
1355   }\
1356   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1357   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1358 }
1359
1360 //Use this macro for reductions that have the same type for all inputs
1361 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1362   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1363   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1364   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1365   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1366
1367
1368 //Compute the sum the numbers passed by each element.
1369 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1370
1371 //Compute the product of the numbers passed by each element.
1372 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1373
1374 //Compute the largest number passed by any element.
1375 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1376
1377 //Compute the smallest integer passed by any element.
1378 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1379
1380
1381 //Compute the logical AND of the integers passed by each element.
1382 // The resulting integer will be zero if any source integer is zero; else 1.
1383 SIMPLE_REDUCTION(logical_and,int,"%d",
1384         if (value[i]==0)
1385      ret[i]=0;
1386   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1387 )
1388
1389 //Compute the logical OR of the integers passed by each element.
1390 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1391 SIMPLE_REDUCTION(logical_or,int,"%d",
1392   if (value[i]!=0)
1393            ret[i]=1;
1394   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1395 )
1396
1397 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1398 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1399
1400 //Select one random message to pass on
1401 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1402   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1403 }
1404
1405 /////////////// concat ////////////////
1406 /*
1407 This reducer simply appends the data it recieves from each element,
1408 without any housekeeping data to separate them.
1409 */
1410 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1411 {
1412   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1413   //Figure out how big a message we'll need
1414   int i,retSize=0;
1415   for (i=0;i<nMsg;i++)
1416       retSize+=msg[i]->getSize();
1417
1418   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1419
1420   //Allocate a new message
1421   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1422
1423   //Copy the source message data into the return message
1424   char *cur=(char *)(ret->getData());
1425   for (i=0;i<nMsg;i++) {
1426     int messageBytes=msg[i]->getSize();
1427     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1428     cur+=messageBytes;
1429   }
1430   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1431   return ret;
1432 }
1433
1434 /////////////// set ////////////////
1435 /*
1436 This reducer appends the data it recieves from each element
1437 along with some housekeeping data indicating contribution boundaries.
1438 The message data is thus a list of reduction_set_element structures
1439 terminated by a dummy reduction_set_element with a sourceElement of -1.
1440 */
1441
1442 //This rounds an integer up to the nearest multiple of sizeof(double)
1443 static const int alignSize=sizeof(double);
1444 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1445
1446 //This gives the size (in bytes) of a reduction_set_element
1447 static int SET_SIZE(int dataSize)
1448 {return SET_ALIGN(sizeof(int)+dataSize);}
1449
1450 //This returns a pointer to the next reduction_set_element in the list
1451 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1452 {
1453   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1454   return (CkReduction::setElement *)next;
1455 }
1456
1457 //Combine the data passed by each element into an list of reduction_set_elements.
1458 // Each element may contribute arbitrary data (with arbitrary length).
1459 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1460 {
1461   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1462   //Figure out how big a message we'll need
1463   int i,retSize=0;
1464   for (i=0;i<nMsg;i++) {
1465     if (!msg[i]->isFromUser())
1466     //This message is composite-- it will just be copied over (less terminating -1)
1467       retSize+=(msg[i]->getSize()-sizeof(int));
1468     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1469       retSize+=SET_SIZE(msg[i]->getSize());
1470   }
1471   retSize+=sizeof(int);//Leave room for terminating -1.
1472
1473   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1474
1475   //Allocate a new message
1476   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1477
1478   //Copy the source message data into the return message
1479   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1480   for (i=0;i<nMsg;i++)
1481     if (!msg[i]->isFromUser())
1482     {//This message is composite-- just copy it over (less terminating -1)
1483                         int messageBytes=msg[i]->getSize()-sizeof(int);
1484                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1485                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1486                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1487     }
1488     else //This is a message from an element-- wrap it in a reduction_set_element
1489     {
1490       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1491       cur->dataSize=msg[i]->getSize();
1492       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1493       cur=SET_NEXT(cur);
1494     }
1495   cur->dataSize=-1;//Add a terminating -1.
1496   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1497   return ret;
1498 }
1499
1500 //Utility routine: get the next reduction_set_element in the list
1501 // if there is one, or return NULL if there are none.
1502 //To get all the elements, just keep feeding this procedure's output back to
1503 // its input until it returns NULL.
1504 CkReduction::setElement *CkReduction::setElement::next(void)
1505 {
1506   CkReduction::setElement *n=SET_NEXT(this);
1507   if (n->dataSize==-1)
1508     return NULL;//This is the end of the list
1509   else
1510     return n;//This is just another element
1511 }
1512
1513 /////////////////// Reduction Function Table /////////////////////
1514 CkReduction::CkReduction() {} //Dummy private constructor
1515
1516 //Add the given reducer to the list.  Returns the new reducer's
1517 // reducerType.  Must be called in the same order on every node.
1518 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1519 {
1520   reducerTable[nReducers]=fn;
1521   return (reducerType)nReducers++;
1522 }
1523
1524 /*Reducer table: maps reducerTypes to reducerFns.
1525 It's indexed by reducerType, so the order in this table
1526 must *exactly* match the reducerType enum declaration.
1527 The names don't have to match, but it helps.
1528 */
1529 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1530
1531 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1532     ::invalid_reducer,
1533     ::nop,
1534   //Compute the sum the numbers passed by each element.
1535     ::sum_int,::sum_long,::sum_float,::sum_double,
1536
1537   //Compute the product the numbers passed by each element.
1538     ::product_int,::product_long,::product_float,::product_double,
1539
1540   //Compute the largest number passed by any element.
1541     ::max_int,::max_long,::max_float,::max_double,
1542
1543   //Compute the smallest number passed by any element.
1544     ::min_int,::min_long,::min_float,::min_double,
1545
1546   //Compute the logical AND of the integers passed by each element.
1547   // The resulting integer will be zero if any source integer is zero.
1548     ::logical_and,
1549
1550   //Compute the logical OR of the integers passed by each element.
1551   // The resulting integer will be 1 if any source integer is nonzero.
1552     ::logical_or,
1553
1554     // Compute the logical bitvector AND of the integers passed by each element.
1555     ::bitvec_and,
1556
1557     // Compute the logical bitvector OR of the integers passed by each element.
1558     ::bitvec_or,
1559
1560     // Select one of the messages at random to pass on
1561     ::random,
1562
1563   //Concatenate the (arbitrary) data passed by each element
1564     ::concat,
1565
1566   //Combine the data passed by each element into an list of setElements.
1567   // Each element may contribute arbitrary data (with arbitrary length).
1568     ::set
1569 };
1570
1571
1572
1573
1574
1575
1576
1577
1578 /********** Code added by Sayantan *********************/
1579 /** Locking is a big problem in the nodegroup code for smp.
1580  So a few assumptions have had to be made. There is one lock
1581  called lockEverything. It protects all the data structures 
1582  of the nodegroup reduction mgr. I tried grabbing it separately 
1583  for each datastructure, modifying it and then releasing it and
1584  then grabbing it again, for the next change.
1585  That doesn't really help because the interleaved execution of 
1586  different threads makes the state of the reduction manager 
1587  inconsistent. 
1588  
1589  1. Grab lockEverything before calling finishreduction or startReduction
1590     or doRecvMsg
1591  2. lockEverything is grabbed only in entry methods reductionStarting
1592     or RecvMesg or  addcontribution.
1593  ****/
1594  
1595 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1596 NodeGroup::NodeGroup(void) {
1597   __nodelock=CmiCreateLock();
1598 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1599     mlogData->objID.type = TypeNodeGroup;
1600     mlogData->objID.data.group.onPE = CkMyNode();
1601 #endif
1602
1603 }
1604 NodeGroup::~NodeGroup() {
1605   CmiDestroyLock(__nodelock);
1606 }
1607 void NodeGroup::pup(PUP::er &p)
1608 {
1609   CkNodeReductionMgr::pup(p);
1610   p|reductionInfo;
1611 }
1612
1613 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1614
1615 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1616   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1617  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1618   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1619  }
1620
1621 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1622                                     ((CkNodeReductionMgr *)this),
1623                                     reductionInfo,false)
1624
1625 /* this contribute also adds up the count across all messages it receives.
1626   Useful for summing up number of array elements who have contributed ****/ 
1627 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1628         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1629
1630
1631
1632 //#define BINOMIAL_TREE
1633
1634 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1635   : thisProxy(thisgroup)
1636 {
1637 #ifdef BINOMIAL_TREE
1638   init_BinomialTree();
1639 #else
1640   init_BinaryTree();
1641 #endif
1642   storedCallback=NULL;
1643   redNo=0;
1644   inProgress=CmiFalse;
1645   
1646   startRequested=CmiFalse;
1647   gcount=CkNumNodes();
1648   lcount=1;
1649   nContrib=nRemote=0;
1650   lockEverything = CmiCreateLock();
1651
1652
1653   creating=CmiFalse;
1654   interrupt = 0;
1655   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1656         /*
1657                 FAULT_EVAC
1658         */
1659         blocked = false;
1660         maxModificationRedNo = INT_MAX;
1661         killed=0;
1662         additionalGCount = newAdditionalGCount = 0;
1663 }
1664
1665 void CkNodeReductionMgr::flushStates()
1666 {
1667  if(CkMyRank() == 0){
1668   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1669   redNo=0;
1670   inProgress=CmiFalse;
1671
1672   startRequested=CmiFalse;
1673   gcount=CkNumNodes();
1674   lcount=1;
1675   nContrib=nRemote=0;
1676
1677   creating=CmiFalse;
1678   interrupt = 0;
1679   while (!msgs.isEmpty()) { delete msgs.deq(); }
1680   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1681   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1682   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1683   }
1684 }
1685
1686 //////////// Reduction Manager Client API /////////////
1687
1688 //Add the given client function.  Overwrites any previous client.
1689 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1690 {
1691   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1692   if(cb->isInvalid()){
1693         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1694   }else{
1695         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1696   }
1697
1698   if (CkMyNode()!=0)
1699           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1700   delete storedCallback;
1701   storedCallback=cb;
1702 }
1703
1704
1705
1706 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1707 {
1708 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1709     Chare *oldObj =CpvAccess(_currentObj);
1710     CpvAccess(_currentObj) = this;
1711 #endif
1712
1713   //m->ci=ci;
1714   m->redNo=ci->redNo++;
1715   m->sourceFlag=-1;//A single contribution
1716   m->gcount=0;
1717   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1718   addContribution(m);
1719
1720 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1721     CpvAccess(_currentObj) = oldObj;
1722 #endif
1723 }
1724
1725
1726 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1727 {
1728 #if CMK_BIGSIM_CHARM
1729   _TRACE_BG_TLINE_END(&m->log);
1730 #endif
1731 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1732     Chare *oldObj =CpvAccess(_currentObj);
1733     CpvAccess(_currentObj) = this;
1734 #endif
1735   //m->ci=ci;
1736   m->redNo=ci->redNo++;
1737   m->gcount=count;
1738   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1739   addContribution(m);
1740   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1741
1742 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1743     CpvAccess(_currentObj) = oldObj;
1744 #endif
1745 }
1746
1747
1748 //////////// Reduction Manager Remote Entry Points /////////////
1749
1750 //Sent down the reduction tree (used by barren PEs)
1751 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1752 {
1753   CmiLock(lockEverything);
1754   if(blocked){
1755         delete m;
1756         CmiUnlock(lockEverything);
1757         return ;
1758   }
1759   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1760   if (isPresent(m->num) && !inProgress)
1761   {
1762     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1763     startReduction(m->num,srcNode);
1764     finishReduction();
1765   } else if (isFuture(m->num)){
1766         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1767   }
1768   else //is Past
1769     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1770   CmiUnlock(lockEverything);
1771   delete m;
1772
1773 }
1774
1775
1776 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1777         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1778         /*
1779                 FAULT_EVAC
1780         */
1781         if(blocked){
1782                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1783                 bufferedRemoteMsgs.enq(m);
1784                 return;
1785         }
1786         
1787         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1788             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1789             startReduction(m->redNo,CkMyNode());
1790             msgs.enq(m);
1791             nRemote++;
1792             finishReduction();
1793         }
1794         else {
1795             if (isFuture(m->redNo)) {
1796                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1797                     futureRemoteMsgs.enq(m);
1798             }else{
1799                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1800                    CkAbort("Recv'd late remote contribution!\n");
1801             }
1802         }
1803         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1804 }
1805
1806 //Sent up the reduction tree with reduced data
1807 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1808 {
1809 #if CMK_BIGSIM_CHARM
1810   _TRACE_BG_TLINE_END(&m->log);
1811 #endif
1812 #ifndef CMK_CPV_IS_SMP
1813 #if CMK_IMMEDIATE_MSG
1814         if(interrupt == 1){
1815                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1816                 CpvAccess(_qd)->process(-1);
1817                 CmiDelayImmediate();
1818                 return;
1819         }
1820 #endif  
1821 #endif
1822    interrupt = 1;       
1823    CmiLock(lockEverything);   
1824    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1825    doRecvMsg(m);
1826    CmiUnlock(lockEverything);    
1827    interrupt = 0;
1828    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1829 }
1830
1831 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1832 {
1833         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1834         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1835         if (inProgress){
1836                 DEBR((AA"This Node reduction is already in progress\n"AB));
1837                 return;//This reduction already started
1838         }
1839         if (creating) //Don't start yet-- we're creating elements
1840         {
1841                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1842                 startRequested=CmiTrue;
1843                 return;
1844         }
1845         
1846         //If none of these cases, we need to start the reduction--
1847         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1848         inProgress=CmiTrue;
1849
1850         if(!_isNotifyChildInRed) return;
1851
1852         //Sent start requests to our kids (in case they don't already know)
1853         
1854         for (int k=0;k<treeKids();k++)
1855         {
1856 #ifdef BINOMIAL_TREE
1857                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1858                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1859 #else
1860                 if(kids[k] != srcNode){
1861                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1862                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1863                 }       
1864 #endif
1865         }
1866
1867         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1868         // in case, nodegroup does not has the attached red group,
1869         // it has to restart groups again
1870         startLocalGroupReductions(number);
1871 /*
1872         if (startLocalGroupReductions(number) == 0)
1873           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1874 */
1875 }
1876
1877 // restart local groups until succeed
1878 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1879   CmiLock(lockEverything);    
1880   if (startLocalGroupReductions(number) == 0)
1881     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1882   CmiUnlock(lockEverything);    
1883 }
1884
1885 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1886         /*
1887                 FAULT_EVAC
1888         */
1889         if(blocked){
1890                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1891                 bufferedMsgs.enq(m);
1892                 return;
1893         }
1894         
1895         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1896                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1897                 futureMsgs.enq(m);
1898         } else {// An ordinary contribution
1899                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1900                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1901                 startReduction(m->redNo,CkMyNode());
1902                 msgs.enq(m);
1903                 nContrib++;
1904                 finishReduction();
1905         }
1906 }
1907
1908 //Handle a message from one element for the reduction
1909 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1910 {
1911   interrupt = 1;
1912   CmiLock(lockEverything);
1913   doAddContribution(m);
1914   CmiUnlock(lockEverything);
1915   interrupt = 0;
1916 }
1917
1918 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1919         if(blocked){
1920                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1921                 bufferedMsgs.enq(m);
1922                 return;
1923         }
1924         
1925         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1926                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1927 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1928                 futureLateMigrantMsgs.enq(m);
1929         } else {// An ordinary contribution
1930                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1931 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1932                 msgs.enq(m);
1933                 finishReduction();
1934         }
1935 }
1936
1937
1938
1939
1940
1941 /** check if the nodegroup reduction is finished at this node. In that case send it
1942 up the reduction tree **/
1943
1944 void CkNodeReductionMgr::finishReduction(void)
1945 {
1946   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1947   /***Check if reduction is finished in the next few ifs***/
1948   if ((!inProgress) || creating){
1949         DEBR((AA"Either not in Progress or creating\n"AB));
1950         return;
1951   }
1952
1953   if (nContrib<(lcount)){
1954         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1955
1956          return;//Need more local messages
1957   }
1958   if (nRemote<treeKids()){
1959         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1960
1961         return;//Need more remote messages
1962   }
1963   if (nRemote>treeKids()){
1964
1965           interrupt = 0;
1966            CkAbort("Nodegrp Excess remote reduction message received!\n");
1967   }
1968
1969   DEBR((AA"Reducing node data...\n"AB));
1970
1971   /**reduce all messages received at this node **/
1972   CkReductionMsg *result=reduceMessages();
1973
1974   if (hasParent())
1975   {//Pass data up tree to parent
1976         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1977         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1978         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1979                 /*
1980                         FAULT_EVAC
1981                 */
1982                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1983             thisProxy[treeParent()].RecvMsg(result);
1984         }
1985
1986   }
1987   else
1988   {
1989                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1990                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
1991                         msgs.enq(result);
1992                         return;
1993                 }
1994                 result->gcount += additionalGCount;
1995           /** if the reduction is finished and I am the root of the reduction tree
1996           then call the reductionhandler and other stuff ***/
1997                 
1998
1999                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2000     CkSetRefNum(result, result->getUserFlag());
2001     if (!result->callback.isInvalid()){
2002       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2003             result->callback.send(result);
2004     }
2005     else if (storedCallback!=NULL){
2006       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2007             storedCallback->send(result);
2008     }
2009     else{
2010                 DEBR((AA"Invalid Callback \n"AB));
2011             CkAbort("No reduction client!\n"
2012                     "You must register a client with either SetReductionClient or during contribute.\n");
2013                 }
2014   }
2015
2016   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2017   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2018   redNo++;
2019         updateTree();
2020   int i;
2021   inProgress=CmiFalse;
2022   startRequested=CmiFalse;
2023   nRemote=nContrib=0;
2024
2025   //Look through the future queue for messages we can now handle
2026   int n=futureMsgs.length();
2027
2028   for (i=0;i<n;i++)
2029   {
2030     interrupt = 1;
2031
2032     CkReductionMsg *m=futureMsgs.deq();
2033
2034     interrupt = 0;
2035     if (m!=NULL){ //One of these addContributions may have finished us.
2036       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2037       doAddContribution(m);//<- if *still* early, puts it back in the queue
2038     }
2039   }
2040
2041   interrupt = 1;
2042
2043   n=futureRemoteMsgs.length();
2044
2045   interrupt = 0;
2046   for (i=0;i<n;i++)
2047   {
2048     interrupt = 1;
2049
2050     CkReductionMsg *m=futureRemoteMsgs.deq();
2051
2052     interrupt = 0;
2053     if (m!=NULL)
2054       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2055   }
2056   
2057   n = futureLateMigrantMsgs.length();
2058   for(i=0;i<n;i++){
2059     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2060     if(m != NULL){
2061       if(m->redNo == redNo){
2062         msgs.enq(m);
2063       }else{
2064         futureLateMigrantMsgs.enq(m);
2065       }
2066     }
2067   }
2068 }
2069
2070 //////////// Reduction Manager Utilities /////////////
2071
2072 void CkNodeReductionMgr::init_BinaryTree(){
2073         parent = (CkMyNode()-1)/TREE_WID;
2074         int firstkid = CkMyNode()*TREE_WID+1;
2075         numKids=CkNumNodes()-firstkid;
2076   if (numKids>TREE_WID) numKids=TREE_WID;
2077   if (numKids<0) numKids=0;
2078
2079         for(int i=0;i<numKids;i++){
2080                 kids.push_back(firstkid+i);
2081                 newKids.push_back(firstkid+i);
2082         }
2083 }
2084
2085 void CkNodeReductionMgr::init_BinomialTree(){
2086         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2087         /*upperSize = (unsigned )pow((double)2,depth);*/
2088         upperSize = (unsigned) 1 << depth;
2089         label = upperSize-CkMyNode()-1;
2090         int p=label;
2091         int count=0;
2092         while( p > 0){
2093                 if(p % 2 == 0)
2094                         break;
2095                 else{
2096                         p = p/2;
2097                         count++;
2098                 }
2099         }
2100         /*parent = label + rint(pow((double)2,count));*/
2101         parent = label + (1<<count);
2102         parent = upperSize -1 -parent;
2103         int temp;
2104         if(count != 0){
2105                 numKids = 0;
2106                 for(int i=0;i<count;i++){
2107                         /*temp = label - rint(pow((double)2,i));*/
2108                         temp = label - (1<<i);
2109                         temp = upperSize-1-temp;
2110                         if(temp <= CkNumNodes()-1){
2111                 //              kids[numKids] = temp;
2112                                 kids.push_back(temp);
2113                                 numKids++;
2114                         }
2115                 }
2116         }else{
2117                 numKids = 0;
2118         //      kids = NULL;
2119         }
2120 }
2121
2122
2123 int CkNodeReductionMgr::treeRoot(void)
2124 {
2125   return 0;
2126 }
2127 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2128 {
2129   return (CmiBool)(CkMyNode()!=treeRoot());
2130 }
2131 int CkNodeReductionMgr::treeParent(void) //My parent Node
2132 {
2133   return parent;
2134 }
2135
2136 int CkNodeReductionMgr::firstKid(void) //My first child Node
2137 {
2138   return CkMyNode()*TREE_WID+1;
2139 }
2140 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2141 {
2142 #ifdef BINOMIAL_TREE
2143         return numKids;
2144 #else
2145 /*  int nKids=CkNumNodes()-firstKid();
2146   if (nKids>TREE_WID) nKids=TREE_WID;
2147   if (nKids<0) nKids=0;
2148   return nKids;*/
2149         return numKids;
2150 #endif
2151 }
2152
2153 //Combine (& free) the current message vector msgs.
2154 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2155 {
2156 #if CMK_BIGSIM_CHARM
2157   _TRACE_BG_END_EXECUTE(1);
2158   void* _bgParentLog = NULL;
2159   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2160 #endif
2161   CkReductionMsg *ret=NULL;
2162
2163   //Look through the vector for a valid reducer, swapping out placeholder messages
2164   CkReduction::reducerType r=CkReduction::invalid;
2165   int msgs_gcount=0;//Reduced gcount
2166   int msgs_nSources=0;//Reduced nSources
2167   int msgs_userFlag=-1;
2168   CkCallback msgs_callback;
2169   CkCallback msgs_secondaryCallback;
2170   int i;
2171   int nMsgs=0;
2172   CkReductionMsg *m;
2173   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2174   bool isMigratableContributor;
2175         
2176
2177   while(NULL!=(m=msgs.deq()))
2178   {
2179     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2180     msgs_gcount+=m->gcount;
2181     if (m->sourceFlag!=0)
2182     { //This is a real message from an element, not just a placeholder
2183       msgArr[nMsgs++]=m;
2184       msgs_nSources+=m->nSources();
2185       r=m->reducer;
2186       if (!m->callback.isInvalid())
2187         msgs_callback=m->callback;
2188       if(!m->secondaryCallback.isInvalid()){
2189         msgs_secondaryCallback = m->secondaryCallback;
2190       }
2191       if (m->userFlag!=-1)
2192         msgs_userFlag=m->userFlag;
2193
2194         isMigratableContributor= m->isMigratableContributor();
2195 #if CMK_BIGSIM_CHARM
2196       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2197 #endif
2198                                 
2199     }
2200     else
2201     { //This is just a placeholder message-- replace it
2202       delete m;
2203     }
2204   }
2205
2206   if (nMsgs==0||r==CkReduction::invalid)
2207   //No valid reducer in the whole vector
2208     ret=CkReductionMsg::buildNew(0,NULL);
2209   else
2210   {//Use the reducer to reduce the messages
2211     if(nMsgs == 1){
2212         ret = msgArr[0];
2213     }else{
2214         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2215         ret=(*f)(nMsgs,msgArr);
2216     }
2217     ret->reducer=r;
2218   }
2219
2220         
2221 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2222 #if CRITICAL_PATH_DEBUG > 3
2223         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2224 #endif
2225         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2226         path.updateMax(UsrToEnv(ret));
2227         // Combine the critical paths from all the reduction messages into the header for the new result
2228         for (i=0;i<nMsgs;i++){
2229           if (msgArr[i]!=ret){
2230             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2231             path.updateMax(UsrToEnv(msgArr[i]));
2232           } else {
2233             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2234           }
2235         }
2236 #if CRITICAL_PATH_DEBUG > 3
2237         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2238 #endif
2239
2240 #endif
2241
2242
2243         //Go back through the vector, deleting old messages
2244   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2245   delete [] msgArr;
2246   //Set the message counts
2247   ret->redNo=redNo;
2248   ret->gcount=msgs_gcount;
2249   ret->userFlag=msgs_userFlag;
2250   ret->callback=msgs_callback;
2251   ret->secondaryCallback = msgs_secondaryCallback;
2252   ret->sourceFlag=msgs_nSources;
2253   ret->setMigratableContributor(isMigratableContributor);
2254   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2255 #if CMK_BIGSIM_CHARM
2256   _TRACE_BG_TLINE_END(&ret->log);
2257 #endif
2258
2259   return ret;
2260 }
2261
2262 void CkNodeReductionMgr::pup(PUP::er &p)
2263 {
2264 //We do not store the client function pointer or the client function parameter,
2265 //it is the responsibility of the programmer to correctly restore these
2266   IrrGroup::pup(p);
2267   p(redNo);
2268   p(inProgress); p(creating); p(startRequested);
2269   p(lcount);
2270   p(nContrib); p(nRemote);
2271   p(interrupt);
2272   p|msgs;
2273   p|futureMsgs;
2274   p|futureRemoteMsgs;
2275   p|futureLateMigrantMsgs;
2276   p|parent;
2277   p|additionalGCount;
2278   p|newAdditionalGCount;
2279   if(p.isUnpacking()) {
2280     gcount=CkNumNodes();
2281     thisProxy = thisgroup;
2282     lockEverything = CmiCreateLock();
2283 #ifdef BINOMIAL_TREE
2284     init_BinomialTree();
2285 #else
2286     init_BinaryTree();
2287 #endif          
2288   }
2289   p | blocked;
2290   p | maxModificationRedNo;
2291
2292 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2293   int isnull = (storedCallback == NULL);
2294   p | isnull;
2295   if (!isnull) {
2296     if (p.isUnpacking()) {
2297       storedCallback = new CkCallback;
2298     }
2299     p|*storedCallback;
2300   }
2301 #endif
2302
2303 }
2304
2305 /*
2306         FAULT_EVAC
2307         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2308         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2309         reduction tree. 
2310 */
2311 void CkNodeReductionMgr::evacuate(){
2312         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2313         if(treeKids() == 0){
2314         /*
2315                 if the node going down is a leaf
2316         */
2317                 oldleaf=true;
2318                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2319                 /*
2320                         Need to ask parent for the reduction number that it has seen. 
2321                         Since it is a leaf, the tree does not need to be rewired. 
2322                         We reuse the oldparent type of tree modification message to get 
2323                         the parent to block and tell us about the highest reduction number it has seen.
2324                         
2325                 */
2326                 int data[2];
2327                 data[0]=CkMyNode();
2328                 data[1]=getTotalGCount()+additionalGCount;
2329                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2330                 newParent = treeParent();
2331         }else{
2332                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2333                 oldleaf= false;
2334         /*
2335                 It is not a leaf. It needs to rewire the tree around itself.
2336                 It also needs to decide on a reduction No after which the new tree will be used
2337                 Till it decides on the new tree and the redNo at which it becomes valid,
2338                 all received messages will be buffered
2339         */
2340                 newParent = kids[0];
2341                 for(int i=numKids-1;i>=0;i--){
2342                         newKids.remove(i);
2343                 }
2344                 /*
2345                         Ask everybody for the highest reduction number they have seen and
2346                         also tell them about the new tree
2347                 */
2348                 /*
2349                         Tell parent about its new child;
2350                 */
2351                 int oldParentData[2];
2352                 oldParentData[0] = CkMyNode();
2353                 oldParentData[1] = newParent;
2354                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2355
2356                 /*
2357                         Tell the other children about their new parent
2358                 */
2359                 int childrenData=newParent;
2360                 for(int i=1;i<numKids;i++){
2361                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2362                 }
2363                 
2364                 /*
2365                         Tell newParent (1st child) about its new children,
2366                         the current node and its children except the newParent
2367                 */
2368                 int *newParentData = new int[numKids+2];
2369                 for(int i=1;i<numKids;i++){
2370                         newParentData[i] = kids[i];
2371                 }
2372                 newParentData[0] = CkMyNode();
2373                 newParentData[numKids] = parent;
2374                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2375                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2376         }
2377         readyDeletion = false;
2378         blocked = true;
2379         numModificationReplies = 0;
2380         tempModificationRedNo = findMaxRedNo();
2381 }
2382
2383 /*
2384         Depending on the code, use the data to change the tree
2385         1. OLDPARENT : replace the old child with a new one
2386         2. OLDCHILDREN: replace the parent
2387         3. NEWPARENT:  add the children and change the parent
2388         4. LEAFPARENT: delete the old child
2389 */
2390
2391 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2392         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2393         int sender;
2394         newKids = kids;
2395         readyDeletion = false;
2396         newAdditionalGCount = additionalGCount;
2397         switch(code){
2398                 case OLDPARENT: 
2399                         for(int i=0;i<numKids;i++){
2400                                 if(newKids[i] == data[0]){
2401                                         newKids[i] = data[1];
2402                                         break;
2403                                 }
2404                         }
2405                         sender = data[0];
2406                         newParent = parent;
2407                         break;
2408                 case OLDCHILDREN:
2409                         newParent = data[0];
2410                         sender = parent;
2411                         break;
2412                 case NEWPARENT:
2413                         for(int i=0;i<size-2;i++){
2414                                 newKids.push_back(data[i]);
2415                         }
2416                         newParent = data[size-2];
2417                         newAdditionalGCount += data[size-1];
2418                         sender = parent;
2419                         break;
2420                 case LEAFPARENT:
2421                         for(int i=0;i<numKids;i++){
2422                                 if(newKids[i] == data[0]){
2423                                         newKids.remove(i);
2424                                         break;
2425                                 }
2426                         }
2427                         sender = data[0];
2428                         newParent = parent;
2429                         newAdditionalGCount += data[1];
2430                         break;
2431         };
2432         blocked = true;
2433         int maxRedNo = findMaxRedNo();
2434         
2435         thisProxy[sender].collectMaxRedNo(maxRedNo);
2436 }
2437
2438 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2439         /*
2440                 Find out the maximum redNo that has been seen by 
2441                 the affected nodes
2442         */
2443         numModificationReplies++;
2444         if(maxRedNo > tempModificationRedNo){
2445                 tempModificationRedNo = maxRedNo;
2446         }
2447         if(numModificationReplies == numKids+1){
2448                 maxModificationRedNo = tempModificationRedNo;
2449                 /*
2450                         when all the affected nodes have replied, tell them the maximum.
2451                         Unblock yourself. deal with the buffered messages local and remote
2452                 */
2453                 if(maxModificationRedNo == -1){
2454                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2455                 }else{
2456                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2457                 }
2458                 thisProxy[parent].unblockNode(maxModificationRedNo);
2459                 for(int i=0;i<numKids;i++){
2460                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2461                 }
2462                 blocked = false;
2463                 updateTree();
2464                 clearBlockedMsgs();
2465         }
2466 }
2467
2468 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2469         maxModificationRedNo = maxRedNo;
2470         updateTree();
2471         blocked = false;
2472         clearBlockedMsgs();
2473 }
2474
2475
2476 void CkNodeReductionMgr::clearBlockedMsgs(){
2477         int len = bufferedMsgs.length();
2478         for(int i=0;i<len;i++){
2479                 CkReductionMsg *m = bufferedMsgs.deq();
2480                 doAddContribution(m);
2481         }
2482         len = bufferedRemoteMsgs.length();
2483         for(int i=0;i<len;i++){
2484                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2485                 doRecvMsg(m);
2486         }
2487
2488 }
2489 /*
2490         if the reduction number exceeds the maxModificationRedNo, change the tree
2491         to become the new one
2492 */
2493
2494 void CkNodeReductionMgr::updateTree(){
2495         if(redNo > maxModificationRedNo){
2496                 parent = newParent;
2497                 kids = newKids;
2498                 maxModificationRedNo = INT_MAX;
2499                 numKids = kids.size();
2500                 readyDeletion = true;
2501                 additionalGCount = newAdditionalGCount;
2502                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2503                 for(int i=0;i<(int)(newKids.size());i++){
2504                         DEBREVAC(("%d ",newKids[i]));
2505                 }
2506                 DEBREVAC(("\n"));
2507         //      startReduction(redNo,CkMyNode());
2508         }else{
2509                 if(maxModificationRedNo != INT_MAX){
2510                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2511                         startReduction(redNo,CkMyNode());
2512                         finishReduction();
2513                 }       
2514         }
2515 }
2516
2517
2518 void CkNodeReductionMgr::doneEvacuate(){
2519         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2520 /*      if(oldleaf){
2521                 
2522                         It used to be a leaf
2523                         Then as soon as future messages have been emptied you can 
2524                         send the parent a message telling them that they are not going
2525                         to receive anymore messages from this child
2526                 
2527                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2528                 while(futureMsgs.length() != 0){
2529                         int n = futureMsgs.length();
2530                         for(int i=0;i<n;i++){
2531                                 CkReductionMsg *m = futureMsgs.deq();
2532                                 if(isPresent(m->redNo)){
2533                                         msgs.enq(m);
2534                                 }else{
2535                                         futureMsgs.enq(m);
2536                                 }
2537                         }
2538                         CkReductionMsg *result = reduceMessages();
2539                         thisProxy[treeParent()].RecvMsg(result);
2540                         redNo++;
2541                 }
2542                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2543                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2544         }else{*/
2545                 if(readyDeletion){
2546                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2547                 }else{
2548                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2549                 }
2550 //      }
2551 }
2552
2553 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2554         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2555         for(int i=0;i<numKids;i++){
2556                 if(kids[i] == deletedChild){
2557                         kids.remove(i);
2558                         break;
2559                 }
2560         }
2561         numKids = kids.length();
2562         finishReduction();
2563 }
2564
2565 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2566         for(int i=0;i<(int)(newKids.length());i++){
2567                 if(newKids[i] == deletedChild){
2568                         newKids.remove(i);
2569                         break;
2570                 }
2571         }
2572         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2573         for(int i=0;i<(int)(newKids.size());i++){
2574                 DEBREVAC(("%d ",newKids[i]));
2575         }
2576         DEBREVAC(("\n"));
2577         finishReduction();
2578 }
2579
2580 int CkNodeReductionMgr::findMaxRedNo(){
2581         int max = redNo;
2582         for(int i=0;i<futureRemoteMsgs.length();i++){
2583                 if(futureRemoteMsgs[i]->redNo  > max){
2584                         max = futureRemoteMsgs[i]->redNo;
2585                 }
2586         }
2587         /*
2588                 if redNo is max (that is no future message) and the current reduction has not started
2589                 then tree can be changed before the reduction redNo can be started
2590         */ 
2591         if(redNo == max && msgs.length() == 0){
2592                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2593                 max--;
2594         }
2595         return max;
2596 }
2597
2598 // initnode call. check the size of reduction table
2599 void CkReductionMgr::sanitycheck()
2600 {
2601 #if CMK_ERROR_CHECKING
2602   int count = 0;
2603   while (CkReduction::reducerTable[count] != NULL) count++;
2604   CmiAssert(CkReduction::nReducers == count);
2605 #endif
2606 }
2607
2608 #include "CkReduction.def.h"