5b88d3c270fe203753636118b67daa1ac98b4b32
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #if 0
54 //Debugging messages:
55 // Reduction mananger internal information:
56 #define DEBUGRED 1
57 #define DEBR(x) CkPrintf x
58 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
59 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
60
61 #define DEBN(x) CkPrintf x
62 #define AAN "Red Node%d "
63 #define ABN ,CkMyNode()
64
65 // For status and data messages from the builtin reducer functions.
66 #define RED_DEB(x) //CkPrintf x
67 #define DEBREVAC(x) CkPrintf x
68 #else
69 //No debugging info-- empty defines
70 #define DEBUGRED 0
71 #define DEBR(x) //CkPrintf x
72 #define DEBN(x) //CkPrintf x
73 #define RED_DEB(x) //CkPrintf x
74 #define DEBREVAC(x) //CkPrintf x
75 #endif
76
77 #ifndef INT_MAX
78 #define INT_MAX 2147483647
79 #endif
80
81 extern int _inrestart;
82
83 Group::Group()
84 {
85         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
86
87         creatingContributors();
88         contributorStamped(&reductionInfo);
89         contributorCreated(&reductionInfo);
90         doneCreatingContributors();
91 #if DEBUGRED
92         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
93 #endif                  
94         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
95         nodeProxy = nodetemp;
96 }
97
98 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
99 {
100         creatingContributors();
101         contributorStamped(&reductionInfo);
102         contributorCreated(&reductionInfo);
103         doneCreatingContributors();
104 }
105
106 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
107                                     ((CkReductionMgr *)this),
108                                     reductionInfo,false);
109 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid));
110
111
112
113 CkGroupInitCallback::CkGroupInitCallback(void) {}
114 /*
115 The callback is just used to tell the caller that this group
116 has been constructed.  (Now they can safely call CkLocalBranch)
117 */
118 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
119 {
120   m->call();
121   delete m;
122 }
123
124 /*
125 The callback is just used to tell the caller that this group
126 is constructed and ready to process other calls.
127 */
128 CkGroupReadyCallback::CkGroupReadyCallback(void)
129 {
130   _isReady = 0;
131 }
132 void
133 CkGroupReadyCallback::callBuffered(void)
134 {
135   int n = _msgs.length();
136   for(int i=0;i<n;i++)
137   {
138     CkGroupCallbackMsg *msg = _msgs.deq();
139     msg->call();
140     delete msg;
141   }
142 }
143 void
144 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
145 {
146   if(_isReady) {
147     msg->call();
148     delete msg;
149   } else {
150     _msgs.enq(msg);
151   }
152 }
153
154 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
155         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
156 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
157 {
158         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
159         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
160         b->fn(b->param,m->getSize(),m->getData());
161         delete m;
162 }
163
164 ///////////////// Reduction Manager //////////////////
165 /*
166 One CkReductionMgr runs a non-overlapping set of reductions.
167 It collects messages from all local contributors, then sends
168 the reduced message up the reduction tree to node zero, where
169 they're passed to the user's client function.
170 */
171
172 CkReductionMgr::CkReductionMgr()//Constructor
173   : thisProxy(thisgroup)
174
175   redNo=0;
176   completedRedNo = -1;
177   inProgress=CmiFalse;
178   creating=CmiFalse;
179   startRequested=CmiFalse;
180   gcount=lcount=0;
181   nContrib=nRemote=0;
182   maxStartRequest=0;
183 #ifdef _FAULT_MLOG_
184     if(CkMyPe() != 0){
185         perProcessorCounts = NULL;
186     }else{
187         perProcessorCounts = new int[CmiNumPes()];
188         for(int i=0;i<CmiNumPes();i++){
189             perProcessorCounts[i] = -1;
190         }
191     }
192     totalCount = 0;
193     processorCount = 0;
194 #endif
195   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
196 }
197
198 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
199 {
200   redNo=0;
201   completedRedNo = -1;
202   inProgress=CmiFalse;
203   creating=CmiFalse;
204   startRequested=CmiFalse;
205   gcount=lcount=0;
206   nContrib=nRemote=0;
207   maxStartRequest=0;
208   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
209 }
210
211 void CkReductionMgr::flushStates()
212 {
213   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
214   redNo=0;
215   completedRedNo = -1;
216   inProgress=CmiFalse;
217   creating=CmiFalse;
218   gcount=lcount=0;
219   startRequested=CmiFalse;
220   nContrib=nRemote=0;
221   maxStartRequest=0;
222
223   while (!msgs.isEmpty()) { delete msgs.deq(); }
224   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
225   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
226   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
227
228   adjVec.length()=0;
229
230   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
231 }
232
233 //////////// Reduction Manager Client API /////////////
234
235 //Add the given client function.  Overwrites any previous client.
236 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
237 {
238   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
239
240   if (CkMyPe()!=0)
241           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
242   storedCallback=*cb;
243   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
244   nodeProxy.ckSetReductionClient(callback);
245 }
246
247 ///////////////////////////// Contributor ////////////////////////
248 //Contributors keep a copy of this structure:
249
250 /*Contributor migration support:
251 */
252 void contributorInfo::pup(PUP::er &p)
253 {
254   p(redNo);
255 }
256
257 ////////////////////// Contributor list maintainance: /////////////////
258 //These just set and clear the "creating" flag to prevent
259 // reductions from finishing early because not all elements
260 // have been created.
261 void CkReductionMgr::creatingContributors(void)
262 {
263   DEBR((AA"Creating contributors...\n"AB));
264   creating=CmiTrue;
265 }
266 void CkReductionMgr::doneCreatingContributors(void)
267 {
268   DEBR((AA"Done creating contributors...\n"AB));
269   creating=CmiFalse;
270   if (startRequested) startReduction(redNo,CkMyPe());
271   finishReduction();
272 }
273
274 //A new contributor will be created
275 void CkReductionMgr::contributorStamped(contributorInfo *ci)
276 {
277   DEBR((AA"Contributor %p stamped\n"AB,ci));
278   //There is another contributor
279   gcount++;
280   if (inProgress)
281   {
282     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
283     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
284   } else
285     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
286 }
287
288 //A new contributor was actually created
289 void CkReductionMgr::contributorCreated(contributorInfo *ci)
290 {
291   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
292   //We've got another contributor
293   lcount++;
294   //He may not need to contribute to some of our reductions:
295   for (int r=redNo;r<ci->redNo;r++)
296     adj(r).lcount--;//He won't be contributing to r here
297 }
298
299 /*Don't expect any more contributions from this one.
300 This is rather horrifying because we now have to make
301 sure the global element count accurately reflects all the
302 contributions the element made before it died-- these may stretch
303 far into the future.  The adj() vector is what saves us here.
304 */
305 void CkReductionMgr::contributorDied(contributorInfo *ci)
306 {
307 #if CMK_MEM_CHECKPOINT
308   // ignore from listener if it is during restart from crash
309   if (CkInRestarting()) return;
310 #endif
311   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
312   //We lost a contributor
313   gcount--;
314
315   if (ci->redNo<redNo)
316   {//Must have been migrating during reductions-- root is waiting for his
317   // contribution, which will never come.
318     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
319     for (int r=ci->redNo;r<redNo;r++)
320       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
321   }
322
323   //Add to the global count for all his future messages (wherever they are)
324   int r;
325   for (r=redNo;r<ci->redNo;r++)
326   {//He already contributed to this reduction, but won't show up in global count.
327     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
328     adj(r).gcount++;
329   }
330
331   lcount--;
332   //He's already contributed to several reductions here
333   for (r=redNo;r<ci->redNo;r++)
334     adj(r).lcount++;//He'll be contributing to r here
335
336   finishReduction();
337 }
338
339 //Migrating away (note that global count doesn't change)
340 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
341 {
342   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
343   lcount--;//We lost a local
344   //He's already contributed to several reductions here
345   for (int r=redNo;r<ci->redNo;r++)
346     adj(r).lcount++;//He'll be contributing to r here
347
348   finishReduction();
349 }
350
351 //Migrating in (note that global count doesn't change)
352 void CkReductionMgr::contributorArriving(contributorInfo *ci)
353 {
354   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
355   lcount++;//We gained a local
356 #if CMK_MEM_CHECKPOINT
357   // ignore from listener if it is during restart from crash
358   // because the ci may be old.
359   if (CkInRestarting()) return;
360 #endif
361   //He has already contributed (elsewhere) to several reductions:
362   for (int r=redNo;r<ci->redNo;r++)
363     adj(r).lcount--;//He won't be contributing to r here
364
365 }
366
367 //Contribute-- the given msg can contain any data.  The reducerType
368 // field of the message must be valid.
369 // Each contributor must contribute exactly once to the each reduction.
370 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
371 {
372   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
373   m->ci=ci;
374   m->redNo=ci->redNo++;
375   m->sourceFlag=-1;//A single contribution
376   m->gcount=0;
377
378 #ifdef _FAULT_MLOG_
379     if(lcount == 0){
380         m->sourceProcessorCount = 1;
381     }else{
382         m->sourceProcessorCount = lcount;
383     }
384     m->fromPE = CmiMyPe();
385     Chare *oldObj =CpvAccess(_currentObj);
386     char currentObjName[100];
387     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
388     thisProxy[0].contributeViaMessage(m);
389 #else
390   addContribution(m);
391 #endif
392 }
393
394 #ifdef _FAULT_MLOG_
395 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
396     CmiAssert(CmiMyPe() == 0);
397     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
398     if(redNo == 0){
399         if(perProcessorCounts[m->fromPE] == -1){
400             processorCount++;
401             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
402             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
403         }else{
404             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
405                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
406                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
407             }
408         }
409         if(processorCount == CmiNumPes()){
410             totalCount = 0;
411             for(int i=0;i<CmiNumPes();i++){
412                 CmiAssert(perProcessorCounts[i] != -1);
413                 totalCount += perProcessorCounts[i];
414             }
415             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
416         }
417         if(m->sourceProcessorCount == 0){
418             if(processorCount == CmiNumPes()){
419                 finishReduction();
420             }
421             return;
422         }
423     }
424     addContribution(m);
425 }
426 #else
427 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
428 #endif
429
430 //////////// Reduction Manager Remote Entry Points /////////////
431 //Sent down the reduction tree (used by barren PEs)
432 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
433 {
434  if(CkMyPe()==0){
435         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
436         //delete m;
437         //return;
438  }
439  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
440  int srcPE = (UsrToEnv(m))->getSrcPe();
441 #ifndef _FAULT_MLOG_ 
442   if (isPresent(m->num) && !inProgress)
443   {
444     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
445     startReduction(m->num,srcPE);
446     finishReduction();
447   } else if (isFuture(m->num)){
448 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
449           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
450           if(maxStartRequest < m->num){
451                   maxStartRequest = m->num;
452           }
453  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
454           
455     }
456   else //is Past
457     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
458   delete m;
459 #else
460     if(redNo == 0){
461         if(lcount == 0){
462             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
463             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
464             dummy->fromPE = CmiMyPe();
465             dummy->sourceProcessorCount = 0;
466             dummy->redNo = 0;
467             thisProxy[0].contributeViaMessage(dummy);
468         }
469     }
470 #endif
471 }
472
473 //Sent to root of reduction tree with reduction contribution
474 // of migrants that missed the main reduction.
475 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
476 {
477         m->secondaryCallback = m->callback;
478   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
479   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
480   nodeMgr->LateMigrantMsg(m);
481 /*      int len = finalMsgs.length();
482         finalMsgs.enq(m);
483 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
484         endArrayReduction();*/
485 }
486
487 //A late migrating contributor will never contribute to this reduction
488 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
489 {
490   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
491   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
492  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
493   adj(m->num).gcount--;//He won't be contributing to this one.
494   finishReduction();
495   delete m;
496 }
497
498 //////////// Reduction Manager State /////////////
499 void CkReductionMgr::startReduction(int number,int srcPE)
500 {
501   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
502   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
503   if (inProgress){
504         DEBR((AA"This reduction is already in progress\n"AB));
505         return;//This reduction already started
506   }
507   if (creating) //Don't start yet-- we're creating elements
508   {
509     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
510     startRequested=CmiTrue;
511     return;
512   }
513
514 //If none of these cases, we need to start the reduction--
515   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
516   inProgress=CmiTrue;
517   //Sent start requests to our kids (in case they don't already know)
518  
519
520         /*
521                 FAULT_EVAC
522         */
523   if(!CmiNodeAlive(CkMyPe())){
524         return;
525   }
526
527 #ifdef _FAULT_MLOG_ 
528   if(CmiMyPe() == 0 && redNo == 0){
529             for(int j=0;j<CkNumPes();j++){
530                 if(j != CkMyPe() && j != srcPE){
531                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
532                 }
533             }
534             if(lcount == 0){
535                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
536                 dummy->fromPE = CmiMyPe();
537                 dummy->sourceProcessorCount = 0;
538                 dummy->redNo = 0;
539                 thisProxy[0].contributeViaMessage(dummy);
540             }
541     }   else{
542         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
543     }
544
545
546 #else
547         nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
548 #endif
549         
550   int temp;
551         /*
552   //making it a broadcast done only by PE 0
553   if(!hasParent()){
554                 temp = completedRedNo+1;
555                 for(int i=temp;i<=number;i++){
556                         for(int j=0;j<CkNumPes();j++){
557                                 if(j != CkMyPe() && j != srcPE){
558                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
559                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
560                                         }
561                                 }
562                         }
563                 }       
564         }       else{
565                 temp = number;
566         }*/
567 /*  if(!hasParent()){
568                 temp = completedRedNo+1;
569         }       else{
570                 temp = number;
571         }
572         for(int i=temp;i<=number;i++){
573         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
574                 if(hasParent()){
575           // kick-start your parent too ...
576                         if(treeParent() != srcPE){
577                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
578 #ifdef _FAULT_MLOG_
579     CpvAccess(_currentObj) = oldObj;
580 #endif
581                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
582                                 }       
583                         }       
584                 }
585           for (int k=0;k<treeKids();k++)
586           {
587                         if(firstKid()+k != srcPE){
588                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
589                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
590                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
591                                 }       
592                         }       
593         }
594         }
595         */
596 }       
597
598 /*Handle a message from one element for the reduction*/
599 void CkReductionMgr::addContribution(CkReductionMsg *m)
600 {
601   if (isPast(m->redNo))
602   {
603 #ifdef _FAULT_MLOG_
604         CmiAbort("this version should not have late migrations");
605 #else
606         //We've moved on-- forward late contribution straight to root
607     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
608         // if (!hasParent()) //Root moved on too soon-- should never happen
609         //   CkAbort("Late reduction contribution received at root!\n");
610     thisProxy[0].LateMigrantMsg(m);
611 #endif
612   }
613   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
614     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
615     futureMsgs.enq(m);
616   } else {// An ordinary contribution
617     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
618    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
619     startReduction(m->redNo,CkMyPe());
620     msgs.enq(m);
621     nContrib++;
622     finishReduction();
623   }
624 }
625
626 /**function checks if it has got all contributions that it is supposed to
627 get at this processor. If it is done it sends the reduced result to the local
628 nodegroup */
629 void CkReductionMgr::finishReduction(void)
630 {
631   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
632   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
633   if ((!inProgress) || creating){
634         DEBR((AA"Either not in Progress or creating\n"AB));
635         return;
636   }
637   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
638 #ifndef _FAULT_MLOG_   
639
640   if (nContrib<(lcount+adj(redNo).lcount)){
641         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
642          return;//Need more local messages
643   }
644  
645 #else
646
647     if(CkMyPe() != 0){
648         if(redNo != 0){
649             return;
650         }else{
651             CmiAssert(lcount == 0);
652             CkReductionMsg *dummy = reduceMessages();
653             dummy->fromPE = CmiMyPe();
654             dummy->sourceProcessorCount = 0;
655             thisProxy[0].contributeViaMessage(dummy);
656             return;
657         }
658     }
659
660     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
661         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
662          return;//Need more local messages
663   }else{
664         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
665     }
666
667
668 #endif
669
670
671   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
672   CkReductionMsg *result=reduceMessages();
673   result->redNo=redNo;
674 #ifndef _FAULT_MLOG_
675
676   result->gcount+=gcount+adj(redNo).gcount;
677   result->secondaryCallback = result->callback;
678   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
679         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
680
681   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
682
683 #if DEBUGRED
684  // CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
685 #endif
686   
687   // Find our node reduction manager, and pass reduction to him:
688   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
689   nodeMgr->contributeArrayReduction(result);
690 #else
691 #if DEBUGRED
692     CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer());
693 #endif
694     if (!result->callback.isInvalid())
695         result->callback.send(result);
696     else if (!storedCallback.isInvalid())
697         storedCallback.send(result);
698     else{
699 #if DEBUGRED
700         CkPrintf("No reduction client for group %d \n",thisgroup.idx);
701 #endif
702         CkAbort("No reduction client!\n"
703             "You must register a client with either SetReductionClient or during contribute.\n");
704     }
705 #if DEBUGRED
706        CkPrintf("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer());
707 #endif
708
709 #endif
710
711   //House Keeping Operations will have to check later what needs to be changed
712   redNo++;
713   //Shift the count adjustment vector down one slot (to match new redNo)
714   int i;
715 #ifndef _FAULT_MLOG_
716         if(CkMyPe()!=0){
717 #else
718     {
719 #endif
720         int i;
721         completedRedNo++;
722         for (i=1;i<adjVec.length();i++)
723            adjVec[i-1]=adjVec[i];
724         adjVec.length()--;  
725   }
726   inProgress=CmiFalse;
727   startRequested=CmiFalse;
728   nRemote=nContrib=0;
729
730   //Look through the future queue for messages we can now handle
731   int n=futureMsgs.length();
732   for (i=0;i<n;i++)
733   {
734     CkReductionMsg *m=futureMsgs.deq();
735     if (m!=NULL) //One of these addContributions may have finished us.
736       addContribution(m);//<- if *still* early, puts it back in the queue
737   }
738 #ifndef _FAULT_MLOG_   
739
740   if(maxStartRequest >= redNo){
741           startReduction(redNo,CkMyPe());
742           finishReduction();
743   }
744  
745 #endif
746 }
747
748 //////////// Reduction Manager Utilities /////////////
749
750 //Return the countAdjustment struct for the given redNo:
751 countAdjustment &CkReductionMgr::adj(int number)
752 {
753   number-=completedRedNo;
754   number--;
755   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
756   //Pad the adjustment vector with zeros until it's at least number long
757   while (adjVec.length()<=number)
758     adjVec.push_back(countAdjustment());
759   return adjVec[number];
760 }
761
762 //Combine (& free) the current message vector msgs.
763 CkReductionMsg *CkReductionMgr::reduceMessages(void)
764 {
765   CkReductionMsg *ret=NULL;
766
767   //Look through the vector for a valid reducer, swapping out placeholder messages
768   CkReduction::reducerType r=CkReduction::invalid;
769   int msgs_gcount=0;//Reduced gcount
770   int msgs_nSources=0;//Reduced nSources
771   int msgs_userFlag=-1;
772   CkCallback msgs_callback;
773   int i;
774   int nMsgs=0;
775   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
776   CkReductionMsg *m;
777         bool isMigratableContributor;
778
779   // Copy message queue into msgArr, skipping placeholders:
780   while (NULL!=(m=msgs.deq()))
781   {
782     msgs_gcount+=m->gcount;
783     if (m->sourceFlag!=0)
784     { //This is a real message from an element, not just a placeholder
785       msgArr[nMsgs++]=m;
786       msgs_nSources+=m->nSources();
787       r=m->reducer;
788       if (!m->callback.isInvalid())
789         msgs_callback=m->callback;
790       if (m->userFlag!=-1)
791         msgs_userFlag=m->userFlag;
792                         
793                         isMigratableContributor=m->isMigratableContributor();
794     }
795     else
796     { //This is just a placeholder message-- forget it
797       delete m;
798     }
799   }
800
801   if (nMsgs==0||r==CkReduction::invalid)
802   //No valid reducer in the whole vector
803     ret=CkReductionMsg::buildNew(0,NULL);
804   else
805   {//Use the reducer to reduce the messages
806                 //if there is only one msg to be reduced just return that message
807                 if(nMsgs == 1){
808                         ret = msgArr[0];        
809                 }else{
810             CkReduction::reducerFn f=CkReduction::reducerTable[r];
811           ret=(*f)(nMsgs,msgArr);
812                 }
813     ret->reducer=r;
814   }
815
816
817
818 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
819   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
820   PathHistory &path = UsrToEnv(ret)->pathHistory;
821   // Combine the critical paths from all the reduction messages into the header for the new result
822   for (i=0;i<nMsgs;i++){
823     if (msgArr[i]!=ret){
824       CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
825       path.updateMax(UsrToEnv(msgArr[i])->pathHistory);
826     }
827   }
828   
829   // Also consider the path which got us into this entry method
830   CkPrintf("[%d] this path = %lf\n", CkMyPe(), currentlyExecutingMsg->pathHistory.getTime() );
831   path.updateMax(currentlyExecutingMsg->pathHistory);
832   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
833   
834 #endif
835
836
837
838
839
840
841
842
843
844
845   
846
847
848         //Go back through the vector, deleting old messages
849   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
850         delete [] msgArr;
851
852   //Set the message counts
853   ret->redNo=redNo;
854   ret->gcount=msgs_gcount;
855   ret->userFlag=msgs_userFlag;
856   ret->callback=msgs_callback;
857   ret->sourceFlag=msgs_nSources;
858         ret->setMigratableContributor(isMigratableContributor);
859   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
860
861   return ret;
862 }
863
864
865 //Checkpointing utilities
866 //pack-unpack method for CkReductionMsg
867 //if packing pack the message and then unpack and return it
868 //if unpacking allocate memory for it read it off disk and then unapck
869 //and return it
870 void CkReductionMgr::pup(PUP::er &p)
871 {
872 //We do not store the client function pointer or the client function parameter,
873 //it is the responsibility of the programmer to correctly restore these
874   CkGroupInitCallback::pup(p);
875   p(redNo);
876   p(completedRedNo);
877   p(inProgress); p(creating); p(startRequested);
878   p(nContrib); p(nRemote);
879   p|msgs;
880   p|futureMsgs;
881   p|futureRemoteMsgs;
882   p|finalMsgs;
883   p|adjVec;
884   p|nodeProxy;
885   p|storedCallback;
886
887   // subtle --- Gengbin
888   // Group : CkReductionMgr
889   // CkArray: CkReductionMgr
890   // lcount/gcount in Group is set in Group constructor
891   // lcount/gcount in CkArray is not, it is set when array elements are created
892   // we can not pup because inserting array elems will add the counters again
893 //  p|lcount;
894 //  p|gcount;
895 #ifdef _FAULT_MLOG_ 
896 //  p|lcount;
897 //  //  p|gcount;
898 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
899 #endif
900   if(p.isUnpacking()){
901     thisProxy = thisgroup;
902     maxStartRequest=0;
903   }
904 #if DEBUGRED
905   CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
906 #endif
907 }
908
909
910 //Callback for doing Reduction through NodeGroups added by Sayantan
911
912 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
913
914         int total = m->gcount+adj(m->redNo).gcount;
915         finalMsgs.enq(m);
916         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
917         adj(m->redNo).mainRecvd = 1;
918 #if DEBUGRED
919         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
920 #endif  
921         endArrayReduction();
922 }
923
924 void CkReductionMgr :: endArrayReduction(){
925         CkReductionMsg *ret=NULL;
926         int nMsgs=finalMsgs.length();
927         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
928         //Look through the vector for a valid reducer, swapping out placeholder messages
929         //CkPrintf("Length of Final Message %d \n",nMsgs);
930         CkReduction::reducerType r=CkReduction::invalid;
931         int msgs_gcount=0;//Reduced gcount
932         int msgs_nSources=0;//Reduced nSources
933         int msgs_userFlag=-1;
934         CkCallback msgs_callback;
935         CkCallback msgs_secondaryCallback;
936         CkVec<CkReductionMsg *> tempMsgs;
937         int i;
938         int numMsgs = 0;
939         for (i=0;i<nMsgs;i++)
940         {
941                 CkReductionMsg *m=finalMsgs.deq();
942                 if(m->redNo == completedRedNo +1){
943                         msgs_gcount+=m->gcount;
944                         if (m->sourceFlag!=0)
945                         { //This is a real message from an element, not just a placeholder
946                                 msgs_nSources+=m->nSources();
947                                 r=m->reducer;
948                                 if (!m->callback.isInvalid())
949                                 msgs_callback=m->callback;
950                                 if(!m->secondaryCallback.isInvalid())
951                                         msgs_secondaryCallback = m->secondaryCallback;
952                                 if (m->userFlag!=-1)
953                                         msgs_userFlag=m->userFlag;
954                                 tempMsgs.push_back(m);
955                         }
956                 }else{
957                         finalMsgs.enq(m);
958                 }
959
960         }
961         numMsgs = tempMsgs.length();
962 #if DEBUGRED
963         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
964 #endif  
965         if(numMsgs == 0){
966                 return;
967         }
968         if(adj(completedRedNo+1).mainRecvd == 0){
969                 for(i=0;i<numMsgs;i++){
970                         finalMsgs.enq(tempMsgs[i]);
971                 }
972                 return;
973         }
974
975 /*
976         NOT NEEDED ANYMORE DONE at nodegroup level
977         if(msgs_gcount  > msgs_nSources){
978                 for(i=0;i<numMsgs;i++){
979                         finalMsgs.enq(tempMsgs[i]);
980                 }
981                 return;
982         }*/
983
984         if (nMsgs==0||r==CkReduction::invalid)
985                 //No valid reducer in the whole vector
986                 ret=CkReductionMsg::buildNew(0,NULL);
987         else{//Use the reducer to reduce the messages
988                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
989                 // has to be corrected elements from above need to be put into a temporary vector
990                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
991                 ret=(*f)(numMsgs,msgArr);
992                 ret->reducer=r;
993
994         }
995
996         
997 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
998         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
999         PathHistory &path = UsrToEnv(ret)->pathHistory;
1000         // Combine the critical paths from all the reduction messages into the header for the new result
1001         for (i=0;i<numMsgs;i++){
1002           if (tempMsgs[i]!=ret){
1003             CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1004             path.updateMax(UsrToEnv(tempMsgs[i])->pathHistory);
1005           } else {
1006             CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1007           }
1008         }
1009         // Also consider the path which got us into this entry method
1010         CkPrintf("[%d] this path = %lf\n", CkMyPe(), currentlyExecutingMsg->pathHistory.getTime() );
1011         path.updateMax(currentlyExecutingMsg->pathHistory);
1012         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1013
1014 #endif
1015   
1016
1017
1018
1019
1020         for(i = 0;i<numMsgs;i++){
1021                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1022         }
1023
1024         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1025         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1026
1027         ret->redNo=completedRedNo+1;
1028         ret->gcount=msgs_gcount;
1029         ret->userFlag=msgs_userFlag;
1030         ret->callback=msgs_callback;
1031         ret->secondaryCallback = msgs_secondaryCallback;
1032         ret->sourceFlag=msgs_nSources;
1033
1034 #if DEBUGRED
1035         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
1036 #endif
1037         if (!ret->secondaryCallback.isInvalid())
1038             ret->secondaryCallback.send(ret);
1039     else if (!storedCallback.isInvalid())
1040             storedCallback.send(ret);
1041     else{
1042 #if DEBUGRED
1043             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
1044 #endif
1045             CkAbort("No reduction client!\n"
1046                     "You must register a client with either SetReductionClient or during contribute.\n");
1047     }
1048         completedRedNo++;
1049 #if DEBUGRED
1050        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
1051 #endif
1052         for (i=1;i<adjVec.length();i++)
1053                 adjVec[i-1]=adjVec[i];
1054         adjVec.length()--;
1055         endArrayReduction();
1056 }
1057
1058
1059 /////////////////////////////////////////////////////////////////////////
1060
1061 ////////////////////////////////
1062 //CkReductionMsg support
1063
1064 //ReductionMessage default private constructor-- does nothing
1065 CkReductionMsg::CkReductionMsg(){}
1066 CkReductionMsg::~CkReductionMsg(){}
1067
1068 //This define gives the distance from the start of the CkReductionMsg
1069 // object to the start of the user data area (just below last object field)
1070 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1071
1072 //"Constructor"-- builds and returns a new CkReductionMsg.
1073 //  the "data" array you specify will be copied into this object.
1074 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1075     CkReduction::reducerType reducer, CkReductionMsg *buf)
1076 {
1077   int len[1];
1078   len[0]=NdataSize;
1079   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1080   
1081   ret->dataSize=NdataSize;
1082   if (srcData!=NULL && !buf)
1083     memcpy(ret->data,srcData,NdataSize);
1084   ret->userFlag=-1;
1085   ret->reducer=reducer;
1086   ret->ci=NULL;
1087   ret->sourceFlag=-1000;
1088   ret->gcount=0;
1089   ret->migratableContributor = true;
1090   return ret;
1091 }
1092
1093 // Charm kernel message runtime support:
1094 void *
1095 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1096 {
1097   int totalsize=ARM_DATASTART+(*sz);
1098   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1099   CkReductionMsg *ret = (CkReductionMsg *)
1100     CkAllocMsg(msgnum,totalsize,priobits);
1101   ret->data=(void *)(&ret->dataStorage);
1102   return (void *) ret;
1103 }
1104
1105 void *
1106 CkReductionMsg::pack(CkReductionMsg* in)
1107 {
1108   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1109   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1110   in->data = NULL;
1111   return (void*) in;
1112 }
1113
1114 CkReductionMsg* CkReductionMsg::unpack(void *in)
1115 {
1116   CkReductionMsg *ret = (CkReductionMsg *)in;
1117   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1118   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1119   ret->data=(void *)(&ret->dataStorage);
1120   return ret;
1121 }
1122
1123
1124 /////////////////////////////////////////////////////////////////////////////////////
1125 ///////////////// Builtin Reducer Functions //////////////
1126 /* A simple reducer, like sum_int, looks like this:
1127 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1128 {
1129   int i,ret=0;
1130   for (i=0;i<nMsg;i++)
1131     ret+=*(int *)(msg[i]->getData());
1132   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1133 }
1134
1135 To keep the code small and easy to change, the implementations below
1136 are built with preprocessor macros.
1137 */
1138
1139 //////////////// simple reducers ///////////////////
1140 /*A define used to quickly and tersely construct simple reductions.
1141 The basic idea is to use the first message's data array as
1142 (pre-initialized!) scratch space for folding in the other messages.
1143  */
1144
1145 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1146 {
1147         CkAbort("Called the invalid reducer type 0.  This probably\n"
1148                 "means you forgot to initialize your custom reducer index.\n");
1149         return NULL;
1150 }
1151
1152 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1153 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1154 {\
1155   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1156   int m,i;\
1157   int nElem=msg[0]->getLength()/sizeof(dataType);\
1158   dataType *ret=(dataType *)(msg[0]->getData());\
1159   for (m=1;m<nMsg;m++)\
1160   {\
1161     dataType *value=(dataType *)(msg[m]->getData());\
1162     for (i=0;i<nElem;i++)\
1163     {\
1164       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1165       loop\
1166     }\
1167   }\
1168   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1169   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1170 }
1171
1172 //Use this macro for reductions that have the same type for all inputs
1173 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1174   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1175   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1176   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1177
1178
1179 //Compute the sum the numbers passed by each element.
1180 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1181
1182 //Compute the product of the numbers passed by each element.
1183 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1184
1185 //Compute the largest number passed by any element.
1186 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1187
1188 //Compute the smallest integer passed by any element.
1189 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1190
1191
1192 //Compute the logical AND of the integers passed by each element.
1193 // The resulting integer will be zero if any source integer is zero; else 1.
1194 SIMPLE_REDUCTION(logical_and,int,"%d",
1195         if (value[i]==0)
1196      ret[i]=0;
1197   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1198 )
1199
1200 //Compute the logical OR of the integers passed by each element.
1201 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1202 SIMPLE_REDUCTION(logical_or,int,"%d",
1203   if (value[i]!=0)
1204            ret[i]=1;
1205   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1206 )
1207
1208 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1209 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1210
1211 //Select one random message to pass on
1212 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1213   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1214 }
1215
1216 /////////////// concat ////////////////
1217 /*
1218 This reducer simply appends the data it recieves from each element,
1219 without any housekeeping data to separate them.
1220 */
1221 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1222 {
1223   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1224   //Figure out how big a message we'll need
1225   int i,retSize=0;
1226   for (i=0;i<nMsg;i++)
1227       retSize+=msg[i]->getSize();
1228
1229   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1230
1231   //Allocate a new message
1232   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1233
1234   //Copy the source message data into the return message
1235   char *cur=(char *)(ret->getData());
1236   for (i=0;i<nMsg;i++) {
1237     int messageBytes=msg[i]->getSize();
1238     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1239     cur+=messageBytes;
1240   }
1241   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1242   return ret;
1243 }
1244
1245 /////////////// set ////////////////
1246 /*
1247 This reducer appends the data it recieves from each element
1248 along with some housekeeping data indicating contribution boundaries.
1249 The message data is thus a list of reduction_set_element structures
1250 terminated by a dummy reduction_set_element with a sourceElement of -1.
1251 */
1252
1253 //This rounds an integer up to the nearest multiple of sizeof(double)
1254 static const int alignSize=sizeof(double);
1255 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1256
1257 //This gives the size (in bytes) of a reduction_set_element
1258 static int SET_SIZE(int dataSize)
1259 {return SET_ALIGN(sizeof(int)+dataSize);}
1260
1261 //This returns a pointer to the next reduction_set_element in the list
1262 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1263 {
1264   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1265   return (CkReduction::setElement *)next;
1266 }
1267
1268 //Combine the data passed by each element into an list of reduction_set_elements.
1269 // Each element may contribute arbitrary data (with arbitrary length).
1270 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1271 {
1272   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1273   //Figure out how big a message we'll need
1274   int i,retSize=0;
1275   for (i=0;i<nMsg;i++) {
1276     if (!msg[i]->isFromUser())
1277     //This message is composite-- it will just be copied over (less terminating -1)
1278       retSize+=(msg[i]->getSize()-sizeof(int));
1279     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1280       retSize+=SET_SIZE(msg[i]->getSize());
1281   }
1282   retSize+=sizeof(int);//Leave room for terminating -1.
1283
1284   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1285
1286   //Allocate a new message
1287   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1288
1289   //Copy the source message data into the return message
1290   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1291   for (i=0;i<nMsg;i++)
1292     if (!msg[i]->isFromUser())
1293     {//This message is composite-- just copy it over (less terminating -1)
1294                         int messageBytes=msg[i]->getSize()-sizeof(int);
1295                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1296                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1297                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1298     }
1299     else //This is a message from an element-- wrap it in a reduction_set_element
1300     {
1301       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1302       cur->dataSize=msg[i]->getSize();
1303       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1304       cur=SET_NEXT(cur);
1305     }
1306   cur->dataSize=-1;//Add a terminating -1.
1307   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1308   return ret;
1309 }
1310
1311 //Utility routine: get the next reduction_set_element in the list
1312 // if there is one, or return NULL if there are none.
1313 //To get all the elements, just keep feeding this procedure's output back to
1314 // its input until it returns NULL.
1315 CkReduction::setElement *CkReduction::setElement::next(void)
1316 {
1317   CkReduction::setElement *n=SET_NEXT(this);
1318   if (n->dataSize==-1)
1319     return NULL;//This is the end of the list
1320   else
1321     return n;//This is just another element
1322 }
1323
1324 /////////////////// Reduction Function Table /////////////////////
1325 CkReduction::CkReduction() {} //Dummy private constructor
1326
1327 //Add the given reducer to the list.  Returns the new reducer's
1328 // reducerType.  Must be called in the same order on every node.
1329 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1330 {
1331   reducerTable[nReducers]=fn;
1332   return (reducerType)nReducers++;
1333 }
1334
1335 /*Reducer table: maps reducerTypes to reducerFns.
1336 It's indexed by reducerType, so the order in this table
1337 must *exactly* match the reducerType enum declaration.
1338 The names don't have to match, but it helps.
1339 */
1340 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1341
1342 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1343     ::invalid_reducer,
1344   //Compute the sum the numbers passed by each element.
1345     ::sum_int,::sum_float,::sum_double,
1346
1347   //Compute the product the numbers passed by each element.
1348     ::product_int,::product_float,::product_double,
1349
1350   //Compute the largest number passed by any element.
1351     ::max_int,::max_float,::max_double,
1352
1353   //Compute the smallest number passed by any element.
1354     ::min_int,::min_float,::min_double,
1355
1356   //Compute the logical AND of the integers passed by each element.
1357   // The resulting integer will be zero if any source integer is zero.
1358     ::logical_and,
1359
1360   //Compute the logical OR of the integers passed by each element.
1361   // The resulting integer will be 1 if any source integer is nonzero.
1362     ::logical_or,
1363
1364     // Compute the logical bitvector AND of the integers passed by each element.
1365     ::bitvec_and,
1366
1367     // Compute the logical bitvector OR of the integers passed by each element.
1368     ::bitvec_or,
1369
1370     // Select one of the messages at random to pass on
1371     ::random,
1372
1373   //Concatenate the (arbitrary) data passed by each element
1374     ::concat,
1375
1376   //Combine the data passed by each element into an list of setElements.
1377   // Each element may contribute arbitrary data (with arbitrary length).
1378     ::set
1379 };
1380
1381
1382
1383
1384
1385
1386
1387
1388 /********** Code added by Sayantan *********************/
1389 /** Locking is a big problem in the nodegroup code for smp.
1390  So a few assumptions have had to be made. There is one lock
1391  called lockEverything. It protects all the data structures 
1392  of the nodegroup reduction mgr. I tried grabbing it separately 
1393  for each datastructure, modifying it and then releasing it and
1394  then grabbing it again, for the next change.
1395  That doesn't really help because the interleaved execution of 
1396  different threads makes the state of the reduction manager 
1397  inconsistent. 
1398  
1399  1. Grab lockEverything before calling finishreduction or startReduction
1400     or doRecvMsg
1401  2. lockEverything is grabbed only in entry methods reductionStarting
1402     or RecvMesg or  addcontribution.
1403  ****/
1404  
1405 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1406 NodeGroup::NodeGroup(void) {
1407   __nodelock=CmiCreateLock();
1408 #ifdef _FAULT_MLOG_
1409     mlogData->objID.type = TypeNodeGroup;
1410     mlogData->objID.data.group.onPE = CkMyNode();
1411 #endif
1412
1413 }
1414 NodeGroup::~NodeGroup() {
1415   CmiDestroyLock(__nodelock);
1416 }
1417 void NodeGroup::pup(PUP::er &p)
1418 {
1419   CkNodeReductionMgr::pup(p);
1420   p|reductionInfo;
1421 }
1422
1423 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1424
1425 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1426   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1427  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1428   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1429  }
1430
1431 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1432                                     ((CkNodeReductionMgr *)this),
1433                                     reductionInfo,false);
1434
1435 /* this contribute also adds up the count across all messages it receives.
1436   Useful for summing up number of array elements who have contributed ****/ 
1437 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1438         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1439
1440
1441
1442 //#define BINOMIAL_TREE
1443
1444 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1445   : thisProxy(thisgroup)
1446 {
1447 #ifdef BINOMIAL_TREE
1448   init_BinomialTree();
1449 #else
1450   init_BinaryTree();
1451 #endif
1452   storedCallback=NULL;
1453   redNo=0;
1454   inProgress=CmiFalse;
1455   
1456   startRequested=CmiFalse;
1457   gcount=CkNumNodes();
1458   lcount=1;
1459   nContrib=nRemote=0;
1460   lockEverything = CmiCreateLock();
1461
1462
1463   creating=CmiFalse;
1464   interrupt = 0;
1465   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1466         /*
1467                 FAULT_EVAC
1468         */
1469         blocked = false;
1470         maxModificationRedNo = INT_MAX;
1471         killed=0;
1472         additionalGCount = newAdditionalGCount = 0;
1473 }
1474
1475 void CkNodeReductionMgr::flushStates()
1476 {
1477  if(CkMyRank() == 0){
1478   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1479   redNo=0;
1480   inProgress=CmiFalse;
1481
1482   startRequested=CmiFalse;
1483   gcount=CkNumNodes();
1484   lcount=1;
1485   nContrib=nRemote=0;
1486
1487   creating=CmiFalse;
1488   interrupt = 0;
1489   while (!msgs.isEmpty()) { delete msgs.deq(); }
1490   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1491   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1492   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1493   }
1494 }
1495
1496 //////////// Reduction Manager Client API /////////////
1497
1498 //Add the given client function.  Overwrites any previous client.
1499 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1500 {
1501   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1502   if(cb->isInvalid()){
1503         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1504   }else{
1505         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1506   }
1507
1508   if (CkMyNode()!=0)
1509           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1510   delete storedCallback;
1511   storedCallback=cb;
1512 }
1513
1514
1515
1516 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1517 {
1518 #ifdef _FAULT_MLOG_
1519     Chare *oldObj =CpvAccess(_currentObj);
1520     CpvAccess(_currentObj) = this;
1521 #endif
1522
1523   m->ci=ci;
1524   m->redNo=ci->redNo++;
1525   m->sourceFlag=-1;//A single contribution
1526   m->gcount=0;
1527 #if DEBUGRED
1528         CkPrintf("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo);
1529 #endif
1530   addContribution(m);
1531
1532 #ifdef _FAULT_MLOG_
1533     CpvAccess(_currentObj) = oldObj;
1534 #endif
1535 }
1536
1537
1538 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1539 {
1540 #ifdef _FAULT_MLOG_
1541     Chare *oldObj =CpvAccess(_currentObj);
1542     CpvAccess(_currentObj) = this;
1543 #endif
1544   m->ci=ci;
1545   m->redNo=ci->redNo++;
1546   m->gcount=count;
1547 #if DEBUGRED
1548  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1549 #endif
1550   addContribution(m);
1551 #if DEBUGRED
1552   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1553 #endif
1554
1555 #ifdef _FAULT_MLOG_
1556     CpvAccess(_currentObj) = oldObj;
1557 #endif
1558 }
1559
1560
1561 //////////// Reduction Manager Remote Entry Points /////////////
1562
1563 //Sent down the reduction tree (used by barren PEs)
1564 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1565 {
1566   CmiLock(lockEverything);
1567   if(blocked){
1568         delete m;
1569         CmiUnlock(lockEverything);
1570         return ;
1571   }
1572   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1573   if (isPresent(m->num) && !inProgress)
1574   {
1575     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1576     startReduction(m->num,srcNode);
1577     finishReduction();
1578   } else if (isFuture(m->num)){
1579         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1580   }
1581   else //is Past
1582     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1583   CmiUnlock(lockEverything);
1584   delete m;
1585
1586 }
1587
1588
1589 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1590 #if DEBUGRED
1591         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1592 #endif
1593         /*
1594                 FAULT_EVAC
1595         */
1596         if(blocked){
1597                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1598                 bufferedRemoteMsgs.enq(m);
1599                 return;
1600         }
1601         
1602         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1603             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1604             startReduction(m->redNo,CkMyNode());
1605             msgs.enq(m);
1606             nRemote++;
1607             finishReduction();
1608         }
1609         else {
1610             if (isFuture(m->redNo)) {
1611                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1612                     futureRemoteMsgs.enq(m);
1613             }else{
1614                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1615                    CkAbort("Recv'd late remote contribution!\n");
1616             }
1617   }
1618 #if DEBUGRED        
1619        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1620 #endif       
1621 }
1622
1623 //Sent up the reduction tree with reduced data
1624 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1625 {
1626 #ifndef CMK_CPV_IS_SMP
1627 #if CMK_IMMEDIATE_MSG
1628         if(interrupt == 1){
1629                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1630                 CpvAccess(_qd)->process(-1);
1631                 CmiDelayImmediate();
1632                 return;
1633         }
1634 #endif  
1635 #endif
1636    interrupt = 1;       
1637    CmiLock(lockEverything);   
1638 #if DEBUGRED   
1639    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1640 #endif   
1641    doRecvMsg(m);
1642    CmiUnlock(lockEverything);    
1643    interrupt = 0;
1644 #if DEBUGRED  
1645    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1646 #endif 
1647 }
1648
1649 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1650 {
1651         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1652         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1653         if (inProgress){
1654                 DEBR((AA"This Node reduction is already in progress\n"AB));
1655                 return;//This reduction already started
1656         }
1657         if (creating) //Don't start yet-- we're creating elements
1658         {
1659                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1660                 startRequested=CmiTrue;
1661                 return;
1662         }
1663         
1664         //If none of these cases, we need to start the reduction--
1665         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1666         inProgress=CmiTrue;
1667         //Sent start requests to our kids (in case they don't already know)
1668
1669         for (int k=0;k<treeKids();k++)
1670         {
1671 #ifdef BINOMIAL_TREE
1672                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1673                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1674 #else
1675                 if(kids[k] != srcNode){
1676                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1677                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1678                 }       
1679 #endif
1680         }
1681
1682         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1683         // in case, nodegroup does not has the attached red group,
1684         // it has to restart groups again
1685         startLocalGroupReductions(number);
1686 /*
1687         if (startLocalGroupReductions(number) == 0)
1688           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1689 */
1690 }
1691
1692 // restart local groups until succeed
1693 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1694   CmiLock(lockEverything);    
1695   if (startLocalGroupReductions(number) == 0)
1696     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1697   CmiUnlock(lockEverything);    
1698 }
1699
1700 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1701         /*
1702                 FAULT_EVAC
1703         */
1704         if(blocked){
1705                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1706                 bufferedMsgs.enq(m);
1707                 return;
1708         }
1709         
1710         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1711                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1712                 futureMsgs.enq(m);
1713         } else {// An ordinary contribution
1714                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1715                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1716                 startReduction(m->redNo,CkMyNode());
1717                 msgs.enq(m);
1718                 nContrib++;
1719                 finishReduction();
1720         }
1721 }
1722
1723 //Handle a message from one element for the reduction
1724 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1725 {
1726   interrupt = 1;
1727   CmiLock(lockEverything);
1728   doAddContribution(m);
1729   CmiUnlock(lockEverything);
1730   interrupt = 0;
1731 }
1732
1733 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1734         if(blocked){
1735                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1736                 bufferedMsgs.enq(m);
1737                 return;
1738         }
1739         
1740         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1741                 DEBR((AA"Latemigrant %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1742 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1743                 futureLateMigrantMsgs.enq(m);
1744         } else {// An ordinary contribution
1745                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1746 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1747                 msgs.enq(m);
1748                 finishReduction();
1749         }
1750 };
1751
1752
1753
1754
1755
1756 /** check if the nodegroup reduction is finished at this node. In that case send it
1757 up the reduction tree **/
1758
1759 void CkNodeReductionMgr::finishReduction(void)
1760 {
1761   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1762   /***Check if reduction is finished in the next few ifs***/
1763   if ((!inProgress) || creating){
1764         DEBR((AA"Either not in Progress or creating\n"AB));
1765         return;
1766   }
1767
1768   if (nContrib<(lcount)){
1769         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1770
1771          return;//Need more local messages
1772   }
1773   if (nRemote<treeKids()){
1774         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1775
1776         return;//Need more remote messages
1777   }
1778   if (nRemote>treeKids()){
1779
1780           interrupt = 0;
1781            CkAbort("Nodegrp Excess remote reduction message received!\n");
1782   }
1783
1784   DEBR((AA"Reducing node data...\n"AB));
1785
1786   /**reduce all messages received at this node **/
1787   CkReductionMsg *result=reduceMessages();
1788
1789   if (hasParent())
1790   {//Pass data up tree to parent
1791         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1792         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1793 #if DEBUGRED
1794         CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
1795 #endif
1796                 /*
1797                         FAULT_EVAC
1798                 */
1799                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1800             thisProxy[treeParent()].RecvMsg(result);
1801         }
1802
1803   }
1804   else
1805   {
1806                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1807 #if DEBUGRED
1808                         CkPrintf("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor());
1809 #endif                  
1810                         msgs.enq(result);
1811                         return;
1812                 }
1813                 result->gcount += additionalGCount;
1814           /** if the reduction is finished and I am the root of the reduction tree
1815           then call the reductionhandler and other stuff ***/
1816                 
1817
1818 #if DEBUGRED
1819    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
1820 #endif
1821     if (!result->callback.isInvalid()){
1822 #if DEBUGRED
1823             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
1824 #endif      
1825             result->callback.send(result);
1826     }
1827     else if (storedCallback!=NULL){
1828 #if DEBUGRED
1829             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
1830 #endif
1831             storedCallback->send(result);
1832     }
1833     else{
1834                 DEBR((AA"Invalid Callback \n"AB));
1835             CkAbort("No reduction client!\n"
1836                     "You must register a client with either SetReductionClient or during contribute.\n");
1837                 }
1838   }
1839
1840   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1841   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1842   redNo++;
1843         updateTree();
1844   int i;
1845   inProgress=CmiFalse;
1846   startRequested=CmiFalse;
1847   nRemote=nContrib=0;
1848
1849   //Look through the future queue for messages we can now handle
1850   int n=futureMsgs.length();
1851
1852   for (i=0;i<n;i++)
1853   {
1854     interrupt = 1;
1855
1856     CkReductionMsg *m=futureMsgs.deq();
1857
1858     interrupt = 0;
1859     if (m!=NULL){ //One of these addContributions may have finished us.
1860 #if DEBUGRED
1861                 CkPrintf("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1862 #endif
1863       doAddContribution(m);//<- if *still* early, puts it back in the queue
1864     }
1865   }
1866
1867   interrupt = 1;
1868
1869   n=futureRemoteMsgs.length();
1870
1871   interrupt = 0;
1872   for (i=0;i<n;i++)
1873   {
1874     interrupt = 1;
1875
1876     CkReductionMsg *m=futureRemoteMsgs.deq();
1877
1878     interrupt = 0;
1879     if (m!=NULL)
1880       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1881   }
1882   
1883   n = futureLateMigrantMsgs.length();
1884   for(i=0;i<n;i++){
1885     CkReductionMsg *m = futureLateMigrantMsgs.deq();
1886     if(m != NULL){
1887       if(m->redNo == redNo){
1888         msgs.enq(m);
1889       }else{
1890         futureLateMigrantMsgs.enq(m);
1891       }
1892     }
1893   }
1894 }
1895
1896 //////////// Reduction Manager Utilities /////////////
1897
1898 void CkNodeReductionMgr::init_BinaryTree(){
1899         parent = (CkMyNode()-1)/TREE_WID;
1900         int firstkid = CkMyNode()*TREE_WID+1;
1901         numKids=CkNumNodes()-firstkid;
1902   if (numKids>TREE_WID) numKids=TREE_WID;
1903   if (numKids<0) numKids=0;
1904
1905         for(int i=0;i<numKids;i++){
1906                 kids.push_back(firstkid+i);
1907                 newKids.push_back(firstkid+i);
1908         }
1909 }
1910
1911 void CkNodeReductionMgr::init_BinomialTree(){
1912         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
1913         /*upperSize = (unsigned )pow((double)2,depth);*/
1914         upperSize = (unsigned) 1 << depth;
1915         label = upperSize-CkMyNode()-1;
1916         int p=label;
1917         int count=0;
1918         while( p > 0){
1919                 if(p % 2 == 0)
1920                         break;
1921                 else{
1922                         p = p/2;
1923                         count++;
1924                 }
1925         }
1926         /*parent = label + rint(pow((double)2,count));*/
1927         parent = label + (1<<count);
1928         parent = upperSize -1 -parent;
1929         int temp;
1930         if(count != 0){
1931                 numKids = 0;
1932                 for(int i=0;i<count;i++){
1933                         /*temp = label - rint(pow((double)2,i));*/
1934                         temp = label - (1<<i);
1935                         temp = upperSize-1-temp;
1936                         if(temp <= CkNumNodes()-1){
1937                 //              kids[numKids] = temp;
1938                                 kids.push_back(temp);
1939                                 numKids++;
1940                         }
1941                 }
1942         }else{
1943                 numKids = 0;
1944         //      kids = NULL;
1945         }
1946 }
1947
1948
1949 int CkNodeReductionMgr::treeRoot(void)
1950 {
1951   return 0;
1952 }
1953 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
1954 {
1955   return (CmiBool)(CkMyNode()!=treeRoot());
1956 }
1957 int CkNodeReductionMgr::treeParent(void) //My parent Node
1958 {
1959 #ifdef BINOMIAL_TREE
1960         return parent;
1961 #else
1962   return parent;
1963 #endif
1964 }
1965
1966 int CkNodeReductionMgr::firstKid(void) //My first child Node
1967 {
1968   return CkMyNode()*TREE_WID+1;
1969 }
1970 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
1971 {
1972 #ifdef BINOMIAL_TREE
1973         return numKids;
1974 #else
1975 /*  int nKids=CkNumNodes()-firstKid();
1976   if (nKids>TREE_WID) nKids=TREE_WID;
1977   if (nKids<0) nKids=0;
1978   return nKids;*/
1979         return numKids;
1980 #endif
1981 }
1982
1983 //Combine (& free) the current message vector msgs.
1984 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
1985 {
1986   CkReductionMsg *ret=NULL;
1987
1988   //Look through the vector for a valid reducer, swapping out placeholder messages
1989   CkReduction::reducerType r=CkReduction::invalid;
1990   int msgs_gcount=0;//Reduced gcount
1991   int msgs_nSources=0;//Reduced nSources
1992   int msgs_userFlag=-1;
1993   CkCallback msgs_callback;
1994   CkCallback msgs_secondaryCallback;
1995   int i;
1996   int nMsgs=0;
1997   CkReductionMsg *m;
1998   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
1999         bool isMigratableContributor;
2000         
2001
2002   while(NULL!=(m=msgs.deq()))
2003   {
2004     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2005     msgs_gcount+=m->gcount;
2006     if (m->sourceFlag!=0)
2007     { //This is a real message from an element, not just a placeholder
2008       msgArr[nMsgs++]=m;
2009       msgs_nSources+=m->nSources();
2010       r=m->reducer;
2011       if (!m->callback.isInvalid())
2012         msgs_callback=m->callback;
2013       if(!m->secondaryCallback.isInvalid()){
2014         msgs_secondaryCallback = m->secondaryCallback;
2015       }
2016       if (m->userFlag!=-1)
2017         msgs_userFlag=m->userFlag;
2018
2019                         isMigratableContributor= m->isMigratableContributor();
2020                                 
2021     }
2022     else
2023     { //This is just a placeholder message-- replace it
2024       delete m;
2025     }
2026   }
2027
2028   if (nMsgs==0||r==CkReduction::invalid)
2029   //No valid reducer in the whole vector
2030     ret=CkReductionMsg::buildNew(0,NULL);
2031   else
2032   {//Use the reducer to reduce the messages
2033                 if(nMsgs == 1){
2034                         ret = msgArr[0];
2035                 }else{
2036             CkReduction::reducerFn f=CkReduction::reducerTable[r];
2037           ret=(*f)(nMsgs,msgArr);
2038                 }
2039     ret->reducer=r;
2040   }
2041
2042         
2043 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2044         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2045         PathHistory &path = UsrToEnv(ret)->pathHistory;
2046         // Combine the critical paths from all the reduction messages into the header for the new result
2047         for (i=0;i<nMsgs;i++){
2048           if (msgArr[i]!=ret){
2049             CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2050             path.updateMax(UsrToEnv(msgArr[i])->pathHistory);
2051           } else {
2052             CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2053           }
2054         }
2055         // Also consider the path which got us into this entry method
2056         CkPrintf("[%d] this path = %lf\n", CkMyPe(), currentlyExecutingMsg->pathHistory.getTime() );
2057         path.updateMax(currentlyExecutingMsg->pathHistory);
2058         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2059 #endif
2060
2061
2062
2063         //Go back through the vector, deleting old messages
2064   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2065   delete [] msgArr;
2066   //Set the message counts
2067   ret->redNo=redNo;
2068   ret->gcount=msgs_gcount;
2069   ret->userFlag=msgs_userFlag;
2070   ret->callback=msgs_callback;
2071   ret->secondaryCallback = msgs_secondaryCallback;
2072   ret->sourceFlag=msgs_nSources;
2073         ret->setMigratableContributor(isMigratableContributor);
2074   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2075
2076   return ret;
2077 }
2078
2079 void CkNodeReductionMgr::pup(PUP::er &p)
2080 {
2081 //We do not store the client function pointer or the client function parameter,
2082 //it is the responsibility of the programmer to correctly restore these
2083   IrrGroup::pup(p);
2084   p(redNo);
2085   p(inProgress); p(creating); p(startRequested);
2086   p(lcount);
2087   p(nContrib); p(nRemote);
2088   p(interrupt);
2089   p|msgs;
2090   p|futureMsgs;
2091   p|futureRemoteMsgs;
2092   p|futureLateMigrantMsgs;
2093   p|parent;
2094   p|additionalGCount;
2095   p|newAdditionalGCount;
2096   if(p.isUnpacking()) {
2097     gcount=CkNumNodes();
2098     thisProxy = thisgroup;
2099     lockEverything = CmiCreateLock();
2100 #ifdef BINOMIAL_TREE
2101     init_BinomialTree();
2102 #else
2103     init_BinaryTree();
2104 #endif          
2105   }
2106   p | blocked;
2107   p | maxModificationRedNo;
2108
2109   int isnull = (storedCallback == NULL);
2110   p | isnull;
2111   if (!isnull) {
2112     if (p.isUnpacking()) {
2113       storedCallback = new CkCallback;
2114     }
2115     p|*storedCallback;
2116   }
2117 }
2118
2119 /*
2120         FAULT_EVAC
2121         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2122         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2123         reduction tree. 
2124 */
2125 void CkNodeReductionMgr::evacuate(){
2126         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2127         if(treeKids() == 0){
2128         /*
2129                 if the node going down is a leaf
2130         */
2131                 oldleaf=true;
2132                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2133                 /*
2134                         Need to ask parent for the reduction number that it has seen. 
2135                         Since it is a leaf, the tree does not need to be rewired. 
2136                         We reuse the oldparent type of tree modification message to get 
2137                         the parent to block and tell us about the highest reduction number it has seen.
2138                         
2139                 */
2140                 int data[2];
2141                 data[0]=CkMyNode();
2142                 data[1]=getTotalGCount()+additionalGCount;
2143                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2144                 newParent = treeParent();
2145         }else{
2146                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2147                 oldleaf= false;
2148         /*
2149                 It is not a leaf. It needs to rewire the tree around itself.
2150                 It also needs to decide on a reduction No after which the new tree will be used
2151                 Till it decides on the new tree and the redNo at which it becomes valid,
2152                 all received messages will be buffered
2153         */
2154                 newParent = kids[0];
2155                 for(int i=numKids-1;i>=0;i--){
2156                         newKids.remove(i);
2157                 }
2158                 /*
2159                         Ask everybody for the highest reduction number they have seen and
2160                         also tell them about the new tree
2161                 */
2162                 /*
2163                         Tell parent about its new child;
2164                 */
2165                 int oldParentData[2];
2166                 oldParentData[0] = CkMyNode();
2167                 oldParentData[1] = newParent;
2168                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2169
2170                 /*
2171                         Tell the other children about their new parent
2172                 */
2173                 int childrenData=newParent;
2174                 for(int i=1;i<numKids;i++){
2175                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2176                 }
2177                 
2178                 /*
2179                         Tell newParent (1st child) about its new children,
2180                         the current node and its children except the newParent
2181                 */
2182                 int *newParentData = new int[numKids+2];
2183                 for(int i=1;i<numKids;i++){
2184                         newParentData[i] = kids[i];
2185                 }
2186                 newParentData[0] = CkMyNode();
2187                 newParentData[numKids] = parent;
2188                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2189                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2190         }
2191         readyDeletion = false;
2192         blocked = true;
2193         numModificationReplies = 0;
2194         tempModificationRedNo = findMaxRedNo();
2195 }
2196
2197 /*
2198         Depending on the code, use the data to change the tree
2199         1. OLDPARENT : replace the old child with a new one
2200         2. OLDCHILDREN: replace the parent
2201         3. NEWPARENT:  add the children and change the parent
2202         4. LEAFPARENT: delete the old child
2203 */
2204
2205 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2206         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2207         int sender;
2208         newKids = kids;
2209         readyDeletion = false;
2210         newAdditionalGCount = additionalGCount;
2211         switch(code){
2212                 case OLDPARENT: 
2213                         for(int i=0;i<numKids;i++){
2214                                 if(newKids[i] == data[0]){
2215                                         newKids[i] = data[1];
2216                                         break;
2217                                 }
2218                         }
2219                         sender = data[0];
2220                         newParent = parent;
2221                         break;
2222                 case OLDCHILDREN:
2223                         newParent = data[0];
2224                         sender = parent;
2225                         break;
2226                 case NEWPARENT:
2227                         for(int i=0;i<size-2;i++){
2228                                 newKids.push_back(data[i]);
2229                         }
2230                         newParent = data[size-2];
2231                         newAdditionalGCount += data[size-1];
2232                         sender = parent;
2233                         break;
2234                 case LEAFPARENT:
2235                         for(int i=0;i<numKids;i++){
2236                                 if(newKids[i] == data[0]){
2237                                         newKids.remove(i);
2238                                         break;
2239                                 }
2240                         }
2241                         sender = data[0];
2242                         newParent = parent;
2243                         newAdditionalGCount += data[1];
2244                         break;
2245         };
2246         blocked = true;
2247         int maxRedNo = findMaxRedNo();
2248         
2249         thisProxy[sender].collectMaxRedNo(maxRedNo);
2250 }
2251
2252 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2253         /*
2254                 Find out the maximum redNo that has been seen by 
2255                 the affected nodes
2256         */
2257         numModificationReplies++;
2258         if(maxRedNo > tempModificationRedNo){
2259                 tempModificationRedNo = maxRedNo;
2260         }
2261         if(numModificationReplies == numKids+1){
2262                 maxModificationRedNo = tempModificationRedNo;
2263                 /*
2264                         when all the affected nodes have replied, tell them the maximum.
2265                         Unblock yourself. deal with the buffered messages local and remote
2266                 */
2267                 if(maxModificationRedNo == -1){
2268                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2269                 }else{
2270                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2271                 }
2272                 thisProxy[parent].unblockNode(maxModificationRedNo);
2273                 for(int i=0;i<numKids;i++){
2274                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2275                 }
2276                 blocked = false;
2277                 updateTree();
2278                 clearBlockedMsgs();
2279         }
2280 };
2281
2282 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2283         maxModificationRedNo = maxRedNo;
2284         updateTree();
2285         blocked = false;
2286         clearBlockedMsgs();
2287 }
2288
2289
2290 void CkNodeReductionMgr::clearBlockedMsgs(){
2291         int len = bufferedMsgs.length();
2292         for(int i=0;i<len;i++){
2293                 CkReductionMsg *m = bufferedMsgs.deq();
2294                 doAddContribution(m);
2295         }
2296         len = bufferedRemoteMsgs.length();
2297         for(int i=0;i<len;i++){
2298                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2299                 doRecvMsg(m);
2300         }
2301
2302 }
2303 /*
2304         if the reduction number exceeds the maxModificationRedNo, change the tree
2305         to become the new one
2306 */
2307
2308 void CkNodeReductionMgr::updateTree(){
2309         if(redNo > maxModificationRedNo){
2310                 parent = newParent;
2311                 kids = newKids;
2312                 maxModificationRedNo = INT_MAX;
2313                 numKids = kids.size();
2314                 readyDeletion = true;
2315                 additionalGCount = newAdditionalGCount;
2316                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2317                 for(int i=0;i<newKids.size();i++){
2318                         DEBREVAC(("%d ",newKids[i]));
2319                 }
2320                 DEBREVAC(("\n"));
2321         //      startReduction(redNo,CkMyNode());
2322         }else{
2323                 if(maxModificationRedNo != INT_MAX){
2324                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2325                         startReduction(redNo,CkMyNode());
2326                         finishReduction();
2327                 }       
2328         }
2329 }
2330
2331
2332 void CkNodeReductionMgr::doneEvacuate(){
2333         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2334 /*      if(oldleaf){
2335                 
2336                         It used to be a leaf
2337                         Then as soon as future messages have been emptied you can 
2338                         send the parent a message telling them that they are not going
2339                         to receive anymore messages from this child
2340                 
2341                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2342                 while(futureMsgs.length() != 0){
2343                         int n = futureMsgs.length();
2344                         for(int i=0;i<n;i++){
2345                                 CkReductionMsg *m = futureMsgs.deq();
2346                                 if(isPresent(m->redNo)){
2347                                         msgs.enq(m);
2348                                 }else{
2349                                         futureMsgs.enq(m);
2350                                 }
2351                         }
2352                         CkReductionMsg *result = reduceMessages();
2353                         thisProxy[treeParent()].RecvMsg(result);
2354                         redNo++;
2355                 }
2356                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2357                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2358         }else{*/
2359                 if(readyDeletion){
2360                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2361                 }else{
2362                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2363                 }
2364 //      }
2365 }
2366
2367 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2368         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2369         for(int i=0;i<numKids;i++){
2370                 if(kids[i] == deletedChild){
2371                         kids.remove(i);
2372                         break;
2373                 }
2374         }
2375         numKids = kids.length();
2376         finishReduction();
2377 }
2378
2379 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2380         for(int i=0;i<newKids.length();i++){
2381                 if(newKids[i] == deletedChild){
2382                         newKids.remove(i);
2383                         break;
2384                 }
2385         }
2386         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2387         for(int i=0;i<newKids.size();i++){
2388                 DEBREVAC(("%d ",newKids[i]));
2389         }
2390         DEBREVAC(("\n"));
2391         finishReduction();
2392 }
2393
2394 int CkNodeReductionMgr::findMaxRedNo(){
2395         int max = redNo;
2396         for(int i=0;i<futureRemoteMsgs.length();i++){
2397                 if(futureRemoteMsgs[i]->redNo  > max){
2398                         max = futureRemoteMsgs[i]->redNo;
2399                 }
2400         }
2401         /*
2402                 if redNo is max (that is no future message) and the current reduction has not started
2403                 then tree can be changed before the reduction redNo can be started
2404         */ 
2405         if(redNo == max && msgs.length() == 0){
2406                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2407                 max--;
2408         }
2409         return max;
2410 }
2411
2412 #include "CkReduction.def.h"