added a reduction operation "nop" which does nothing.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   disableNotifyChildrenStart = CmiFalse;
208   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
209 }
210
211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
212 {
213   redNo=0;
214   completedRedNo = -1;
215   inProgress=CmiFalse;
216   creating=CmiFalse;
217   startRequested=CmiFalse;
218   gcount=lcount=0;
219   nContrib=nRemote=0;
220   maxStartRequest=0;
221   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
222 }
223
224 void CkReductionMgr::flushStates()
225 {
226   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
227   redNo=0;
228   completedRedNo = -1;
229   inProgress=CmiFalse;
230   creating=CmiFalse;
231   gcount=lcount=0;
232   startRequested=CmiFalse;
233   nContrib=nRemote=0;
234   maxStartRequest=0;
235
236   while (!msgs.isEmpty()) { delete msgs.deq(); }
237   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
238   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
239   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
240
241   adjVec.length()=0;
242
243   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
244 }
245
246 //////////// Reduction Manager Client API /////////////
247
248 //Add the given client function.  Overwrites any previous client.
249 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
250 {
251   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
252
253   if (CkMyPe()!=0)
254           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
255   storedCallback=*cb;
256 #if ! GROUP_LEVEL_REDUCTION
257   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
258   nodeProxy.ckSetReductionClient(callback);
259 #endif
260 }
261
262 ///////////////////////////// Contributor ////////////////////////
263 //Contributors keep a copy of this structure:
264
265 /*Contributor migration support:
266 */
267 void contributorInfo::pup(PUP::er &p)
268 {
269   p(redNo);
270 }
271
272 ////////////////////// Contributor list maintainance: /////////////////
273 //These just set and clear the "creating" flag to prevent
274 // reductions from finishing early because not all elements
275 // have been created.
276 void CkReductionMgr::creatingContributors(void)
277 {
278   DEBR((AA"Creating contributors...\n"AB));
279   creating=CmiTrue;
280 }
281 void CkReductionMgr::doneCreatingContributors(void)
282 {
283   DEBR((AA"Done creating contributors...\n"AB));
284   creating=CmiFalse;
285   if (startRequested) startReduction(redNo,CkMyPe());
286   finishReduction();
287 }
288
289 //A new contributor will be created
290 void CkReductionMgr::contributorStamped(contributorInfo *ci)
291 {
292   DEBR((AA"Contributor %p stamped\n"AB,ci));
293   //There is another contributor
294   gcount++;
295   if (inProgress)
296   {
297     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
298     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
299   } else
300     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
301 }
302
303 //A new contributor was actually created
304 void CkReductionMgr::contributorCreated(contributorInfo *ci)
305 {
306   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
307   //We've got another contributor
308   lcount++;
309   //He may not need to contribute to some of our reductions:
310   for (int r=redNo;r<ci->redNo;r++)
311     adj(r).lcount--;//He won't be contributing to r here
312 }
313
314 /*Don't expect any more contributions from this one.
315 This is rather horrifying because we now have to make
316 sure the global element count accurately reflects all the
317 contributions the element made before it died-- these may stretch
318 far into the future.  The adj() vector is what saves us here.
319 */
320 void CkReductionMgr::contributorDied(contributorInfo *ci)
321 {
322 #if CMK_MEM_CHECKPOINT
323   // ignore from listener if it is during restart from crash
324   if (CkInRestarting()) return;
325 #endif
326   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
327   //We lost a contributor
328   gcount--;
329
330   if (ci->redNo<redNo)
331   {//Must have been migrating during reductions-- root is waiting for his
332   // contribution, which will never come.
333     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
334     for (int r=ci->redNo;r<redNo;r++)
335       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
336   }
337
338   //Add to the global count for all his future messages (wherever they are)
339   int r;
340   for (r=redNo;r<ci->redNo;r++)
341   {//He already contributed to this reduction, but won't show up in global count.
342     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
343     adj(r).gcount++;
344   }
345
346   lcount--;
347   //He's already contributed to several reductions here
348   for (r=redNo;r<ci->redNo;r++)
349     adj(r).lcount++;//He'll be contributing to r here
350
351   finishReduction();
352 }
353
354 //Migrating away (note that global count doesn't change)
355 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
356 {
357   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
358   lcount--;//We lost a local
359   //He's already contributed to several reductions here
360   for (int r=redNo;r<ci->redNo;r++)
361     adj(r).lcount++;//He'll be contributing to r here
362
363   finishReduction();
364 }
365
366 //Migrating in (note that global count doesn't change)
367 void CkReductionMgr::contributorArriving(contributorInfo *ci)
368 {
369   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
370   lcount++;//We gained a local
371 #if CMK_MEM_CHECKPOINT
372   // ignore from listener if it is during restart from crash
373   // because the ci may be old.
374   if (CkInRestarting()) return;
375 #endif
376   //He has already contributed (elsewhere) to several reductions:
377   for (int r=redNo;r<ci->redNo;r++)
378     adj(r).lcount--;//He won't be contributing to r here
379
380 }
381
382 //Contribute-- the given msg can contain any data.  The reducerType
383 // field of the message must be valid.
384 // Each contributor must contribute exactly once to the each reduction.
385 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
386 {
387 #if CMK_BLUEGENE_CHARM
388   _TRACE_BG_TLINE_END(&(m->log));
389 #endif
390   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
391   //m->ci=ci;
392   m->redNo=ci->redNo++;
393   m->sourceFlag=-1;//A single contribution
394   m->gcount=0;
395
396 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
397     if(lcount == 0){
398         m->sourceProcessorCount = 1;
399     }else{
400         m->sourceProcessorCount = lcount;
401     }
402     m->fromPE = CmiMyPe();
403     Chare *oldObj =CpvAccess(_currentObj);
404     char currentObjName[100];
405     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
406     thisProxy[0].contributeViaMessage(m);
407 #else
408   addContribution(m);
409 #endif
410 }
411
412 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
413 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
414     CmiAssert(CmiMyPe() == 0);
415     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
416     if(redNo == 0){
417         if(perProcessorCounts[m->fromPE] == -1){
418             processorCount++;
419             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
420             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
421         }else{
422             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
423                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
424                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
425             }
426         }
427         if(processorCount == CmiNumPes()){
428             totalCount = 0;
429             for(int i=0;i<CmiNumPes();i++){
430                 CmiAssert(perProcessorCounts[i] != -1);
431                 totalCount += perProcessorCounts[i];
432             }
433             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
434         }
435         if(m->sourceProcessorCount == 0){
436             if(processorCount == CmiNumPes()){
437                 finishReduction();
438             }
439             return;
440         }
441     }
442     addContribution(m);
443 }
444 #else
445 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
446 #endif
447
448 //////////// Reduction Manager Remote Entry Points /////////////
449 //Sent down the reduction tree (used by barren PEs)
450 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
451 {
452  if(CkMyPe()==0){
453         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
454         //delete m;
455         //return;
456  }
457  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
458  int srcPE = (UsrToEnv(m))->getSrcPe();
459 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
460   if (isPresent(m->num) && !inProgress)
461   {
462     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
463     startReduction(m->num,srcPE);
464     finishReduction();
465   } else if (isFuture(m->num)){
466 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
467           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
468           if(maxStartRequest < m->num){
469                   maxStartRequest = m->num;
470           }
471  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
472           
473     }
474   else //is Past
475     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
476   delete m;
477 #else
478     if(redNo == 0){
479         if(lcount == 0){
480             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
481             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
482             dummy->fromPE = CmiMyPe();
483             dummy->sourceProcessorCount = 0;
484             dummy->redNo = 0;
485             thisProxy[0].contributeViaMessage(dummy);
486         }
487     }
488 #endif
489 }
490
491 //Sent to root of reduction tree with reduction contribution
492 // of migrants that missed the main reduction.
493 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
494 {
495 #if GROUP_LEVEL_REDUCTION
496 #if CMK_BLUEGENE_CHARM
497   _TRACE_BG_TLINE_END(&(m->log));
498 #endif
499   addContribution(m);
500 #else
501   m->secondaryCallback = m->callback;
502   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
503   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
504   nodeMgr->LateMigrantMsg(m);
505 /*      int len = finalMsgs.length();
506         finalMsgs.enq(m);
507 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
508         endArrayReduction();*/
509 #endif
510 }
511
512 //A late migrating contributor will never contribute to this reduction
513 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
514 {
515   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
516   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
517  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
518   adj(m->num).gcount--;//He won't be contributing to this one.
519   finishReduction();
520   delete m;
521 }
522
523 //////////// Reduction Manager State /////////////
524 void CkReductionMgr::startReduction(int number,int srcPE)
525 {
526   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
527   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
528   if (inProgress){
529         DEBR((AA"This reduction is already in progress\n"AB));
530         return;//This reduction already started
531   }
532   if (creating) //Don't start yet-- we're creating elements
533   {
534     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
535     startRequested=CmiTrue;
536     return;
537   }
538
539 //If none of these cases, we need to start the reduction--
540   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
541   inProgress=CmiTrue;
542  
543
544         /*
545                 FAULT_EVAC
546         */
547   if(!CmiNodeAlive(CkMyPe())){
548         return;
549   }
550
551   if(disableNotifyChildrenStart) return;
552   
553 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
554   if(CmiMyPe() == 0 && redNo == 0){
555             for(int j=0;j<CkNumPes();j++){
556                 if(j != CkMyPe() && j != srcPE){
557                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
558                 }
559             }
560             if(lcount == 0){
561                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
562                 dummy->fromPE = CmiMyPe();
563                 dummy->sourceProcessorCount = 0;
564                 dummy->redNo = 0;
565                 thisProxy[0].contributeViaMessage(dummy);
566             }
567     }   else{
568         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
569     }
570
571
572 #else
573   //Sent start requests to our kids (in case they don't already know)
574 #if GROUP_LEVEL_REDUCTION
575   for (int k=0;k<treeKids();k++)
576   {
577     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
578     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
579   }
580 #else
581   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
582 #endif
583 #endif
584         
585         /*
586   int temp;
587   //making it a broadcast done only by PE 0
588   if(!hasParent()){
589                 temp = completedRedNo+1;
590                 for(int i=temp;i<=number;i++){
591                         for(int j=0;j<CkNumPes();j++){
592                                 if(j != CkMyPe() && j != srcPE){
593                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
594                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
595                                         }
596                                 }
597                         }
598                 }       
599         }       else{
600                 temp = number;
601         }*/
602 /*  if(!hasParent()){
603                 temp = completedRedNo+1;
604         }       else{
605                 temp = number;
606         }
607         for(int i=temp;i<=number;i++){
608         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
609                 if(hasParent()){
610           // kick-start your parent too ...
611                         if(treeParent() != srcPE){
612                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
613 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
614     CpvAccess(_currentObj) = oldObj;
615 #endif
616                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
617                                 }       
618                         }       
619                 }
620           for (int k=0;k<treeKids();k++)
621           {
622                         if(firstKid()+k != srcPE){
623                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
624                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
625                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
626                                 }       
627                         }       
628         }
629         }
630         */
631 }       
632
633 /*Handle a message from one element for the reduction*/
634 void CkReductionMgr::addContribution(CkReductionMsg *m)
635 {
636   if (isPast(m->redNo))
637   {
638 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
639         CmiAbort("this version should not have late migrations");
640 #else
641         //We've moved on-- forward late contribution straight to root
642     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
643         // if (!hasParent()) //Root moved on too soon-- should never happen
644         //   CkAbort("Late reduction contribution received at root!\n");
645     thisProxy[0].LateMigrantMsg(m);
646 #endif
647   }
648   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
649     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
650     futureMsgs.enq(m);
651   } else {// An ordinary contribution
652     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
653    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
654     startReduction(m->redNo,CkMyPe());
655     msgs.enq(m);
656     nContrib++;
657     finishReduction();
658   }
659 }
660
661 /**function checks if it has got all contributions that it is supposed to
662 get at this processor. If it is done it sends the reduced result to the local
663 nodegroup */
664 void CkReductionMgr::finishReduction(void)
665 {
666   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
667   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
668   if ((!inProgress) || creating){
669         DEBR((AA"Either not in Progress or creating\n"AB));
670         return;
671   }
672   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
673 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
674
675   if (nContrib<(lcount+adj(redNo).lcount)){
676          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
677          return;//Need more local messages
678   }
679 #if GROUP_LEVEL_REDUCTION
680   if (nRemote<treeKids()) return;//Need more remote messages
681 #endif
682  
683 #else
684
685     if(CkMyPe() != 0){
686         if(redNo != 0){
687             return;
688         }else{
689             CmiAssert(lcount == 0);
690             CkReductionMsg *dummy = reduceMessages();
691             dummy->fromPE = CmiMyPe();
692             dummy->sourceProcessorCount = 0;
693             thisProxy[0].contributeViaMessage(dummy);
694             return;
695         }
696     }
697     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
698         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
699          return;//Need more local messages
700   }else{
701         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
702     }
703
704
705 #endif
706
707   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
708   CkReductionMsg *result=reduceMessages();
709   result->redNo=redNo;
710 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
711
712 #if GROUP_LEVEL_REDUCTION
713   if (hasParent())
714   {//Pass data up tree to parent
715     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
716     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
717     result->gcount+=gcount+adj(redNo).gcount;
718     thisProxy[treeParent()].RecvMsg(result);
719   }
720   else 
721   {//We are root-- pass data to client
722     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
723     int totalElements=result->gcount+gcount+adj(redNo).gcount;
724     if (totalElements>result->nSources()) 
725     {
726       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
727       msgs.enq(result);
728       return; // Wait for migrants to contribute
729     } else if (totalElements<result->nSources()) {
730       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
731       CkAbort("ERROR! Too many contributions at root!\n");
732     }
733     DEBR((AA"Passing result to client function\n"AB));
734     CkSetRefNum(result, result->getUserFlag());
735     if (!result->callback.isInvalid())
736             result->callback.send(result);
737     else if (!storedCallback.isInvalid())
738             storedCallback.send(result);
739     else
740             CkAbort("No reduction client!\n"
741                     "You must register a client with either SetReductionClient or during contribute.\n");
742   }
743
744 #else
745   result->gcount+=gcount+adj(redNo).gcount;
746
747   result->secondaryCallback = result->callback;
748   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
749         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
750
751   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
752
753  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
754   
755   // Find our node reduction manager, and pass reduction to him:
756   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
757   nodeMgr->contributeArrayReduction(result);
758 #endif
759 #else                // _FAULT_MLOG_
760   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
761
762     CkSetRefNum(result, result->getUserFlag());
763     if (!result->callback.isInvalid())
764         result->callback.send(result);
765     else if (!storedCallback.isInvalid())
766         storedCallback.send(result);
767     else{
768       DEBR(("No reduction client for group %d \n",thisgroup.idx));
769         CkAbort("No reduction client!\n"
770             "You must register a client with either SetReductionClient or during contribute.\n");
771     }
772
773     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
774
775 #endif               // _FAULT_MLOG_
776
777   //House Keeping Operations will have to check later what needs to be changed
778   redNo++;
779   //Shift the count adjustment vector down one slot (to match new redNo)
780   int i;
781 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
782         if(CkMyPe()!=0){
783 #else
784     {
785 #endif
786 //      int i;
787         completedRedNo++;
788         for (i=1;i<(int)(adjVec.length());i++){
789            adjVec[i-1]=adjVec[i];
790         }
791         adjVec.length()--;  
792   }
793
794   inProgress=CmiFalse;
795   startRequested=CmiFalse;
796   nRemote=nContrib=0;
797
798   //Look through the future queue for messages we can now handle
799   int n=futureMsgs.length();
800   for (i=0;i<n;i++)
801   {
802     CkReductionMsg *m=futureMsgs.deq();
803     if (m!=NULL) //One of these addContributions may have finished us.
804       addContribution(m);//<- if *still* early, puts it back in the queue
805   }
806 #if GROUP_LEVEL_REDUCTION
807   n=futureRemoteMsgs.length();
808   for (i=0;i<n;i++)
809   {
810     CkReductionMsg *m=futureRemoteMsgs.deq();
811     if (m!=NULL) {
812       RecvMsg(m);//<- if *still* early, puts it back in the queue
813     }
814   }
815 #endif
816
817 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
818   if(maxStartRequest >= redNo){
819           startReduction(redNo,CkMyPe());
820           finishReduction();
821   }
822  
823 #endif
824 }
825
826 //Sent up the reduction tree with reduced data
827   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
828 {
829 #if GROUP_LEVEL_REDUCTION
830 #if CMK_BLUEGENE_CHARM
831   _TRACE_BG_TLINE_END(&m->log);
832 #endif
833   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
834     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
835     startReduction(m->redNo, CkMyPe());
836     msgs.enq(m);
837     nRemote++;
838     finishReduction();
839   }
840   else if (isFuture(m->redNo)) {
841     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
842     futureRemoteMsgs.enq(m);
843   } 
844   else CkAbort("Recv'd late remote contribution!\n");
845 #endif
846 }
847
848 //////////// Reduction Manager Utilities /////////////
849
850 //Return the countAdjustment struct for the given redNo:
851 countAdjustment &CkReductionMgr::adj(int number)
852 {
853   number-=completedRedNo;
854   number--;
855   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
856   //Pad the adjustment vector with zeros until it's at least number long
857   while ((int)(adjVec.length())<=number)
858     adjVec.push_back(countAdjustment());
859   return adjVec[number];
860 }
861
862 //Combine (& free) the current message vector msgs.
863 CkReductionMsg *CkReductionMgr::reduceMessages(void)
864 {
865 #if CMK_BLUEGENE_CHARM
866   _TRACE_BG_END_EXECUTE(1);
867   void* _bgParentLog = NULL;
868   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
869 #endif
870   CkReductionMsg *ret=NULL;
871
872   //Look through the vector for a valid reducer, swapping out placeholder messages
873   CkReduction::reducerType r=CkReduction::invalid;
874   int msgs_gcount=0;//Reduced gcount
875   int msgs_nSources=0;//Reduced nSources
876   int msgs_userFlag=-1;
877   CkCallback msgs_callback;
878   int i;
879   int nMsgs=0;
880   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
881   CkReductionMsg *m;
882   bool isMigratableContributor;
883
884   // Copy message queue into msgArr, skipping placeholders:
885   while (NULL!=(m=msgs.deq()))
886   {
887     msgs_gcount+=m->gcount;
888     if (m->sourceFlag!=0)
889     { //This is a real message from an element, not just a placeholder
890       msgArr[nMsgs++]=m;
891       msgs_nSources+=m->nSources();
892       r=m->reducer;
893       if (!m->callback.isInvalid())
894         msgs_callback=m->callback;
895       if (m->userFlag!=-1)
896         msgs_userFlag=m->userFlag;
897                         
898         isMigratableContributor=m->isMigratableContributor();
899 #if CMK_BLUEGENE_CHARM
900         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
901 #endif
902     }
903     else
904     { //This is just a placeholder message-- forget it
905       delete m;
906     }
907   }
908
909   if (nMsgs==0||r==CkReduction::invalid)
910   //No valid reducer in the whole vector
911     ret=CkReductionMsg::buildNew(0,NULL);
912   else
913   {//Use the reducer to reduce the messages
914                 //if there is only one msg to be reduced just return that message
915     if(nMsgs == 1){
916         ret = msgArr[0];        
917     }else{
918         CkReduction::reducerFn f=CkReduction::reducerTable[r];
919         ret=(*f)(nMsgs,msgArr);
920     }
921     ret->reducer=r;
922   }
923
924
925
926 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
927
928 #if CRITICAL_PATH_DEBUG > 3
929   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
930 #endif
931
932   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
933   path.updateMax(UsrToEnv(ret));
934   // Combine the critical paths from all the reduction messages
935   for (i=0;i<nMsgs;i++){
936     if (msgArr[i]!=ret){
937       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
938       path.updateMax(UsrToEnv(msgArr[i]));
939     }
940   }
941   
942
943 #if CRITICAL_PATH_DEBUG > 3
944   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
945 #endif
946   
947   PathHistoryTableEntry tableEntry(path);
948   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
949   
950 #endif
951
952         //Go back through the vector, deleting old messages
953   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
954         delete [] msgArr;
955
956   //Set the message counts
957   ret->redNo=redNo;
958   ret->gcount=msgs_gcount;
959   ret->userFlag=msgs_userFlag;
960   ret->callback=msgs_callback;
961   ret->sourceFlag=msgs_nSources;
962         ret->setMigratableContributor(isMigratableContributor);
963   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
964
965   return ret;
966 }
967
968
969 //Checkpointing utilities
970 //pack-unpack method for CkReductionMsg
971 //if packing pack the message and then unpack and return it
972 //if unpacking allocate memory for it read it off disk and then unapck
973 //and return it
974 void CkReductionMgr::pup(PUP::er &p)
975 {
976 //We do not store the client function pointer or the client function parameter,
977 //it is the responsibility of the programmer to correctly restore these
978   CkGroupInitCallback::pup(p);
979   p(redNo);
980   p(completedRedNo);
981   p(inProgress); p(creating); p(startRequested);
982   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
983   p|msgs;
984   p|futureMsgs;
985   p|futureRemoteMsgs;
986   p|finalMsgs;
987   p|adjVec;
988   p|nodeProxy;
989   p|storedCallback;
990     // handle CkReductionClientBundle
991   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
992   {
993     CkReductionClientBundle *bd;
994     if (p.isUnpacking()) 
995       bd = new CkReductionClientBundle;
996     else
997       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
998     p|*bd;
999     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1000   }
1001
1002   // subtle --- Gengbin
1003   // Group : CkReductionMgr
1004   // CkArray: CkReductionMgr
1005   // lcount/gcount in Group is set in Group constructor
1006   // lcount/gcount in CkArray is not, it is set when array elements are created
1007   // we can not pup because inserting array elems will add the counters again
1008 //  p|lcount;
1009 //  p|gcount;
1010 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1011 //  p|lcount;
1012 //  //  p|gcount;
1013 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1014 #endif
1015   if(p.isUnpacking()){
1016     thisProxy = thisgroup;
1017     maxStartRequest=0;
1018 #if GROUP_LEVEL_REDUCTION
1019 #ifdef BINOMIAL_TREE
1020     init_BinomialTree();
1021 #else
1022    init_BinaryTree();
1023 #endif
1024 #endif
1025   }
1026
1027   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1028 }
1029
1030
1031 //Callback for doing Reduction through NodeGroups added by Sayantan
1032
1033 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1034
1035         int total = m->gcount+adj(m->redNo).gcount;
1036         finalMsgs.enq(m);
1037         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1038         adj(m->redNo).mainRecvd = 1;
1039         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1040         endArrayReduction();
1041 }
1042
1043 void CkReductionMgr :: endArrayReduction(){
1044         CkReductionMsg *ret=NULL;
1045         int nMsgs=finalMsgs.length();
1046         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1047         //Look through the vector for a valid reducer, swapping out placeholder messages
1048         //CkPrintf("Length of Final Message %d \n",nMsgs);
1049         CkReduction::reducerType r=CkReduction::invalid;
1050         int msgs_gcount=0;//Reduced gcount
1051         int msgs_nSources=0;//Reduced nSources
1052         int msgs_userFlag=-1;
1053         CkCallback msgs_callback;
1054         CkCallback msgs_secondaryCallback;
1055         CkVec<CkReductionMsg *> tempMsgs;
1056         int i;
1057         int numMsgs = 0;
1058         for (i=0;i<nMsgs;i++)
1059         {
1060                 CkReductionMsg *m=finalMsgs.deq();
1061                 if(m->redNo == completedRedNo +1){
1062                         msgs_gcount+=m->gcount;
1063                         if (m->sourceFlag!=0)
1064                         { //This is a real message from an element, not just a placeholder
1065                                 msgs_nSources+=m->nSources();
1066                                 r=m->reducer;
1067                                 if (!m->callback.isInvalid())
1068                                 msgs_callback=m->callback;
1069                                 if(!m->secondaryCallback.isInvalid())
1070                                         msgs_secondaryCallback = m->secondaryCallback;
1071                                 if (m->userFlag!=-1)
1072                                         msgs_userFlag=m->userFlag;
1073                                 tempMsgs.push_back(m);
1074                         }
1075                 }else{
1076                         finalMsgs.enq(m);
1077                 }
1078
1079         }
1080         numMsgs = tempMsgs.length();
1081
1082         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1083
1084         if(numMsgs == 0){
1085                 return;
1086         }
1087         if(adj(completedRedNo+1).mainRecvd == 0){
1088                 for(i=0;i<numMsgs;i++){
1089                         finalMsgs.enq(tempMsgs[i]);
1090                 }
1091                 return;
1092         }
1093
1094 /*
1095         NOT NEEDED ANYMORE DONE at nodegroup level
1096         if(msgs_gcount  > msgs_nSources){
1097                 for(i=0;i<numMsgs;i++){
1098                         finalMsgs.enq(tempMsgs[i]);
1099                 }
1100                 return;
1101         }*/
1102
1103         if (nMsgs==0||r==CkReduction::invalid)
1104                 //No valid reducer in the whole vector
1105                 ret=CkReductionMsg::buildNew(0,NULL);
1106         else{//Use the reducer to reduce the messages
1107                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1108                 // has to be corrected elements from above need to be put into a temporary vector
1109                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1110                 ret=(*f)(numMsgs,msgArr);
1111                 ret->reducer=r;
1112
1113         }
1114
1115         
1116 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1117
1118 #if CRITICAL_PATH_DEBUG > 3
1119         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1120 #endif
1121
1122         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1123         path.updateMax(UsrToEnv(ret));
1124         // Combine the critical paths from all the reduction messages into the header for the new result
1125         for (i=0;i<numMsgs;i++){
1126           if (tempMsgs[i]!=ret){
1127             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1128             path.updateMax(UsrToEnv(tempMsgs[i]));
1129           } else {
1130             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1131           }
1132         }
1133         // Also consider the path which got us into this entry method
1134
1135 #if CRITICAL_PATH_DEBUG > 3
1136         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1137 #endif
1138
1139 #endif
1140   
1141
1142
1143
1144
1145         for(i = 0;i<numMsgs;i++){
1146                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1147         }
1148
1149         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1150         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1151
1152         ret->redNo=completedRedNo+1;
1153         ret->gcount=msgs_gcount;
1154         ret->userFlag=msgs_userFlag;
1155         ret->callback=msgs_callback;
1156         ret->secondaryCallback = msgs_secondaryCallback;
1157         ret->sourceFlag=msgs_nSources;
1158
1159         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1160
1161         CkSetRefNum(ret, ret->getUserFlag());
1162         if (!ret->secondaryCallback.isInvalid())
1163             ret->secondaryCallback.send(ret);
1164     else if (!storedCallback.isInvalid())
1165             storedCallback.send(ret);
1166     else{
1167       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1168             CkAbort("No reduction client!\n"
1169                     "You must register a client with either SetReductionClient or during contribute.\n");
1170     }
1171         completedRedNo++;
1172
1173         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1174
1175         for (i=1;i<(int)(adjVec.length());i++)
1176                 adjVec[i-1]=adjVec[i];
1177         adjVec.length()--;
1178         endArrayReduction();
1179 }
1180
1181 #if GROUP_LEVEL_REDUCTION
1182
1183 void CkReductionMgr::init_BinaryTree(){
1184         parent = (CkMyPe()-1)/TREE_WID;
1185         int firstkid = CkMyPe()*TREE_WID+1;
1186         numKids=CkNumPes()-firstkid;
1187         if (numKids>TREE_WID) numKids=TREE_WID;
1188         if (numKids<0) numKids=0;
1189
1190         for(int i=0;i<numKids;i++){
1191                 kids.push_back(firstkid+i);
1192                 newKids.push_back(firstkid+i);
1193         }
1194 }
1195
1196 void CkReductionMgr::init_BinomialTree(){
1197         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1198         /*upperSize = (unsigned )pow((double)2,depth);*/
1199         upperSize = (unsigned) 1 << depth;
1200         label = upperSize-CkMyPe()-1;
1201         int p=label;
1202         int count=0;
1203         while( p > 0){
1204                 if(p % 2 == 0)
1205                         break;
1206                 else{
1207                         p = p/2;
1208                         count++;
1209                 }
1210         }
1211         /*parent = label + rint(pow((double)2,count));*/
1212         parent = label + (1<<count);
1213         parent = upperSize -1 -parent;
1214         int temp;
1215         if(count != 0){
1216                 numKids = 0;
1217                 for(int i=0;i<count;i++){
1218                         /*temp = label - rint(pow((double)2,i));*/
1219                         temp = label - (1<<i);
1220                         temp = upperSize-1-temp;
1221                         if(temp <= CkNumPes()-1){
1222                                 kids.push_back(temp);
1223                                 numKids++;
1224                         }
1225                 }
1226         }else{
1227                 numKids = 0;
1228         //      kids = NULL;
1229         }
1230 }
1231
1232
1233 int CkReductionMgr::treeRoot(void)
1234 {
1235   return 0;
1236 }
1237 CmiBool CkReductionMgr::hasParent(void) //Root Node
1238 {
1239   return (CmiBool)(CkMyPe()!=treeRoot());
1240 }
1241 int CkReductionMgr::treeParent(void) //My parent Node
1242 {
1243   return parent;
1244 }
1245
1246 int CkReductionMgr::firstKid(void) //My first child Node
1247 {
1248   return CkMyPe()*TREE_WID+1;
1249 }
1250 int CkReductionMgr::treeKids(void)//Number of children in tree
1251 {
1252   return numKids;
1253 }
1254
1255 #endif
1256
1257 /////////////////////////////////////////////////////////////////////////
1258
1259 ////////////////////////////////
1260 //CkReductionMsg support
1261
1262 //ReductionMessage default private constructor-- does nothing
1263 CkReductionMsg::CkReductionMsg(){}
1264 CkReductionMsg::~CkReductionMsg(){}
1265
1266 //This define gives the distance from the start of the CkReductionMsg
1267 // object to the start of the user data area (just below last object field)
1268 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1269
1270 //"Constructor"-- builds and returns a new CkReductionMsg.
1271 //  the "data" array you specify will be copied into this object.
1272 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1273     CkReduction::reducerType reducer, CkReductionMsg *buf)
1274 {
1275   int len[1];
1276   len[0]=NdataSize;
1277   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1278   
1279   ret->dataSize=NdataSize;
1280   if (srcData!=NULL && !buf)
1281     memcpy(ret->data,srcData,NdataSize);
1282   ret->userFlag=-1;
1283   ret->reducer=reducer;
1284   //ret->ci=NULL;
1285   ret->sourceFlag=-1000;
1286   ret->gcount=0;
1287   ret->migratableContributor = true;
1288 #if CMK_BLUEGENE_CHARM
1289   ret->log = NULL;
1290 #endif
1291   return ret;
1292 }
1293
1294 // Charm kernel message runtime support:
1295 void *
1296 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1297 {
1298   int totalsize=ARM_DATASTART+(*sz);
1299   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1300   CkReductionMsg *ret = (CkReductionMsg *)
1301     CkAllocMsg(msgnum,totalsize,priobits);
1302   ret->data=(void *)(&ret->dataStorage);
1303   return (void *) ret;
1304 }
1305
1306 void *
1307 CkReductionMsg::pack(CkReductionMsg* in)
1308 {
1309   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1310   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1311   in->data = NULL;
1312   return (void*) in;
1313 }
1314
1315 CkReductionMsg* CkReductionMsg::unpack(void *in)
1316 {
1317   CkReductionMsg *ret = (CkReductionMsg *)in;
1318   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1319   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1320   ret->data=(void *)(&ret->dataStorage);
1321   return ret;
1322 }
1323
1324
1325 /////////////////////////////////////////////////////////////////////////////////////
1326 ///////////////// Builtin Reducer Functions //////////////
1327 /* A simple reducer, like sum_int, looks like this:
1328 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1329 {
1330   int i,ret=0;
1331   for (i=0;i<nMsg;i++)
1332     ret+=*(int *)(msg[i]->getData());
1333   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1334 }
1335
1336 To keep the code small and easy to change, the implementations below
1337 are built with preprocessor macros.
1338 */
1339
1340 //////////////// simple reducers ///////////////////
1341 /*A define used to quickly and tersely construct simple reductions.
1342 The basic idea is to use the first message's data array as
1343 (pre-initialized!) scratch space for folding in the other messages.
1344  */
1345
1346 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1347 {
1348         CkAbort("Called the invalid reducer type 0.  This probably\n"
1349                 "means you forgot to initialize your custom reducer index.\n");
1350         return NULL;
1351 }
1352
1353 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1354 {
1355   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1356 }
1357
1358 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1359 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1360 {\
1361   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1362   int m,i;\
1363   int nElem=msg[0]->getLength()/sizeof(dataType);\
1364   dataType *ret=(dataType *)(msg[0]->getData());\
1365   for (m=1;m<nMsg;m++)\
1366   {\
1367     dataType *value=(dataType *)(msg[m]->getData());\
1368     for (i=0;i<nElem;i++)\
1369     {\
1370       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1371       loop\
1372     }\
1373   }\
1374   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1375   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1376 }
1377
1378 //Use this macro for reductions that have the same type for all inputs
1379 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1380   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1381   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1382   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1383
1384
1385 //Compute the sum the numbers passed by each element.
1386 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1387
1388 //Compute the product of the numbers passed by each element.
1389 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1390
1391 //Compute the largest number passed by any element.
1392 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1393
1394 //Compute the smallest integer passed by any element.
1395 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1396
1397
1398 //Compute the logical AND of the integers passed by each element.
1399 // The resulting integer will be zero if any source integer is zero; else 1.
1400 SIMPLE_REDUCTION(logical_and,int,"%d",
1401         if (value[i]==0)
1402      ret[i]=0;
1403   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1404 )
1405
1406 //Compute the logical OR of the integers passed by each element.
1407 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1408 SIMPLE_REDUCTION(logical_or,int,"%d",
1409   if (value[i]!=0)
1410            ret[i]=1;
1411   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1412 )
1413
1414 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1415 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1416
1417 //Select one random message to pass on
1418 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1419   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1420 }
1421
1422 /////////////// concat ////////////////
1423 /*
1424 This reducer simply appends the data it recieves from each element,
1425 without any housekeeping data to separate them.
1426 */
1427 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1428 {
1429   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1430   //Figure out how big a message we'll need
1431   int i,retSize=0;
1432   for (i=0;i<nMsg;i++)
1433       retSize+=msg[i]->getSize();
1434
1435   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1436
1437   //Allocate a new message
1438   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1439
1440   //Copy the source message data into the return message
1441   char *cur=(char *)(ret->getData());
1442   for (i=0;i<nMsg;i++) {
1443     int messageBytes=msg[i]->getSize();
1444     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1445     cur+=messageBytes;
1446   }
1447   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1448   return ret;
1449 }
1450
1451 /////////////// set ////////////////
1452 /*
1453 This reducer appends the data it recieves from each element
1454 along with some housekeeping data indicating contribution boundaries.
1455 The message data is thus a list of reduction_set_element structures
1456 terminated by a dummy reduction_set_element with a sourceElement of -1.
1457 */
1458
1459 //This rounds an integer up to the nearest multiple of sizeof(double)
1460 static const int alignSize=sizeof(double);
1461 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1462
1463 //This gives the size (in bytes) of a reduction_set_element
1464 static int SET_SIZE(int dataSize)
1465 {return SET_ALIGN(sizeof(int)+dataSize);}
1466
1467 //This returns a pointer to the next reduction_set_element in the list
1468 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1469 {
1470   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1471   return (CkReduction::setElement *)next;
1472 }
1473
1474 //Combine the data passed by each element into an list of reduction_set_elements.
1475 // Each element may contribute arbitrary data (with arbitrary length).
1476 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1477 {
1478   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1479   //Figure out how big a message we'll need
1480   int i,retSize=0;
1481   for (i=0;i<nMsg;i++) {
1482     if (!msg[i]->isFromUser())
1483     //This message is composite-- it will just be copied over (less terminating -1)
1484       retSize+=(msg[i]->getSize()-sizeof(int));
1485     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1486       retSize+=SET_SIZE(msg[i]->getSize());
1487   }
1488   retSize+=sizeof(int);//Leave room for terminating -1.
1489
1490   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1491
1492   //Allocate a new message
1493   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1494
1495   //Copy the source message data into the return message
1496   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1497   for (i=0;i<nMsg;i++)
1498     if (!msg[i]->isFromUser())
1499     {//This message is composite-- just copy it over (less terminating -1)
1500                         int messageBytes=msg[i]->getSize()-sizeof(int);
1501                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1502                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1503                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1504     }
1505     else //This is a message from an element-- wrap it in a reduction_set_element
1506     {
1507       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1508       cur->dataSize=msg[i]->getSize();
1509       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1510       cur=SET_NEXT(cur);
1511     }
1512   cur->dataSize=-1;//Add a terminating -1.
1513   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1514   return ret;
1515 }
1516
1517 //Utility routine: get the next reduction_set_element in the list
1518 // if there is one, or return NULL if there are none.
1519 //To get all the elements, just keep feeding this procedure's output back to
1520 // its input until it returns NULL.
1521 CkReduction::setElement *CkReduction::setElement::next(void)
1522 {
1523   CkReduction::setElement *n=SET_NEXT(this);
1524   if (n->dataSize==-1)
1525     return NULL;//This is the end of the list
1526   else
1527     return n;//This is just another element
1528 }
1529
1530 /////////////////// Reduction Function Table /////////////////////
1531 CkReduction::CkReduction() {} //Dummy private constructor
1532
1533 //Add the given reducer to the list.  Returns the new reducer's
1534 // reducerType.  Must be called in the same order on every node.
1535 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1536 {
1537   reducerTable[nReducers]=fn;
1538   return (reducerType)nReducers++;
1539 }
1540
1541 /*Reducer table: maps reducerTypes to reducerFns.
1542 It's indexed by reducerType, so the order in this table
1543 must *exactly* match the reducerType enum declaration.
1544 The names don't have to match, but it helps.
1545 */
1546 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1547
1548 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1549     ::invalid_reducer,
1550     ::nop,
1551   //Compute the sum the numbers passed by each element.
1552     ::sum_int,::sum_float,::sum_double,
1553
1554   //Compute the product the numbers passed by each element.
1555     ::product_int,::product_float,::product_double,
1556
1557   //Compute the largest number passed by any element.
1558     ::max_int,::max_float,::max_double,
1559
1560   //Compute the smallest number passed by any element.
1561     ::min_int,::min_float,::min_double,
1562
1563   //Compute the logical AND of the integers passed by each element.
1564   // The resulting integer will be zero if any source integer is zero.
1565     ::logical_and,
1566
1567   //Compute the logical OR of the integers passed by each element.
1568   // The resulting integer will be 1 if any source integer is nonzero.
1569     ::logical_or,
1570
1571     // Compute the logical bitvector AND of the integers passed by each element.
1572     ::bitvec_and,
1573
1574     // Compute the logical bitvector OR of the integers passed by each element.
1575     ::bitvec_or,
1576
1577     // Select one of the messages at random to pass on
1578     ::random,
1579
1580   //Concatenate the (arbitrary) data passed by each element
1581     ::concat,
1582
1583   //Combine the data passed by each element into an list of setElements.
1584   // Each element may contribute arbitrary data (with arbitrary length).
1585     ::set
1586 };
1587
1588
1589
1590
1591
1592
1593
1594
1595 /********** Code added by Sayantan *********************/
1596 /** Locking is a big problem in the nodegroup code for smp.
1597  So a few assumptions have had to be made. There is one lock
1598  called lockEverything. It protects all the data structures 
1599  of the nodegroup reduction mgr. I tried grabbing it separately 
1600  for each datastructure, modifying it and then releasing it and
1601  then grabbing it again, for the next change.
1602  That doesn't really help because the interleaved execution of 
1603  different threads makes the state of the reduction manager 
1604  inconsistent. 
1605  
1606  1. Grab lockEverything before calling finishreduction or startReduction
1607     or doRecvMsg
1608  2. lockEverything is grabbed only in entry methods reductionStarting
1609     or RecvMesg or  addcontribution.
1610  ****/
1611  
1612 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1613 NodeGroup::NodeGroup(void) {
1614   __nodelock=CmiCreateLock();
1615 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1616     mlogData->objID.type = TypeNodeGroup;
1617     mlogData->objID.data.group.onPE = CkMyNode();
1618 #endif
1619
1620 }
1621 NodeGroup::~NodeGroup() {
1622   CmiDestroyLock(__nodelock);
1623 }
1624 void NodeGroup::pup(PUP::er &p)
1625 {
1626   CkNodeReductionMgr::pup(p);
1627   p|reductionInfo;
1628 }
1629
1630 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1631
1632 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1633   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1634  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1635   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1636  }
1637
1638 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1639                                     ((CkNodeReductionMgr *)this),
1640                                     reductionInfo,false)
1641
1642 /* this contribute also adds up the count across all messages it receives.
1643   Useful for summing up number of array elements who have contributed ****/ 
1644 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1645         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1646
1647
1648
1649 //#define BINOMIAL_TREE
1650
1651 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1652   : thisProxy(thisgroup)
1653 {
1654 #ifdef BINOMIAL_TREE
1655   init_BinomialTree();
1656 #else
1657   init_BinaryTree();
1658 #endif
1659   storedCallback=NULL;
1660   redNo=0;
1661   inProgress=CmiFalse;
1662   
1663   startRequested=CmiFalse;
1664   gcount=CkNumNodes();
1665   lcount=1;
1666   nContrib=nRemote=0;
1667   lockEverything = CmiCreateLock();
1668
1669
1670   creating=CmiFalse;
1671   interrupt = 0;
1672   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1673         /*
1674                 FAULT_EVAC
1675         */
1676         blocked = false;
1677         maxModificationRedNo = INT_MAX;
1678         killed=0;
1679         additionalGCount = newAdditionalGCount = 0;
1680 }
1681
1682 void CkNodeReductionMgr::flushStates()
1683 {
1684  if(CkMyRank() == 0){
1685   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1686   redNo=0;
1687   inProgress=CmiFalse;
1688
1689   startRequested=CmiFalse;
1690   gcount=CkNumNodes();
1691   lcount=1;
1692   nContrib=nRemote=0;
1693
1694   creating=CmiFalse;
1695   interrupt = 0;
1696   while (!msgs.isEmpty()) { delete msgs.deq(); }
1697   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1698   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1699   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1700   }
1701 }
1702
1703 //////////// Reduction Manager Client API /////////////
1704
1705 //Add the given client function.  Overwrites any previous client.
1706 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1707 {
1708   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1709   if(cb->isInvalid()){
1710         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1711   }else{
1712         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1713   }
1714
1715   if (CkMyNode()!=0)
1716           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1717   delete storedCallback;
1718   storedCallback=cb;
1719 }
1720
1721
1722
1723 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1724 {
1725 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1726     Chare *oldObj =CpvAccess(_currentObj);
1727     CpvAccess(_currentObj) = this;
1728 #endif
1729
1730   //m->ci=ci;
1731   m->redNo=ci->redNo++;
1732   m->sourceFlag=-1;//A single contribution
1733   m->gcount=0;
1734   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1735   addContribution(m);
1736
1737 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1738     CpvAccess(_currentObj) = oldObj;
1739 #endif
1740 }
1741
1742
1743 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1744 {
1745 #if CMK_BLUEGENE_CHARM
1746   _TRACE_BG_TLINE_END(&m->log);
1747 #endif
1748 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1749     Chare *oldObj =CpvAccess(_currentObj);
1750     CpvAccess(_currentObj) = this;
1751 #endif
1752   //m->ci=ci;
1753   m->redNo=ci->redNo++;
1754   m->gcount=count;
1755   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1756   addContribution(m);
1757   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1758
1759 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1760     CpvAccess(_currentObj) = oldObj;
1761 #endif
1762 }
1763
1764
1765 //////////// Reduction Manager Remote Entry Points /////////////
1766
1767 //Sent down the reduction tree (used by barren PEs)
1768 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1769 {
1770   CmiLock(lockEverything);
1771   if(blocked){
1772         delete m;
1773         CmiUnlock(lockEverything);
1774         return ;
1775   }
1776   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1777   if (isPresent(m->num) && !inProgress)
1778   {
1779     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1780     startReduction(m->num,srcNode);
1781     finishReduction();
1782   } else if (isFuture(m->num)){
1783         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1784   }
1785   else //is Past
1786     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1787   CmiUnlock(lockEverything);
1788   delete m;
1789
1790 }
1791
1792
1793 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1794         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1795         /*
1796                 FAULT_EVAC
1797         */
1798         if(blocked){
1799                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1800                 bufferedRemoteMsgs.enq(m);
1801                 return;
1802         }
1803         
1804         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1805             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1806             startReduction(m->redNo,CkMyNode());
1807             msgs.enq(m);
1808             nRemote++;
1809             finishReduction();
1810         }
1811         else {
1812             if (isFuture(m->redNo)) {
1813                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1814                     futureRemoteMsgs.enq(m);
1815             }else{
1816                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1817                    CkAbort("Recv'd late remote contribution!\n");
1818             }
1819         }
1820         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1821 }
1822
1823 //Sent up the reduction tree with reduced data
1824 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1825 {
1826 #if CMK_BLUEGENE_CHARM
1827   _TRACE_BG_TLINE_END(&m->log);
1828 #endif
1829 #ifndef CMK_CPV_IS_SMP
1830 #if CMK_IMMEDIATE_MSG
1831         if(interrupt == 1){
1832                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1833                 CpvAccess(_qd)->process(-1);
1834                 CmiDelayImmediate();
1835                 return;
1836         }
1837 #endif  
1838 #endif
1839    interrupt = 1;       
1840    CmiLock(lockEverything);   
1841    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1842    doRecvMsg(m);
1843    CmiUnlock(lockEverything);    
1844    interrupt = 0;
1845    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1846 }
1847
1848 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1849 {
1850         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1851         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1852         if (inProgress){
1853                 DEBR((AA"This Node reduction is already in progress\n"AB));
1854                 return;//This reduction already started
1855         }
1856         if (creating) //Don't start yet-- we're creating elements
1857         {
1858                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1859                 startRequested=CmiTrue;
1860                 return;
1861         }
1862         
1863         //If none of these cases, we need to start the reduction--
1864         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1865         inProgress=CmiTrue;
1866         //Sent start requests to our kids (in case they don't already know)
1867         
1868         for (int k=0;k<treeKids();k++)
1869         {
1870 #ifdef BINOMIAL_TREE
1871                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1872                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1873 #else
1874                 if(kids[k] != srcNode){
1875                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1876                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1877                 }       
1878 #endif
1879         }
1880
1881         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1882         // in case, nodegroup does not has the attached red group,
1883         // it has to restart groups again
1884         startLocalGroupReductions(number);
1885 /*
1886         if (startLocalGroupReductions(number) == 0)
1887           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1888 */
1889 }
1890
1891 // restart local groups until succeed
1892 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1893   CmiLock(lockEverything);    
1894   if (startLocalGroupReductions(number) == 0)
1895     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1896   CmiUnlock(lockEverything);    
1897 }
1898
1899 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1900         /*
1901                 FAULT_EVAC
1902         */
1903         if(blocked){
1904                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1905                 bufferedMsgs.enq(m);
1906                 return;
1907         }
1908         
1909         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1910                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1911                 futureMsgs.enq(m);
1912         } else {// An ordinary contribution
1913                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1914                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1915                 startReduction(m->redNo,CkMyNode());
1916                 msgs.enq(m);
1917                 nContrib++;
1918                 finishReduction();
1919         }
1920 }
1921
1922 //Handle a message from one element for the reduction
1923 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1924 {
1925   interrupt = 1;
1926   CmiLock(lockEverything);
1927   doAddContribution(m);
1928   CmiUnlock(lockEverything);
1929   interrupt = 0;
1930 }
1931
1932 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1933         if(blocked){
1934                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1935                 bufferedMsgs.enq(m);
1936                 return;
1937         }
1938         
1939         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1940                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1941 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1942                 futureLateMigrantMsgs.enq(m);
1943         } else {// An ordinary contribution
1944                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1945 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1946                 msgs.enq(m);
1947                 finishReduction();
1948         }
1949 }
1950
1951
1952
1953
1954
1955 /** check if the nodegroup reduction is finished at this node. In that case send it
1956 up the reduction tree **/
1957
1958 void CkNodeReductionMgr::finishReduction(void)
1959 {
1960   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1961   /***Check if reduction is finished in the next few ifs***/
1962   if ((!inProgress) || creating){
1963         DEBR((AA"Either not in Progress or creating\n"AB));
1964         return;
1965   }
1966
1967   if (nContrib<(lcount)){
1968         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1969
1970          return;//Need more local messages
1971   }
1972   if (nRemote<treeKids()){
1973         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1974
1975         return;//Need more remote messages
1976   }
1977   if (nRemote>treeKids()){
1978
1979           interrupt = 0;
1980            CkAbort("Nodegrp Excess remote reduction message received!\n");
1981   }
1982
1983   DEBR((AA"Reducing node data...\n"AB));
1984
1985   /**reduce all messages received at this node **/
1986   CkReductionMsg *result=reduceMessages();
1987
1988   if (hasParent())
1989   {//Pass data up tree to parent
1990         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1991         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1992         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1993                 /*
1994                         FAULT_EVAC
1995                 */
1996                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1997             thisProxy[treeParent()].RecvMsg(result);
1998         }
1999
2000   }
2001   else
2002   {
2003                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2004                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2005                         msgs.enq(result);
2006                         return;
2007                 }
2008                 result->gcount += additionalGCount;
2009           /** if the reduction is finished and I am the root of the reduction tree
2010           then call the reductionhandler and other stuff ***/
2011                 
2012
2013                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2014     CkSetRefNum(result, result->getUserFlag());
2015     if (!result->callback.isInvalid()){
2016       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2017             result->callback.send(result);
2018     }
2019     else if (storedCallback!=NULL){
2020       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2021             storedCallback->send(result);
2022     }
2023     else{
2024                 DEBR((AA"Invalid Callback \n"AB));
2025             CkAbort("No reduction client!\n"
2026                     "You must register a client with either SetReductionClient or during contribute.\n");
2027                 }
2028   }
2029
2030   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2031   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2032   redNo++;
2033         updateTree();
2034   int i;
2035   inProgress=CmiFalse;
2036   startRequested=CmiFalse;
2037   nRemote=nContrib=0;
2038
2039   //Look through the future queue for messages we can now handle
2040   int n=futureMsgs.length();
2041
2042   for (i=0;i<n;i++)
2043   {
2044     interrupt = 1;
2045
2046     CkReductionMsg *m=futureMsgs.deq();
2047
2048     interrupt = 0;
2049     if (m!=NULL){ //One of these addContributions may have finished us.
2050       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2051       doAddContribution(m);//<- if *still* early, puts it back in the queue
2052     }
2053   }
2054
2055   interrupt = 1;
2056
2057   n=futureRemoteMsgs.length();
2058
2059   interrupt = 0;
2060   for (i=0;i<n;i++)
2061   {
2062     interrupt = 1;
2063
2064     CkReductionMsg *m=futureRemoteMsgs.deq();
2065
2066     interrupt = 0;
2067     if (m!=NULL)
2068       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2069   }
2070   
2071   n = futureLateMigrantMsgs.length();
2072   for(i=0;i<n;i++){
2073     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2074     if(m != NULL){
2075       if(m->redNo == redNo){
2076         msgs.enq(m);
2077       }else{
2078         futureLateMigrantMsgs.enq(m);
2079       }
2080     }
2081   }
2082 }
2083
2084 //////////// Reduction Manager Utilities /////////////
2085
2086 void CkNodeReductionMgr::init_BinaryTree(){
2087         parent = (CkMyNode()-1)/TREE_WID;
2088         int firstkid = CkMyNode()*TREE_WID+1;
2089         numKids=CkNumNodes()-firstkid;
2090   if (numKids>TREE_WID) numKids=TREE_WID;
2091   if (numKids<0) numKids=0;
2092
2093         for(int i=0;i<numKids;i++){
2094                 kids.push_back(firstkid+i);
2095                 newKids.push_back(firstkid+i);
2096         }
2097 }
2098
2099 void CkNodeReductionMgr::init_BinomialTree(){
2100         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2101         /*upperSize = (unsigned )pow((double)2,depth);*/
2102         upperSize = (unsigned) 1 << depth;
2103         label = upperSize-CkMyNode()-1;
2104         int p=label;
2105         int count=0;
2106         while( p > 0){
2107                 if(p % 2 == 0)
2108                         break;
2109                 else{
2110                         p = p/2;
2111                         count++;
2112                 }
2113         }
2114         /*parent = label + rint(pow((double)2,count));*/
2115         parent = label + (1<<count);
2116         parent = upperSize -1 -parent;
2117         int temp;
2118         if(count != 0){
2119                 numKids = 0;
2120                 for(int i=0;i<count;i++){
2121                         /*temp = label - rint(pow((double)2,i));*/
2122                         temp = label - (1<<i);
2123                         temp = upperSize-1-temp;
2124                         if(temp <= CkNumNodes()-1){
2125                 //              kids[numKids] = temp;
2126                                 kids.push_back(temp);
2127                                 numKids++;
2128                         }
2129                 }
2130         }else{
2131                 numKids = 0;
2132         //      kids = NULL;
2133         }
2134 }
2135
2136
2137 int CkNodeReductionMgr::treeRoot(void)
2138 {
2139   return 0;
2140 }
2141 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2142 {
2143   return (CmiBool)(CkMyNode()!=treeRoot());
2144 }
2145 int CkNodeReductionMgr::treeParent(void) //My parent Node
2146 {
2147   return parent;
2148 }
2149
2150 int CkNodeReductionMgr::firstKid(void) //My first child Node
2151 {
2152   return CkMyNode()*TREE_WID+1;
2153 }
2154 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2155 {
2156 #ifdef BINOMIAL_TREE
2157         return numKids;
2158 #else
2159 /*  int nKids=CkNumNodes()-firstKid();
2160   if (nKids>TREE_WID) nKids=TREE_WID;
2161   if (nKids<0) nKids=0;
2162   return nKids;*/
2163         return numKids;
2164 #endif
2165 }
2166
2167 //Combine (& free) the current message vector msgs.
2168 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2169 {
2170 #if CMK_BLUEGENE_CHARM
2171   _TRACE_BG_END_EXECUTE(1);
2172   void* _bgParentLog = NULL;
2173   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2174 #endif
2175   CkReductionMsg *ret=NULL;
2176
2177   //Look through the vector for a valid reducer, swapping out placeholder messages
2178   CkReduction::reducerType r=CkReduction::invalid;
2179   int msgs_gcount=0;//Reduced gcount
2180   int msgs_nSources=0;//Reduced nSources
2181   int msgs_userFlag=-1;
2182   CkCallback msgs_callback;
2183   CkCallback msgs_secondaryCallback;
2184   int i;
2185   int nMsgs=0;
2186   CkReductionMsg *m;
2187   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2188   bool isMigratableContributor;
2189         
2190
2191   while(NULL!=(m=msgs.deq()))
2192   {
2193     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2194     msgs_gcount+=m->gcount;
2195     if (m->sourceFlag!=0)
2196     { //This is a real message from an element, not just a placeholder
2197       msgArr[nMsgs++]=m;
2198       msgs_nSources+=m->nSources();
2199       r=m->reducer;
2200       if (!m->callback.isInvalid())
2201         msgs_callback=m->callback;
2202       if(!m->secondaryCallback.isInvalid()){
2203         msgs_secondaryCallback = m->secondaryCallback;
2204       }
2205       if (m->userFlag!=-1)
2206         msgs_userFlag=m->userFlag;
2207
2208         isMigratableContributor= m->isMigratableContributor();
2209 #if CMK_BLUEGENE_CHARM
2210       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2211 #endif
2212                                 
2213     }
2214     else
2215     { //This is just a placeholder message-- replace it
2216       delete m;
2217     }
2218   }
2219
2220   if (nMsgs==0||r==CkReduction::invalid)
2221   //No valid reducer in the whole vector
2222     ret=CkReductionMsg::buildNew(0,NULL);
2223   else
2224   {//Use the reducer to reduce the messages
2225     if(nMsgs == 1){
2226         ret = msgArr[0];
2227     }else{
2228         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2229         ret=(*f)(nMsgs,msgArr);
2230     }
2231     ret->reducer=r;
2232   }
2233
2234         
2235 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2236 #if CRITICAL_PATH_DEBUG > 3
2237         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2238 #endif
2239         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2240         path.updateMax(UsrToEnv(ret));
2241         // Combine the critical paths from all the reduction messages into the header for the new result
2242         for (i=0;i<nMsgs;i++){
2243           if (msgArr[i]!=ret){
2244             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2245             path.updateMax(UsrToEnv(msgArr[i]));
2246           } else {
2247             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2248           }
2249         }
2250 #if CRITICAL_PATH_DEBUG > 3
2251         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2252 #endif
2253
2254 #endif
2255
2256
2257         //Go back through the vector, deleting old messages
2258   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2259   delete [] msgArr;
2260   //Set the message counts
2261   ret->redNo=redNo;
2262   ret->gcount=msgs_gcount;
2263   ret->userFlag=msgs_userFlag;
2264   ret->callback=msgs_callback;
2265   ret->secondaryCallback = msgs_secondaryCallback;
2266   ret->sourceFlag=msgs_nSources;
2267   ret->setMigratableContributor(isMigratableContributor);
2268   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2269 #if CMK_BLUEGENE_CHARM
2270   _TRACE_BG_TLINE_END(&ret->log);
2271 #endif
2272
2273   return ret;
2274 }
2275
2276 void CkNodeReductionMgr::pup(PUP::er &p)
2277 {
2278 //We do not store the client function pointer or the client function parameter,
2279 //it is the responsibility of the programmer to correctly restore these
2280   IrrGroup::pup(p);
2281   p(redNo);
2282   p(inProgress); p(creating); p(startRequested);
2283   p(lcount);
2284   p(nContrib); p(nRemote);
2285   p(interrupt);
2286   p|msgs;
2287   p|futureMsgs;
2288   p|futureRemoteMsgs;
2289   p|futureLateMigrantMsgs;
2290   p|parent;
2291   p|additionalGCount;
2292   p|newAdditionalGCount;
2293   if(p.isUnpacking()) {
2294     gcount=CkNumNodes();
2295     thisProxy = thisgroup;
2296     lockEverything = CmiCreateLock();
2297 #ifdef BINOMIAL_TREE
2298     init_BinomialTree();
2299 #else
2300     init_BinaryTree();
2301 #endif          
2302   }
2303   p | blocked;
2304   p | maxModificationRedNo;
2305
2306 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2307   int isnull = (storedCallback == NULL);
2308   p | isnull;
2309   if (!isnull) {
2310     if (p.isUnpacking()) {
2311       storedCallback = new CkCallback;
2312     }
2313     p|*storedCallback;
2314   }
2315 #endif
2316
2317 }
2318
2319 /*
2320         FAULT_EVAC
2321         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2322         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2323         reduction tree. 
2324 */
2325 void CkNodeReductionMgr::evacuate(){
2326         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2327         if(treeKids() == 0){
2328         /*
2329                 if the node going down is a leaf
2330         */
2331                 oldleaf=true;
2332                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2333                 /*
2334                         Need to ask parent for the reduction number that it has seen. 
2335                         Since it is a leaf, the tree does not need to be rewired. 
2336                         We reuse the oldparent type of tree modification message to get 
2337                         the parent to block and tell us about the highest reduction number it has seen.
2338                         
2339                 */
2340                 int data[2];
2341                 data[0]=CkMyNode();
2342                 data[1]=getTotalGCount()+additionalGCount;
2343                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2344                 newParent = treeParent();
2345         }else{
2346                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2347                 oldleaf= false;
2348         /*
2349                 It is not a leaf. It needs to rewire the tree around itself.
2350                 It also needs to decide on a reduction No after which the new tree will be used
2351                 Till it decides on the new tree and the redNo at which it becomes valid,
2352                 all received messages will be buffered
2353         */
2354                 newParent = kids[0];
2355                 for(int i=numKids-1;i>=0;i--){
2356                         newKids.remove(i);
2357                 }
2358                 /*
2359                         Ask everybody for the highest reduction number they have seen and
2360                         also tell them about the new tree
2361                 */
2362                 /*
2363                         Tell parent about its new child;
2364                 */
2365                 int oldParentData[2];
2366                 oldParentData[0] = CkMyNode();
2367                 oldParentData[1] = newParent;
2368                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2369
2370                 /*
2371                         Tell the other children about their new parent
2372                 */
2373                 int childrenData=newParent;
2374                 for(int i=1;i<numKids;i++){
2375                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2376                 }
2377                 
2378                 /*
2379                         Tell newParent (1st child) about its new children,
2380                         the current node and its children except the newParent
2381                 */
2382                 int *newParentData = new int[numKids+2];
2383                 for(int i=1;i<numKids;i++){
2384                         newParentData[i] = kids[i];
2385                 }
2386                 newParentData[0] = CkMyNode();
2387                 newParentData[numKids] = parent;
2388                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2389                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2390         }
2391         readyDeletion = false;
2392         blocked = true;
2393         numModificationReplies = 0;
2394         tempModificationRedNo = findMaxRedNo();
2395 }
2396
2397 /*
2398         Depending on the code, use the data to change the tree
2399         1. OLDPARENT : replace the old child with a new one
2400         2. OLDCHILDREN: replace the parent
2401         3. NEWPARENT:  add the children and change the parent
2402         4. LEAFPARENT: delete the old child
2403 */
2404
2405 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2406         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2407         int sender;
2408         newKids = kids;
2409         readyDeletion = false;
2410         newAdditionalGCount = additionalGCount;
2411         switch(code){
2412                 case OLDPARENT: 
2413                         for(int i=0;i<numKids;i++){
2414                                 if(newKids[i] == data[0]){
2415                                         newKids[i] = data[1];
2416                                         break;
2417                                 }
2418                         }
2419                         sender = data[0];
2420                         newParent = parent;
2421                         break;
2422                 case OLDCHILDREN:
2423                         newParent = data[0];
2424                         sender = parent;
2425                         break;
2426                 case NEWPARENT:
2427                         for(int i=0;i<size-2;i++){
2428                                 newKids.push_back(data[i]);
2429                         }
2430                         newParent = data[size-2];
2431                         newAdditionalGCount += data[size-1];
2432                         sender = parent;
2433                         break;
2434                 case LEAFPARENT:
2435                         for(int i=0;i<numKids;i++){
2436                                 if(newKids[i] == data[0]){
2437                                         newKids.remove(i);
2438                                         break;
2439                                 }
2440                         }
2441                         sender = data[0];
2442                         newParent = parent;
2443                         newAdditionalGCount += data[1];
2444                         break;
2445         };
2446         blocked = true;
2447         int maxRedNo = findMaxRedNo();
2448         
2449         thisProxy[sender].collectMaxRedNo(maxRedNo);
2450 }
2451
2452 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2453         /*
2454                 Find out the maximum redNo that has been seen by 
2455                 the affected nodes
2456         */
2457         numModificationReplies++;
2458         if(maxRedNo > tempModificationRedNo){
2459                 tempModificationRedNo = maxRedNo;
2460         }
2461         if(numModificationReplies == numKids+1){
2462                 maxModificationRedNo = tempModificationRedNo;
2463                 /*
2464                         when all the affected nodes have replied, tell them the maximum.
2465                         Unblock yourself. deal with the buffered messages local and remote
2466                 */
2467                 if(maxModificationRedNo == -1){
2468                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2469                 }else{
2470                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2471                 }
2472                 thisProxy[parent].unblockNode(maxModificationRedNo);
2473                 for(int i=0;i<numKids;i++){
2474                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2475                 }
2476                 blocked = false;
2477                 updateTree();
2478                 clearBlockedMsgs();
2479         }
2480 }
2481
2482 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2483         maxModificationRedNo = maxRedNo;
2484         updateTree();
2485         blocked = false;
2486         clearBlockedMsgs();
2487 }
2488
2489
2490 void CkNodeReductionMgr::clearBlockedMsgs(){
2491         int len = bufferedMsgs.length();
2492         for(int i=0;i<len;i++){
2493                 CkReductionMsg *m = bufferedMsgs.deq();
2494                 doAddContribution(m);
2495         }
2496         len = bufferedRemoteMsgs.length();
2497         for(int i=0;i<len;i++){
2498                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2499                 doRecvMsg(m);
2500         }
2501
2502 }
2503 /*
2504         if the reduction number exceeds the maxModificationRedNo, change the tree
2505         to become the new one
2506 */
2507
2508 void CkNodeReductionMgr::updateTree(){
2509         if(redNo > maxModificationRedNo){
2510                 parent = newParent;
2511                 kids = newKids;
2512                 maxModificationRedNo = INT_MAX;
2513                 numKids = kids.size();
2514                 readyDeletion = true;
2515                 additionalGCount = newAdditionalGCount;
2516                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2517                 for(int i=0;i<(int)(newKids.size());i++){
2518                         DEBREVAC(("%d ",newKids[i]));
2519                 }
2520                 DEBREVAC(("\n"));
2521         //      startReduction(redNo,CkMyNode());
2522         }else{
2523                 if(maxModificationRedNo != INT_MAX){
2524                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2525                         startReduction(redNo,CkMyNode());
2526                         finishReduction();
2527                 }       
2528         }
2529 }
2530
2531
2532 void CkNodeReductionMgr::doneEvacuate(){
2533         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2534 /*      if(oldleaf){
2535                 
2536                         It used to be a leaf
2537                         Then as soon as future messages have been emptied you can 
2538                         send the parent a message telling them that they are not going
2539                         to receive anymore messages from this child
2540                 
2541                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2542                 while(futureMsgs.length() != 0){
2543                         int n = futureMsgs.length();
2544                         for(int i=0;i<n;i++){
2545                                 CkReductionMsg *m = futureMsgs.deq();
2546                                 if(isPresent(m->redNo)){
2547                                         msgs.enq(m);
2548                                 }else{
2549                                         futureMsgs.enq(m);
2550                                 }
2551                         }
2552                         CkReductionMsg *result = reduceMessages();
2553                         thisProxy[treeParent()].RecvMsg(result);
2554                         redNo++;
2555                 }
2556                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2557                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2558         }else{*/
2559                 if(readyDeletion){
2560                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2561                 }else{
2562                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2563                 }
2564 //      }
2565 }
2566
2567 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2568         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2569         for(int i=0;i<numKids;i++){
2570                 if(kids[i] == deletedChild){
2571                         kids.remove(i);
2572                         break;
2573                 }
2574         }
2575         numKids = kids.length();
2576         finishReduction();
2577 }
2578
2579 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2580         for(int i=0;i<(int)(newKids.length());i++){
2581                 if(newKids[i] == deletedChild){
2582                         newKids.remove(i);
2583                         break;
2584                 }
2585         }
2586         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2587         for(int i=0;i<(int)(newKids.size());i++){
2588                 DEBREVAC(("%d ",newKids[i]));
2589         }
2590         DEBREVAC(("\n"));
2591         finishReduction();
2592 }
2593
2594 int CkNodeReductionMgr::findMaxRedNo(){
2595         int max = redNo;
2596         for(int i=0;i<futureRemoteMsgs.length();i++){
2597                 if(futureRemoteMsgs[i]->redNo  > max){
2598                         max = futureRemoteMsgs[i]->redNo;
2599                 }
2600         }
2601         /*
2602                 if redNo is max (that is no future message) and the current reduction has not started
2603                 then tree can be changed before the reduction redNo can be started
2604         */ 
2605         if(redNo == max && msgs.length() == 0){
2606                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2607                 max--;
2608         }
2609         return max;
2610 }
2611
2612 #include "CkReduction.def.h"