8c01c56a176019118b113ad1d6bebf7174cd62e9
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196         numImmigrantRecObjs = 0;
197         numEmigrantRecObjs = 0;
198 #endif
199   disableNotifyChildrenStart = CmiFalse;
200   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
201 }
202
203 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
204 {
205   redNo=0;
206   completedRedNo = -1;
207   inProgress=CmiFalse;
208   creating=CmiFalse;
209   startRequested=CmiFalse;
210   gcount=lcount=0;
211   nContrib=nRemote=0;
212   maxStartRequest=0;
213   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
214 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
215         numImmigrantRecObjs = 0;
216         numEmigrantRecObjs = 0;
217 #endif
218
219 }
220
221 void CkReductionMgr::flushStates(int isgroup)
222 {
223   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
224   redNo=0;
225   completedRedNo = -1;
226   inProgress=CmiFalse;
227   creating=CmiFalse;
228   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
229   startRequested=CmiFalse;
230   nContrib=nRemote=0;
231   maxStartRequest=0;
232
233   while (!msgs.isEmpty()) { delete msgs.deq(); }
234   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
235   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
236   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
237
238   adjVec.length()=0;
239
240 #if ! GROUP_LEVEL_REDUCTION
241   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
242 #endif
243 }
244
245 //////////// Reduction Manager Client API /////////////
246
247 //Add the given client function.  Overwrites any previous client.
248 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
249 {
250   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
251
252   if (CkMyPe()!=0)
253           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
254   storedCallback=*cb;
255 #if ! GROUP_LEVEL_REDUCTION
256   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
257   nodeProxy.ckSetReductionClient(callback);
258 #endif
259 }
260
261 ///////////////////////////// Contributor ////////////////////////
262 //Contributors keep a copy of this structure:
263
264 /*Contributor migration support:
265 */
266 void contributorInfo::pup(PUP::er &p)
267 {
268   p(redNo);
269 }
270
271 ////////////////////// Contributor list maintainance: /////////////////
272 //These just set and clear the "creating" flag to prevent
273 // reductions from finishing early because not all elements
274 // have been created.
275 void CkReductionMgr::creatingContributors(void)
276 {
277   DEBR((AA"Creating contributors...\n"AB));
278   creating=CmiTrue;
279 }
280 void CkReductionMgr::doneCreatingContributors(void)
281 {
282   DEBR((AA"Done creating contributors...\n"AB));
283   creating=CmiFalse;
284   if (startRequested) startReduction(redNo,CkMyPe());
285   finishReduction();
286 }
287
288 //A new contributor will be created
289 void CkReductionMgr::contributorStamped(contributorInfo *ci)
290 {
291   DEBR((AA"Contributor %p stamped\n"AB,ci));
292   //There is another contributor
293   gcount++;
294   if (inProgress)
295   {
296     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
297     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
298   } else
299     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
300 }
301
302 //A new contributor was actually created
303 void CkReductionMgr::contributorCreated(contributorInfo *ci)
304 {
305   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
306   //We've got another contributor
307   lcount++;
308   //He may not need to contribute to some of our reductions:
309   for (int r=redNo;r<ci->redNo;r++)
310     adj(r).lcount--;//He won't be contributing to r here
311 }
312
313 /*Don't expect any more contributions from this one.
314 This is rather horrifying because we now have to make
315 sure the global element count accurately reflects all the
316 contributions the element made before it died-- these may stretch
317 far into the future.  The adj() vector is what saves us here.
318 */
319 void CkReductionMgr::contributorDied(contributorInfo *ci)
320 {
321 #if CMK_MEM_CHECKPOINT
322   // ignore from listener if it is during restart from crash
323   if (CkInRestarting()) return;
324 #endif
325   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
326   //We lost a contributor
327   gcount--;
328
329   if (ci->redNo<redNo)
330   {//Must have been migrating during reductions-- root is waiting for his
331   // contribution, which will never come.
332     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
333     for (int r=ci->redNo;r<redNo;r++)
334       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
335   }
336
337   //Add to the global count for all his future messages (wherever they are)
338   int r;
339   for (r=redNo;r<ci->redNo;r++)
340   {//He already contributed to this reduction, but won't show up in global count.
341     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
342     adj(r).gcount++;
343   }
344
345   lcount--;
346   //He's already contributed to several reductions here
347   for (r=redNo;r<ci->redNo;r++)
348     adj(r).lcount++;//He'll be contributing to r here
349
350   finishReduction();
351 }
352
353 //Migrating away (note that global count doesn't change)
354 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
355 {
356   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
357   lcount--;//We lost a local
358   //He's already contributed to several reductions here
359   for (int r=redNo;r<ci->redNo;r++)
360     adj(r).lcount++;//He'll be contributing to r here
361
362   finishReduction();
363 }
364
365 //Migrating in (note that global count doesn't change)
366 void CkReductionMgr::contributorArriving(contributorInfo *ci)
367 {
368   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
369   lcount++;//We gained a local
370 #if CMK_MEM_CHECKPOINT
371   // ignore from listener if it is during restart from crash
372   // because the ci may be old.
373   if (CkInRestarting()) return;
374 #endif
375   //He has already contributed (elsewhere) to several reductions:
376   for (int r=redNo;r<ci->redNo;r++)
377     adj(r).lcount--;//He won't be contributing to r here
378
379 }
380
381 //Contribute-- the given msg can contain any data.  The reducerType
382 // field of the message must be valid.
383 // Each contributor must contribute exactly once to the each reduction.
384 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
385 {
386 #if CMK_BIGSIM_CHARM
387   _TRACE_BG_TLINE_END(&(m->log));
388 #endif
389   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
390   //m->ci=ci;
391   m->redNo=ci->redNo++;
392   m->sourceFlag=-1;//A single contribution
393   m->gcount=0;
394
395 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
396
397         // if object is an immigrant recovery object, we send the contribution to the source PE
398         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
399                 
400                 // turning on the message-logging bypass flag
401                 envelope *env = UsrToEnv(m);
402                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
403         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
404                 return;
405         }
406
407     Chare *oldObj = CpvAccess(_currentObj);
408     CpvAccess(_currentObj) = this;
409
410         // adding contribution
411         addContribution(m);
412
413     CpvAccess(_currentObj) = oldObj;
414 #else
415   addContribution(m);
416 #endif
417 }
418
419 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
420 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
421         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
422         
423         // turning off bypassing flag
424         envelope *env = UsrToEnv(m);
425         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
426
427         // adding contribution
428     addContribution(m);
429 }
430 #else
431 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
432 #endif
433
434 //////////// Reduction Manager Remote Entry Points /////////////
435 //Sent down the reduction tree (used by barren PEs)
436 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
437 {
438  if(CkMyPe()==0){
439         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
440         //delete m;
441         //return;
442  }
443  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
444  int srcPE = (UsrToEnv(m))->getSrcPe();
445   if (isPresent(m->num) && !inProgress)
446   {
447     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
448     startReduction(m->num,srcPE);
449     finishReduction();
450   } else if (isFuture(m->num)){
451 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
452           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
453           if(maxStartRequest < m->num){
454                   maxStartRequest = m->num;
455           }
456  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
457           
458     }
459   else //is Past
460     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
461   delete m;
462 }
463
464 //Sent to root of reduction tree with reduction contribution
465 // of migrants that missed the main reduction.
466 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
467 {
468 #if GROUP_LEVEL_REDUCTION
469 #if CMK_BIGSIM_CHARM
470   _TRACE_BG_TLINE_END(&(m->log));
471 #endif
472   addContribution(m);
473 #else
474   m->secondaryCallback = m->callback;
475   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
476   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
477   nodeMgr->LateMigrantMsg(m);
478 /*      int len = finalMsgs.length();
479         finalMsgs.enq(m);
480 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
481         endArrayReduction();*/
482 #endif
483 }
484
485 //A late migrating contributor will never contribute to this reduction
486 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
487 {
488   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
489   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
490  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
491   adj(m->num).gcount--;//He won't be contributing to this one.
492   finishReduction();
493   delete m;
494 }
495
496 //////////// Reduction Manager State /////////////
497 void CkReductionMgr::startReduction(int number,int srcPE)
498 {
499   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
500   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
501   if (inProgress){
502         DEBR((AA"This reduction is already in progress\n"AB));
503         return;//This reduction already started
504   }
505   if (creating) //Don't start yet-- we're creating elements
506   {
507     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
508     startRequested=CmiTrue;
509     return;
510   }
511
512 //If none of these cases, we need to start the reduction--
513   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
514   inProgress=CmiTrue;
515  
516
517         /*
518                 FAULT_EVAC
519         */
520   if(!CmiNodeAlive(CkMyPe())){
521         return;
522   }
523
524   if(disableNotifyChildrenStart) return;
525  
526   //Sent start requests to our kids (in case they don't already know)
527 #if GROUP_LEVEL_REDUCTION
528   for (int k=0;k<treeKids();k++)
529   {
530     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
531     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
532   }
533 #else
534   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
535 #endif
536         
537         /*
538   int temp;
539   //making it a broadcast done only by PE 0
540   if(!hasParent()){
541                 temp = completedRedNo+1;
542                 for(int i=temp;i<=number;i++){
543                         for(int j=0;j<CkNumPes();j++){
544                                 if(j != CkMyPe() && j != srcPE){
545                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
546                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
547                                         }
548                                 }
549                         }
550                 }       
551         }       else{
552                 temp = number;
553         }*/
554 /*  if(!hasParent()){
555                 temp = completedRedNo+1;
556         }       else{
557                 temp = number;
558         }
559         for(int i=temp;i<=number;i++){
560         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
561                 if(hasParent()){
562           // kick-start your parent too ...
563                         if(treeParent() != srcPE){
564                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
565 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
566     CpvAccess(_currentObj) = oldObj;
567 #endif
568                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
569                                 }       
570                         }       
571                 }
572           for (int k=0;k<treeKids();k++)
573           {
574                         if(firstKid()+k != srcPE){
575                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
576                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
577                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
578                                 }       
579                         }       
580         }
581         }
582         */
583 }       
584
585 /*Handle a message from one element for the reduction*/
586 void CkReductionMgr::addContribution(CkReductionMsg *m)
587 {
588   if (isPast(m->redNo))
589   {
590 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
591         CmiAbort("this version should not have late migrations");
592 #else
593         //We've moved on-- forward late contribution straight to root
594     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
595         // if (!hasParent()) //Root moved on too soon-- should never happen
596         //   CkAbort("Late reduction contribution received at root!\n");
597     thisProxy[0].LateMigrantMsg(m);
598 #endif
599   }
600   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
601     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
602     futureMsgs.enq(m);
603   } else {// An ordinary contribution
604     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
605    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
606     startReduction(m->redNo,CkMyPe());
607     msgs.enq(m);
608     nContrib++;
609     finishReduction();
610   }
611 }
612
613 /**function checks if it has got all contributions that it is supposed to
614 get at this processor. If it is done it sends the reduced result to the local
615 nodegroup */
616 void CkReductionMgr::finishReduction(void)
617 {
618   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
619   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
620   if ((!inProgress) || creating){
621         DEBR((AA"Either not in Progress or creating\n"AB));
622         return;
623   }
624   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
625 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
626         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
627         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
628                 return;//Need more local messages
629         }
630 #else
631   if (nContrib<(lcount+adj(redNo).lcount)){
632          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
633          return;//Need more local messages
634   }
635 #endif
636
637 #if GROUP_LEVEL_REDUCTION
638   if (nRemote<treeKids())  return;//Need more remote messages
639         
640 #endif
641  
642   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
643   CkReductionMsg *result=reduceMessages();
644   result->redNo=redNo;
645
646 #if GROUP_LEVEL_REDUCTION
647   if (hasParent())
648   {//Pass data up tree to parent
649     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
650     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
651 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
652     result->gcount+=gcount+adj(redNo).gcount;
653 #else
654     result->gcount+=gcount+adj(redNo).gcount;
655 #endif
656     thisProxy[treeParent()].RecvMsg(result);
657   }
658   else 
659   {//We are root-- pass data to client
660     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
661     int totalElements=result->gcount+gcount+adj(redNo).gcount;
662     if (totalElements>result->nSources()) 
663     {
664       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
665       msgs.enq(result);
666       return; // Wait for migrants to contribute
667     } else if (totalElements<result->nSources()) {
668       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
669       CkAbort("ERROR! Too many contributions at root!\n");
670     }
671     DEBR((AA"Passing result to client function\n"AB));
672     CkSetRefNum(result, result->getUserFlag());
673     if (!result->callback.isInvalid())
674             result->callback.send(result);
675     else if (!storedCallback.isInvalid())
676             storedCallback.send(result);
677     else
678             CkAbort("No reduction client!\n"
679                     "You must register a client with either SetReductionClient or during contribute.\n");
680   }
681
682 #else
683   result->gcount+=gcount+adj(redNo).gcount;
684
685   result->secondaryCallback = result->callback;
686   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
687         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
688
689   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
690
691  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
692   
693   // Find our node reduction manager, and pass reduction to him:
694   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
695   nodeMgr->contributeArrayReduction(result);
696 #endif
697
698   //House Keeping Operations will have to check later what needs to be changed
699   redNo++;
700   //Shift the count adjustment vector down one slot (to match new redNo)
701   int i;
702 #if !GROUP_LEVEL_REDUCTION
703     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
704   if(CkMyPe()!=0)
705 #endif
706   {
707         completedRedNo++;
708         for (i=1;i<(int)(adjVec.length());i++){
709            adjVec[i-1]=adjVec[i];
710         }
711         adjVec.length()--;  
712   }
713
714   inProgress=CmiFalse;
715   startRequested=CmiFalse;
716   nRemote=nContrib=0;
717
718   //Look through the future queue for messages we can now handle
719   int n=futureMsgs.length();
720   for (i=0;i<n;i++)
721   {
722     CkReductionMsg *m=futureMsgs.deq();
723     if (m!=NULL) //One of these addContributions may have finished us.
724       addContribution(m);//<- if *still* early, puts it back in the queue
725   }
726 #if GROUP_LEVEL_REDUCTION
727   n=futureRemoteMsgs.length();
728   for (i=0;i<n;i++)
729   {
730     CkReductionMsg *m=futureRemoteMsgs.deq();
731     if (m!=NULL) {
732       RecvMsg(m);//<- if *still* early, puts it back in the queue
733     }
734   }
735 #endif
736
737   if(maxStartRequest >= redNo){
738           startReduction(redNo,CkMyPe());
739           finishReduction();
740   }
741  
742
743 }
744
745 //Sent up the reduction tree with reduced data
746   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
747 {
748 #if GROUP_LEVEL_REDUCTION
749 #if CMK_BIGSIM_CHARM
750   _TRACE_BG_TLINE_END(&m->log);
751 #endif
752   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
753     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
754     startReduction(m->redNo, CkMyPe());
755     msgs.enq(m);
756     nRemote++;
757     finishReduction();
758   }
759   else if (isFuture(m->redNo)) {
760     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
761     futureRemoteMsgs.enq(m);
762   } 
763   else CkAbort("Recv'd late remote contribution!\n");
764 #endif
765 }
766
767 //////////// Reduction Manager Utilities /////////////
768
769 //Return the countAdjustment struct for the given redNo:
770 countAdjustment &CkReductionMgr::adj(int number)
771 {
772   number-=completedRedNo;
773   number--;
774   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
775   //Pad the adjustment vector with zeros until it's at least number long
776   while ((int)(adjVec.length())<=number)
777     adjVec.push_back(countAdjustment());
778   return adjVec[number];
779 }
780
781 //Combine (& free) the current message vector msgs.
782 CkReductionMsg *CkReductionMgr::reduceMessages(void)
783 {
784 #if CMK_BIGSIM_CHARM
785   _TRACE_BG_END_EXECUTE(1);
786   void* _bgParentLog = NULL;
787   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
788 #endif
789   CkReductionMsg *ret=NULL;
790
791   //Look through the vector for a valid reducer, swapping out placeholder messages
792   CkReduction::reducerType r=CkReduction::invalid;
793   int msgs_gcount=0;//Reduced gcount
794   int msgs_nSources=0;//Reduced nSources
795   int msgs_userFlag=-1;
796   CkCallback msgs_callback;
797   int i;
798   int nMsgs=0;
799   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
800   CkReductionMsg *m;
801   bool isMigratableContributor;
802
803   // Copy message queue into msgArr, skipping placeholders:
804   while (NULL!=(m=msgs.deq()))
805   {
806     msgs_gcount+=m->gcount;
807     if (m->sourceFlag!=0)
808     { //This is a real message from an element, not just a placeholder
809       msgArr[nMsgs++]=m;
810       msgs_nSources+=m->nSources();
811       r=m->reducer;
812       if (!m->callback.isInvalid())
813         msgs_callback=m->callback;
814       if (m->userFlag!=-1)
815         msgs_userFlag=m->userFlag;
816                         
817         isMigratableContributor=m->isMigratableContributor();
818 #if CMK_BIGSIM_CHARM
819         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
820 #endif
821     }
822     else
823     { //This is just a placeholder message-- forget it
824       delete m;
825     }
826   }
827
828   if (nMsgs==0||r==CkReduction::invalid)
829   //No valid reducer in the whole vector
830     ret=CkReductionMsg::buildNew(0,NULL);
831   else
832   {//Use the reducer to reduce the messages
833                 //if there is only one msg to be reduced just return that message
834     if(nMsgs == 1){
835         ret = msgArr[0];        
836     }else{
837         CkReduction::reducerFn f=CkReduction::reducerTable[r];
838         ret=(*f)(nMsgs,msgArr);
839     }
840     ret->reducer=r;
841   }
842
843
844
845 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
846
847 #if CRITICAL_PATH_DEBUG > 3
848   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
849 #endif
850
851   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
852   path.updateMax(UsrToEnv(ret));
853   // Combine the critical paths from all the reduction messages
854   for (i=0;i<nMsgs;i++){
855     if (msgArr[i]!=ret){
856       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
857       path.updateMax(UsrToEnv(msgArr[i]));
858     }
859   }
860   
861
862 #if CRITICAL_PATH_DEBUG > 3
863   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
864 #endif
865   
866   PathHistoryTableEntry tableEntry(path);
867   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
868   
869 #endif
870
871         //Go back through the vector, deleting old messages
872   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
873   delete [] msgArr;
874
875   //Set the message counts
876   ret->redNo=redNo;
877   ret->gcount=msgs_gcount;
878   ret->userFlag=msgs_userFlag;
879   ret->callback=msgs_callback;
880   ret->sourceFlag=msgs_nSources;
881         ret->setMigratableContributor(isMigratableContributor);
882   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
883
884   return ret;
885 }
886
887
888 //Checkpointing utilities
889 //pack-unpack method for CkReductionMsg
890 //if packing pack the message and then unpack and return it
891 //if unpacking allocate memory for it read it off disk and then unapck
892 //and return it
893 void CkReductionMgr::pup(PUP::er &p)
894 {
895 //We do not store the client function pointer or the client function parameter,
896 //it is the responsibility of the programmer to correctly restore these
897   CkGroupInitCallback::pup(p);
898   p(redNo);
899   p(completedRedNo);
900   p(inProgress); p(creating); p(startRequested);
901   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
902   p|msgs;
903   p|futureMsgs;
904   p|futureRemoteMsgs;
905   p|finalMsgs;
906   p|adjVec;
907   p|nodeProxy;
908   p|storedCallback;
909     // handle CkReductionClientBundle
910   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
911   {
912     CkReductionClientBundle *bd;
913     if (p.isUnpacking()) 
914       bd = new CkReductionClientBundle;
915     else
916       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
917     p|*bd;
918     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
919   }
920
921   // subtle --- Gengbin
922   // Group : CkReductionMgr
923   // CkArray: CkReductionMgr
924   // lcount/gcount in Group is set in Group constructor
925   // lcount/gcount in CkArray is not, it is set when array elements are created
926   // we can not pup because inserting array elems will add the counters again
927 //  p|lcount;
928 //  p|gcount;
929 //  p|lcount;
930 //  //  p|gcount;
931 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
932   if(p.isUnpacking()){
933     thisProxy = thisgroup;
934     maxStartRequest=0;
935 #if GROUP_LEVEL_REDUCTION
936 #ifdef BINOMIAL_TREE
937     init_BinomialTree();
938 #else
939     init_BinaryTree();
940 #endif
941 #endif
942   }
943
944   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
945 }
946
947
948 //Callback for doing Reduction through NodeGroups added by Sayantan
949
950 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
951
952         int total = m->gcount+adj(m->redNo).gcount;
953         finalMsgs.enq(m);
954         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
955         adj(m->redNo).mainRecvd = 1;
956         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
957         endArrayReduction();
958 }
959
960 void CkReductionMgr :: endArrayReduction(){
961         CkReductionMsg *ret=NULL;
962         int nMsgs=finalMsgs.length();
963         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
964         //Look through the vector for a valid reducer, swapping out placeholder messages
965         //CkPrintf("Length of Final Message %d \n",nMsgs);
966         CkReduction::reducerType r=CkReduction::invalid;
967         int msgs_gcount=0;//Reduced gcount
968         int msgs_nSources=0;//Reduced nSources
969         int msgs_userFlag=-1;
970         CkCallback msgs_callback;
971         CkCallback msgs_secondaryCallback;
972         CkVec<CkReductionMsg *> tempMsgs;
973         int i;
974         int numMsgs = 0;
975         for (i=0;i<nMsgs;i++)
976         {
977                 CkReductionMsg *m=finalMsgs.deq();
978                 if(m->redNo == completedRedNo +1){
979                         msgs_gcount+=m->gcount;
980                         if (m->sourceFlag!=0)
981                         { //This is a real message from an element, not just a placeholder
982                                 msgs_nSources+=m->nSources();
983                                 r=m->reducer;
984                                 if (!m->callback.isInvalid())
985                                 msgs_callback=m->callback;
986                                 if(!m->secondaryCallback.isInvalid())
987                                         msgs_secondaryCallback = m->secondaryCallback;
988                                 if (m->userFlag!=-1)
989                                         msgs_userFlag=m->userFlag;
990                                 tempMsgs.push_back(m);
991                         }
992                 }else{
993                         finalMsgs.enq(m);
994                 }
995
996         }
997         numMsgs = tempMsgs.length();
998
999         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1000
1001         if(numMsgs == 0){
1002                 return;
1003         }
1004         if(adj(completedRedNo+1).mainRecvd == 0){
1005                 for(i=0;i<numMsgs;i++){
1006                         finalMsgs.enq(tempMsgs[i]);
1007                 }
1008                 return;
1009         }
1010
1011 /*
1012         NOT NEEDED ANYMORE DONE at nodegroup level
1013         if(msgs_gcount  > msgs_nSources){
1014                 for(i=0;i<numMsgs;i++){
1015                         finalMsgs.enq(tempMsgs[i]);
1016                 }
1017                 return;
1018         }*/
1019
1020         if (nMsgs==0||r==CkReduction::invalid)
1021                 //No valid reducer in the whole vector
1022                 ret=CkReductionMsg::buildNew(0,NULL);
1023         else{//Use the reducer to reduce the messages
1024                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1025                 // has to be corrected elements from above need to be put into a temporary vector
1026                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1027                 ret=(*f)(numMsgs,msgArr);
1028                 ret->reducer=r;
1029
1030         }
1031
1032         
1033 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1034
1035 #if CRITICAL_PATH_DEBUG > 3
1036         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1037 #endif
1038
1039         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1040         path.updateMax(UsrToEnv(ret));
1041         // Combine the critical paths from all the reduction messages into the header for the new result
1042         for (i=0;i<numMsgs;i++){
1043           if (tempMsgs[i]!=ret){
1044             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1045             path.updateMax(UsrToEnv(tempMsgs[i]));
1046           } else {
1047             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1048           }
1049         }
1050         // Also consider the path which got us into this entry method
1051
1052 #if CRITICAL_PATH_DEBUG > 3
1053         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1054 #endif
1055
1056 #endif
1057   
1058
1059
1060
1061
1062         for(i = 0;i<numMsgs;i++){
1063                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1064         }
1065
1066         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1067         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1068
1069         ret->redNo=completedRedNo+1;
1070         ret->gcount=msgs_gcount;
1071         ret->userFlag=msgs_userFlag;
1072         ret->callback=msgs_callback;
1073         ret->secondaryCallback = msgs_secondaryCallback;
1074         ret->sourceFlag=msgs_nSources;
1075
1076         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1077
1078         CkSetRefNum(ret, ret->getUserFlag());
1079         if (!ret->secondaryCallback.isInvalid())
1080             ret->secondaryCallback.send(ret);
1081     else if (!storedCallback.isInvalid())
1082             storedCallback.send(ret);
1083     else{
1084       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1085             CkAbort("No reduction client!\n"
1086                     "You must register a client with either SetReductionClient or during contribute.\n");
1087     }
1088         completedRedNo++;
1089
1090         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1091
1092         for (i=1;i<(int)(adjVec.length());i++)
1093                 adjVec[i-1]=adjVec[i];
1094         adjVec.length()--;
1095         endArrayReduction();
1096 }
1097
1098 #if GROUP_LEVEL_REDUCTION
1099
1100 void CkReductionMgr::init_BinaryTree(){
1101         parent = (CkMyPe()-1)/TREE_WID;
1102         int firstkid = CkMyPe()*TREE_WID+1;
1103         numKids=CkNumPes()-firstkid;
1104         if (numKids>TREE_WID) numKids=TREE_WID;
1105         if (numKids<0) numKids=0;
1106
1107         for(int i=0;i<numKids;i++){
1108                 kids.push_back(firstkid+i);
1109                 newKids.push_back(firstkid+i);
1110         }
1111 }
1112
1113 void CkReductionMgr::init_BinomialTree(){
1114         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1115         /*upperSize = (unsigned )pow((double)2,depth);*/
1116         upperSize = (unsigned) 1 << depth;
1117         label = upperSize-CkMyPe()-1;
1118         int p=label;
1119         int count=0;
1120         while( p > 0){
1121                 if(p % 2 == 0)
1122                         break;
1123                 else{
1124                         p = p/2;
1125                         count++;
1126                 }
1127         }
1128         /*parent = label + rint(pow((double)2,count));*/
1129         parent = label + (1<<count);
1130         parent = upperSize -1 -parent;
1131         int temp;
1132         if(count != 0){
1133                 numKids = 0;
1134                 for(int i=0;i<count;i++){
1135                         /*temp = label - rint(pow((double)2,i));*/
1136                         temp = label - (1<<i);
1137                         temp = upperSize-1-temp;
1138                         if(temp <= CkNumPes()-1){
1139                                 kids.push_back(temp);
1140                                 numKids++;
1141                         }
1142                 }
1143         }else{
1144                 numKids = 0;
1145         //      kids = NULL;
1146         }
1147 }
1148
1149
1150 int CkReductionMgr::treeRoot(void)
1151 {
1152   return 0;
1153 }
1154 CmiBool CkReductionMgr::hasParent(void) //Root Node
1155 {
1156   return (CmiBool)(CkMyPe()!=treeRoot());
1157 }
1158 int CkReductionMgr::treeParent(void) //My parent Node
1159 {
1160   return parent;
1161 }
1162
1163 int CkReductionMgr::firstKid(void) //My first child Node
1164 {
1165   return CkMyPe()*TREE_WID+1;
1166 }
1167 int CkReductionMgr::treeKids(void)//Number of children in tree
1168 {
1169   return numKids;
1170 }
1171
1172 #endif
1173
1174 /////////////////////////////////////////////////////////////////////////
1175
1176 ////////////////////////////////
1177 //CkReductionMsg support
1178
1179 //ReductionMessage default private constructor-- does nothing
1180 CkReductionMsg::CkReductionMsg(){}
1181 CkReductionMsg::~CkReductionMsg(){}
1182
1183 //This define gives the distance from the start of the CkReductionMsg
1184 // object to the start of the user data area (just below last object field)
1185 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1186
1187 //"Constructor"-- builds and returns a new CkReductionMsg.
1188 //  the "data" array you specify will be copied into this object.
1189 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1190     CkReduction::reducerType reducer, CkReductionMsg *buf)
1191 {
1192   int len[1];
1193   len[0]=NdataSize;
1194   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1195   
1196   ret->dataSize=NdataSize;
1197   if (srcData!=NULL && !buf)
1198     memcpy(ret->data,srcData,NdataSize);
1199   ret->userFlag=-1;
1200   ret->reducer=reducer;
1201   //ret->ci=NULL;
1202   ret->sourceFlag=-1000;
1203   ret->gcount=0;
1204   ret->migratableContributor = true;
1205 #if CMK_BIGSIM_CHARM
1206   ret->log = NULL;
1207 #endif
1208   return ret;
1209 }
1210
1211 // Charm kernel message runtime support:
1212 void *
1213 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1214 {
1215   int totalsize=ARM_DATASTART+(*sz);
1216   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1217   CkReductionMsg *ret = (CkReductionMsg *)
1218     CkAllocMsg(msgnum,totalsize,priobits);
1219   ret->data=(void *)(&ret->dataStorage);
1220   return (void *) ret;
1221 }
1222
1223 void *
1224 CkReductionMsg::pack(CkReductionMsg* in)
1225 {
1226   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1227   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1228   in->data = NULL;
1229   return (void*) in;
1230 }
1231
1232 CkReductionMsg* CkReductionMsg::unpack(void *in)
1233 {
1234   CkReductionMsg *ret = (CkReductionMsg *)in;
1235   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1236   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1237   ret->data=(void *)(&ret->dataStorage);
1238   return ret;
1239 }
1240
1241
1242 /////////////////////////////////////////////////////////////////////////////////////
1243 ///////////////// Builtin Reducer Functions //////////////
1244 /* A simple reducer, like sum_int, looks like this:
1245 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1246 {
1247   int i,ret=0;
1248   for (i=0;i<nMsg;i++)
1249     ret+=*(int *)(msg[i]->getData());
1250   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1251 }
1252
1253 To keep the code small and easy to change, the implementations below
1254 are built with preprocessor macros.
1255 */
1256
1257 //////////////// simple reducers ///////////////////
1258 /*A define used to quickly and tersely construct simple reductions.
1259 The basic idea is to use the first message's data array as
1260 (pre-initialized!) scratch space for folding in the other messages.
1261  */
1262
1263 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1264 {
1265         CkAbort("Called the invalid reducer type 0.  This probably\n"
1266                 "means you forgot to initialize your custom reducer index.\n");
1267         return NULL;
1268 }
1269
1270 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1271 {
1272   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1273 }
1274
1275 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1276 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1277 {\
1278   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1279   int m,i;\
1280   int nElem=msg[0]->getLength()/sizeof(dataType);\
1281   dataType *ret=(dataType *)(msg[0]->getData());\
1282   for (m=1;m<nMsg;m++)\
1283   {\
1284     dataType *value=(dataType *)(msg[m]->getData());\
1285     for (i=0;i<nElem;i++)\
1286     {\
1287       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1288       loop\
1289     }\
1290   }\
1291   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1292   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1293 }
1294
1295 //Use this macro for reductions that have the same type for all inputs
1296 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1297   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1298   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1299   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1300   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1301
1302
1303 //Compute the sum the numbers passed by each element.
1304 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1305
1306 //Compute the product of the numbers passed by each element.
1307 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1308
1309 //Compute the largest number passed by any element.
1310 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1311
1312 //Compute the smallest integer passed by any element.
1313 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1314
1315
1316 //Compute the logical AND of the integers passed by each element.
1317 // The resulting integer will be zero if any source integer is zero; else 1.
1318 SIMPLE_REDUCTION(logical_and,int,"%d",
1319         if (value[i]==0)
1320      ret[i]=0;
1321   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1322 )
1323
1324 //Compute the logical OR of the integers passed by each element.
1325 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1326 SIMPLE_REDUCTION(logical_or,int,"%d",
1327   if (value[i]!=0)
1328            ret[i]=1;
1329   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1330 )
1331
1332 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1333 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1334
1335 //Select one random message to pass on
1336 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1337   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1338 }
1339
1340 /////////////// concat ////////////////
1341 /*
1342 This reducer simply appends the data it recieves from each element,
1343 without any housekeeping data to separate them.
1344 */
1345 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1346 {
1347   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1348   //Figure out how big a message we'll need
1349   int i,retSize=0;
1350   for (i=0;i<nMsg;i++)
1351       retSize+=msg[i]->getSize();
1352
1353   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1354
1355   //Allocate a new message
1356   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1357
1358   //Copy the source message data into the return message
1359   char *cur=(char *)(ret->getData());
1360   for (i=0;i<nMsg;i++) {
1361     int messageBytes=msg[i]->getSize();
1362     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1363     cur+=messageBytes;
1364   }
1365   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1366   return ret;
1367 }
1368
1369 /////////////// set ////////////////
1370 /*
1371 This reducer appends the data it recieves from each element
1372 along with some housekeeping data indicating contribution boundaries.
1373 The message data is thus a list of reduction_set_element structures
1374 terminated by a dummy reduction_set_element with a sourceElement of -1.
1375 */
1376
1377 //This rounds an integer up to the nearest multiple of sizeof(double)
1378 static const int alignSize=sizeof(double);
1379 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1380
1381 //This gives the size (in bytes) of a reduction_set_element
1382 static int SET_SIZE(int dataSize)
1383 {return SET_ALIGN(sizeof(int)+dataSize);}
1384
1385 //This returns a pointer to the next reduction_set_element in the list
1386 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1387 {
1388   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1389   return (CkReduction::setElement *)next;
1390 }
1391
1392 //Combine the data passed by each element into an list of reduction_set_elements.
1393 // Each element may contribute arbitrary data (with arbitrary length).
1394 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1395 {
1396   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1397   //Figure out how big a message we'll need
1398   int i,retSize=0;
1399   for (i=0;i<nMsg;i++) {
1400     if (!msg[i]->isFromUser())
1401     //This message is composite-- it will just be copied over (less terminating -1)
1402       retSize+=(msg[i]->getSize()-sizeof(int));
1403     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1404       retSize+=SET_SIZE(msg[i]->getSize());
1405   }
1406   retSize+=sizeof(int);//Leave room for terminating -1.
1407
1408   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1409
1410   //Allocate a new message
1411   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1412
1413   //Copy the source message data into the return message
1414   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1415   for (i=0;i<nMsg;i++)
1416     if (!msg[i]->isFromUser())
1417     {//This message is composite-- just copy it over (less terminating -1)
1418                         int messageBytes=msg[i]->getSize()-sizeof(int);
1419                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1420                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1421                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1422     }
1423     else //This is a message from an element-- wrap it in a reduction_set_element
1424     {
1425       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1426       cur->dataSize=msg[i]->getSize();
1427       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1428       cur=SET_NEXT(cur);
1429     }
1430   cur->dataSize=-1;//Add a terminating -1.
1431   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1432   return ret;
1433 }
1434
1435 //Utility routine: get the next reduction_set_element in the list
1436 // if there is one, or return NULL if there are none.
1437 //To get all the elements, just keep feeding this procedure's output back to
1438 // its input until it returns NULL.
1439 CkReduction::setElement *CkReduction::setElement::next(void)
1440 {
1441   CkReduction::setElement *n=SET_NEXT(this);
1442   if (n->dataSize==-1)
1443     return NULL;//This is the end of the list
1444   else
1445     return n;//This is just another element
1446 }
1447
1448 /////////////////// Reduction Function Table /////////////////////
1449 CkReduction::CkReduction() {} //Dummy private constructor
1450
1451 //Add the given reducer to the list.  Returns the new reducer's
1452 // reducerType.  Must be called in the same order on every node.
1453 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1454 {
1455   reducerTable[nReducers]=fn;
1456   return (reducerType)nReducers++;
1457 }
1458
1459 /*Reducer table: maps reducerTypes to reducerFns.
1460 It's indexed by reducerType, so the order in this table
1461 must *exactly* match the reducerType enum declaration.
1462 The names don't have to match, but it helps.
1463 */
1464 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1465
1466 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1467     ::invalid_reducer,
1468     ::nop,
1469   //Compute the sum the numbers passed by each element.
1470     ::sum_int,::sum_long,::sum_float,::sum_double,
1471
1472   //Compute the product the numbers passed by each element.
1473     ::product_int,::product_long,::product_float,::product_double,
1474
1475   //Compute the largest number passed by any element.
1476     ::max_int,::max_long,::max_float,::max_double,
1477
1478   //Compute the smallest number passed by any element.
1479     ::min_int,::min_long,::min_float,::min_double,
1480
1481   //Compute the logical AND of the integers passed by each element.
1482   // The resulting integer will be zero if any source integer is zero.
1483     ::logical_and,
1484
1485   //Compute the logical OR of the integers passed by each element.
1486   // The resulting integer will be 1 if any source integer is nonzero.
1487     ::logical_or,
1488
1489     // Compute the logical bitvector AND of the integers passed by each element.
1490     ::bitvec_and,
1491
1492     // Compute the logical bitvector OR of the integers passed by each element.
1493     ::bitvec_or,
1494
1495     // Select one of the messages at random to pass on
1496     ::random,
1497
1498   //Concatenate the (arbitrary) data passed by each element
1499     ::concat,
1500
1501   //Combine the data passed by each element into an list of setElements.
1502   // Each element may contribute arbitrary data (with arbitrary length).
1503     ::set
1504 };
1505
1506
1507
1508
1509
1510
1511
1512
1513 /********** Code added by Sayantan *********************/
1514 /** Locking is a big problem in the nodegroup code for smp.
1515  So a few assumptions have had to be made. There is one lock
1516  called lockEverything. It protects all the data structures 
1517  of the nodegroup reduction mgr. I tried grabbing it separately 
1518  for each datastructure, modifying it and then releasing it and
1519  then grabbing it again, for the next change.
1520  That doesn't really help because the interleaved execution of 
1521  different threads makes the state of the reduction manager 
1522  inconsistent. 
1523  
1524  1. Grab lockEverything before calling finishreduction or startReduction
1525     or doRecvMsg
1526  2. lockEverything is grabbed only in entry methods reductionStarting
1527     or RecvMesg or  addcontribution.
1528  ****/
1529  
1530 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1531 NodeGroup::NodeGroup(void) {
1532   __nodelock=CmiCreateLock();
1533 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1534     mlogData->objID.type = TypeNodeGroup;
1535     mlogData->objID.data.group.onPE = CkMyNode();
1536 #endif
1537
1538 }
1539 NodeGroup::~NodeGroup() {
1540   CmiDestroyLock(__nodelock);
1541 }
1542 void NodeGroup::pup(PUP::er &p)
1543 {
1544   CkNodeReductionMgr::pup(p);
1545   p|reductionInfo;
1546 }
1547
1548 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1549
1550 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1551   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1552  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1553   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1554  }
1555
1556 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1557                                     ((CkNodeReductionMgr *)this),
1558                                     reductionInfo,false)
1559
1560 /* this contribute also adds up the count across all messages it receives.
1561   Useful for summing up number of array elements who have contributed ****/ 
1562 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1563         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1564
1565
1566
1567 //#define BINOMIAL_TREE
1568
1569 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1570   : thisProxy(thisgroup)
1571 {
1572 #ifdef BINOMIAL_TREE
1573   init_BinomialTree();
1574 #else
1575   init_BinaryTree();
1576 #endif
1577   storedCallback=NULL;
1578   redNo=0;
1579   inProgress=CmiFalse;
1580   
1581   startRequested=CmiFalse;
1582   gcount=CkNumNodes();
1583   lcount=1;
1584   nContrib=nRemote=0;
1585   lockEverything = CmiCreateLock();
1586
1587
1588   creating=CmiFalse;
1589   interrupt = 0;
1590   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1591         /*
1592                 FAULT_EVAC
1593         */
1594         blocked = false;
1595         maxModificationRedNo = INT_MAX;
1596         killed=0;
1597         additionalGCount = newAdditionalGCount = 0;
1598 }
1599
1600 void CkNodeReductionMgr::flushStates()
1601 {
1602  if(CkMyRank() == 0){
1603   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1604   redNo=0;
1605   inProgress=CmiFalse;
1606
1607   startRequested=CmiFalse;
1608   gcount=CkNumNodes();
1609   lcount=1;
1610   nContrib=nRemote=0;
1611
1612   creating=CmiFalse;
1613   interrupt = 0;
1614   while (!msgs.isEmpty()) { delete msgs.deq(); }
1615   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1616   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1617   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1618   }
1619 }
1620
1621 //////////// Reduction Manager Client API /////////////
1622
1623 //Add the given client function.  Overwrites any previous client.
1624 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1625 {
1626   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1627   if(cb->isInvalid()){
1628         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1629   }else{
1630         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1631   }
1632
1633   if (CkMyNode()!=0)
1634           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1635   delete storedCallback;
1636   storedCallback=cb;
1637 }
1638
1639
1640
1641 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1642 {
1643 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1644     Chare *oldObj =CpvAccess(_currentObj);
1645     CpvAccess(_currentObj) = this;
1646 #endif
1647
1648   //m->ci=ci;
1649   m->redNo=ci->redNo++;
1650   m->sourceFlag=-1;//A single contribution
1651   m->gcount=0;
1652   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1653   addContribution(m);
1654
1655 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1656     CpvAccess(_currentObj) = oldObj;
1657 #endif
1658 }
1659
1660
1661 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1662 {
1663 #if CMK_BIGSIM_CHARM
1664   _TRACE_BG_TLINE_END(&m->log);
1665 #endif
1666 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1667     Chare *oldObj =CpvAccess(_currentObj);
1668     CpvAccess(_currentObj) = this;
1669 #endif
1670   //m->ci=ci;
1671   m->redNo=ci->redNo++;
1672   m->gcount=count;
1673   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1674   addContribution(m);
1675   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1676
1677 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1678     CpvAccess(_currentObj) = oldObj;
1679 #endif
1680 }
1681
1682
1683 //////////// Reduction Manager Remote Entry Points /////////////
1684
1685 //Sent down the reduction tree (used by barren PEs)
1686 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1687 {
1688   CmiLock(lockEverything);
1689         /*
1690                 FAULT_EVAC
1691         */
1692   if(blocked){
1693         delete m;
1694         CmiUnlock(lockEverything);
1695         return ;
1696   }
1697   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1698   if (isPresent(m->num) && !inProgress)
1699   {
1700     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1701     startReduction(m->num,srcNode);
1702     finishReduction();
1703   } else if (isFuture(m->num)){
1704         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1705   }
1706   else //is Past
1707     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1708   CmiUnlock(lockEverything);
1709   delete m;
1710
1711 }
1712
1713
1714 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1715         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1716         /*
1717                 FAULT_EVAC
1718         */
1719         if(blocked){
1720                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1721                 bufferedRemoteMsgs.enq(m);
1722                 return;
1723         }
1724         
1725         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1726             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1727             startReduction(m->redNo,CkMyNode());
1728             msgs.enq(m);
1729             nRemote++;
1730             finishReduction();
1731         }
1732         else {
1733             if (isFuture(m->redNo)) {
1734                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1735                     futureRemoteMsgs.enq(m);
1736             }else{
1737                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1738                    CkAbort("Recv'd late remote contribution!\n");
1739             }
1740         }
1741         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1742 }
1743
1744 //Sent up the reduction tree with reduced data
1745 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1746 {
1747 #if CMK_BIGSIM_CHARM
1748   _TRACE_BG_TLINE_END(&m->log);
1749 #endif
1750 #ifndef CMK_CPV_IS_SMP
1751 #if CMK_IMMEDIATE_MSG
1752         if(interrupt == 1){
1753                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1754                 CpvAccess(_qd)->process(-1);
1755                 CmiDelayImmediate();
1756                 return;
1757         }
1758 #endif  
1759 #endif
1760    interrupt = 1;       
1761    CmiLock(lockEverything);   
1762    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1763    doRecvMsg(m);
1764    CmiUnlock(lockEverything);    
1765    interrupt = 0;
1766    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1767 }
1768
1769 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1770 {
1771         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1772         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1773         if (inProgress){
1774                 DEBR((AA"This Node reduction is already in progress\n"AB));
1775                 return;//This reduction already started
1776         }
1777         if (creating) //Don't start yet-- we're creating elements
1778         {
1779                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1780                 startRequested=CmiTrue;
1781                 return;
1782         }
1783         
1784         //If none of these cases, we need to start the reduction--
1785         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1786         inProgress=CmiTrue;
1787
1788         if(!_isNotifyChildInRed) return;
1789
1790         //Sent start requests to our kids (in case they don't already know)
1791         
1792         for (int k=0;k<treeKids();k++)
1793         {
1794 #ifdef BINOMIAL_TREE
1795                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1796                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1797 #else
1798                 if(kids[k] != srcNode){
1799                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1800                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1801                 }       
1802 #endif
1803         }
1804
1805         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1806         // in case, nodegroup does not has the attached red group,
1807         // it has to restart groups again
1808         startLocalGroupReductions(number);
1809 /*
1810         if (startLocalGroupReductions(number) == 0)
1811           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1812 */
1813 }
1814
1815 // restart local groups until succeed
1816 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1817   CmiLock(lockEverything);    
1818   if (startLocalGroupReductions(number) == 0)
1819     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1820   CmiUnlock(lockEverything);    
1821 }
1822
1823 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1824         /*
1825                 FAULT_EVAC
1826         */
1827         if(blocked){
1828                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1829                 bufferedMsgs.enq(m);
1830                 return;
1831         }
1832         
1833         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1834                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1835                 futureMsgs.enq(m);
1836         } else {// An ordinary contribution
1837                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1838                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1839                 startReduction(m->redNo,CkMyNode());
1840                 msgs.enq(m);
1841                 nContrib++;
1842                 finishReduction();
1843         }
1844 }
1845
1846 //Handle a message from one element for the reduction
1847 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1848 {
1849   interrupt = 1;
1850   CmiLock(lockEverything);
1851   doAddContribution(m);
1852   CmiUnlock(lockEverything);
1853   interrupt = 0;
1854 }
1855
1856 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1857         CmiLock(lockEverything);   
1858         /*
1859                 FAULT_EVAC
1860         */
1861         if(blocked){
1862                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1863                 bufferedMsgs.enq(m);
1864                 CmiUnlock(lockEverything);   
1865                 return;
1866         }
1867         
1868         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1869                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1870 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1871                 futureLateMigrantMsgs.enq(m);
1872         } else {// An ordinary contribution
1873                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1874 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1875                 msgs.enq(m);
1876                 finishReduction();
1877         }
1878         CmiUnlock(lockEverything);   
1879 }
1880
1881
1882
1883
1884
1885 /** check if the nodegroup reduction is finished at this node. In that case send it
1886 up the reduction tree **/
1887
1888 void CkNodeReductionMgr::finishReduction(void)
1889 {
1890   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1891   /***Check if reduction is finished in the next few ifs***/
1892   if ((!inProgress) || creating){
1893         DEBR((AA"Either not in Progress or creating\n"AB));
1894         return;
1895   }
1896
1897   if (nContrib<(lcount)){
1898         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1899
1900          return;//Need more local messages
1901   }
1902   if (nRemote<treeKids()){
1903         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1904
1905         return;//Need more remote messages
1906   }
1907   if (nRemote>treeKids()){
1908
1909           interrupt = 0;
1910            CkAbort("Nodegrp Excess remote reduction message received!\n");
1911   }
1912
1913   DEBR((AA"Reducing node data...\n"AB));
1914
1915   /**reduce all messages received at this node **/
1916   CkReductionMsg *result=reduceMessages();
1917
1918   if (hasParent())
1919   {//Pass data up tree to parent
1920         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1921         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1922         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1923                 /*
1924                         FAULT_EVAC
1925                 */
1926                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1927             thisProxy[treeParent()].RecvMsg(result);
1928         }
1929
1930   }
1931   else
1932   {
1933                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1934                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
1935                         msgs.enq(result);
1936                         return;
1937                 }
1938                 result->gcount += additionalGCount;
1939           /** if the reduction is finished and I am the root of the reduction tree
1940           then call the reductionhandler and other stuff ***/
1941                 
1942
1943                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
1944     CkSetRefNum(result, result->getUserFlag());
1945     if (!result->callback.isInvalid()){
1946       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
1947             result->callback.send(result);
1948     }
1949     else if (storedCallback!=NULL){
1950       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
1951             storedCallback->send(result);
1952     }
1953     else{
1954                 DEBR((AA"Invalid Callback \n"AB));
1955             CkAbort("No reduction client!\n"
1956                     "You must register a client with either SetReductionClient or during contribute.\n");
1957                 }
1958   }
1959
1960   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1961   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1962   redNo++;
1963         updateTree();
1964   int i;
1965   inProgress=CmiFalse;
1966   startRequested=CmiFalse;
1967   nRemote=nContrib=0;
1968
1969   //Look through the future queue for messages we can now handle
1970   int n=futureMsgs.length();
1971
1972   for (i=0;i<n;i++)
1973   {
1974     interrupt = 1;
1975
1976     CkReductionMsg *m=futureMsgs.deq();
1977
1978     interrupt = 0;
1979     if (m!=NULL){ //One of these addContributions may have finished us.
1980       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
1981       doAddContribution(m);//<- if *still* early, puts it back in the queue
1982     }
1983   }
1984
1985   interrupt = 1;
1986
1987   n=futureRemoteMsgs.length();
1988
1989   interrupt = 0;
1990   for (i=0;i<n;i++)
1991   {
1992     interrupt = 1;
1993
1994     CkReductionMsg *m=futureRemoteMsgs.deq();
1995
1996     interrupt = 0;
1997     if (m!=NULL)
1998       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1999   }
2000   
2001   n = futureLateMigrantMsgs.length();
2002   for(i=0;i<n;i++){
2003     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2004     if(m != NULL){
2005       if(m->redNo == redNo){
2006         msgs.enq(m);
2007       }else{
2008         futureLateMigrantMsgs.enq(m);
2009       }
2010     }
2011   }
2012 }
2013
2014 //////////// Reduction Manager Utilities /////////////
2015
2016 void CkNodeReductionMgr::init_BinaryTree(){
2017         parent = (CkMyNode()-1)/TREE_WID;
2018         int firstkid = CkMyNode()*TREE_WID+1;
2019         numKids=CkNumNodes()-firstkid;
2020   if (numKids>TREE_WID) numKids=TREE_WID;
2021   if (numKids<0) numKids=0;
2022
2023         for(int i=0;i<numKids;i++){
2024                 kids.push_back(firstkid+i);
2025                 newKids.push_back(firstkid+i);
2026         }
2027 }
2028
2029 void CkNodeReductionMgr::init_BinomialTree(){
2030         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2031         /*upperSize = (unsigned )pow((double)2,depth);*/
2032         upperSize = (unsigned) 1 << depth;
2033         label = upperSize-CkMyNode()-1;
2034         int p=label;
2035         int count=0;
2036         while( p > 0){
2037                 if(p % 2 == 0)
2038                         break;
2039                 else{
2040                         p = p/2;
2041                         count++;
2042                 }
2043         }
2044         /*parent = label + rint(pow((double)2,count));*/
2045         parent = label + (1<<count);
2046         parent = upperSize -1 -parent;
2047         int temp;
2048         if(count != 0){
2049                 numKids = 0;
2050                 for(int i=0;i<count;i++){
2051                         /*temp = label - rint(pow((double)2,i));*/
2052                         temp = label - (1<<i);
2053                         temp = upperSize-1-temp;
2054                         if(temp <= CkNumNodes()-1){
2055                 //              kids[numKids] = temp;
2056                                 kids.push_back(temp);
2057                                 numKids++;
2058                         }
2059                 }
2060         }else{
2061                 numKids = 0;
2062         //      kids = NULL;
2063         }
2064 }
2065
2066
2067 int CkNodeReductionMgr::treeRoot(void)
2068 {
2069   return 0;
2070 }
2071 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2072 {
2073   return (CmiBool)(CkMyNode()!=treeRoot());
2074 }
2075 int CkNodeReductionMgr::treeParent(void) //My parent Node
2076 {
2077   return parent;
2078 }
2079
2080 int CkNodeReductionMgr::firstKid(void) //My first child Node
2081 {
2082   return CkMyNode()*TREE_WID+1;
2083 }
2084 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2085 {
2086 #ifdef BINOMIAL_TREE
2087         return numKids;
2088 #else
2089 /*  int nKids=CkNumNodes()-firstKid();
2090   if (nKids>TREE_WID) nKids=TREE_WID;
2091   if (nKids<0) nKids=0;
2092   return nKids;*/
2093         return numKids;
2094 #endif
2095 }
2096
2097 //Combine (& free) the current message vector msgs.
2098 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2099 {
2100 #if CMK_BIGSIM_CHARM
2101   _TRACE_BG_END_EXECUTE(1);
2102   void* _bgParentLog = NULL;
2103   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2104 #endif
2105   CkReductionMsg *ret=NULL;
2106
2107   //Look through the vector for a valid reducer, swapping out placeholder messages
2108   CkReduction::reducerType r=CkReduction::invalid;
2109   int msgs_gcount=0;//Reduced gcount
2110   int msgs_nSources=0;//Reduced nSources
2111   int msgs_userFlag=-1;
2112   CkCallback msgs_callback;
2113   CkCallback msgs_secondaryCallback;
2114   int i;
2115   int nMsgs=0;
2116   CkReductionMsg *m;
2117   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2118   bool isMigratableContributor;
2119         
2120
2121   while(NULL!=(m=msgs.deq()))
2122   {
2123     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2124     msgs_gcount+=m->gcount;
2125     if (m->sourceFlag!=0)
2126     { //This is a real message from an element, not just a placeholder
2127       msgArr[nMsgs++]=m;
2128       msgs_nSources+=m->nSources();
2129       r=m->reducer;
2130       if (!m->callback.isInvalid())
2131         msgs_callback=m->callback;
2132       if(!m->secondaryCallback.isInvalid()){
2133         msgs_secondaryCallback = m->secondaryCallback;
2134       }
2135       if (m->userFlag!=-1)
2136         msgs_userFlag=m->userFlag;
2137
2138         isMigratableContributor= m->isMigratableContributor();
2139 #if CMK_BIGSIM_CHARM
2140       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2141 #endif
2142                                 
2143     }
2144     else
2145     { //This is just a placeholder message-- replace it
2146       delete m;
2147     }
2148   }
2149
2150   if (nMsgs==0||r==CkReduction::invalid)
2151   //No valid reducer in the whole vector
2152     ret=CkReductionMsg::buildNew(0,NULL);
2153   else
2154   {//Use the reducer to reduce the messages
2155     if(nMsgs == 1){
2156         ret = msgArr[0];
2157     }else{
2158         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2159         ret=(*f)(nMsgs,msgArr);
2160     }
2161     ret->reducer=r;
2162   }
2163
2164         
2165 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2166 #if CRITICAL_PATH_DEBUG > 3
2167         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2168 #endif
2169         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2170         path.updateMax(UsrToEnv(ret));
2171         // Combine the critical paths from all the reduction messages into the header for the new result
2172         for (i=0;i<nMsgs;i++){
2173           if (msgArr[i]!=ret){
2174             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2175             path.updateMax(UsrToEnv(msgArr[i]));
2176           } else {
2177             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2178           }
2179         }
2180 #if CRITICAL_PATH_DEBUG > 3
2181         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2182 #endif
2183
2184 #endif
2185
2186
2187         //Go back through the vector, deleting old messages
2188   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2189   delete [] msgArr;
2190   //Set the message counts
2191   ret->redNo=redNo;
2192   ret->gcount=msgs_gcount;
2193   ret->userFlag=msgs_userFlag;
2194   ret->callback=msgs_callback;
2195   ret->secondaryCallback = msgs_secondaryCallback;
2196   ret->sourceFlag=msgs_nSources;
2197   ret->setMigratableContributor(isMigratableContributor);
2198   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2199 #if CMK_BIGSIM_CHARM
2200   _TRACE_BG_TLINE_END(&ret->log);
2201 #endif
2202
2203   return ret;
2204 }
2205
2206 void CkNodeReductionMgr::pup(PUP::er &p)
2207 {
2208 //We do not store the client function pointer or the client function parameter,
2209 //it is the responsibility of the programmer to correctly restore these
2210   IrrGroup::pup(p);
2211   p(redNo);
2212   p(inProgress); p(creating); p(startRequested);
2213   p(lcount);
2214   p(nContrib); p(nRemote);
2215   p(interrupt);
2216   p|msgs;
2217   p|futureMsgs;
2218   p|futureRemoteMsgs;
2219   p|futureLateMigrantMsgs;
2220   p|parent;
2221   p|additionalGCount;
2222   p|newAdditionalGCount;
2223   if(p.isUnpacking()) {
2224     gcount=CkNumNodes();
2225     thisProxy = thisgroup;
2226     lockEverything = CmiCreateLock();
2227 #ifdef BINOMIAL_TREE
2228     init_BinomialTree();
2229 #else
2230     init_BinaryTree();
2231 #endif          
2232   }
2233   p | blocked;
2234   p | maxModificationRedNo;
2235
2236 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2237   int isnull = (storedCallback == NULL);
2238   p | isnull;
2239   if (!isnull) {
2240     if (p.isUnpacking()) {
2241       storedCallback = new CkCallback;
2242     }
2243     p|*storedCallback;
2244   }
2245 #endif
2246
2247 }
2248
2249 /*
2250         FAULT_EVAC
2251         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2252         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2253         reduction tree. 
2254 */
2255 void CkNodeReductionMgr::evacuate(){
2256         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2257         if(treeKids() == 0){
2258         /*
2259                 if the node going down is a leaf
2260         */
2261                 oldleaf=true;
2262                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2263                 /*
2264                         Need to ask parent for the reduction number that it has seen. 
2265                         Since it is a leaf, the tree does not need to be rewired. 
2266                         We reuse the oldparent type of tree modification message to get 
2267                         the parent to block and tell us about the highest reduction number it has seen.
2268                         
2269                 */
2270                 int data[2];
2271                 data[0]=CkMyNode();
2272                 data[1]=getTotalGCount()+additionalGCount;
2273                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2274                 newParent = treeParent();
2275         }else{
2276                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2277                 oldleaf= false;
2278         /*
2279                 It is not a leaf. It needs to rewire the tree around itself.
2280                 It also needs to decide on a reduction No after which the new tree will be used
2281                 Till it decides on the new tree and the redNo at which it becomes valid,
2282                 all received messages will be buffered
2283         */
2284                 newParent = kids[0];
2285                 for(int i=numKids-1;i>=0;i--){
2286                         newKids.remove(i);
2287                 }
2288                 /*
2289                         Ask everybody for the highest reduction number they have seen and
2290                         also tell them about the new tree
2291                 */
2292                 /*
2293                         Tell parent about its new child;
2294                 */
2295                 int oldParentData[2];
2296                 oldParentData[0] = CkMyNode();
2297                 oldParentData[1] = newParent;
2298                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2299
2300                 /*
2301                         Tell the other children about their new parent
2302                 */
2303                 int childrenData=newParent;
2304                 for(int i=1;i<numKids;i++){
2305                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2306                 }
2307                 
2308                 /*
2309                         Tell newParent (1st child) about its new children,
2310                         the current node and its children except the newParent
2311                 */
2312                 int *newParentData = new int[numKids+2];
2313                 for(int i=1;i<numKids;i++){
2314                         newParentData[i] = kids[i];
2315                 }
2316                 newParentData[0] = CkMyNode();
2317                 newParentData[numKids] = parent;
2318                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2319                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2320         }
2321         readyDeletion = false;
2322         blocked = true;
2323         numModificationReplies = 0;
2324         tempModificationRedNo = findMaxRedNo();
2325 }
2326
2327 /*
2328         Depending on the code, use the data to change the tree
2329         1. OLDPARENT : replace the old child with a new one
2330         2. OLDCHILDREN: replace the parent
2331         3. NEWPARENT:  add the children and change the parent
2332         4. LEAFPARENT: delete the old child
2333 */
2334
2335 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2336         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2337         int sender;
2338         newKids = kids;
2339         readyDeletion = false;
2340         newAdditionalGCount = additionalGCount;
2341         switch(code){
2342                 case OLDPARENT: 
2343                         for(int i=0;i<numKids;i++){
2344                                 if(newKids[i] == data[0]){
2345                                         newKids[i] = data[1];
2346                                         break;
2347                                 }
2348                         }
2349                         sender = data[0];
2350                         newParent = parent;
2351                         break;
2352                 case OLDCHILDREN:
2353                         newParent = data[0];
2354                         sender = parent;
2355                         break;
2356                 case NEWPARENT:
2357                         for(int i=0;i<size-2;i++){
2358                                 newKids.push_back(data[i]);
2359                         }
2360                         newParent = data[size-2];
2361                         newAdditionalGCount += data[size-1];
2362                         sender = parent;
2363                         break;
2364                 case LEAFPARENT:
2365                         for(int i=0;i<numKids;i++){
2366                                 if(newKids[i] == data[0]){
2367                                         newKids.remove(i);
2368                                         break;
2369                                 }
2370                         }
2371                         sender = data[0];
2372                         newParent = parent;
2373                         newAdditionalGCount += data[1];
2374                         break;
2375         };
2376         blocked = true;
2377         int maxRedNo = findMaxRedNo();
2378         
2379         thisProxy[sender].collectMaxRedNo(maxRedNo);
2380 }
2381
2382 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2383         /*
2384                 Find out the maximum redNo that has been seen by 
2385                 the affected nodes
2386         */
2387         numModificationReplies++;
2388         if(maxRedNo > tempModificationRedNo){
2389                 tempModificationRedNo = maxRedNo;
2390         }
2391         if(numModificationReplies == numKids+1){
2392                 maxModificationRedNo = tempModificationRedNo;
2393                 /*
2394                         when all the affected nodes have replied, tell them the maximum.
2395                         Unblock yourself. deal with the buffered messages local and remote
2396                 */
2397                 if(maxModificationRedNo == -1){
2398                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2399                 }else{
2400                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2401                 }
2402                 thisProxy[parent].unblockNode(maxModificationRedNo);
2403                 for(int i=0;i<numKids;i++){
2404                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2405                 }
2406                 blocked = false;
2407                 updateTree();
2408                 clearBlockedMsgs();
2409         }
2410 }
2411
2412 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2413         maxModificationRedNo = maxRedNo;
2414         updateTree();
2415         blocked = false;
2416         clearBlockedMsgs();
2417 }
2418
2419
2420 void CkNodeReductionMgr::clearBlockedMsgs(){
2421         int len = bufferedMsgs.length();
2422         for(int i=0;i<len;i++){
2423                 CkReductionMsg *m = bufferedMsgs.deq();
2424                 doAddContribution(m);
2425         }
2426         len = bufferedRemoteMsgs.length();
2427         for(int i=0;i<len;i++){
2428                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2429                 doRecvMsg(m);
2430         }
2431
2432 }
2433 /*
2434         if the reduction number exceeds the maxModificationRedNo, change the tree
2435         to become the new one
2436 */
2437
2438 void CkNodeReductionMgr::updateTree(){
2439         if(redNo > maxModificationRedNo){
2440                 parent = newParent;
2441                 kids = newKids;
2442                 maxModificationRedNo = INT_MAX;
2443                 numKids = kids.size();
2444                 readyDeletion = true;
2445                 additionalGCount = newAdditionalGCount;
2446                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2447                 for(int i=0;i<(int)(newKids.size());i++){
2448                         DEBREVAC(("%d ",newKids[i]));
2449                 }
2450                 DEBREVAC(("\n"));
2451         //      startReduction(redNo,CkMyNode());
2452         }else{
2453                 if(maxModificationRedNo != INT_MAX){
2454                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2455                         startReduction(redNo,CkMyNode());
2456                         finishReduction();
2457                 }       
2458         }
2459 }
2460
2461
2462 void CkNodeReductionMgr::doneEvacuate(){
2463         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2464 /*      if(oldleaf){
2465                 
2466                         It used to be a leaf
2467                         Then as soon as future messages have been emptied you can 
2468                         send the parent a message telling them that they are not going
2469                         to receive anymore messages from this child
2470                 
2471                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2472                 while(futureMsgs.length() != 0){
2473                         int n = futureMsgs.length();
2474                         for(int i=0;i<n;i++){
2475                                 CkReductionMsg *m = futureMsgs.deq();
2476                                 if(isPresent(m->redNo)){
2477                                         msgs.enq(m);
2478                                 }else{
2479                                         futureMsgs.enq(m);
2480                                 }
2481                         }
2482                         CkReductionMsg *result = reduceMessages();
2483                         thisProxy[treeParent()].RecvMsg(result);
2484                         redNo++;
2485                 }
2486                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2487                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2488         }else{*/
2489                 if(readyDeletion){
2490                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2491                 }else{
2492                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2493                 }
2494 //      }
2495 }
2496
2497 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2498         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2499         for(int i=0;i<numKids;i++){
2500                 if(kids[i] == deletedChild){
2501                         kids.remove(i);
2502                         break;
2503                 }
2504         }
2505         numKids = kids.length();
2506         finishReduction();
2507 }
2508
2509 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2510         for(int i=0;i<(int)(newKids.length());i++){
2511                 if(newKids[i] == deletedChild){
2512                         newKids.remove(i);
2513                         break;
2514                 }
2515         }
2516         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2517         for(int i=0;i<(int)(newKids.size());i++){
2518                 DEBREVAC(("%d ",newKids[i]));
2519         }
2520         DEBREVAC(("\n"));
2521         finishReduction();
2522 }
2523
2524 int CkNodeReductionMgr::findMaxRedNo(){
2525         int max = redNo;
2526         for(int i=0;i<futureRemoteMsgs.length();i++){
2527                 if(futureRemoteMsgs[i]->redNo  > max){
2528                         max = futureRemoteMsgs[i]->redNo;
2529                 }
2530         }
2531         /*
2532                 if redNo is max (that is no future message) and the current reduction has not started
2533                 then tree can be changed before the reduction redNo can be started
2534         */ 
2535         if(redNo == max && msgs.length() == 0){
2536                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2537                 max--;
2538         }
2539         return max;
2540 }
2541
2542 // initnode call. check the size of reduction table
2543 void CkReductionMgr::sanitycheck()
2544 {
2545 #if CMK_ERROR_CHECKING
2546   int count = 0;
2547   while (CkReduction::reducerTable[count] != NULL) count++;
2548   CmiAssert(CkReduction::nReducers == count);
2549 #endif
2550 }
2551
2552 #include "CkReduction.def.h"