committed a wrong file in previous commit, committing the right file now
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   disableNotifyChildrenStart = CmiFalse;
208   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
209 }
210
211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
212 {
213   redNo=0;
214   completedRedNo = -1;
215   inProgress=CmiFalse;
216   creating=CmiFalse;
217   startRequested=CmiFalse;
218   gcount=lcount=0;
219   nContrib=nRemote=0;
220   maxStartRequest=0;
221   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
222 }
223
224 void CkReductionMgr::flushStates(int isgroup)
225 {
226   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
227   redNo=0;
228   completedRedNo = -1;
229   inProgress=CmiFalse;
230   creating=CmiFalse;
231   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
232   startRequested=CmiFalse;
233   nContrib=nRemote=0;
234   maxStartRequest=0;
235
236   while (!msgs.isEmpty()) { delete msgs.deq(); }
237   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
238   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
239   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
240
241   adjVec.length()=0;
242
243 #if ! GROUP_LEVEL_REDUCTION
244   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
245 #endif
246 }
247
248 //////////// Reduction Manager Client API /////////////
249
250 //Add the given client function.  Overwrites any previous client.
251 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
252 {
253   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
254
255   if (CkMyPe()!=0)
256           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
257   storedCallback=*cb;
258 #if ! GROUP_LEVEL_REDUCTION
259   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
260   nodeProxy.ckSetReductionClient(callback);
261 #endif
262 }
263
264 ///////////////////////////// Contributor ////////////////////////
265 //Contributors keep a copy of this structure:
266
267 /*Contributor migration support:
268 */
269 void contributorInfo::pup(PUP::er &p)
270 {
271   p(redNo);
272 }
273
274 ////////////////////// Contributor list maintainance: /////////////////
275 //These just set and clear the "creating" flag to prevent
276 // reductions from finishing early because not all elements
277 // have been created.
278 void CkReductionMgr::creatingContributors(void)
279 {
280   DEBR((AA"Creating contributors...\n"AB));
281   creating=CmiTrue;
282 }
283 void CkReductionMgr::doneCreatingContributors(void)
284 {
285   DEBR((AA"Done creating contributors...\n"AB));
286   creating=CmiFalse;
287   if (startRequested) startReduction(redNo,CkMyPe());
288   finishReduction();
289 }
290
291 //A new contributor will be created
292 void CkReductionMgr::contributorStamped(contributorInfo *ci)
293 {
294   DEBR((AA"Contributor %p stamped\n"AB,ci));
295   //There is another contributor
296   gcount++;
297   if (inProgress)
298   {
299     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
300     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
301   } else
302     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
303 }
304
305 //A new contributor was actually created
306 void CkReductionMgr::contributorCreated(contributorInfo *ci)
307 {
308   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
309   //We've got another contributor
310   lcount++;
311   //He may not need to contribute to some of our reductions:
312   for (int r=redNo;r<ci->redNo;r++)
313     adj(r).lcount--;//He won't be contributing to r here
314 }
315
316 /*Don't expect any more contributions from this one.
317 This is rather horrifying because we now have to make
318 sure the global element count accurately reflects all the
319 contributions the element made before it died-- these may stretch
320 far into the future.  The adj() vector is what saves us here.
321 */
322 void CkReductionMgr::contributorDied(contributorInfo *ci)
323 {
324 #if CMK_MEM_CHECKPOINT
325   // ignore from listener if it is during restart from crash
326   if (CkInRestarting()) return;
327 #endif
328   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
329   //We lost a contributor
330   gcount--;
331
332   if (ci->redNo<redNo)
333   {//Must have been migrating during reductions-- root is waiting for his
334   // contribution, which will never come.
335     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
336     for (int r=ci->redNo;r<redNo;r++)
337       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
338   }
339
340   //Add to the global count for all his future messages (wherever they are)
341   int r;
342   for (r=redNo;r<ci->redNo;r++)
343   {//He already contributed to this reduction, but won't show up in global count.
344     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
345     adj(r).gcount++;
346   }
347
348   lcount--;
349   //He's already contributed to several reductions here
350   for (r=redNo;r<ci->redNo;r++)
351     adj(r).lcount++;//He'll be contributing to r here
352
353   finishReduction();
354 }
355
356 //Migrating away (note that global count doesn't change)
357 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
358 {
359   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
360   lcount--;//We lost a local
361   //He's already contributed to several reductions here
362   for (int r=redNo;r<ci->redNo;r++)
363     adj(r).lcount++;//He'll be contributing to r here
364
365   finishReduction();
366 }
367
368 //Migrating in (note that global count doesn't change)
369 void CkReductionMgr::contributorArriving(contributorInfo *ci)
370 {
371   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
372   lcount++;//We gained a local
373 #if CMK_MEM_CHECKPOINT
374   // ignore from listener if it is during restart from crash
375   // because the ci may be old.
376   if (CkInRestarting()) return;
377 #endif
378   //He has already contributed (elsewhere) to several reductions:
379   for (int r=redNo;r<ci->redNo;r++)
380     adj(r).lcount--;//He won't be contributing to r here
381
382 }
383
384 //Contribute-- the given msg can contain any data.  The reducerType
385 // field of the message must be valid.
386 // Each contributor must contribute exactly once to the each reduction.
387 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
388 {
389 #if CMK_BIGSIM_CHARM
390   _TRACE_BG_TLINE_END(&(m->log));
391 #endif
392   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
393   //m->ci=ci;
394   m->redNo=ci->redNo++;
395   m->sourceFlag=-1;//A single contribution
396   m->gcount=0;
397
398 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
399     if(lcount == 0){
400         m->sourceProcessorCount = 1;
401     }else{
402         m->sourceProcessorCount = lcount;
403     }
404     m->fromPE = CmiMyPe();
405     Chare *oldObj =CpvAccess(_currentObj);
406     char currentObjName[100];
407     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
408     thisProxy[0].contributeViaMessage(m);
409 #else
410   addContribution(m);
411 #endif
412 }
413
414 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
415 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
416     CmiAssert(CmiMyPe() == 0);
417     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
418     if(redNo == 0){
419         if(perProcessorCounts[m->fromPE] == -1){
420             processorCount++;
421             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
422             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
423         }else{
424             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
425                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
426                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
427             }
428         }
429         if(processorCount == CmiNumPes()){
430             totalCount = 0;
431             for(int i=0;i<CmiNumPes();i++){
432                 CmiAssert(perProcessorCounts[i] != -1);
433                 totalCount += perProcessorCounts[i];
434             }
435             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
436         }
437         if(m->sourceProcessorCount == 0){
438             if(processorCount == CmiNumPes()){
439                 finishReduction();
440             }
441             return;
442         }
443     }
444     addContribution(m);
445 }
446 #else
447 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
448 #endif
449
450 //////////// Reduction Manager Remote Entry Points /////////////
451 //Sent down the reduction tree (used by barren PEs)
452 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
453 {
454  if(CkMyPe()==0){
455         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
456         //delete m;
457         //return;
458  }
459  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
460  int srcPE = (UsrToEnv(m))->getSrcPe();
461 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
462   if (isPresent(m->num) && !inProgress)
463   {
464     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
465     startReduction(m->num,srcPE);
466     finishReduction();
467   } else if (isFuture(m->num)){
468 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
469           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
470           if(maxStartRequest < m->num){
471                   maxStartRequest = m->num;
472           }
473  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
474           
475     }
476   else //is Past
477     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
478   delete m;
479 #else
480     if(redNo == 0){
481         if(lcount == 0){
482             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
483             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
484             dummy->fromPE = CmiMyPe();
485             dummy->sourceProcessorCount = 0;
486             dummy->redNo = 0;
487             thisProxy[0].contributeViaMessage(dummy);
488         }
489     }
490 #endif
491 }
492
493 //Sent to root of reduction tree with reduction contribution
494 // of migrants that missed the main reduction.
495 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
496 {
497 #if GROUP_LEVEL_REDUCTION
498 #if CMK_BIGSIM_CHARM
499   _TRACE_BG_TLINE_END(&(m->log));
500 #endif
501   addContribution(m);
502 #else
503   m->secondaryCallback = m->callback;
504   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
505   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
506   nodeMgr->LateMigrantMsg(m);
507 /*      int len = finalMsgs.length();
508         finalMsgs.enq(m);
509 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
510         endArrayReduction();*/
511 #endif
512 }
513
514 //A late migrating contributor will never contribute to this reduction
515 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
516 {
517   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
518   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
519  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
520   adj(m->num).gcount--;//He won't be contributing to this one.
521   finishReduction();
522   delete m;
523 }
524
525 //////////// Reduction Manager State /////////////
526 void CkReductionMgr::startReduction(int number,int srcPE)
527 {
528   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
529   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
530   if (inProgress){
531         DEBR((AA"This reduction is already in progress\n"AB));
532         return;//This reduction already started
533   }
534   if (creating) //Don't start yet-- we're creating elements
535   {
536     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
537     startRequested=CmiTrue;
538     return;
539   }
540
541 //If none of these cases, we need to start the reduction--
542   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
543   inProgress=CmiTrue;
544  
545
546         /*
547                 FAULT_EVAC
548         */
549   if(!CmiNodeAlive(CkMyPe())){
550         return;
551   }
552
553   if(disableNotifyChildrenStart) return;
554   
555 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
556   if(CmiMyPe() == 0 && redNo == 0){
557             for(int j=0;j<CkNumPes();j++){
558                 if(j != CkMyPe() && j != srcPE){
559                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
560                 }
561             }
562             if(lcount == 0){
563                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
564                 dummy->fromPE = CmiMyPe();
565                 dummy->sourceProcessorCount = 0;
566                 dummy->redNo = 0;
567                 thisProxy[0].contributeViaMessage(dummy);
568             }
569     }   else{
570         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
571     }
572
573
574 #else
575   //Sent start requests to our kids (in case they don't already know)
576 #if GROUP_LEVEL_REDUCTION
577   for (int k=0;k<treeKids();k++)
578   {
579     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
580     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
581   }
582 #else
583   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
584 #endif
585 #endif
586         
587         /*
588   int temp;
589   //making it a broadcast done only by PE 0
590   if(!hasParent()){
591                 temp = completedRedNo+1;
592                 for(int i=temp;i<=number;i++){
593                         for(int j=0;j<CkNumPes();j++){
594                                 if(j != CkMyPe() && j != srcPE){
595                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
596                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
597                                         }
598                                 }
599                         }
600                 }       
601         }       else{
602                 temp = number;
603         }*/
604 /*  if(!hasParent()){
605                 temp = completedRedNo+1;
606         }       else{
607                 temp = number;
608         }
609         for(int i=temp;i<=number;i++){
610         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
611                 if(hasParent()){
612           // kick-start your parent too ...
613                         if(treeParent() != srcPE){
614                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
615 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
616     CpvAccess(_currentObj) = oldObj;
617 #endif
618                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
619                                 }       
620                         }       
621                 }
622           for (int k=0;k<treeKids();k++)
623           {
624                         if(firstKid()+k != srcPE){
625                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
626                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
627                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
628                                 }       
629                         }       
630         }
631         }
632         */
633 }       
634
635 /*Handle a message from one element for the reduction*/
636 void CkReductionMgr::addContribution(CkReductionMsg *m)
637 {
638   if (isPast(m->redNo))
639   {
640 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
641         CmiAbort("this version should not have late migrations");
642 #else
643         //We've moved on-- forward late contribution straight to root
644     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
645         // if (!hasParent()) //Root moved on too soon-- should never happen
646         //   CkAbort("Late reduction contribution received at root!\n");
647     thisProxy[0].LateMigrantMsg(m);
648 #endif
649   }
650   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
651     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
652     futureMsgs.enq(m);
653   } else {// An ordinary contribution
654     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
655    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
656     startReduction(m->redNo,CkMyPe());
657     msgs.enq(m);
658     nContrib++;
659     finishReduction();
660   }
661 }
662
663 /**function checks if it has got all contributions that it is supposed to
664 get at this processor. If it is done it sends the reduced result to the local
665 nodegroup */
666 void CkReductionMgr::finishReduction(void)
667 {
668   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
669   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
670   if ((!inProgress) || creating){
671         DEBR((AA"Either not in Progress or creating\n"AB));
672         return;
673   }
674   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
675 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
676
677   if (nContrib<(lcount+adj(redNo).lcount)){
678          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
679          return;//Need more local messages
680   }
681 #if GROUP_LEVEL_REDUCTION
682   if (nRemote<treeKids()) return;//Need more remote messages
683 #endif
684  
685 #else
686
687     if(CkMyPe() != 0){
688         if(redNo != 0){
689             return;
690         }else{
691             CmiAssert(lcount == 0);
692             CkReductionMsg *dummy = reduceMessages();
693             dummy->fromPE = CmiMyPe();
694             dummy->sourceProcessorCount = 0;
695             thisProxy[0].contributeViaMessage(dummy);
696             return;
697         }
698     }
699     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
700         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
701          return;//Need more local messages
702   }else{
703         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
704     }
705
706
707 #endif
708
709   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
710   CkReductionMsg *result=reduceMessages();
711   result->redNo=redNo;
712 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
713
714 #if GROUP_LEVEL_REDUCTION
715   if (hasParent())
716   {//Pass data up tree to parent
717     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
718     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
719     result->gcount+=gcount+adj(redNo).gcount;
720     thisProxy[treeParent()].RecvMsg(result);
721     delete result;
722   }
723   else 
724   {//We are root-- pass data to client
725     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
726     int totalElements=result->gcount+gcount+adj(redNo).gcount;
727     if (totalElements>result->nSources()) 
728     {
729       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
730       msgs.enq(result);
731       return; // Wait for migrants to contribute
732     } else if (totalElements<result->nSources()) {
733       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
734       CkAbort("ERROR! Too many contributions at root!\n");
735     }
736     DEBR((AA"Passing result to client function\n"AB));
737     CkSetRefNum(result, result->getUserFlag());
738     if (!result->callback.isInvalid())
739             result->callback.send(result);
740     else if (!storedCallback.isInvalid())
741             storedCallback.send(result);
742     else
743             CkAbort("No reduction client!\n"
744                     "You must register a client with either SetReductionClient or during contribute.\n");
745   }
746
747 #else
748   result->gcount+=gcount+adj(redNo).gcount;
749
750   result->secondaryCallback = result->callback;
751   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
752         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
753
754   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
755
756  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
757   
758   // Find our node reduction manager, and pass reduction to him:
759   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
760   nodeMgr->contributeArrayReduction(result);
761 #endif
762 #else                // _FAULT_MLOG_
763   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
764
765     CkSetRefNum(result, result->getUserFlag());
766     if (!result->callback.isInvalid())
767         result->callback.send(result);
768     else if (!storedCallback.isInvalid())
769         storedCallback.send(result);
770     else{
771       DEBR(("No reduction client for group %d \n",thisgroup.idx));
772         CkAbort("No reduction client!\n"
773             "You must register a client with either SetReductionClient or during contribute.\n");
774     }
775
776     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
777
778 #endif               // _FAULT_MLOG_
779
780   //House Keeping Operations will have to check later what needs to be changed
781   redNo++;
782   //Shift the count adjustment vector down one slot (to match new redNo)
783   int i;
784 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
785         if(CkMyPe()!=0){
786 #else
787     {
788 #endif
789 //      int i;
790         completedRedNo++;
791         for (i=1;i<(int)(adjVec.length());i++){
792            adjVec[i-1]=adjVec[i];
793         }
794         adjVec.length()--;  
795   }
796
797   inProgress=CmiFalse;
798   startRequested=CmiFalse;
799   nRemote=nContrib=0;
800
801   //Look through the future queue for messages we can now handle
802   int n=futureMsgs.length();
803   for (i=0;i<n;i++)
804   {
805     CkReductionMsg *m=futureMsgs.deq();
806     if (m!=NULL) //One of these addContributions may have finished us.
807       addContribution(m);//<- if *still* early, puts it back in the queue
808   }
809 #if GROUP_LEVEL_REDUCTION
810   n=futureRemoteMsgs.length();
811   for (i=0;i<n;i++)
812   {
813     CkReductionMsg *m=futureRemoteMsgs.deq();
814     if (m!=NULL) {
815       RecvMsg(m);//<- if *still* early, puts it back in the queue
816     }
817   }
818 #endif
819
820 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
821   if(maxStartRequest >= redNo){
822           startReduction(redNo,CkMyPe());
823           finishReduction();
824   }
825  
826 #endif
827 }
828
829 //Sent up the reduction tree with reduced data
830   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
831 {
832 #if GROUP_LEVEL_REDUCTION
833 #if CMK_BIGSIM_CHARM
834   _TRACE_BG_TLINE_END(&m->log);
835 #endif
836   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
837     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
838     startReduction(m->redNo, CkMyPe());
839     msgs.enq(m);
840     nRemote++;
841     finishReduction();
842   }
843   else if (isFuture(m->redNo)) {
844     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
845     futureRemoteMsgs.enq(m);
846   } 
847   else CkAbort("Recv'd late remote contribution!\n");
848 #endif
849 }
850
851 //////////// Reduction Manager Utilities /////////////
852
853 //Return the countAdjustment struct for the given redNo:
854 countAdjustment &CkReductionMgr::adj(int number)
855 {
856   number-=completedRedNo;
857   number--;
858   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
859   //Pad the adjustment vector with zeros until it's at least number long
860   while ((int)(adjVec.length())<=number)
861     adjVec.push_back(countAdjustment());
862   return adjVec[number];
863 }
864
865 //Combine (& free) the current message vector msgs.
866 CkReductionMsg *CkReductionMgr::reduceMessages(void)
867 {
868 #if CMK_BIGSIM_CHARM
869   _TRACE_BG_END_EXECUTE(1);
870   void* _bgParentLog = NULL;
871   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
872 #endif
873   CkReductionMsg *ret=NULL;
874
875   //Look through the vector for a valid reducer, swapping out placeholder messages
876   CkReduction::reducerType r=CkReduction::invalid;
877   int msgs_gcount=0;//Reduced gcount
878   int msgs_nSources=0;//Reduced nSources
879   int msgs_userFlag=-1;
880   CkCallback msgs_callback;
881   int i;
882   int nMsgs=0;
883   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
884   CkReductionMsg *m;
885   bool isMigratableContributor;
886
887   // Copy message queue into msgArr, skipping placeholders:
888   while (NULL!=(m=msgs.deq()))
889   {
890     msgs_gcount+=m->gcount;
891     if (m->sourceFlag!=0)
892     { //This is a real message from an element, not just a placeholder
893       msgArr[nMsgs++]=m;
894       msgs_nSources+=m->nSources();
895       r=m->reducer;
896       if (!m->callback.isInvalid())
897         msgs_callback=m->callback;
898       if (m->userFlag!=-1)
899         msgs_userFlag=m->userFlag;
900                         
901         isMigratableContributor=m->isMigratableContributor();
902 #if CMK_BIGSIM_CHARM
903         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
904 #endif
905     }
906     else
907     { //This is just a placeholder message-- forget it
908       delete m;
909     }
910   }
911
912   if (nMsgs==0||r==CkReduction::invalid)
913   //No valid reducer in the whole vector
914     ret=CkReductionMsg::buildNew(0,NULL);
915   else
916   {//Use the reducer to reduce the messages
917                 //if there is only one msg to be reduced just return that message
918     if(nMsgs == 1){
919         ret = msgArr[0];        
920     }else{
921         CkReduction::reducerFn f=CkReduction::reducerTable[r];
922         ret=(*f)(nMsgs,msgArr);
923     }
924     ret->reducer=r;
925   }
926
927
928
929 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
930
931 #if CRITICAL_PATH_DEBUG > 3
932   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
933 #endif
934
935   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
936   path.updateMax(UsrToEnv(ret));
937   // Combine the critical paths from all the reduction messages
938   for (i=0;i<nMsgs;i++){
939     if (msgArr[i]!=ret){
940       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
941       path.updateMax(UsrToEnv(msgArr[i]));
942     }
943   }
944   
945
946 #if CRITICAL_PATH_DEBUG > 3
947   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
948 #endif
949   
950   PathHistoryTableEntry tableEntry(path);
951   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
952   
953 #endif
954
955         //Go back through the vector, deleting old messages
956   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
957         delete [] msgArr;
958
959   //Set the message counts
960   ret->redNo=redNo;
961   ret->gcount=msgs_gcount;
962   ret->userFlag=msgs_userFlag;
963   ret->callback=msgs_callback;
964   ret->sourceFlag=msgs_nSources;
965         ret->setMigratableContributor(isMigratableContributor);
966   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
967
968   return ret;
969 }
970
971
972 //Checkpointing utilities
973 //pack-unpack method for CkReductionMsg
974 //if packing pack the message and then unpack and return it
975 //if unpacking allocate memory for it read it off disk and then unapck
976 //and return it
977 void CkReductionMgr::pup(PUP::er &p)
978 {
979 //We do not store the client function pointer or the client function parameter,
980 //it is the responsibility of the programmer to correctly restore these
981   CkGroupInitCallback::pup(p);
982   p(redNo);
983   p(completedRedNo);
984   p(inProgress); p(creating); p(startRequested);
985   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
986   p|msgs;
987   p|futureMsgs;
988   p|futureRemoteMsgs;
989   p|finalMsgs;
990   p|adjVec;
991   p|nodeProxy;
992   p|storedCallback;
993     // handle CkReductionClientBundle
994   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
995   {
996     CkReductionClientBundle *bd;
997     if (p.isUnpacking()) 
998       bd = new CkReductionClientBundle;
999     else
1000       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1001     p|*bd;
1002     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1003   }
1004
1005   // subtle --- Gengbin
1006   // Group : CkReductionMgr
1007   // CkArray: CkReductionMgr
1008   // lcount/gcount in Group is set in Group constructor
1009   // lcount/gcount in CkArray is not, it is set when array elements are created
1010   // we can not pup because inserting array elems will add the counters again
1011 //  p|lcount;
1012 //  p|gcount;
1013 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1014 //  p|lcount;
1015 //  //  p|gcount;
1016 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1017 #endif
1018   if(p.isUnpacking()){
1019     thisProxy = thisgroup;
1020     maxStartRequest=0;
1021 #if GROUP_LEVEL_REDUCTION
1022 #ifdef BINOMIAL_TREE
1023     init_BinomialTree();
1024 #else
1025     init_BinaryTree();
1026 #endif
1027 #endif
1028   }
1029
1030   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1031 }
1032
1033
1034 //Callback for doing Reduction through NodeGroups added by Sayantan
1035
1036 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1037
1038         int total = m->gcount+adj(m->redNo).gcount;
1039         finalMsgs.enq(m);
1040         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1041         adj(m->redNo).mainRecvd = 1;
1042         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1043         endArrayReduction();
1044 }
1045
1046 void CkReductionMgr :: endArrayReduction(){
1047         CkReductionMsg *ret=NULL;
1048         int nMsgs=finalMsgs.length();
1049         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1050         //Look through the vector for a valid reducer, swapping out placeholder messages
1051         //CkPrintf("Length of Final Message %d \n",nMsgs);
1052         CkReduction::reducerType r=CkReduction::invalid;
1053         int msgs_gcount=0;//Reduced gcount
1054         int msgs_nSources=0;//Reduced nSources
1055         int msgs_userFlag=-1;
1056         CkCallback msgs_callback;
1057         CkCallback msgs_secondaryCallback;
1058         CkVec<CkReductionMsg *> tempMsgs;
1059         int i;
1060         int numMsgs = 0;
1061         for (i=0;i<nMsgs;i++)
1062         {
1063                 CkReductionMsg *m=finalMsgs.deq();
1064                 if(m->redNo == completedRedNo +1){
1065                         msgs_gcount+=m->gcount;
1066                         if (m->sourceFlag!=0)
1067                         { //This is a real message from an element, not just a placeholder
1068                                 msgs_nSources+=m->nSources();
1069                                 r=m->reducer;
1070                                 if (!m->callback.isInvalid())
1071                                 msgs_callback=m->callback;
1072                                 if(!m->secondaryCallback.isInvalid())
1073                                         msgs_secondaryCallback = m->secondaryCallback;
1074                                 if (m->userFlag!=-1)
1075                                         msgs_userFlag=m->userFlag;
1076                                 tempMsgs.push_back(m);
1077                         }
1078                 }else{
1079                         finalMsgs.enq(m);
1080                 }
1081
1082         }
1083         numMsgs = tempMsgs.length();
1084
1085         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1086
1087         if(numMsgs == 0){
1088                 return;
1089         }
1090         if(adj(completedRedNo+1).mainRecvd == 0){
1091                 for(i=0;i<numMsgs;i++){
1092                         finalMsgs.enq(tempMsgs[i]);
1093                 }
1094                 return;
1095         }
1096
1097 /*
1098         NOT NEEDED ANYMORE DONE at nodegroup level
1099         if(msgs_gcount  > msgs_nSources){
1100                 for(i=0;i<numMsgs;i++){
1101                         finalMsgs.enq(tempMsgs[i]);
1102                 }
1103                 return;
1104         }*/
1105
1106         if (nMsgs==0||r==CkReduction::invalid)
1107                 //No valid reducer in the whole vector
1108                 ret=CkReductionMsg::buildNew(0,NULL);
1109         else{//Use the reducer to reduce the messages
1110                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1111                 // has to be corrected elements from above need to be put into a temporary vector
1112                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1113                 ret=(*f)(numMsgs,msgArr);
1114                 ret->reducer=r;
1115
1116         }
1117
1118         
1119 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1120
1121 #if CRITICAL_PATH_DEBUG > 3
1122         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1123 #endif
1124
1125         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1126         path.updateMax(UsrToEnv(ret));
1127         // Combine the critical paths from all the reduction messages into the header for the new result
1128         for (i=0;i<numMsgs;i++){
1129           if (tempMsgs[i]!=ret){
1130             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1131             path.updateMax(UsrToEnv(tempMsgs[i]));
1132           } else {
1133             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1134           }
1135         }
1136         // Also consider the path which got us into this entry method
1137
1138 #if CRITICAL_PATH_DEBUG > 3
1139         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1140 #endif
1141
1142 #endif
1143   
1144
1145
1146
1147
1148         for(i = 0;i<numMsgs;i++){
1149                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1150         }
1151
1152         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1153         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1154
1155         ret->redNo=completedRedNo+1;
1156         ret->gcount=msgs_gcount;
1157         ret->userFlag=msgs_userFlag;
1158         ret->callback=msgs_callback;
1159         ret->secondaryCallback = msgs_secondaryCallback;
1160         ret->sourceFlag=msgs_nSources;
1161
1162         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1163
1164         CkSetRefNum(ret, ret->getUserFlag());
1165         if (!ret->secondaryCallback.isInvalid())
1166             ret->secondaryCallback.send(ret);
1167     else if (!storedCallback.isInvalid())
1168             storedCallback.send(ret);
1169     else{
1170       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1171             CkAbort("No reduction client!\n"
1172                     "You must register a client with either SetReductionClient or during contribute.\n");
1173     }
1174         completedRedNo++;
1175
1176         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1177
1178         for (i=1;i<(int)(adjVec.length());i++)
1179                 adjVec[i-1]=adjVec[i];
1180         adjVec.length()--;
1181         endArrayReduction();
1182 }
1183
1184 #if GROUP_LEVEL_REDUCTION
1185
1186 void CkReductionMgr::init_BinaryTree(){
1187         parent = (CkMyPe()-1)/TREE_WID;
1188         int firstkid = CkMyPe()*TREE_WID+1;
1189         numKids=CkNumPes()-firstkid;
1190         if (numKids>TREE_WID) numKids=TREE_WID;
1191         if (numKids<0) numKids=0;
1192
1193         for(int i=0;i<numKids;i++){
1194                 kids.push_back(firstkid+i);
1195                 newKids.push_back(firstkid+i);
1196         }
1197 }
1198
1199 void CkReductionMgr::init_BinomialTree(){
1200         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1201         /*upperSize = (unsigned )pow((double)2,depth);*/
1202         upperSize = (unsigned) 1 << depth;
1203         label = upperSize-CkMyPe()-1;
1204         int p=label;
1205         int count=0;
1206         while( p > 0){
1207                 if(p % 2 == 0)
1208                         break;
1209                 else{
1210                         p = p/2;
1211                         count++;
1212                 }
1213         }
1214         /*parent = label + rint(pow((double)2,count));*/
1215         parent = label + (1<<count);
1216         parent = upperSize -1 -parent;
1217         int temp;
1218         if(count != 0){
1219                 numKids = 0;
1220                 for(int i=0;i<count;i++){
1221                         /*temp = label - rint(pow((double)2,i));*/
1222                         temp = label - (1<<i);
1223                         temp = upperSize-1-temp;
1224                         if(temp <= CkNumPes()-1){
1225                                 kids.push_back(temp);
1226                                 numKids++;
1227                         }
1228                 }
1229         }else{
1230                 numKids = 0;
1231         //      kids = NULL;
1232         }
1233 }
1234
1235
1236 int CkReductionMgr::treeRoot(void)
1237 {
1238   return 0;
1239 }
1240 CmiBool CkReductionMgr::hasParent(void) //Root Node
1241 {
1242   return (CmiBool)(CkMyPe()!=treeRoot());
1243 }
1244 int CkReductionMgr::treeParent(void) //My parent Node
1245 {
1246   return parent;
1247 }
1248
1249 int CkReductionMgr::firstKid(void) //My first child Node
1250 {
1251   return CkMyPe()*TREE_WID+1;
1252 }
1253 int CkReductionMgr::treeKids(void)//Number of children in tree
1254 {
1255   return numKids;
1256 }
1257
1258 #endif
1259
1260 /////////////////////////////////////////////////////////////////////////
1261
1262 ////////////////////////////////
1263 //CkReductionMsg support
1264
1265 //ReductionMessage default private constructor-- does nothing
1266 CkReductionMsg::CkReductionMsg(){}
1267 CkReductionMsg::~CkReductionMsg(){}
1268
1269 //This define gives the distance from the start of the CkReductionMsg
1270 // object to the start of the user data area (just below last object field)
1271 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1272
1273 //"Constructor"-- builds and returns a new CkReductionMsg.
1274 //  the "data" array you specify will be copied into this object.
1275 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1276     CkReduction::reducerType reducer, CkReductionMsg *buf)
1277 {
1278   int len[1];
1279   len[0]=NdataSize;
1280   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1281   
1282   ret->dataSize=NdataSize;
1283   if (srcData!=NULL && !buf)
1284     memcpy(ret->data,srcData,NdataSize);
1285   ret->userFlag=-1;
1286   ret->reducer=reducer;
1287   //ret->ci=NULL;
1288   ret->sourceFlag=-1000;
1289   ret->gcount=0;
1290   ret->migratableContributor = true;
1291 #if CMK_BIGSIM_CHARM
1292   ret->log = NULL;
1293 #endif
1294   return ret;
1295 }
1296
1297 // Charm kernel message runtime support:
1298 void *
1299 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1300 {
1301   int totalsize=ARM_DATASTART+(*sz);
1302   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1303   CkReductionMsg *ret = (CkReductionMsg *)
1304     CkAllocMsg(msgnum,totalsize,priobits);
1305   ret->data=(void *)(&ret->dataStorage);
1306   return (void *) ret;
1307 }
1308
1309 void *
1310 CkReductionMsg::pack(CkReductionMsg* in)
1311 {
1312   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1313   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1314   in->data = NULL;
1315   return (void*) in;
1316 }
1317
1318 CkReductionMsg* CkReductionMsg::unpack(void *in)
1319 {
1320   CkReductionMsg *ret = (CkReductionMsg *)in;
1321   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1322   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1323   ret->data=(void *)(&ret->dataStorage);
1324   return ret;
1325 }
1326
1327
1328 /////////////////////////////////////////////////////////////////////////////////////
1329 ///////////////// Builtin Reducer Functions //////////////
1330 /* A simple reducer, like sum_int, looks like this:
1331 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1332 {
1333   int i,ret=0;
1334   for (i=0;i<nMsg;i++)
1335     ret+=*(int *)(msg[i]->getData());
1336   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1337 }
1338
1339 To keep the code small and easy to change, the implementations below
1340 are built with preprocessor macros.
1341 */
1342
1343 //////////////// simple reducers ///////////////////
1344 /*A define used to quickly and tersely construct simple reductions.
1345 The basic idea is to use the first message's data array as
1346 (pre-initialized!) scratch space for folding in the other messages.
1347  */
1348
1349 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1350 {
1351         CkAbort("Called the invalid reducer type 0.  This probably\n"
1352                 "means you forgot to initialize your custom reducer index.\n");
1353         return NULL;
1354 }
1355
1356 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1357 {
1358   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1359 }
1360
1361 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1362 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1363 {\
1364   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1365   int m,i;\
1366   int nElem=msg[0]->getLength()/sizeof(dataType);\
1367   dataType *ret=(dataType *)(msg[0]->getData());\
1368   for (m=1;m<nMsg;m++)\
1369   {\
1370     dataType *value=(dataType *)(msg[m]->getData());\
1371     for (i=0;i<nElem;i++)\
1372     {\
1373       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1374       loop\
1375     }\
1376   }\
1377   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1378   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1379 }
1380
1381 //Use this macro for reductions that have the same type for all inputs
1382 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1383   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1384   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1385   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1386   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1387
1388
1389 //Compute the sum the numbers passed by each element.
1390 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1391
1392 //Compute the product of the numbers passed by each element.
1393 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1394
1395 //Compute the largest number passed by any element.
1396 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1397
1398 //Compute the smallest integer passed by any element.
1399 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1400
1401
1402 //Compute the logical AND of the integers passed by each element.
1403 // The resulting integer will be zero if any source integer is zero; else 1.
1404 SIMPLE_REDUCTION(logical_and,int,"%d",
1405         if (value[i]==0)
1406      ret[i]=0;
1407   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1408 )
1409
1410 //Compute the logical OR of the integers passed by each element.
1411 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1412 SIMPLE_REDUCTION(logical_or,int,"%d",
1413   if (value[i]!=0)
1414            ret[i]=1;
1415   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1416 )
1417
1418 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1419 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1420
1421 //Select one random message to pass on
1422 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1423   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1424 }
1425
1426 /////////////// concat ////////////////
1427 /*
1428 This reducer simply appends the data it recieves from each element,
1429 without any housekeeping data to separate them.
1430 */
1431 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1432 {
1433   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1434   //Figure out how big a message we'll need
1435   int i,retSize=0;
1436   for (i=0;i<nMsg;i++)
1437       retSize+=msg[i]->getSize();
1438
1439   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1440
1441   //Allocate a new message
1442   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1443
1444   //Copy the source message data into the return message
1445   char *cur=(char *)(ret->getData());
1446   for (i=0;i<nMsg;i++) {
1447     int messageBytes=msg[i]->getSize();
1448     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1449     cur+=messageBytes;
1450   }
1451   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1452   return ret;
1453 }
1454
1455 /////////////// set ////////////////
1456 /*
1457 This reducer appends the data it recieves from each element
1458 along with some housekeeping data indicating contribution boundaries.
1459 The message data is thus a list of reduction_set_element structures
1460 terminated by a dummy reduction_set_element with a sourceElement of -1.
1461 */
1462
1463 //This rounds an integer up to the nearest multiple of sizeof(double)
1464 static const int alignSize=sizeof(double);
1465 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1466
1467 //This gives the size (in bytes) of a reduction_set_element
1468 static int SET_SIZE(int dataSize)
1469 {return SET_ALIGN(sizeof(int)+dataSize);}
1470
1471 //This returns a pointer to the next reduction_set_element in the list
1472 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1473 {
1474   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1475   return (CkReduction::setElement *)next;
1476 }
1477
1478 //Combine the data passed by each element into an list of reduction_set_elements.
1479 // Each element may contribute arbitrary data (with arbitrary length).
1480 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1481 {
1482   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1483   //Figure out how big a message we'll need
1484   int i,retSize=0;
1485   for (i=0;i<nMsg;i++) {
1486     if (!msg[i]->isFromUser())
1487     //This message is composite-- it will just be copied over (less terminating -1)
1488       retSize+=(msg[i]->getSize()-sizeof(int));
1489     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1490       retSize+=SET_SIZE(msg[i]->getSize());
1491   }
1492   retSize+=sizeof(int);//Leave room for terminating -1.
1493
1494   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1495
1496   //Allocate a new message
1497   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1498
1499   //Copy the source message data into the return message
1500   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1501   for (i=0;i<nMsg;i++)
1502     if (!msg[i]->isFromUser())
1503     {//This message is composite-- just copy it over (less terminating -1)
1504                         int messageBytes=msg[i]->getSize()-sizeof(int);
1505                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1506                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1507                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1508     }
1509     else //This is a message from an element-- wrap it in a reduction_set_element
1510     {
1511       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1512       cur->dataSize=msg[i]->getSize();
1513       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1514       cur=SET_NEXT(cur);
1515     }
1516   cur->dataSize=-1;//Add a terminating -1.
1517   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1518   return ret;
1519 }
1520
1521 //Utility routine: get the next reduction_set_element in the list
1522 // if there is one, or return NULL if there are none.
1523 //To get all the elements, just keep feeding this procedure's output back to
1524 // its input until it returns NULL.
1525 CkReduction::setElement *CkReduction::setElement::next(void)
1526 {
1527   CkReduction::setElement *n=SET_NEXT(this);
1528   if (n->dataSize==-1)
1529     return NULL;//This is the end of the list
1530   else
1531     return n;//This is just another element
1532 }
1533
1534 /////////////////// Reduction Function Table /////////////////////
1535 CkReduction::CkReduction() {} //Dummy private constructor
1536
1537 //Add the given reducer to the list.  Returns the new reducer's
1538 // reducerType.  Must be called in the same order on every node.
1539 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1540 {
1541   reducerTable[nReducers]=fn;
1542   return (reducerType)nReducers++;
1543 }
1544
1545 /*Reducer table: maps reducerTypes to reducerFns.
1546 It's indexed by reducerType, so the order in this table
1547 must *exactly* match the reducerType enum declaration.
1548 The names don't have to match, but it helps.
1549 */
1550 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1551
1552 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1553     ::invalid_reducer,
1554     ::nop,
1555   //Compute the sum the numbers passed by each element.
1556     ::sum_int,::sum_long,::sum_float,::sum_double,
1557
1558   //Compute the product the numbers passed by each element.
1559     ::product_int,::product_long,::product_float,::product_double,
1560
1561   //Compute the largest number passed by any element.
1562     ::max_int,::max_long,::max_float,::max_double,
1563
1564   //Compute the smallest number passed by any element.
1565     ::min_int,::min_long,::min_float,::min_double,
1566
1567   //Compute the logical AND of the integers passed by each element.
1568   // The resulting integer will be zero if any source integer is zero.
1569     ::logical_and,
1570
1571   //Compute the logical OR of the integers passed by each element.
1572   // The resulting integer will be 1 if any source integer is nonzero.
1573     ::logical_or,
1574
1575     // Compute the logical bitvector AND of the integers passed by each element.
1576     ::bitvec_and,
1577
1578     // Compute the logical bitvector OR of the integers passed by each element.
1579     ::bitvec_or,
1580
1581     // Select one of the messages at random to pass on
1582     ::random,
1583
1584   //Concatenate the (arbitrary) data passed by each element
1585     ::concat,
1586
1587   //Combine the data passed by each element into an list of setElements.
1588   // Each element may contribute arbitrary data (with arbitrary length).
1589     ::set
1590 };
1591
1592
1593
1594
1595
1596
1597
1598
1599 /********** Code added by Sayantan *********************/
1600 /** Locking is a big problem in the nodegroup code for smp.
1601  So a few assumptions have had to be made. There is one lock
1602  called lockEverything. It protects all the data structures 
1603  of the nodegroup reduction mgr. I tried grabbing it separately 
1604  for each datastructure, modifying it and then releasing it and
1605  then grabbing it again, for the next change.
1606  That doesn't really help because the interleaved execution of 
1607  different threads makes the state of the reduction manager 
1608  inconsistent. 
1609  
1610  1. Grab lockEverything before calling finishreduction or startReduction
1611     or doRecvMsg
1612  2. lockEverything is grabbed only in entry methods reductionStarting
1613     or RecvMesg or  addcontribution.
1614  ****/
1615  
1616 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1617 NodeGroup::NodeGroup(void) {
1618   __nodelock=CmiCreateLock();
1619 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1620     mlogData->objID.type = TypeNodeGroup;
1621     mlogData->objID.data.group.onPE = CkMyNode();
1622 #endif
1623
1624 }
1625 NodeGroup::~NodeGroup() {
1626   CmiDestroyLock(__nodelock);
1627 }
1628 void NodeGroup::pup(PUP::er &p)
1629 {
1630   CkNodeReductionMgr::pup(p);
1631   p|reductionInfo;
1632 }
1633
1634 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1635
1636 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1637   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1638  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1639   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1640  }
1641
1642 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1643                                     ((CkNodeReductionMgr *)this),
1644                                     reductionInfo,false)
1645
1646 /* this contribute also adds up the count across all messages it receives.
1647   Useful for summing up number of array elements who have contributed ****/ 
1648 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1649         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1650
1651
1652
1653 //#define BINOMIAL_TREE
1654
1655 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1656   : thisProxy(thisgroup)
1657 {
1658 #ifdef BINOMIAL_TREE
1659   init_BinomialTree();
1660 #else
1661   init_BinaryTree();
1662 #endif
1663   storedCallback=NULL;
1664   redNo=0;
1665   inProgress=CmiFalse;
1666   
1667   startRequested=CmiFalse;
1668   gcount=CkNumNodes();
1669   lcount=1;
1670   nContrib=nRemote=0;
1671   lockEverything = CmiCreateLock();
1672
1673
1674   creating=CmiFalse;
1675   interrupt = 0;
1676   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1677         /*
1678                 FAULT_EVAC
1679         */
1680         blocked = false;
1681         maxModificationRedNo = INT_MAX;
1682         killed=0;
1683         additionalGCount = newAdditionalGCount = 0;
1684 }
1685
1686 void CkNodeReductionMgr::flushStates()
1687 {
1688  if(CkMyRank() == 0){
1689   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1690   redNo=0;
1691   inProgress=CmiFalse;
1692
1693   startRequested=CmiFalse;
1694   gcount=CkNumNodes();
1695   lcount=1;
1696   nContrib=nRemote=0;
1697
1698   creating=CmiFalse;
1699   interrupt = 0;
1700   while (!msgs.isEmpty()) { delete msgs.deq(); }
1701   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1702   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1703   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1704   }
1705 }
1706
1707 //////////// Reduction Manager Client API /////////////
1708
1709 //Add the given client function.  Overwrites any previous client.
1710 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1711 {
1712   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1713   if(cb->isInvalid()){
1714         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1715   }else{
1716         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1717   }
1718
1719   if (CkMyNode()!=0)
1720           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1721   delete storedCallback;
1722   storedCallback=cb;
1723 }
1724
1725
1726
1727 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1728 {
1729 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1730     Chare *oldObj =CpvAccess(_currentObj);
1731     CpvAccess(_currentObj) = this;
1732 #endif
1733
1734   //m->ci=ci;
1735   m->redNo=ci->redNo++;
1736   m->sourceFlag=-1;//A single contribution
1737   m->gcount=0;
1738   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1739   addContribution(m);
1740
1741 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1742     CpvAccess(_currentObj) = oldObj;
1743 #endif
1744 }
1745
1746
1747 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1748 {
1749 #if CMK_BIGSIM_CHARM
1750   _TRACE_BG_TLINE_END(&m->log);
1751 #endif
1752 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1753     Chare *oldObj =CpvAccess(_currentObj);
1754     CpvAccess(_currentObj) = this;
1755 #endif
1756   //m->ci=ci;
1757   m->redNo=ci->redNo++;
1758   m->gcount=count;
1759   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1760   addContribution(m);
1761   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1762
1763 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1764     CpvAccess(_currentObj) = oldObj;
1765 #endif
1766 }
1767
1768
1769 //////////// Reduction Manager Remote Entry Points /////////////
1770
1771 //Sent down the reduction tree (used by barren PEs)
1772 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1773 {
1774   CmiLock(lockEverything);
1775   if(blocked){
1776         delete m;
1777         CmiUnlock(lockEverything);
1778         return ;
1779   }
1780   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1781   if (isPresent(m->num) && !inProgress)
1782   {
1783     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1784     startReduction(m->num,srcNode);
1785     finishReduction();
1786   } else if (isFuture(m->num)){
1787         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1788   }
1789   else //is Past
1790     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1791   CmiUnlock(lockEverything);
1792   delete m;
1793
1794 }
1795
1796
1797 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1798         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1799         /*
1800                 FAULT_EVAC
1801         */
1802         if(blocked){
1803                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1804                 bufferedRemoteMsgs.enq(m);
1805                 return;
1806         }
1807         
1808         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1809             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1810             startReduction(m->redNo,CkMyNode());
1811             msgs.enq(m);
1812             nRemote++;
1813             finishReduction();
1814         }
1815         else {
1816             if (isFuture(m->redNo)) {
1817                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1818                     futureRemoteMsgs.enq(m);
1819             }else{
1820                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1821                    CkAbort("Recv'd late remote contribution!\n");
1822             }
1823         }
1824         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1825 }
1826
1827 //Sent up the reduction tree with reduced data
1828 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1829 {
1830 #if CMK_BIGSIM_CHARM
1831   _TRACE_BG_TLINE_END(&m->log);
1832 #endif
1833 #ifndef CMK_CPV_IS_SMP
1834 #if CMK_IMMEDIATE_MSG
1835         if(interrupt == 1){
1836                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1837                 CpvAccess(_qd)->process(-1);
1838                 CmiDelayImmediate();
1839                 return;
1840         }
1841 #endif  
1842 #endif
1843    interrupt = 1;       
1844    CmiLock(lockEverything);   
1845    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1846    doRecvMsg(m);
1847    CmiUnlock(lockEverything);    
1848    interrupt = 0;
1849    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1850 }
1851
1852 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1853 {
1854         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1855         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1856         if (inProgress){
1857                 DEBR((AA"This Node reduction is already in progress\n"AB));
1858                 return;//This reduction already started
1859         }
1860         if (creating) //Don't start yet-- we're creating elements
1861         {
1862                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1863                 startRequested=CmiTrue;
1864                 return;
1865         }
1866         
1867         //If none of these cases, we need to start the reduction--
1868         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1869         inProgress=CmiTrue;
1870         //Sent start requests to our kids (in case they don't already know)
1871         
1872         for (int k=0;k<treeKids();k++)
1873         {
1874 #ifdef BINOMIAL_TREE
1875                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1876                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1877 #else
1878                 if(kids[k] != srcNode){
1879                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1880                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1881                 }       
1882 #endif
1883         }
1884
1885         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1886         // in case, nodegroup does not has the attached red group,
1887         // it has to restart groups again
1888         startLocalGroupReductions(number);
1889 /*
1890         if (startLocalGroupReductions(number) == 0)
1891           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1892 */
1893 }
1894
1895 // restart local groups until succeed
1896 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1897   CmiLock(lockEverything);    
1898   if (startLocalGroupReductions(number) == 0)
1899     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1900   CmiUnlock(lockEverything);    
1901 }
1902
1903 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1904         /*
1905                 FAULT_EVAC
1906         */
1907         if(blocked){
1908                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1909                 bufferedMsgs.enq(m);
1910                 return;
1911         }
1912         
1913         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1914                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1915                 futureMsgs.enq(m);
1916         } else {// An ordinary contribution
1917                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1918                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1919                 startReduction(m->redNo,CkMyNode());
1920                 msgs.enq(m);
1921                 nContrib++;
1922                 finishReduction();
1923         }
1924 }
1925
1926 //Handle a message from one element for the reduction
1927 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1928 {
1929   interrupt = 1;
1930   CmiLock(lockEverything);
1931   doAddContribution(m);
1932   CmiUnlock(lockEverything);
1933   interrupt = 0;
1934 }
1935
1936 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1937         if(blocked){
1938                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1939                 bufferedMsgs.enq(m);
1940                 return;
1941         }
1942         
1943         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1944                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1945 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1946                 futureLateMigrantMsgs.enq(m);
1947         } else {// An ordinary contribution
1948                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1949 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1950                 msgs.enq(m);
1951                 finishReduction();
1952         }
1953 }
1954
1955
1956
1957
1958
1959 /** check if the nodegroup reduction is finished at this node. In that case send it
1960 up the reduction tree **/
1961
1962 void CkNodeReductionMgr::finishReduction(void)
1963 {
1964   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1965   /***Check if reduction is finished in the next few ifs***/
1966   if ((!inProgress) || creating){
1967         DEBR((AA"Either not in Progress or creating\n"AB));
1968         return;
1969   }
1970
1971   if (nContrib<(lcount)){
1972         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1973
1974          return;//Need more local messages
1975   }
1976   if (nRemote<treeKids()){
1977         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1978
1979         return;//Need more remote messages
1980   }
1981   if (nRemote>treeKids()){
1982
1983           interrupt = 0;
1984            CkAbort("Nodegrp Excess remote reduction message received!\n");
1985   }
1986
1987   DEBR((AA"Reducing node data...\n"AB));
1988
1989   /**reduce all messages received at this node **/
1990   CkReductionMsg *result=reduceMessages();
1991
1992   if (hasParent())
1993   {//Pass data up tree to parent
1994         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1995         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1996         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1997                 /*
1998                         FAULT_EVAC
1999                 */
2000                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2001             thisProxy[treeParent()].RecvMsg(result);
2002       delete result;
2003         }
2004
2005   }
2006   else
2007   {
2008                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2009                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2010                         msgs.enq(result);
2011                         return;
2012                 }
2013                 result->gcount += additionalGCount;
2014           /** if the reduction is finished and I am the root of the reduction tree
2015           then call the reductionhandler and other stuff ***/
2016                 
2017
2018                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2019     CkSetRefNum(result, result->getUserFlag());
2020     if (!result->callback.isInvalid()){
2021       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2022             result->callback.send(result);
2023     }
2024     else if (storedCallback!=NULL){
2025       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2026             storedCallback->send(result);
2027     }
2028     else{
2029                 DEBR((AA"Invalid Callback \n"AB));
2030             CkAbort("No reduction client!\n"
2031                     "You must register a client with either SetReductionClient or during contribute.\n");
2032                 }
2033   }
2034
2035   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2036   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2037   redNo++;
2038         updateTree();
2039   int i;
2040   inProgress=CmiFalse;
2041   startRequested=CmiFalse;
2042   nRemote=nContrib=0;
2043
2044   //Look through the future queue for messages we can now handle
2045   int n=futureMsgs.length();
2046
2047   for (i=0;i<n;i++)
2048   {
2049     interrupt = 1;
2050
2051     CkReductionMsg *m=futureMsgs.deq();
2052
2053     interrupt = 0;
2054     if (m!=NULL){ //One of these addContributions may have finished us.
2055       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2056       doAddContribution(m);//<- if *still* early, puts it back in the queue
2057     }
2058   }
2059
2060   interrupt = 1;
2061
2062   n=futureRemoteMsgs.length();
2063
2064   interrupt = 0;
2065   for (i=0;i<n;i++)
2066   {
2067     interrupt = 1;
2068
2069     CkReductionMsg *m=futureRemoteMsgs.deq();
2070
2071     interrupt = 0;
2072     if (m!=NULL)
2073       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2074   }
2075   
2076   n = futureLateMigrantMsgs.length();
2077   for(i=0;i<n;i++){
2078     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2079     if(m != NULL){
2080       if(m->redNo == redNo){
2081         msgs.enq(m);
2082       }else{
2083         futureLateMigrantMsgs.enq(m);
2084       }
2085     }
2086   }
2087 }
2088
2089 //////////// Reduction Manager Utilities /////////////
2090
2091 void CkNodeReductionMgr::init_BinaryTree(){
2092         parent = (CkMyNode()-1)/TREE_WID;
2093         int firstkid = CkMyNode()*TREE_WID+1;
2094         numKids=CkNumNodes()-firstkid;
2095   if (numKids>TREE_WID) numKids=TREE_WID;
2096   if (numKids<0) numKids=0;
2097
2098         for(int i=0;i<numKids;i++){
2099                 kids.push_back(firstkid+i);
2100                 newKids.push_back(firstkid+i);
2101         }
2102 }
2103
2104 void CkNodeReductionMgr::init_BinomialTree(){
2105         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2106         /*upperSize = (unsigned )pow((double)2,depth);*/
2107         upperSize = (unsigned) 1 << depth;
2108         label = upperSize-CkMyNode()-1;
2109         int p=label;
2110         int count=0;
2111         while( p > 0){
2112                 if(p % 2 == 0)
2113                         break;
2114                 else{
2115                         p = p/2;
2116                         count++;
2117                 }
2118         }
2119         /*parent = label + rint(pow((double)2,count));*/
2120         parent = label + (1<<count);
2121         parent = upperSize -1 -parent;
2122         int temp;
2123         if(count != 0){
2124                 numKids = 0;
2125                 for(int i=0;i<count;i++){
2126                         /*temp = label - rint(pow((double)2,i));*/
2127                         temp = label - (1<<i);
2128                         temp = upperSize-1-temp;
2129                         if(temp <= CkNumNodes()-1){
2130                 //              kids[numKids] = temp;
2131                                 kids.push_back(temp);
2132                                 numKids++;
2133                         }
2134                 }
2135         }else{
2136                 numKids = 0;
2137         //      kids = NULL;
2138         }
2139 }
2140
2141
2142 int CkNodeReductionMgr::treeRoot(void)
2143 {
2144   return 0;
2145 }
2146 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2147 {
2148   return (CmiBool)(CkMyNode()!=treeRoot());
2149 }
2150 int CkNodeReductionMgr::treeParent(void) //My parent Node
2151 {
2152   return parent;
2153 }
2154
2155 int CkNodeReductionMgr::firstKid(void) //My first child Node
2156 {
2157   return CkMyNode()*TREE_WID+1;
2158 }
2159 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2160 {
2161 #ifdef BINOMIAL_TREE
2162         return numKids;
2163 #else
2164 /*  int nKids=CkNumNodes()-firstKid();
2165   if (nKids>TREE_WID) nKids=TREE_WID;
2166   if (nKids<0) nKids=0;
2167   return nKids;*/
2168         return numKids;
2169 #endif
2170 }
2171
2172 //Combine (& free) the current message vector msgs.
2173 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2174 {
2175 #if CMK_BIGSIM_CHARM
2176   _TRACE_BG_END_EXECUTE(1);
2177   void* _bgParentLog = NULL;
2178   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2179 #endif
2180   CkReductionMsg *ret=NULL;
2181
2182   //Look through the vector for a valid reducer, swapping out placeholder messages
2183   CkReduction::reducerType r=CkReduction::invalid;
2184   int msgs_gcount=0;//Reduced gcount
2185   int msgs_nSources=0;//Reduced nSources
2186   int msgs_userFlag=-1;
2187   CkCallback msgs_callback;
2188   CkCallback msgs_secondaryCallback;
2189   int i;
2190   int nMsgs=0;
2191   CkReductionMsg *m;
2192   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2193   bool isMigratableContributor;
2194         
2195
2196   while(NULL!=(m=msgs.deq()))
2197   {
2198     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2199     msgs_gcount+=m->gcount;
2200     if (m->sourceFlag!=0)
2201     { //This is a real message from an element, not just a placeholder
2202       msgArr[nMsgs++]=m;
2203       msgs_nSources+=m->nSources();
2204       r=m->reducer;
2205       if (!m->callback.isInvalid())
2206         msgs_callback=m->callback;
2207       if(!m->secondaryCallback.isInvalid()){
2208         msgs_secondaryCallback = m->secondaryCallback;
2209       }
2210       if (m->userFlag!=-1)
2211         msgs_userFlag=m->userFlag;
2212
2213         isMigratableContributor= m->isMigratableContributor();
2214 #if CMK_BIGSIM_CHARM
2215       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2216 #endif
2217                                 
2218     }
2219     else
2220     { //This is just a placeholder message-- replace it
2221       delete m;
2222     }
2223   }
2224
2225   if (nMsgs==0||r==CkReduction::invalid)
2226   //No valid reducer in the whole vector
2227     ret=CkReductionMsg::buildNew(0,NULL);
2228   else
2229   {//Use the reducer to reduce the messages
2230     if(nMsgs == 1){
2231         ret = msgArr[0];
2232     }else{
2233         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2234         ret=(*f)(nMsgs,msgArr);
2235     }
2236     ret->reducer=r;
2237   }
2238
2239         
2240 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2241 #if CRITICAL_PATH_DEBUG > 3
2242         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2243 #endif
2244         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2245         path.updateMax(UsrToEnv(ret));
2246         // Combine the critical paths from all the reduction messages into the header for the new result
2247         for (i=0;i<nMsgs;i++){
2248           if (msgArr[i]!=ret){
2249             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2250             path.updateMax(UsrToEnv(msgArr[i]));
2251           } else {
2252             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2253           }
2254         }
2255 #if CRITICAL_PATH_DEBUG > 3
2256         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2257 #endif
2258
2259 #endif
2260
2261
2262         //Go back through the vector, deleting old messages
2263   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2264   delete [] msgArr;
2265   //Set the message counts
2266   ret->redNo=redNo;
2267   ret->gcount=msgs_gcount;
2268   ret->userFlag=msgs_userFlag;
2269   ret->callback=msgs_callback;
2270   ret->secondaryCallback = msgs_secondaryCallback;
2271   ret->sourceFlag=msgs_nSources;
2272   ret->setMigratableContributor(isMigratableContributor);
2273   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2274 #if CMK_BIGSIM_CHARM
2275   _TRACE_BG_TLINE_END(&ret->log);
2276 #endif
2277
2278   return ret;
2279 }
2280
2281 void CkNodeReductionMgr::pup(PUP::er &p)
2282 {
2283 //We do not store the client function pointer or the client function parameter,
2284 //it is the responsibility of the programmer to correctly restore these
2285   IrrGroup::pup(p);
2286   p(redNo);
2287   p(inProgress); p(creating); p(startRequested);
2288   p(lcount);
2289   p(nContrib); p(nRemote);
2290   p(interrupt);
2291   p|msgs;
2292   p|futureMsgs;
2293   p|futureRemoteMsgs;
2294   p|futureLateMigrantMsgs;
2295   p|parent;
2296   p|additionalGCount;
2297   p|newAdditionalGCount;
2298   if(p.isUnpacking()) {
2299     gcount=CkNumNodes();
2300     thisProxy = thisgroup;
2301     lockEverything = CmiCreateLock();
2302 #ifdef BINOMIAL_TREE
2303     init_BinomialTree();
2304 #else
2305     init_BinaryTree();
2306 #endif          
2307   }
2308   p | blocked;
2309   p | maxModificationRedNo;
2310
2311 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2312   int isnull = (storedCallback == NULL);
2313   p | isnull;
2314   if (!isnull) {
2315     if (p.isUnpacking()) {
2316       storedCallback = new CkCallback;
2317     }
2318     p|*storedCallback;
2319   }
2320 #endif
2321
2322 }
2323
2324 /*
2325         FAULT_EVAC
2326         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2327         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2328         reduction tree. 
2329 */
2330 void CkNodeReductionMgr::evacuate(){
2331         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2332         if(treeKids() == 0){
2333         /*
2334                 if the node going down is a leaf
2335         */
2336                 oldleaf=true;
2337                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2338                 /*
2339                         Need to ask parent for the reduction number that it has seen. 
2340                         Since it is a leaf, the tree does not need to be rewired. 
2341                         We reuse the oldparent type of tree modification message to get 
2342                         the parent to block and tell us about the highest reduction number it has seen.
2343                         
2344                 */
2345                 int data[2];
2346                 data[0]=CkMyNode();
2347                 data[1]=getTotalGCount()+additionalGCount;
2348                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2349                 newParent = treeParent();
2350         }else{
2351                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2352                 oldleaf= false;
2353         /*
2354                 It is not a leaf. It needs to rewire the tree around itself.
2355                 It also needs to decide on a reduction No after which the new tree will be used
2356                 Till it decides on the new tree and the redNo at which it becomes valid,
2357                 all received messages will be buffered
2358         */
2359                 newParent = kids[0];
2360                 for(int i=numKids-1;i>=0;i--){
2361                         newKids.remove(i);
2362                 }
2363                 /*
2364                         Ask everybody for the highest reduction number they have seen and
2365                         also tell them about the new tree
2366                 */
2367                 /*
2368                         Tell parent about its new child;
2369                 */
2370                 int oldParentData[2];
2371                 oldParentData[0] = CkMyNode();
2372                 oldParentData[1] = newParent;
2373                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2374
2375                 /*
2376                         Tell the other children about their new parent
2377                 */
2378                 int childrenData=newParent;
2379                 for(int i=1;i<numKids;i++){
2380                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2381                 }
2382                 
2383                 /*
2384                         Tell newParent (1st child) about its new children,
2385                         the current node and its children except the newParent
2386                 */
2387                 int *newParentData = new int[numKids+2];
2388                 for(int i=1;i<numKids;i++){
2389                         newParentData[i] = kids[i];
2390                 }
2391                 newParentData[0] = CkMyNode();
2392                 newParentData[numKids] = parent;
2393                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2394                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2395         }
2396         readyDeletion = false;
2397         blocked = true;
2398         numModificationReplies = 0;
2399         tempModificationRedNo = findMaxRedNo();
2400 }
2401
2402 /*
2403         Depending on the code, use the data to change the tree
2404         1. OLDPARENT : replace the old child with a new one
2405         2. OLDCHILDREN: replace the parent
2406         3. NEWPARENT:  add the children and change the parent
2407         4. LEAFPARENT: delete the old child
2408 */
2409
2410 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2411         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2412         int sender;
2413         newKids = kids;
2414         readyDeletion = false;
2415         newAdditionalGCount = additionalGCount;
2416         switch(code){
2417                 case OLDPARENT: 
2418                         for(int i=0;i<numKids;i++){
2419                                 if(newKids[i] == data[0]){
2420                                         newKids[i] = data[1];
2421                                         break;
2422                                 }
2423                         }
2424                         sender = data[0];
2425                         newParent = parent;
2426                         break;
2427                 case OLDCHILDREN:
2428                         newParent = data[0];
2429                         sender = parent;
2430                         break;
2431                 case NEWPARENT:
2432                         for(int i=0;i<size-2;i++){
2433                                 newKids.push_back(data[i]);
2434                         }
2435                         newParent = data[size-2];
2436                         newAdditionalGCount += data[size-1];
2437                         sender = parent;
2438                         break;
2439                 case LEAFPARENT:
2440                         for(int i=0;i<numKids;i++){
2441                                 if(newKids[i] == data[0]){
2442                                         newKids.remove(i);
2443                                         break;
2444                                 }
2445                         }
2446                         sender = data[0];
2447                         newParent = parent;
2448                         newAdditionalGCount += data[1];
2449                         break;
2450         };
2451         blocked = true;
2452         int maxRedNo = findMaxRedNo();
2453         
2454         thisProxy[sender].collectMaxRedNo(maxRedNo);
2455 }
2456
2457 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2458         /*
2459                 Find out the maximum redNo that has been seen by 
2460                 the affected nodes
2461         */
2462         numModificationReplies++;
2463         if(maxRedNo > tempModificationRedNo){
2464                 tempModificationRedNo = maxRedNo;
2465         }
2466         if(numModificationReplies == numKids+1){
2467                 maxModificationRedNo = tempModificationRedNo;
2468                 /*
2469                         when all the affected nodes have replied, tell them the maximum.
2470                         Unblock yourself. deal with the buffered messages local and remote
2471                 */
2472                 if(maxModificationRedNo == -1){
2473                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2474                 }else{
2475                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2476                 }
2477                 thisProxy[parent].unblockNode(maxModificationRedNo);
2478                 for(int i=0;i<numKids;i++){
2479                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2480                 }
2481                 blocked = false;
2482                 updateTree();
2483                 clearBlockedMsgs();
2484         }
2485 }
2486
2487 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2488         maxModificationRedNo = maxRedNo;
2489         updateTree();
2490         blocked = false;
2491         clearBlockedMsgs();
2492 }
2493
2494
2495 void CkNodeReductionMgr::clearBlockedMsgs(){
2496         int len = bufferedMsgs.length();
2497         for(int i=0;i<len;i++){
2498                 CkReductionMsg *m = bufferedMsgs.deq();
2499                 doAddContribution(m);
2500         }
2501         len = bufferedRemoteMsgs.length();
2502         for(int i=0;i<len;i++){
2503                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2504                 doRecvMsg(m);
2505         }
2506
2507 }
2508 /*
2509         if the reduction number exceeds the maxModificationRedNo, change the tree
2510         to become the new one
2511 */
2512
2513 void CkNodeReductionMgr::updateTree(){
2514         if(redNo > maxModificationRedNo){
2515                 parent = newParent;
2516                 kids = newKids;
2517                 maxModificationRedNo = INT_MAX;
2518                 numKids = kids.size();
2519                 readyDeletion = true;
2520                 additionalGCount = newAdditionalGCount;
2521                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2522                 for(int i=0;i<(int)(newKids.size());i++){
2523                         DEBREVAC(("%d ",newKids[i]));
2524                 }
2525                 DEBREVAC(("\n"));
2526         //      startReduction(redNo,CkMyNode());
2527         }else{
2528                 if(maxModificationRedNo != INT_MAX){
2529                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2530                         startReduction(redNo,CkMyNode());
2531                         finishReduction();
2532                 }       
2533         }
2534 }
2535
2536
2537 void CkNodeReductionMgr::doneEvacuate(){
2538         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2539 /*      if(oldleaf){
2540                 
2541                         It used to be a leaf
2542                         Then as soon as future messages have been emptied you can 
2543                         send the parent a message telling them that they are not going
2544                         to receive anymore messages from this child
2545                 
2546                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2547                 while(futureMsgs.length() != 0){
2548                         int n = futureMsgs.length();
2549                         for(int i=0;i<n;i++){
2550                                 CkReductionMsg *m = futureMsgs.deq();
2551                                 if(isPresent(m->redNo)){
2552                                         msgs.enq(m);
2553                                 }else{
2554                                         futureMsgs.enq(m);
2555                                 }
2556                         }
2557                         CkReductionMsg *result = reduceMessages();
2558                         thisProxy[treeParent()].RecvMsg(result);
2559       delete result;
2560                         redNo++;
2561                 }
2562                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2563                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2564         }else{*/
2565                 if(readyDeletion){
2566                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2567                 }else{
2568                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2569                 }
2570 //      }
2571 }
2572
2573 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2574         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2575         for(int i=0;i<numKids;i++){
2576                 if(kids[i] == deletedChild){
2577                         kids.remove(i);
2578                         break;
2579                 }
2580         }
2581         numKids = kids.length();
2582         finishReduction();
2583 }
2584
2585 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2586         for(int i=0;i<(int)(newKids.length());i++){
2587                 if(newKids[i] == deletedChild){
2588                         newKids.remove(i);
2589                         break;
2590                 }
2591         }
2592         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2593         for(int i=0;i<(int)(newKids.size());i++){
2594                 DEBREVAC(("%d ",newKids[i]));
2595         }
2596         DEBREVAC(("\n"));
2597         finishReduction();
2598 }
2599
2600 int CkNodeReductionMgr::findMaxRedNo(){
2601         int max = redNo;
2602         for(int i=0;i<futureRemoteMsgs.length();i++){
2603                 if(futureRemoteMsgs[i]->redNo  > max){
2604                         max = futureRemoteMsgs[i]->redNo;
2605                 }
2606         }
2607         /*
2608                 if redNo is max (that is no future message) and the current reduction has not started
2609                 then tree can be changed before the reduction redNo can be started
2610         */ 
2611         if(redNo == max && msgs.length() == 0){
2612                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2613                 max--;
2614         }
2615         return max;
2616 }
2617
2618 // initnode call. check the size of reduction table
2619 void CkReductionMgr::sanitycheck()
2620 {
2621 #if CMK_ERROR_CHECKING
2622   int count = 0;
2623   while (CkReduction::reducerTable[count] != NULL) count++;
2624   CmiAssert(CkReduction::nReducers == count);
2625 #endif
2626 }
2627
2628 #include "CkReduction.def.h"