Merged arrays into Charj mainline.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
208 }
209
210 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
211 {
212   redNo=0;
213   completedRedNo = -1;
214   inProgress=CmiFalse;
215   creating=CmiFalse;
216   startRequested=CmiFalse;
217   gcount=lcount=0;
218   nContrib=nRemote=0;
219   maxStartRequest=0;
220   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
221 }
222
223 void CkReductionMgr::flushStates()
224 {
225   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
226   redNo=0;
227   completedRedNo = -1;
228   inProgress=CmiFalse;
229   creating=CmiFalse;
230   gcount=lcount=0;
231   startRequested=CmiFalse;
232   nContrib=nRemote=0;
233   maxStartRequest=0;
234
235   while (!msgs.isEmpty()) { delete msgs.deq(); }
236   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
237   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
238   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
239
240   adjVec.length()=0;
241
242   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
243 }
244
245 //////////// Reduction Manager Client API /////////////
246
247 //Add the given client function.  Overwrites any previous client.
248 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
249 {
250   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
251
252   if (CkMyPe()!=0)
253           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
254   storedCallback=*cb;
255 #if ! GROUP_LEVEL_REDUCTION
256   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
257   nodeProxy.ckSetReductionClient(callback);
258 #endif
259 }
260
261 ///////////////////////////// Contributor ////////////////////////
262 //Contributors keep a copy of this structure:
263
264 /*Contributor migration support:
265 */
266 void contributorInfo::pup(PUP::er &p)
267 {
268   p(redNo);
269 }
270
271 ////////////////////// Contributor list maintainance: /////////////////
272 //These just set and clear the "creating" flag to prevent
273 // reductions from finishing early because not all elements
274 // have been created.
275 void CkReductionMgr::creatingContributors(void)
276 {
277   DEBR((AA"Creating contributors...\n"AB));
278   creating=CmiTrue;
279 }
280 void CkReductionMgr::doneCreatingContributors(void)
281 {
282   DEBR((AA"Done creating contributors...\n"AB));
283   creating=CmiFalse;
284   if (startRequested) startReduction(redNo,CkMyPe());
285   finishReduction();
286 }
287
288 //A new contributor will be created
289 void CkReductionMgr::contributorStamped(contributorInfo *ci)
290 {
291   DEBR((AA"Contributor %p stamped\n"AB,ci));
292   //There is another contributor
293   gcount++;
294   if (inProgress)
295   {
296     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
297     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
298   } else
299     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
300 }
301
302 //A new contributor was actually created
303 void CkReductionMgr::contributorCreated(contributorInfo *ci)
304 {
305   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
306   //We've got another contributor
307   lcount++;
308   //He may not need to contribute to some of our reductions:
309   for (int r=redNo;r<ci->redNo;r++)
310     adj(r).lcount--;//He won't be contributing to r here
311 }
312
313 /*Don't expect any more contributions from this one.
314 This is rather horrifying because we now have to make
315 sure the global element count accurately reflects all the
316 contributions the element made before it died-- these may stretch
317 far into the future.  The adj() vector is what saves us here.
318 */
319 void CkReductionMgr::contributorDied(contributorInfo *ci)
320 {
321 #if CMK_MEM_CHECKPOINT
322   // ignore from listener if it is during restart from crash
323   if (CkInRestarting()) return;
324 #endif
325   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
326   //We lost a contributor
327   gcount--;
328
329   if (ci->redNo<redNo)
330   {//Must have been migrating during reductions-- root is waiting for his
331   // contribution, which will never come.
332     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
333     for (int r=ci->redNo;r<redNo;r++)
334       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
335   }
336
337   //Add to the global count for all his future messages (wherever they are)
338   int r;
339   for (r=redNo;r<ci->redNo;r++)
340   {//He already contributed to this reduction, but won't show up in global count.
341     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
342     adj(r).gcount++;
343   }
344
345   lcount--;
346   //He's already contributed to several reductions here
347   for (r=redNo;r<ci->redNo;r++)
348     adj(r).lcount++;//He'll be contributing to r here
349
350   finishReduction();
351 }
352
353 //Migrating away (note that global count doesn't change)
354 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
355 {
356   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
357   lcount--;//We lost a local
358   //He's already contributed to several reductions here
359   for (int r=redNo;r<ci->redNo;r++)
360     adj(r).lcount++;//He'll be contributing to r here
361
362   finishReduction();
363 }
364
365 //Migrating in (note that global count doesn't change)
366 void CkReductionMgr::contributorArriving(contributorInfo *ci)
367 {
368   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
369   lcount++;//We gained a local
370 #if CMK_MEM_CHECKPOINT
371   // ignore from listener if it is during restart from crash
372   // because the ci may be old.
373   if (CkInRestarting()) return;
374 #endif
375   //He has already contributed (elsewhere) to several reductions:
376   for (int r=redNo;r<ci->redNo;r++)
377     adj(r).lcount--;//He won't be contributing to r here
378
379 }
380
381 //Contribute-- the given msg can contain any data.  The reducerType
382 // field of the message must be valid.
383 // Each contributor must contribute exactly once to the each reduction.
384 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
385 {
386 #if CMK_BLUEGENE_CHARM
387   _TRACE_BG_TLINE_END(&(m->log));
388 #endif
389   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
390   //m->ci=ci;
391   m->redNo=ci->redNo++;
392   m->sourceFlag=-1;//A single contribution
393   m->gcount=0;
394
395 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
396     if(lcount == 0){
397         m->sourceProcessorCount = 1;
398     }else{
399         m->sourceProcessorCount = lcount;
400     }
401     m->fromPE = CmiMyPe();
402     Chare *oldObj =CpvAccess(_currentObj);
403     char currentObjName[100];
404     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
405     thisProxy[0].contributeViaMessage(m);
406 #else
407   addContribution(m);
408 #endif
409 }
410
411 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
412 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
413     CmiAssert(CmiMyPe() == 0);
414     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
415     if(redNo == 0){
416         if(perProcessorCounts[m->fromPE] == -1){
417             processorCount++;
418             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
419             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
420         }else{
421             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
422                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
423                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
424             }
425         }
426         if(processorCount == CmiNumPes()){
427             totalCount = 0;
428             for(int i=0;i<CmiNumPes();i++){
429                 CmiAssert(perProcessorCounts[i] != -1);
430                 totalCount += perProcessorCounts[i];
431             }
432             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
433         }
434         if(m->sourceProcessorCount == 0){
435             if(processorCount == CmiNumPes()){
436                 finishReduction();
437             }
438             return;
439         }
440     }
441     addContribution(m);
442 }
443 #else
444 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
445 #endif
446
447 //////////// Reduction Manager Remote Entry Points /////////////
448 //Sent down the reduction tree (used by barren PEs)
449 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
450 {
451  if(CkMyPe()==0){
452         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
453         //delete m;
454         //return;
455  }
456  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
457  int srcPE = (UsrToEnv(m))->getSrcPe();
458 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
459   if (isPresent(m->num) && !inProgress)
460   {
461     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
462     startReduction(m->num,srcPE);
463     finishReduction();
464   } else if (isFuture(m->num)){
465 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
466           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
467           if(maxStartRequest < m->num){
468                   maxStartRequest = m->num;
469           }
470  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
471           
472     }
473   else //is Past
474     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
475   delete m;
476 #else
477     if(redNo == 0){
478         if(lcount == 0){
479             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
480             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
481             dummy->fromPE = CmiMyPe();
482             dummy->sourceProcessorCount = 0;
483             dummy->redNo = 0;
484             thisProxy[0].contributeViaMessage(dummy);
485         }
486     }
487 #endif
488 }
489
490 //Sent to root of reduction tree with reduction contribution
491 // of migrants that missed the main reduction.
492 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
493 {
494 #if GROUP_LEVEL_REDUCTION
495 #if CMK_BLUEGENE_CHARM
496   _TRACE_BG_TLINE_END(&(m->log));
497 #endif
498   addContribution(m);
499 #else
500   m->secondaryCallback = m->callback;
501   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
502   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
503   nodeMgr->LateMigrantMsg(m);
504 /*      int len = finalMsgs.length();
505         finalMsgs.enq(m);
506 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
507         endArrayReduction();*/
508 #endif
509 }
510
511 //A late migrating contributor will never contribute to this reduction
512 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
513 {
514   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
515   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
516  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
517   adj(m->num).gcount--;//He won't be contributing to this one.
518   finishReduction();
519   delete m;
520 }
521
522 //////////// Reduction Manager State /////////////
523 void CkReductionMgr::startReduction(int number,int srcPE)
524 {
525   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
526   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
527   if (inProgress){
528         DEBR((AA"This reduction is already in progress\n"AB));
529         return;//This reduction already started
530   }
531   if (creating) //Don't start yet-- we're creating elements
532   {
533     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
534     startRequested=CmiTrue;
535     return;
536   }
537
538 //If none of these cases, we need to start the reduction--
539   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
540   inProgress=CmiTrue;
541  
542
543         /*
544                 FAULT_EVAC
545         */
546   if(!CmiNodeAlive(CkMyPe())){
547         return;
548   }
549
550 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
551   if(CmiMyPe() == 0 && redNo == 0){
552             for(int j=0;j<CkNumPes();j++){
553                 if(j != CkMyPe() && j != srcPE){
554                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
555                 }
556             }
557             if(lcount == 0){
558                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
559                 dummy->fromPE = CmiMyPe();
560                 dummy->sourceProcessorCount = 0;
561                 dummy->redNo = 0;
562                 thisProxy[0].contributeViaMessage(dummy);
563             }
564     }   else{
565         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
566     }
567
568
569 #else
570   //Sent start requests to our kids (in case they don't already know)
571 #if GROUP_LEVEL_REDUCTION
572   for (int k=0;k<treeKids();k++)
573   {
574     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
575     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
576   }
577 #else
578   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
579 #endif
580 #endif
581         
582         /*
583   int temp;
584   //making it a broadcast done only by PE 0
585   if(!hasParent()){
586                 temp = completedRedNo+1;
587                 for(int i=temp;i<=number;i++){
588                         for(int j=0;j<CkNumPes();j++){
589                                 if(j != CkMyPe() && j != srcPE){
590                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
591                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
592                                         }
593                                 }
594                         }
595                 }       
596         }       else{
597                 temp = number;
598         }*/
599 /*  if(!hasParent()){
600                 temp = completedRedNo+1;
601         }       else{
602                 temp = number;
603         }
604         for(int i=temp;i<=number;i++){
605         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
606                 if(hasParent()){
607           // kick-start your parent too ...
608                         if(treeParent() != srcPE){
609                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
610 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
611     CpvAccess(_currentObj) = oldObj;
612 #endif
613                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
614                                 }       
615                         }       
616                 }
617           for (int k=0;k<treeKids();k++)
618           {
619                         if(firstKid()+k != srcPE){
620                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
621                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
622                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
623                                 }       
624                         }       
625         }
626         }
627         */
628 }       
629
630 /*Handle a message from one element for the reduction*/
631 void CkReductionMgr::addContribution(CkReductionMsg *m)
632 {
633   if (isPast(m->redNo))
634   {
635 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
636         CmiAbort("this version should not have late migrations");
637 #else
638         //We've moved on-- forward late contribution straight to root
639     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
640         // if (!hasParent()) //Root moved on too soon-- should never happen
641         //   CkAbort("Late reduction contribution received at root!\n");
642     thisProxy[0].LateMigrantMsg(m);
643 #endif
644   }
645   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
646     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
647     futureMsgs.enq(m);
648   } else {// An ordinary contribution
649     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
650    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
651     startReduction(m->redNo,CkMyPe());
652     msgs.enq(m);
653     nContrib++;
654     finishReduction();
655   }
656 }
657
658 /**function checks if it has got all contributions that it is supposed to
659 get at this processor. If it is done it sends the reduced result to the local
660 nodegroup */
661 void CkReductionMgr::finishReduction(void)
662 {
663   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
664   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
665   if ((!inProgress) || creating){
666         DEBR((AA"Either not in Progress or creating\n"AB));
667         return;
668   }
669   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
670 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
671
672   if (nContrib<(lcount+adj(redNo).lcount)){
673          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
674          return;//Need more local messages
675   }
676 #if GROUP_LEVEL_REDUCTION
677   if (nRemote<treeKids()) return;//Need more remote messages
678 #endif
679  
680 #else
681
682     if(CkMyPe() != 0){
683         if(redNo != 0){
684             return;
685         }else{
686             CmiAssert(lcount == 0);
687             CkReductionMsg *dummy = reduceMessages();
688             dummy->fromPE = CmiMyPe();
689             dummy->sourceProcessorCount = 0;
690             thisProxy[0].contributeViaMessage(dummy);
691             return;
692         }
693     }
694     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
695         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
696          return;//Need more local messages
697   }else{
698         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
699     }
700
701
702 #endif
703
704   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
705   CkReductionMsg *result=reduceMessages();
706   result->redNo=redNo;
707 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
708
709 #if GROUP_LEVEL_REDUCTION
710   if (hasParent())
711   {//Pass data up tree to parent
712     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
713     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
714     result->gcount+=gcount+adj(redNo).gcount;
715     thisProxy[treeParent()].RecvMsg(result);
716   }
717   else 
718   {//We are root-- pass data to client
719     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
720     int totalElements=result->gcount+gcount+adj(redNo).gcount;
721     if (totalElements>result->nSources()) 
722     {
723       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
724       msgs.enq(result);
725       return; // Wait for migrants to contribute
726     } else if (totalElements<result->nSources()) {
727       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
728       CkAbort("ERROR! Too many contributions at root!\n");
729     }
730     DEBR((AA"Passing result to client function\n"AB));
731     CkSetRefNum(result, redNo);
732     if (!result->callback.isInvalid())
733             result->callback.send(result);
734     else if (!storedCallback.isInvalid())
735             storedCallback.send(result);
736     else
737             CkAbort("No reduction client!\n"
738                     "You must register a client with either SetReductionClient or during contribute.\n");
739   }
740
741 #else
742   result->gcount+=gcount+adj(redNo).gcount;
743
744   result->secondaryCallback = result->callback;
745   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
746         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
747
748   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
749
750  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
751   
752   // Find our node reduction manager, and pass reduction to him:
753   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
754   nodeMgr->contributeArrayReduction(result);
755 #endif
756 #else                // _FAULT_MLOG_
757   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
758
759     CkSetRefNum(result, redNo);
760     if (!result->callback.isInvalid())
761         result->callback.send(result);
762     else if (!storedCallback.isInvalid())
763         storedCallback.send(result);
764     else{
765       DEBR(("No reduction client for group %d \n",thisgroup.idx));
766         CkAbort("No reduction client!\n"
767             "You must register a client with either SetReductionClient or during contribute.\n");
768     }
769
770     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
771
772 #endif               // _FAULT_MLOG_
773
774   //House Keeping Operations will have to check later what needs to be changed
775   redNo++;
776   //Shift the count adjustment vector down one slot (to match new redNo)
777   int i;
778 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
779         if(CkMyPe()!=0){
780 #else
781     {
782 #endif
783 //      int i;
784         completedRedNo++;
785         for (i=1;i<(int)(adjVec.length());i++){
786            adjVec[i-1]=adjVec[i];
787         }
788         adjVec.length()--;  
789   }
790
791   inProgress=CmiFalse;
792   startRequested=CmiFalse;
793   nRemote=nContrib=0;
794
795   //Look through the future queue for messages we can now handle
796   int n=futureMsgs.length();
797   for (i=0;i<n;i++)
798   {
799     CkReductionMsg *m=futureMsgs.deq();
800     if (m!=NULL) //One of these addContributions may have finished us.
801       addContribution(m);//<- if *still* early, puts it back in the queue
802   }
803 #if GROUP_LEVEL_REDUCTION
804   n=futureRemoteMsgs.length();
805   for (i=0;i<n;i++)
806   {
807     CkReductionMsg *m=futureRemoteMsgs.deq();
808     if (m!=NULL) {
809       RecvMsg(m);//<- if *still* early, puts it back in the queue
810     }
811   }
812 #endif
813
814 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
815   if(maxStartRequest >= redNo){
816           startReduction(redNo,CkMyPe());
817           finishReduction();
818   }
819  
820 #endif
821 }
822
823 //Sent up the reduction tree with reduced data
824   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
825 {
826 #if GROUP_LEVEL_REDUCTION
827 #if CMK_BLUEGENE_CHARM
828   _TRACE_BG_TLINE_END(&m->log);
829 #endif
830   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
831     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
832     startReduction(m->redNo, CkMyPe());
833     msgs.enq(m);
834     nRemote++;
835     finishReduction();
836   }
837   else if (isFuture(m->redNo)) {
838     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
839     futureRemoteMsgs.enq(m);
840   } 
841   else CkAbort("Recv'd late remote contribution!\n");
842 #endif
843 }
844
845 //////////// Reduction Manager Utilities /////////////
846
847 //Return the countAdjustment struct for the given redNo:
848 countAdjustment &CkReductionMgr::adj(int number)
849 {
850   number-=completedRedNo;
851   number--;
852   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
853   //Pad the adjustment vector with zeros until it's at least number long
854   while ((int)(adjVec.length())<=number)
855     adjVec.push_back(countAdjustment());
856   return adjVec[number];
857 }
858
859 //Combine (& free) the current message vector msgs.
860 CkReductionMsg *CkReductionMgr::reduceMessages(void)
861 {
862 #if CMK_BLUEGENE_CHARM
863   _TRACE_BG_END_EXECUTE(1);
864   void* _bgParentLog = NULL;
865   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
866 #endif
867   CkReductionMsg *ret=NULL;
868
869   //Look through the vector for a valid reducer, swapping out placeholder messages
870   CkReduction::reducerType r=CkReduction::invalid;
871   int msgs_gcount=0;//Reduced gcount
872   int msgs_nSources=0;//Reduced nSources
873   int msgs_userFlag=-1;
874   CkCallback msgs_callback;
875   int i;
876   int nMsgs=0;
877   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
878   CkReductionMsg *m;
879   bool isMigratableContributor;
880
881   // Copy message queue into msgArr, skipping placeholders:
882   while (NULL!=(m=msgs.deq()))
883   {
884     msgs_gcount+=m->gcount;
885     if (m->sourceFlag!=0)
886     { //This is a real message from an element, not just a placeholder
887       msgArr[nMsgs++]=m;
888       msgs_nSources+=m->nSources();
889       r=m->reducer;
890       if (!m->callback.isInvalid())
891         msgs_callback=m->callback;
892       if (m->userFlag!=-1)
893         msgs_userFlag=m->userFlag;
894                         
895         isMigratableContributor=m->isMigratableContributor();
896 #if CMK_BLUEGENE_CHARM
897         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
898 #endif
899     }
900     else
901     { //This is just a placeholder message-- forget it
902       delete m;
903     }
904   }
905
906   if (nMsgs==0||r==CkReduction::invalid)
907   //No valid reducer in the whole vector
908     ret=CkReductionMsg::buildNew(0,NULL);
909   else
910   {//Use the reducer to reduce the messages
911                 //if there is only one msg to be reduced just return that message
912     if(nMsgs == 1){
913         ret = msgArr[0];        
914     }else{
915         CkReduction::reducerFn f=CkReduction::reducerTable[r];
916         ret=(*f)(nMsgs,msgArr);
917     }
918     ret->reducer=r;
919   }
920
921
922
923 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
924
925 #if CRITICAL_PATH_DEBUG > 3
926   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
927 #endif
928
929   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
930   path.updateMax(UsrToEnv(ret));
931   // Combine the critical paths from all the reduction messages
932   for (i=0;i<nMsgs;i++){
933     if (msgArr[i]!=ret){
934       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
935       path.updateMax(UsrToEnv(msgArr[i]));
936     }
937   }
938   
939
940 #if CRITICAL_PATH_DEBUG > 3
941   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
942 #endif
943   
944   PathHistoryTableEntry tableEntry(path);
945   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
946   
947 #endif
948
949         //Go back through the vector, deleting old messages
950   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
951         delete [] msgArr;
952
953   //Set the message counts
954   ret->redNo=redNo;
955   ret->gcount=msgs_gcount;
956   ret->userFlag=msgs_userFlag;
957   ret->callback=msgs_callback;
958   ret->sourceFlag=msgs_nSources;
959         ret->setMigratableContributor(isMigratableContributor);
960   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
961
962   return ret;
963 }
964
965
966 //Checkpointing utilities
967 //pack-unpack method for CkReductionMsg
968 //if packing pack the message and then unpack and return it
969 //if unpacking allocate memory for it read it off disk and then unapck
970 //and return it
971 void CkReductionMgr::pup(PUP::er &p)
972 {
973 //We do not store the client function pointer or the client function parameter,
974 //it is the responsibility of the programmer to correctly restore these
975   CkGroupInitCallback::pup(p);
976   p(redNo);
977   p(completedRedNo);
978   p(inProgress); p(creating); p(startRequested);
979   p(nContrib); p(nRemote);
980   p|msgs;
981   p|futureMsgs;
982   p|futureRemoteMsgs;
983   p|finalMsgs;
984   p|adjVec;
985   p|nodeProxy;
986   p|storedCallback;
987     // handle CkReductionClientBundle
988   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
989   {
990     CkReductionClientBundle *bd;
991     if (p.isUnpacking()) 
992       bd = new CkReductionClientBundle;
993     else
994       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
995     p|*bd;
996     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
997   }
998
999   // subtle --- Gengbin
1000   // Group : CkReductionMgr
1001   // CkArray: CkReductionMgr
1002   // lcount/gcount in Group is set in Group constructor
1003   // lcount/gcount in CkArray is not, it is set when array elements are created
1004   // we can not pup because inserting array elems will add the counters again
1005 //  p|lcount;
1006 //  p|gcount;
1007 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1008 //  p|lcount;
1009 //  //  p|gcount;
1010 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1011 #endif
1012   if(p.isUnpacking()){
1013     thisProxy = thisgroup;
1014     maxStartRequest=0;
1015 #if GROUP_LEVEL_REDUCTION
1016 #ifdef BINOMIAL_TREE
1017     init_BinomialTree();
1018 #else
1019    init_BinaryTree();
1020 #endif
1021 #endif
1022   }
1023
1024   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1025 }
1026
1027
1028 //Callback for doing Reduction through NodeGroups added by Sayantan
1029
1030 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1031
1032         int total = m->gcount+adj(m->redNo).gcount;
1033         finalMsgs.enq(m);
1034         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1035         adj(m->redNo).mainRecvd = 1;
1036         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1037         endArrayReduction();
1038 }
1039
1040 void CkReductionMgr :: endArrayReduction(){
1041         CkReductionMsg *ret=NULL;
1042         int nMsgs=finalMsgs.length();
1043         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1044         //Look through the vector for a valid reducer, swapping out placeholder messages
1045         //CkPrintf("Length of Final Message %d \n",nMsgs);
1046         CkReduction::reducerType r=CkReduction::invalid;
1047         int msgs_gcount=0;//Reduced gcount
1048         int msgs_nSources=0;//Reduced nSources
1049         int msgs_userFlag=-1;
1050         CkCallback msgs_callback;
1051         CkCallback msgs_secondaryCallback;
1052         CkVec<CkReductionMsg *> tempMsgs;
1053         int i;
1054         int numMsgs = 0;
1055         for (i=0;i<nMsgs;i++)
1056         {
1057                 CkReductionMsg *m=finalMsgs.deq();
1058                 if(m->redNo == completedRedNo +1){
1059                         msgs_gcount+=m->gcount;
1060                         if (m->sourceFlag!=0)
1061                         { //This is a real message from an element, not just a placeholder
1062                                 msgs_nSources+=m->nSources();
1063                                 r=m->reducer;
1064                                 if (!m->callback.isInvalid())
1065                                 msgs_callback=m->callback;
1066                                 if(!m->secondaryCallback.isInvalid())
1067                                         msgs_secondaryCallback = m->secondaryCallback;
1068                                 if (m->userFlag!=-1)
1069                                         msgs_userFlag=m->userFlag;
1070                                 tempMsgs.push_back(m);
1071                         }
1072                 }else{
1073                         finalMsgs.enq(m);
1074                 }
1075
1076         }
1077         numMsgs = tempMsgs.length();
1078
1079         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1080
1081         if(numMsgs == 0){
1082                 return;
1083         }
1084         if(adj(completedRedNo+1).mainRecvd == 0){
1085                 for(i=0;i<numMsgs;i++){
1086                         finalMsgs.enq(tempMsgs[i]);
1087                 }
1088                 return;
1089         }
1090
1091 /*
1092         NOT NEEDED ANYMORE DONE at nodegroup level
1093         if(msgs_gcount  > msgs_nSources){
1094                 for(i=0;i<numMsgs;i++){
1095                         finalMsgs.enq(tempMsgs[i]);
1096                 }
1097                 return;
1098         }*/
1099
1100         if (nMsgs==0||r==CkReduction::invalid)
1101                 //No valid reducer in the whole vector
1102                 ret=CkReductionMsg::buildNew(0,NULL);
1103         else{//Use the reducer to reduce the messages
1104                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1105                 // has to be corrected elements from above need to be put into a temporary vector
1106                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1107                 ret=(*f)(numMsgs,msgArr);
1108                 ret->reducer=r;
1109
1110         }
1111
1112         
1113 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1114
1115 #if CRITICAL_PATH_DEBUG > 3
1116         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1117 #endif
1118
1119         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1120         path.updateMax(UsrToEnv(ret));
1121         // Combine the critical paths from all the reduction messages into the header for the new result
1122         for (i=0;i<numMsgs;i++){
1123           if (tempMsgs[i]!=ret){
1124             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1125             path.updateMax(UsrToEnv(tempMsgs[i]));
1126           } else {
1127             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1128           }
1129         }
1130         // Also consider the path which got us into this entry method
1131
1132 #if CRITICAL_PATH_DEBUG > 3
1133         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1134 #endif
1135
1136 #endif
1137   
1138
1139
1140
1141
1142         for(i = 0;i<numMsgs;i++){
1143                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1144         }
1145
1146         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1147         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1148
1149         ret->redNo=completedRedNo+1;
1150         ret->gcount=msgs_gcount;
1151         ret->userFlag=msgs_userFlag;
1152         ret->callback=msgs_callback;
1153         ret->secondaryCallback = msgs_secondaryCallback;
1154         ret->sourceFlag=msgs_nSources;
1155
1156         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1157
1158         CkSetRefNum(ret, ret->redNo);
1159         if (!ret->secondaryCallback.isInvalid())
1160             ret->secondaryCallback.send(ret);
1161     else if (!storedCallback.isInvalid())
1162             storedCallback.send(ret);
1163     else{
1164       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1165             CkAbort("No reduction client!\n"
1166                     "You must register a client with either SetReductionClient or during contribute.\n");
1167     }
1168         completedRedNo++;
1169
1170         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1171
1172         for (i=1;i<(int)(adjVec.length());i++)
1173                 adjVec[i-1]=adjVec[i];
1174         adjVec.length()--;
1175         endArrayReduction();
1176 }
1177
1178 #if GROUP_LEVEL_REDUCTION
1179
1180 void CkReductionMgr::init_BinaryTree(){
1181         parent = (CkMyPe()-1)/TREE_WID;
1182         int firstkid = CkMyPe()*TREE_WID+1;
1183         numKids=CkNumPes()-firstkid;
1184         if (numKids>TREE_WID) numKids=TREE_WID;
1185         if (numKids<0) numKids=0;
1186
1187         for(int i=0;i<numKids;i++){
1188                 kids.push_back(firstkid+i);
1189                 newKids.push_back(firstkid+i);
1190         }
1191 }
1192
1193 void CkReductionMgr::init_BinomialTree(){
1194         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1195         /*upperSize = (unsigned )pow((double)2,depth);*/
1196         upperSize = (unsigned) 1 << depth;
1197         label = upperSize-CkMyPe()-1;
1198         int p=label;
1199         int count=0;
1200         while( p > 0){
1201                 if(p % 2 == 0)
1202                         break;
1203                 else{
1204                         p = p/2;
1205                         count++;
1206                 }
1207         }
1208         /*parent = label + rint(pow((double)2,count));*/
1209         parent = label + (1<<count);
1210         parent = upperSize -1 -parent;
1211         int temp;
1212         if(count != 0){
1213                 numKids = 0;
1214                 for(int i=0;i<count;i++){
1215                         /*temp = label - rint(pow((double)2,i));*/
1216                         temp = label - (1<<i);
1217                         temp = upperSize-1-temp;
1218                         if(temp <= CkNumPes()-1){
1219                                 kids.push_back(temp);
1220                                 numKids++;
1221                         }
1222                 }
1223         }else{
1224                 numKids = 0;
1225         //      kids = NULL;
1226         }
1227 }
1228
1229
1230 int CkReductionMgr::treeRoot(void)
1231 {
1232   return 0;
1233 }
1234 CmiBool CkReductionMgr::hasParent(void) //Root Node
1235 {
1236   return (CmiBool)(CkMyPe()!=treeRoot());
1237 }
1238 int CkReductionMgr::treeParent(void) //My parent Node
1239 {
1240   return parent;
1241 }
1242
1243 int CkReductionMgr::firstKid(void) //My first child Node
1244 {
1245   return CkMyPe()*TREE_WID+1;
1246 }
1247 int CkReductionMgr::treeKids(void)//Number of children in tree
1248 {
1249   return numKids;
1250 }
1251
1252 #endif
1253
1254 /////////////////////////////////////////////////////////////////////////
1255
1256 ////////////////////////////////
1257 //CkReductionMsg support
1258
1259 //ReductionMessage default private constructor-- does nothing
1260 CkReductionMsg::CkReductionMsg(){}
1261 CkReductionMsg::~CkReductionMsg(){}
1262
1263 //This define gives the distance from the start of the CkReductionMsg
1264 // object to the start of the user data area (just below last object field)
1265 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1266
1267 //"Constructor"-- builds and returns a new CkReductionMsg.
1268 //  the "data" array you specify will be copied into this object.
1269 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1270     CkReduction::reducerType reducer, CkReductionMsg *buf)
1271 {
1272   int len[1];
1273   len[0]=NdataSize;
1274   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1275   
1276   ret->dataSize=NdataSize;
1277   if (srcData!=NULL && !buf)
1278     memcpy(ret->data,srcData,NdataSize);
1279   ret->userFlag=-1;
1280   ret->reducer=reducer;
1281   //ret->ci=NULL;
1282   ret->sourceFlag=-1000;
1283   ret->gcount=0;
1284   ret->migratableContributor = true;
1285 #if CMK_BLUEGENE_CHARM
1286   ret->log = NULL;
1287 #endif
1288   return ret;
1289 }
1290
1291 // Charm kernel message runtime support:
1292 void *
1293 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1294 {
1295   int totalsize=ARM_DATASTART+(*sz);
1296   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1297   CkReductionMsg *ret = (CkReductionMsg *)
1298     CkAllocMsg(msgnum,totalsize,priobits);
1299   ret->data=(void *)(&ret->dataStorage);
1300   return (void *) ret;
1301 }
1302
1303 void *
1304 CkReductionMsg::pack(CkReductionMsg* in)
1305 {
1306   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1307   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1308   in->data = NULL;
1309   return (void*) in;
1310 }
1311
1312 CkReductionMsg* CkReductionMsg::unpack(void *in)
1313 {
1314   CkReductionMsg *ret = (CkReductionMsg *)in;
1315   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1316   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1317   ret->data=(void *)(&ret->dataStorage);
1318   return ret;
1319 }
1320
1321
1322 /////////////////////////////////////////////////////////////////////////////////////
1323 ///////////////// Builtin Reducer Functions //////////////
1324 /* A simple reducer, like sum_int, looks like this:
1325 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1326 {
1327   int i,ret=0;
1328   for (i=0;i<nMsg;i++)
1329     ret+=*(int *)(msg[i]->getData());
1330   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1331 }
1332
1333 To keep the code small and easy to change, the implementations below
1334 are built with preprocessor macros.
1335 */
1336
1337 //////////////// simple reducers ///////////////////
1338 /*A define used to quickly and tersely construct simple reductions.
1339 The basic idea is to use the first message's data array as
1340 (pre-initialized!) scratch space for folding in the other messages.
1341  */
1342
1343 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1344 {
1345         CkAbort("Called the invalid reducer type 0.  This probably\n"
1346                 "means you forgot to initialize your custom reducer index.\n");
1347         return NULL;
1348 }
1349
1350 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1351 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1352 {\
1353   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1354   int m,i;\
1355   int nElem=msg[0]->getLength()/sizeof(dataType);\
1356   dataType *ret=(dataType *)(msg[0]->getData());\
1357   for (m=1;m<nMsg;m++)\
1358   {\
1359     dataType *value=(dataType *)(msg[m]->getData());\
1360     for (i=0;i<nElem;i++)\
1361     {\
1362       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1363       loop\
1364     }\
1365   }\
1366   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1367   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1368 }
1369
1370 //Use this macro for reductions that have the same type for all inputs
1371 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1372   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1373   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1374   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1375
1376
1377 //Compute the sum the numbers passed by each element.
1378 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1379
1380 //Compute the product of the numbers passed by each element.
1381 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1382
1383 //Compute the largest number passed by any element.
1384 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1385
1386 //Compute the smallest integer passed by any element.
1387 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1388
1389
1390 //Compute the logical AND of the integers passed by each element.
1391 // The resulting integer will be zero if any source integer is zero; else 1.
1392 SIMPLE_REDUCTION(logical_and,int,"%d",
1393         if (value[i]==0)
1394      ret[i]=0;
1395   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1396 )
1397
1398 //Compute the logical OR of the integers passed by each element.
1399 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1400 SIMPLE_REDUCTION(logical_or,int,"%d",
1401   if (value[i]!=0)
1402            ret[i]=1;
1403   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1404 )
1405
1406 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1407 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1408
1409 //Select one random message to pass on
1410 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1411   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1412 }
1413
1414 /////////////// concat ////////////////
1415 /*
1416 This reducer simply appends the data it recieves from each element,
1417 without any housekeeping data to separate them.
1418 */
1419 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1420 {
1421   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1422   //Figure out how big a message we'll need
1423   int i,retSize=0;
1424   for (i=0;i<nMsg;i++)
1425       retSize+=msg[i]->getSize();
1426
1427   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1428
1429   //Allocate a new message
1430   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1431
1432   //Copy the source message data into the return message
1433   char *cur=(char *)(ret->getData());
1434   for (i=0;i<nMsg;i++) {
1435     int messageBytes=msg[i]->getSize();
1436     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1437     cur+=messageBytes;
1438   }
1439   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1440   return ret;
1441 }
1442
1443 /////////////// set ////////////////
1444 /*
1445 This reducer appends the data it recieves from each element
1446 along with some housekeeping data indicating contribution boundaries.
1447 The message data is thus a list of reduction_set_element structures
1448 terminated by a dummy reduction_set_element with a sourceElement of -1.
1449 */
1450
1451 //This rounds an integer up to the nearest multiple of sizeof(double)
1452 static const int alignSize=sizeof(double);
1453 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1454
1455 //This gives the size (in bytes) of a reduction_set_element
1456 static int SET_SIZE(int dataSize)
1457 {return SET_ALIGN(sizeof(int)+dataSize);}
1458
1459 //This returns a pointer to the next reduction_set_element in the list
1460 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1461 {
1462   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1463   return (CkReduction::setElement *)next;
1464 }
1465
1466 //Combine the data passed by each element into an list of reduction_set_elements.
1467 // Each element may contribute arbitrary data (with arbitrary length).
1468 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1469 {
1470   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1471   //Figure out how big a message we'll need
1472   int i,retSize=0;
1473   for (i=0;i<nMsg;i++) {
1474     if (!msg[i]->isFromUser())
1475     //This message is composite-- it will just be copied over (less terminating -1)
1476       retSize+=(msg[i]->getSize()-sizeof(int));
1477     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1478       retSize+=SET_SIZE(msg[i]->getSize());
1479   }
1480   retSize+=sizeof(int);//Leave room for terminating -1.
1481
1482   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1483
1484   //Allocate a new message
1485   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1486
1487   //Copy the source message data into the return message
1488   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1489   for (i=0;i<nMsg;i++)
1490     if (!msg[i]->isFromUser())
1491     {//This message is composite-- just copy it over (less terminating -1)
1492                         int messageBytes=msg[i]->getSize()-sizeof(int);
1493                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1494                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1495                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1496     }
1497     else //This is a message from an element-- wrap it in a reduction_set_element
1498     {
1499       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1500       cur->dataSize=msg[i]->getSize();
1501       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1502       cur=SET_NEXT(cur);
1503     }
1504   cur->dataSize=-1;//Add a terminating -1.
1505   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1506   return ret;
1507 }
1508
1509 //Utility routine: get the next reduction_set_element in the list
1510 // if there is one, or return NULL if there are none.
1511 //To get all the elements, just keep feeding this procedure's output back to
1512 // its input until it returns NULL.
1513 CkReduction::setElement *CkReduction::setElement::next(void)
1514 {
1515   CkReduction::setElement *n=SET_NEXT(this);
1516   if (n->dataSize==-1)
1517     return NULL;//This is the end of the list
1518   else
1519     return n;//This is just another element
1520 }
1521
1522 /////////////////// Reduction Function Table /////////////////////
1523 CkReduction::CkReduction() {} //Dummy private constructor
1524
1525 //Add the given reducer to the list.  Returns the new reducer's
1526 // reducerType.  Must be called in the same order on every node.
1527 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1528 {
1529   reducerTable[nReducers]=fn;
1530   return (reducerType)nReducers++;
1531 }
1532
1533 /*Reducer table: maps reducerTypes to reducerFns.
1534 It's indexed by reducerType, so the order in this table
1535 must *exactly* match the reducerType enum declaration.
1536 The names don't have to match, but it helps.
1537 */
1538 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1539
1540 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1541     ::invalid_reducer,
1542   //Compute the sum the numbers passed by each element.
1543     ::sum_int,::sum_float,::sum_double,
1544
1545   //Compute the product the numbers passed by each element.
1546     ::product_int,::product_float,::product_double,
1547
1548   //Compute the largest number passed by any element.
1549     ::max_int,::max_float,::max_double,
1550
1551   //Compute the smallest number passed by any element.
1552     ::min_int,::min_float,::min_double,
1553
1554   //Compute the logical AND of the integers passed by each element.
1555   // The resulting integer will be zero if any source integer is zero.
1556     ::logical_and,
1557
1558   //Compute the logical OR of the integers passed by each element.
1559   // The resulting integer will be 1 if any source integer is nonzero.
1560     ::logical_or,
1561
1562     // Compute the logical bitvector AND of the integers passed by each element.
1563     ::bitvec_and,
1564
1565     // Compute the logical bitvector OR of the integers passed by each element.
1566     ::bitvec_or,
1567
1568     // Select one of the messages at random to pass on
1569     ::random,
1570
1571   //Concatenate the (arbitrary) data passed by each element
1572     ::concat,
1573
1574   //Combine the data passed by each element into an list of setElements.
1575   // Each element may contribute arbitrary data (with arbitrary length).
1576     ::set
1577 };
1578
1579
1580
1581
1582
1583
1584
1585
1586 /********** Code added by Sayantan *********************/
1587 /** Locking is a big problem in the nodegroup code for smp.
1588  So a few assumptions have had to be made. There is one lock
1589  called lockEverything. It protects all the data structures 
1590  of the nodegroup reduction mgr. I tried grabbing it separately 
1591  for each datastructure, modifying it and then releasing it and
1592  then grabbing it again, for the next change.
1593  That doesn't really help because the interleaved execution of 
1594  different threads makes the state of the reduction manager 
1595  inconsistent. 
1596  
1597  1. Grab lockEverything before calling finishreduction or startReduction
1598     or doRecvMsg
1599  2. lockEverything is grabbed only in entry methods reductionStarting
1600     or RecvMesg or  addcontribution.
1601  ****/
1602  
1603 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1604 NodeGroup::NodeGroup(void) {
1605   __nodelock=CmiCreateLock();
1606 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1607     mlogData->objID.type = TypeNodeGroup;
1608     mlogData->objID.data.group.onPE = CkMyNode();
1609 #endif
1610
1611 }
1612 NodeGroup::~NodeGroup() {
1613   CmiDestroyLock(__nodelock);
1614 }
1615 void NodeGroup::pup(PUP::er &p)
1616 {
1617   CkNodeReductionMgr::pup(p);
1618   p|reductionInfo;
1619 }
1620
1621 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1622
1623 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1624   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1625  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1626   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1627  }
1628
1629 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1630                                     ((CkNodeReductionMgr *)this),
1631                                     reductionInfo,false)
1632
1633 /* this contribute also adds up the count across all messages it receives.
1634   Useful for summing up number of array elements who have contributed ****/ 
1635 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1636         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1637
1638
1639
1640 //#define BINOMIAL_TREE
1641
1642 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1643   : thisProxy(thisgroup)
1644 {
1645 #ifdef BINOMIAL_TREE
1646   init_BinomialTree();
1647 #else
1648   init_BinaryTree();
1649 #endif
1650   storedCallback=NULL;
1651   redNo=0;
1652   inProgress=CmiFalse;
1653   
1654   startRequested=CmiFalse;
1655   gcount=CkNumNodes();
1656   lcount=1;
1657   nContrib=nRemote=0;
1658   lockEverything = CmiCreateLock();
1659
1660
1661   creating=CmiFalse;
1662   interrupt = 0;
1663   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1664         /*
1665                 FAULT_EVAC
1666         */
1667         blocked = false;
1668         maxModificationRedNo = INT_MAX;
1669         killed=0;
1670         additionalGCount = newAdditionalGCount = 0;
1671 }
1672
1673 void CkNodeReductionMgr::flushStates()
1674 {
1675  if(CkMyRank() == 0){
1676   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1677   redNo=0;
1678   inProgress=CmiFalse;
1679
1680   startRequested=CmiFalse;
1681   gcount=CkNumNodes();
1682   lcount=1;
1683   nContrib=nRemote=0;
1684
1685   creating=CmiFalse;
1686   interrupt = 0;
1687   while (!msgs.isEmpty()) { delete msgs.deq(); }
1688   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1689   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1690   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1691   }
1692 }
1693
1694 //////////// Reduction Manager Client API /////////////
1695
1696 //Add the given client function.  Overwrites any previous client.
1697 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1698 {
1699   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1700   if(cb->isInvalid()){
1701         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1702   }else{
1703         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1704   }
1705
1706   if (CkMyNode()!=0)
1707           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1708   delete storedCallback;
1709   storedCallback=cb;
1710 }
1711
1712
1713
1714 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1715 {
1716 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1717     Chare *oldObj =CpvAccess(_currentObj);
1718     CpvAccess(_currentObj) = this;
1719 #endif
1720
1721   //m->ci=ci;
1722   m->redNo=ci->redNo++;
1723   m->sourceFlag=-1;//A single contribution
1724   m->gcount=0;
1725   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1726   addContribution(m);
1727
1728 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1729     CpvAccess(_currentObj) = oldObj;
1730 #endif
1731 }
1732
1733
1734 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1735 {
1736 #if CMK_BLUEGENE_CHARM
1737   _TRACE_BG_TLINE_END(&m->log);
1738 #endif
1739 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1740     Chare *oldObj =CpvAccess(_currentObj);
1741     CpvAccess(_currentObj) = this;
1742 #endif
1743   //m->ci=ci;
1744   m->redNo=ci->redNo++;
1745   m->gcount=count;
1746   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1747   addContribution(m);
1748   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1749
1750 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1751     CpvAccess(_currentObj) = oldObj;
1752 #endif
1753 }
1754
1755
1756 //////////// Reduction Manager Remote Entry Points /////////////
1757
1758 //Sent down the reduction tree (used by barren PEs)
1759 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1760 {
1761   CmiLock(lockEverything);
1762   if(blocked){
1763         delete m;
1764         CmiUnlock(lockEverything);
1765         return ;
1766   }
1767   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1768   if (isPresent(m->num) && !inProgress)
1769   {
1770     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1771     startReduction(m->num,srcNode);
1772     finishReduction();
1773   } else if (isFuture(m->num)){
1774         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1775   }
1776   else //is Past
1777     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1778   CmiUnlock(lockEverything);
1779   delete m;
1780
1781 }
1782
1783
1784 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1785         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1786         /*
1787                 FAULT_EVAC
1788         */
1789         if(blocked){
1790                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1791                 bufferedRemoteMsgs.enq(m);
1792                 return;
1793         }
1794         
1795         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1796             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1797             startReduction(m->redNo,CkMyNode());
1798             msgs.enq(m);
1799             nRemote++;
1800             finishReduction();
1801         }
1802         else {
1803             if (isFuture(m->redNo)) {
1804                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1805                     futureRemoteMsgs.enq(m);
1806             }else{
1807                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1808                    CkAbort("Recv'd late remote contribution!\n");
1809             }
1810         }
1811         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1812 }
1813
1814 //Sent up the reduction tree with reduced data
1815 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1816 {
1817 #if CMK_BLUEGENE_CHARM
1818   _TRACE_BG_TLINE_END(&m->log);
1819 #endif
1820 #ifndef CMK_CPV_IS_SMP
1821 #if CMK_IMMEDIATE_MSG
1822         if(interrupt == 1){
1823                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1824                 CpvAccess(_qd)->process(-1);
1825                 CmiDelayImmediate();
1826                 return;
1827         }
1828 #endif  
1829 #endif
1830    interrupt = 1;       
1831    CmiLock(lockEverything);   
1832    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1833    doRecvMsg(m);
1834    CmiUnlock(lockEverything);    
1835    interrupt = 0;
1836    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1837 }
1838
1839 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1840 {
1841         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1842         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1843         if (inProgress){
1844                 DEBR((AA"This Node reduction is already in progress\n"AB));
1845                 return;//This reduction already started
1846         }
1847         if (creating) //Don't start yet-- we're creating elements
1848         {
1849                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1850                 startRequested=CmiTrue;
1851                 return;
1852         }
1853         
1854         //If none of these cases, we need to start the reduction--
1855         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1856         inProgress=CmiTrue;
1857         //Sent start requests to our kids (in case they don't already know)
1858
1859         for (int k=0;k<treeKids();k++)
1860         {
1861 #ifdef BINOMIAL_TREE
1862                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1863                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1864 #else
1865                 if(kids[k] != srcNode){
1866                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1867                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1868                 }       
1869 #endif
1870         }
1871
1872         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1873         // in case, nodegroup does not has the attached red group,
1874         // it has to restart groups again
1875         startLocalGroupReductions(number);
1876 /*
1877         if (startLocalGroupReductions(number) == 0)
1878           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1879 */
1880 }
1881
1882 // restart local groups until succeed
1883 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1884   CmiLock(lockEverything);    
1885   if (startLocalGroupReductions(number) == 0)
1886     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1887   CmiUnlock(lockEverything);    
1888 }
1889
1890 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1891         /*
1892                 FAULT_EVAC
1893         */
1894         if(blocked){
1895                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1896                 bufferedMsgs.enq(m);
1897                 return;
1898         }
1899         
1900         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1901                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1902                 futureMsgs.enq(m);
1903         } else {// An ordinary contribution
1904                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1905                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1906                 startReduction(m->redNo,CkMyNode());
1907                 msgs.enq(m);
1908                 nContrib++;
1909                 finishReduction();
1910         }
1911 }
1912
1913 //Handle a message from one element for the reduction
1914 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1915 {
1916   interrupt = 1;
1917   CmiLock(lockEverything);
1918   doAddContribution(m);
1919   CmiUnlock(lockEverything);
1920   interrupt = 0;
1921 }
1922
1923 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1924         if(blocked){
1925                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1926                 bufferedMsgs.enq(m);
1927                 return;
1928         }
1929         
1930         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1931                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1932 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1933                 futureLateMigrantMsgs.enq(m);
1934         } else {// An ordinary contribution
1935                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1936 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1937                 msgs.enq(m);
1938                 finishReduction();
1939         }
1940 }
1941
1942
1943
1944
1945
1946 /** check if the nodegroup reduction is finished at this node. In that case send it
1947 up the reduction tree **/
1948
1949 void CkNodeReductionMgr::finishReduction(void)
1950 {
1951   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1952   /***Check if reduction is finished in the next few ifs***/
1953   if ((!inProgress) || creating){
1954         DEBR((AA"Either not in Progress or creating\n"AB));
1955         return;
1956   }
1957
1958   if (nContrib<(lcount)){
1959         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1960
1961          return;//Need more local messages
1962   }
1963   if (nRemote<treeKids()){
1964         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1965
1966         return;//Need more remote messages
1967   }
1968   if (nRemote>treeKids()){
1969
1970           interrupt = 0;
1971            CkAbort("Nodegrp Excess remote reduction message received!\n");
1972   }
1973
1974   DEBR((AA"Reducing node data...\n"AB));
1975
1976   /**reduce all messages received at this node **/
1977   CkReductionMsg *result=reduceMessages();
1978
1979   if (hasParent())
1980   {//Pass data up tree to parent
1981         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1982         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1983         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1984                 /*
1985                         FAULT_EVAC
1986                 */
1987                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1988             thisProxy[treeParent()].RecvMsg(result);
1989         }
1990
1991   }
1992   else
1993   {
1994                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1995                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
1996                         msgs.enq(result);
1997                         return;
1998                 }
1999                 result->gcount += additionalGCount;
2000           /** if the reduction is finished and I am the root of the reduction tree
2001           then call the reductionhandler and other stuff ***/
2002                 
2003
2004                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2005    CkSetRefNum(result, redNo);
2006     if (!result->callback.isInvalid()){
2007       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2008             result->callback.send(result);
2009     }
2010     else if (storedCallback!=NULL){
2011       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2012             storedCallback->send(result);
2013     }
2014     else{
2015                 DEBR((AA"Invalid Callback \n"AB));
2016             CkAbort("No reduction client!\n"
2017                     "You must register a client with either SetReductionClient or during contribute.\n");
2018                 }
2019   }
2020
2021   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2022   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2023   redNo++;
2024         updateTree();
2025   int i;
2026   inProgress=CmiFalse;
2027   startRequested=CmiFalse;
2028   nRemote=nContrib=0;
2029
2030   //Look through the future queue for messages we can now handle
2031   int n=futureMsgs.length();
2032
2033   for (i=0;i<n;i++)
2034   {
2035     interrupt = 1;
2036
2037     CkReductionMsg *m=futureMsgs.deq();
2038
2039     interrupt = 0;
2040     if (m!=NULL){ //One of these addContributions may have finished us.
2041       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2042       doAddContribution(m);//<- if *still* early, puts it back in the queue
2043     }
2044   }
2045
2046   interrupt = 1;
2047
2048   n=futureRemoteMsgs.length();
2049
2050   interrupt = 0;
2051   for (i=0;i<n;i++)
2052   {
2053     interrupt = 1;
2054
2055     CkReductionMsg *m=futureRemoteMsgs.deq();
2056
2057     interrupt = 0;
2058     if (m!=NULL)
2059       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2060   }
2061   
2062   n = futureLateMigrantMsgs.length();
2063   for(i=0;i<n;i++){
2064     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2065     if(m != NULL){
2066       if(m->redNo == redNo){
2067         msgs.enq(m);
2068       }else{
2069         futureLateMigrantMsgs.enq(m);
2070       }
2071     }
2072   }
2073 }
2074
2075 //////////// Reduction Manager Utilities /////////////
2076
2077 void CkNodeReductionMgr::init_BinaryTree(){
2078         parent = (CkMyNode()-1)/TREE_WID;
2079         int firstkid = CkMyNode()*TREE_WID+1;
2080         numKids=CkNumNodes()-firstkid;
2081   if (numKids>TREE_WID) numKids=TREE_WID;
2082   if (numKids<0) numKids=0;
2083
2084         for(int i=0;i<numKids;i++){
2085                 kids.push_back(firstkid+i);
2086                 newKids.push_back(firstkid+i);
2087         }
2088 }
2089
2090 void CkNodeReductionMgr::init_BinomialTree(){
2091         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2092         /*upperSize = (unsigned )pow((double)2,depth);*/
2093         upperSize = (unsigned) 1 << depth;
2094         label = upperSize-CkMyNode()-1;
2095         int p=label;
2096         int count=0;
2097         while( p > 0){
2098                 if(p % 2 == 0)
2099                         break;
2100                 else{
2101                         p = p/2;
2102                         count++;
2103                 }
2104         }
2105         /*parent = label + rint(pow((double)2,count));*/
2106         parent = label + (1<<count);
2107         parent = upperSize -1 -parent;
2108         int temp;
2109         if(count != 0){
2110                 numKids = 0;
2111                 for(int i=0;i<count;i++){
2112                         /*temp = label - rint(pow((double)2,i));*/
2113                         temp = label - (1<<i);
2114                         temp = upperSize-1-temp;
2115                         if(temp <= CkNumNodes()-1){
2116                 //              kids[numKids] = temp;
2117                                 kids.push_back(temp);
2118                                 numKids++;
2119                         }
2120                 }
2121         }else{
2122                 numKids = 0;
2123         //      kids = NULL;
2124         }
2125 }
2126
2127
2128 int CkNodeReductionMgr::treeRoot(void)
2129 {
2130   return 0;
2131 }
2132 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2133 {
2134   return (CmiBool)(CkMyNode()!=treeRoot());
2135 }
2136 int CkNodeReductionMgr::treeParent(void) //My parent Node
2137 {
2138   return parent;
2139 }
2140
2141 int CkNodeReductionMgr::firstKid(void) //My first child Node
2142 {
2143   return CkMyNode()*TREE_WID+1;
2144 }
2145 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2146 {
2147 #ifdef BINOMIAL_TREE
2148         return numKids;
2149 #else
2150 /*  int nKids=CkNumNodes()-firstKid();
2151   if (nKids>TREE_WID) nKids=TREE_WID;
2152   if (nKids<0) nKids=0;
2153   return nKids;*/
2154         return numKids;
2155 #endif
2156 }
2157
2158 //Combine (& free) the current message vector msgs.
2159 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2160 {
2161 #if CMK_BLUEGENE_CHARM
2162   _TRACE_BG_END_EXECUTE(1);
2163   void* _bgParentLog = NULL;
2164   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2165 #endif
2166   CkReductionMsg *ret=NULL;
2167
2168   //Look through the vector for a valid reducer, swapping out placeholder messages
2169   CkReduction::reducerType r=CkReduction::invalid;
2170   int msgs_gcount=0;//Reduced gcount
2171   int msgs_nSources=0;//Reduced nSources
2172   int msgs_userFlag=-1;
2173   CkCallback msgs_callback;
2174   CkCallback msgs_secondaryCallback;
2175   int i;
2176   int nMsgs=0;
2177   CkReductionMsg *m;
2178   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2179   bool isMigratableContributor;
2180         
2181
2182   while(NULL!=(m=msgs.deq()))
2183   {
2184     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2185     msgs_gcount+=m->gcount;
2186     if (m->sourceFlag!=0)
2187     { //This is a real message from an element, not just a placeholder
2188       msgArr[nMsgs++]=m;
2189       msgs_nSources+=m->nSources();
2190       r=m->reducer;
2191       if (!m->callback.isInvalid())
2192         msgs_callback=m->callback;
2193       if(!m->secondaryCallback.isInvalid()){
2194         msgs_secondaryCallback = m->secondaryCallback;
2195       }
2196       if (m->userFlag!=-1)
2197         msgs_userFlag=m->userFlag;
2198
2199         isMigratableContributor= m->isMigratableContributor();
2200 #if CMK_BLUEGENE_CHARM
2201       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2202 #endif
2203                                 
2204     }
2205     else
2206     { //This is just a placeholder message-- replace it
2207       delete m;
2208     }
2209   }
2210
2211   if (nMsgs==0||r==CkReduction::invalid)
2212   //No valid reducer in the whole vector
2213     ret=CkReductionMsg::buildNew(0,NULL);
2214   else
2215   {//Use the reducer to reduce the messages
2216     if(nMsgs == 1){
2217         ret = msgArr[0];
2218     }else{
2219         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2220         ret=(*f)(nMsgs,msgArr);
2221     }
2222     ret->reducer=r;
2223   }
2224
2225         
2226 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2227 #if CRITICAL_PATH_DEBUG > 3
2228         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2229 #endif
2230         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2231         path.updateMax(UsrToEnv(ret));
2232         // Combine the critical paths from all the reduction messages into the header for the new result
2233         for (i=0;i<nMsgs;i++){
2234           if (msgArr[i]!=ret){
2235             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2236             path.updateMax(UsrToEnv(msgArr[i]));
2237           } else {
2238             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2239           }
2240         }
2241 #if CRITICAL_PATH_DEBUG > 3
2242         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2243 #endif
2244
2245 #endif
2246
2247
2248         //Go back through the vector, deleting old messages
2249   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2250   delete [] msgArr;
2251   //Set the message counts
2252   ret->redNo=redNo;
2253   ret->gcount=msgs_gcount;
2254   ret->userFlag=msgs_userFlag;
2255   ret->callback=msgs_callback;
2256   ret->secondaryCallback = msgs_secondaryCallback;
2257   ret->sourceFlag=msgs_nSources;
2258   ret->setMigratableContributor(isMigratableContributor);
2259   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2260 #if CMK_BLUEGENE_CHARM
2261   _TRACE_BG_TLINE_END(&ret->log);
2262 #endif
2263
2264   return ret;
2265 }
2266
2267 void CkNodeReductionMgr::pup(PUP::er &p)
2268 {
2269 //We do not store the client function pointer or the client function parameter,
2270 //it is the responsibility of the programmer to correctly restore these
2271   IrrGroup::pup(p);
2272   p(redNo);
2273   p(inProgress); p(creating); p(startRequested);
2274   p(lcount);
2275   p(nContrib); p(nRemote);
2276   p(interrupt);
2277   p|msgs;
2278   p|futureMsgs;
2279   p|futureRemoteMsgs;
2280   p|futureLateMigrantMsgs;
2281   p|parent;
2282   p|additionalGCount;
2283   p|newAdditionalGCount;
2284   if(p.isUnpacking()) {
2285     gcount=CkNumNodes();
2286     thisProxy = thisgroup;
2287     lockEverything = CmiCreateLock();
2288 #ifdef BINOMIAL_TREE
2289     init_BinomialTree();
2290 #else
2291     init_BinaryTree();
2292 #endif          
2293   }
2294   p | blocked;
2295   p | maxModificationRedNo;
2296
2297 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2298   int isnull = (storedCallback == NULL);
2299   p | isnull;
2300   if (!isnull) {
2301     if (p.isUnpacking()) {
2302       storedCallback = new CkCallback;
2303     }
2304     p|*storedCallback;
2305   }
2306 #endif
2307
2308 }
2309
2310 /*
2311         FAULT_EVAC
2312         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2313         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2314         reduction tree. 
2315 */
2316 void CkNodeReductionMgr::evacuate(){
2317         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2318         if(treeKids() == 0){
2319         /*
2320                 if the node going down is a leaf
2321         */
2322                 oldleaf=true;
2323                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2324                 /*
2325                         Need to ask parent for the reduction number that it has seen. 
2326                         Since it is a leaf, the tree does not need to be rewired. 
2327                         We reuse the oldparent type of tree modification message to get 
2328                         the parent to block and tell us about the highest reduction number it has seen.
2329                         
2330                 */
2331                 int data[2];
2332                 data[0]=CkMyNode();
2333                 data[1]=getTotalGCount()+additionalGCount;
2334                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2335                 newParent = treeParent();
2336         }else{
2337                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2338                 oldleaf= false;
2339         /*
2340                 It is not a leaf. It needs to rewire the tree around itself.
2341                 It also needs to decide on a reduction No after which the new tree will be used
2342                 Till it decides on the new tree and the redNo at which it becomes valid,
2343                 all received messages will be buffered
2344         */
2345                 newParent = kids[0];
2346                 for(int i=numKids-1;i>=0;i--){
2347                         newKids.remove(i);
2348                 }
2349                 /*
2350                         Ask everybody for the highest reduction number they have seen and
2351                         also tell them about the new tree
2352                 */
2353                 /*
2354                         Tell parent about its new child;
2355                 */
2356                 int oldParentData[2];
2357                 oldParentData[0] = CkMyNode();
2358                 oldParentData[1] = newParent;
2359                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2360
2361                 /*
2362                         Tell the other children about their new parent
2363                 */
2364                 int childrenData=newParent;
2365                 for(int i=1;i<numKids;i++){
2366                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2367                 }
2368                 
2369                 /*
2370                         Tell newParent (1st child) about its new children,
2371                         the current node and its children except the newParent
2372                 */
2373                 int *newParentData = new int[numKids+2];
2374                 for(int i=1;i<numKids;i++){
2375                         newParentData[i] = kids[i];
2376                 }
2377                 newParentData[0] = CkMyNode();
2378                 newParentData[numKids] = parent;
2379                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2380                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2381         }
2382         readyDeletion = false;
2383         blocked = true;
2384         numModificationReplies = 0;
2385         tempModificationRedNo = findMaxRedNo();
2386 }
2387
2388 /*
2389         Depending on the code, use the data to change the tree
2390         1. OLDPARENT : replace the old child with a new one
2391         2. OLDCHILDREN: replace the parent
2392         3. NEWPARENT:  add the children and change the parent
2393         4. LEAFPARENT: delete the old child
2394 */
2395
2396 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2397         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2398         int sender;
2399         newKids = kids;
2400         readyDeletion = false;
2401         newAdditionalGCount = additionalGCount;
2402         switch(code){
2403                 case OLDPARENT: 
2404                         for(int i=0;i<numKids;i++){
2405                                 if(newKids[i] == data[0]){
2406                                         newKids[i] = data[1];
2407                                         break;
2408                                 }
2409                         }
2410                         sender = data[0];
2411                         newParent = parent;
2412                         break;
2413                 case OLDCHILDREN:
2414                         newParent = data[0];
2415                         sender = parent;
2416                         break;
2417                 case NEWPARENT:
2418                         for(int i=0;i<size-2;i++){
2419                                 newKids.push_back(data[i]);
2420                         }
2421                         newParent = data[size-2];
2422                         newAdditionalGCount += data[size-1];
2423                         sender = parent;
2424                         break;
2425                 case LEAFPARENT:
2426                         for(int i=0;i<numKids;i++){
2427                                 if(newKids[i] == data[0]){
2428                                         newKids.remove(i);
2429                                         break;
2430                                 }
2431                         }
2432                         sender = data[0];
2433                         newParent = parent;
2434                         newAdditionalGCount += data[1];
2435                         break;
2436         };
2437         blocked = true;
2438         int maxRedNo = findMaxRedNo();
2439         
2440         thisProxy[sender].collectMaxRedNo(maxRedNo);
2441 }
2442
2443 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2444         /*
2445                 Find out the maximum redNo that has been seen by 
2446                 the affected nodes
2447         */
2448         numModificationReplies++;
2449         if(maxRedNo > tempModificationRedNo){
2450                 tempModificationRedNo = maxRedNo;
2451         }
2452         if(numModificationReplies == numKids+1){
2453                 maxModificationRedNo = tempModificationRedNo;
2454                 /*
2455                         when all the affected nodes have replied, tell them the maximum.
2456                         Unblock yourself. deal with the buffered messages local and remote
2457                 */
2458                 if(maxModificationRedNo == -1){
2459                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2460                 }else{
2461                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2462                 }
2463                 thisProxy[parent].unblockNode(maxModificationRedNo);
2464                 for(int i=0;i<numKids;i++){
2465                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2466                 }
2467                 blocked = false;
2468                 updateTree();
2469                 clearBlockedMsgs();
2470         }
2471 }
2472
2473 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2474         maxModificationRedNo = maxRedNo;
2475         updateTree();
2476         blocked = false;
2477         clearBlockedMsgs();
2478 }
2479
2480
2481 void CkNodeReductionMgr::clearBlockedMsgs(){
2482         int len = bufferedMsgs.length();
2483         for(int i=0;i<len;i++){
2484                 CkReductionMsg *m = bufferedMsgs.deq();
2485                 doAddContribution(m);
2486         }
2487         len = bufferedRemoteMsgs.length();
2488         for(int i=0;i<len;i++){
2489                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2490                 doRecvMsg(m);
2491         }
2492
2493 }
2494 /*
2495         if the reduction number exceeds the maxModificationRedNo, change the tree
2496         to become the new one
2497 */
2498
2499 void CkNodeReductionMgr::updateTree(){
2500         if(redNo > maxModificationRedNo){
2501                 parent = newParent;
2502                 kids = newKids;
2503                 maxModificationRedNo = INT_MAX;
2504                 numKids = kids.size();
2505                 readyDeletion = true;
2506                 additionalGCount = newAdditionalGCount;
2507                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2508                 for(int i=0;i<(int)(newKids.size());i++){
2509                         DEBREVAC(("%d ",newKids[i]));
2510                 }
2511                 DEBREVAC(("\n"));
2512         //      startReduction(redNo,CkMyNode());
2513         }else{
2514                 if(maxModificationRedNo != INT_MAX){
2515                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2516                         startReduction(redNo,CkMyNode());
2517                         finishReduction();
2518                 }       
2519         }
2520 }
2521
2522
2523 void CkNodeReductionMgr::doneEvacuate(){
2524         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2525 /*      if(oldleaf){
2526                 
2527                         It used to be a leaf
2528                         Then as soon as future messages have been emptied you can 
2529                         send the parent a message telling them that they are not going
2530                         to receive anymore messages from this child
2531                 
2532                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2533                 while(futureMsgs.length() != 0){
2534                         int n = futureMsgs.length();
2535                         for(int i=0;i<n;i++){
2536                                 CkReductionMsg *m = futureMsgs.deq();
2537                                 if(isPresent(m->redNo)){
2538                                         msgs.enq(m);
2539                                 }else{
2540                                         futureMsgs.enq(m);
2541                                 }
2542                         }
2543                         CkReductionMsg *result = reduceMessages();
2544                         thisProxy[treeParent()].RecvMsg(result);
2545                         redNo++;
2546                 }
2547                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2548                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2549         }else{*/
2550                 if(readyDeletion){
2551                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2552                 }else{
2553                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2554                 }
2555 //      }
2556 }
2557
2558 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2559         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2560         for(int i=0;i<numKids;i++){
2561                 if(kids[i] == deletedChild){
2562                         kids.remove(i);
2563                         break;
2564                 }
2565         }
2566         numKids = kids.length();
2567         finishReduction();
2568 }
2569
2570 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2571         for(int i=0;i<(int)(newKids.length());i++){
2572                 if(newKids[i] == deletedChild){
2573                         newKids.remove(i);
2574                         break;
2575                 }
2576         }
2577         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2578         for(int i=0;i<(int)(newKids.size());i++){
2579                 DEBREVAC(("%d ",newKids[i]));
2580         }
2581         DEBREVAC(("\n"));
2582         finishReduction();
2583 }
2584
2585 int CkNodeReductionMgr::findMaxRedNo(){
2586         int max = redNo;
2587         for(int i=0;i<futureRemoteMsgs.length();i++){
2588                 if(futureRemoteMsgs[i]->redNo  > max){
2589                         max = futureRemoteMsgs[i]->redNo;
2590                 }
2591         }
2592         /*
2593                 if redNo is max (that is no future message) and the current reduction has not started
2594                 then tree can be changed before the reduction redNo can be started
2595         */ 
2596         if(redNo == max && msgs.length() == 0){
2597                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2598                 max--;
2599         }
2600         return max;
2601 }
2602
2603 #include "CkReduction.def.h"