A few bug fixes related to the mlogft machine.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #if 0
54 //Debugging messages:
55 // Reduction mananger internal information:
56 #define DEBUGRED 1
57 #define DEBR(x) CkPrintf x
58 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
59 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
60
61 #define DEBN(x) CkPrintf x
62 #define AAN "Red Node%d "
63 #define ABN ,CkMyNode()
64
65 // For status and data messages from the builtin reducer functions.
66 #define RED_DEB(x) //CkPrintf x
67 #define DEBREVAC(x) CkPrintf x
68 #else
69 //No debugging info-- empty defines
70 #define DEBUGRED 0
71 #define DEBR(x) // CkPrintf x
72 #define DEBRMLOG(x) CkPrintf x
73 #define AA
74 #define AB
75 #define DEBN(x) //CkPrintf x
76 #define RED_DEB(x) //CkPrintf x
77 #define DEBREVAC(x) //CkPrintf x
78 #endif
79
80 #ifndef INT_MAX
81 #define INT_MAX 2147483647
82 #endif
83
84 extern int _inrestart;
85
86 Group::Group()
87 {
88         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
89
90         creatingContributors();
91         contributorStamped(&reductionInfo);
92         contributorCreated(&reductionInfo);
93         doneCreatingContributors();
94 #if DEBUGRED
95         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
96 #endif                  
97         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
98         nodeProxy = nodetemp;
99 }
100
101 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
102 {
103         creatingContributors();
104         contributorStamped(&reductionInfo);
105         contributorCreated(&reductionInfo);
106         doneCreatingContributors();
107 }
108
109 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
110                                     ((CkReductionMgr *)this),
111                                     reductionInfo,false);
112 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid));
113
114
115
116 CkGroupInitCallback::CkGroupInitCallback(void) {}
117 /*
118 The callback is just used to tell the caller that this group
119 has been constructed.  (Now they can safely call CkLocalBranch)
120 */
121 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
122 {
123   m->call();
124   delete m;
125 }
126
127 /*
128 The callback is just used to tell the caller that this group
129 is constructed and ready to process other calls.
130 */
131 CkGroupReadyCallback::CkGroupReadyCallback(void)
132 {
133   _isReady = 0;
134 }
135 void
136 CkGroupReadyCallback::callBuffered(void)
137 {
138   int n = _msgs.length();
139   for(int i=0;i<n;i++)
140   {
141     CkGroupCallbackMsg *msg = _msgs.deq();
142     msg->call();
143     delete msg;
144   }
145 }
146 void
147 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
148 {
149   if(_isReady) {
150     msg->call();
151     delete msg;
152   } else {
153     _msgs.enq(msg);
154   }
155 }
156
157 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
158         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
159 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
160 {
161         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
162         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
163         b->fn(b->param,m->getSize(),m->getData());
164         delete m;
165 }
166
167 ///////////////// Reduction Manager //////////////////
168 /*
169 One CkReductionMgr runs a non-overlapping set of reductions.
170 It collects messages from all local contributors, then sends
171 the reduced message up the reduction tree to node zero, where
172 they're passed to the user's client function.
173 */
174
175 CkReductionMgr::CkReductionMgr()//Constructor
176   : thisProxy(thisgroup)
177
178   redNo=0;
179   completedRedNo = -1;
180   inProgress=CmiFalse;
181   creating=CmiFalse;
182   startRequested=CmiFalse;
183   gcount=lcount=0;
184   nContrib=nRemote=0;
185   maxStartRequest=0;
186 #ifdef _FAULT_MLOG_
187     if(CkMyPe() != 0){
188         perProcessorCounts = NULL;
189     }else{
190         perProcessorCounts = new int[CmiNumPes()];
191         for(int i=0;i<CmiNumPes();i++){
192             perProcessorCounts[i] = -1;
193         }
194     }
195     totalCount = 0;
196     processorCount = 0;
197 #endif
198   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
199 }
200
201 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
202 {
203   redNo=0;
204   completedRedNo = -1;
205   inProgress=CmiFalse;
206   creating=CmiFalse;
207   startRequested=CmiFalse;
208   gcount=lcount=0;
209   nContrib=nRemote=0;
210   maxStartRequest=0;
211   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
212 }
213
214 void CkReductionMgr::flushStates()
215 {
216   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
217   redNo=0;
218   completedRedNo = -1;
219   inProgress=CmiFalse;
220   creating=CmiFalse;
221   gcount=lcount=0;
222   startRequested=CmiFalse;
223   nContrib=nRemote=0;
224   maxStartRequest=0;
225
226   while (!msgs.isEmpty()) { delete msgs.deq(); }
227   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
228   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
229   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
230
231   adjVec.length()=0;
232
233   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
234 }
235
236 //////////// Reduction Manager Client API /////////////
237
238 //Add the given client function.  Overwrites any previous client.
239 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
240 {
241   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
242
243   if (CkMyPe()!=0)
244           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
245   storedCallback=*cb;
246   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
247   nodeProxy.ckSetReductionClient(callback);
248 }
249
250 ///////////////////////////// Contributor ////////////////////////
251 //Contributors keep a copy of this structure:
252
253 /*Contributor migration support:
254 */
255 void contributorInfo::pup(PUP::er &p)
256 {
257   p(redNo);
258 }
259
260 ////////////////////// Contributor list maintainance: /////////////////
261 //These just set and clear the "creating" flag to prevent
262 // reductions from finishing early because not all elements
263 // have been created.
264 void CkReductionMgr::creatingContributors(void)
265 {
266   DEBR((AA"Creating contributors...\n"AB));
267   creating=CmiTrue;
268 }
269 void CkReductionMgr::doneCreatingContributors(void)
270 {
271   DEBR((AA"Done creating contributors...\n"AB));
272   creating=CmiFalse;
273   if (startRequested) startReduction(redNo,CkMyPe());
274   finishReduction();
275 }
276
277 //A new contributor will be created
278 void CkReductionMgr::contributorStamped(contributorInfo *ci)
279 {
280   DEBR((AA"Contributor %p stamped\n"AB,ci));
281   //There is another contributor
282   gcount++;
283   if (inProgress)
284   {
285     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
286     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
287   } else
288     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
289 }
290
291 //A new contributor was actually created
292 void CkReductionMgr::contributorCreated(contributorInfo *ci)
293 {
294   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
295   //We've got another contributor
296   lcount++;
297   //He may not need to contribute to some of our reductions:
298   for (int r=redNo;r<ci->redNo;r++)
299     adj(r).lcount--;//He won't be contributing to r here
300 }
301
302 /*Don't expect any more contributions from this one.
303 This is rather horrifying because we now have to make
304 sure the global element count accurately reflects all the
305 contributions the element made before it died-- these may stretch
306 far into the future.  The adj() vector is what saves us here.
307 */
308 void CkReductionMgr::contributorDied(contributorInfo *ci)
309 {
310 #if CMK_MEM_CHECKPOINT
311   // ignore from listener if it is during restart from crash
312   if (CkInRestarting()) return;
313 #endif
314   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
315   //We lost a contributor
316   gcount--;
317
318   if (ci->redNo<redNo)
319   {//Must have been migrating during reductions-- root is waiting for his
320   // contribution, which will never come.
321     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
322     for (int r=ci->redNo;r<redNo;r++)
323       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
324   }
325
326   //Add to the global count for all his future messages (wherever they are)
327   int r;
328   for (r=redNo;r<ci->redNo;r++)
329   {//He already contributed to this reduction, but won't show up in global count.
330     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
331     adj(r).gcount++;
332   }
333
334   lcount--;
335   //He's already contributed to several reductions here
336   for (r=redNo;r<ci->redNo;r++)
337     adj(r).lcount++;//He'll be contributing to r here
338
339   finishReduction();
340 }
341
342 //Migrating away (note that global count doesn't change)
343 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
344 {
345   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
346   lcount--;//We lost a local
347   //He's already contributed to several reductions here
348   for (int r=redNo;r<ci->redNo;r++)
349     adj(r).lcount++;//He'll be contributing to r here
350
351   finishReduction();
352 }
353
354 //Migrating in (note that global count doesn't change)
355 void CkReductionMgr::contributorArriving(contributorInfo *ci)
356 {
357   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
358   lcount++;//We gained a local
359 #if CMK_MEM_CHECKPOINT
360   // ignore from listener if it is during restart from crash
361   // because the ci may be old.
362   if (CkInRestarting()) return;
363 #endif
364   //He has already contributed (elsewhere) to several reductions:
365   for (int r=redNo;r<ci->redNo;r++)
366     adj(r).lcount--;//He won't be contributing to r here
367
368 }
369
370 //Contribute-- the given msg can contain any data.  The reducerType
371 // field of the message must be valid.
372 // Each contributor must contribute exactly once to the each reduction.
373 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
374 {
375   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
376   m->ci=ci;
377   m->redNo=ci->redNo++;
378   m->sourceFlag=-1;//A single contribution
379   m->gcount=0;
380
381 #ifdef _FAULT_MLOG_
382     if(lcount == 0){
383         m->sourceProcessorCount = 1;
384     }else{
385         m->sourceProcessorCount = lcount;
386     }
387     m->fromPE = CmiMyPe();
388     Chare *oldObj =CpvAccess(_currentObj);
389     char currentObjName[100];
390     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
391     thisProxy[0].contributeViaMessage(m);
392 #else
393   addContribution(m);
394 #endif
395 }
396
397 #ifdef _FAULT_MLOG_
398 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
399     CmiAssert(CmiMyPe() == 0);
400     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
401     if(redNo == 0){
402         if(perProcessorCounts[m->fromPE] == -1){
403             processorCount++;
404             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
405             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
406         }else{
407             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
408                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
409                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
410             }
411         }
412         if(processorCount == CmiNumPes()){
413             totalCount = 0;
414             for(int i=0;i<CmiNumPes();i++){
415                 CmiAssert(perProcessorCounts[i] != -1);
416                 totalCount += perProcessorCounts[i];
417             }
418             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
419         }
420         if(m->sourceProcessorCount == 0){
421             if(processorCount == CmiNumPes()){
422                 finishReduction();
423             }
424             return;
425         }
426     }
427     addContribution(m);
428 }
429 #else
430 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
431 #endif
432
433 //////////// Reduction Manager Remote Entry Points /////////////
434 //Sent down the reduction tree (used by barren PEs)
435 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
436 {
437  if(CkMyPe()==0){
438         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
439         //delete m;
440         //return;
441  }
442  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
443  int srcPE = (UsrToEnv(m))->getSrcPe();
444 #ifndef _FAULT_MLOG_ 
445   if (isPresent(m->num) && !inProgress)
446   {
447     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
448     startReduction(m->num,srcPE);
449     finishReduction();
450   } else if (isFuture(m->num)){
451 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
452           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
453           if(maxStartRequest < m->num){
454                   maxStartRequest = m->num;
455           }
456  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
457           
458     }
459   else //is Past
460     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
461   delete m;
462 #else
463     if(redNo == 0){
464         if(lcount == 0){
465             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
466             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
467             dummy->fromPE = CmiMyPe();
468             dummy->sourceProcessorCount = 0;
469             dummy->redNo = 0;
470             thisProxy[0].contributeViaMessage(dummy);
471         }
472     }
473 #endif
474 }
475
476 //Sent to root of reduction tree with reduction contribution
477 // of migrants that missed the main reduction.
478 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
479 {
480         m->secondaryCallback = m->callback;
481   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
482   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
483   nodeMgr->LateMigrantMsg(m);
484 /*      int len = finalMsgs.length();
485         finalMsgs.enq(m);
486 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
487         endArrayReduction();*/
488 }
489
490 //A late migrating contributor will never contribute to this reduction
491 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
492 {
493   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
494   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
495  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
496   adj(m->num).gcount--;//He won't be contributing to this one.
497   finishReduction();
498   delete m;
499 }
500
501 //////////// Reduction Manager State /////////////
502 void CkReductionMgr::startReduction(int number,int srcPE)
503 {
504   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
505   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
506   if (inProgress){
507         DEBR((AA"This reduction is already in progress\n"AB));
508         return;//This reduction already started
509   }
510   if (creating) //Don't start yet-- we're creating elements
511   {
512     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
513     startRequested=CmiTrue;
514     return;
515   }
516
517 //If none of these cases, we need to start the reduction--
518   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
519   inProgress=CmiTrue;
520   //Sent start requests to our kids (in case they don't already know)
521  
522
523         /*
524                 FAULT_EVAC
525         */
526   if(!CmiNodeAlive(CkMyPe())){
527         return;
528   }
529
530 #ifdef _FAULT_MLOG_ 
531   if(CmiMyPe() == 0 && redNo == 0){
532             for(int j=0;j<CkNumPes();j++){
533                 if(j != CkMyPe() && j != srcPE){
534                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
535                 }
536             }
537             if(lcount == 0){
538                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
539                 dummy->fromPE = CmiMyPe();
540                 dummy->sourceProcessorCount = 0;
541                 dummy->redNo = 0;
542                 thisProxy[0].contributeViaMessage(dummy);
543             }
544     }   else{
545         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
546     }
547
548
549 #else
550         nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
551 #endif
552         
553   int temp;
554         /*
555   //making it a broadcast done only by PE 0
556   if(!hasParent()){
557                 temp = completedRedNo+1;
558                 for(int i=temp;i<=number;i++){
559                         for(int j=0;j<CkNumPes();j++){
560                                 if(j != CkMyPe() && j != srcPE){
561                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
562                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
563                                         }
564                                 }
565                         }
566                 }       
567         }       else{
568                 temp = number;
569         }*/
570 /*  if(!hasParent()){
571                 temp = completedRedNo+1;
572         }       else{
573                 temp = number;
574         }
575         for(int i=temp;i<=number;i++){
576         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
577                 if(hasParent()){
578           // kick-start your parent too ...
579                         if(treeParent() != srcPE){
580                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
581 #ifdef _FAULT_MLOG_
582     CpvAccess(_currentObj) = oldObj;
583 #endif
584                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
585                                 }       
586                         }       
587                 }
588           for (int k=0;k<treeKids();k++)
589           {
590                         if(firstKid()+k != srcPE){
591                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
592                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
593                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
594                                 }       
595                         }       
596         }
597         }
598         */
599 }       
600
601 /*Handle a message from one element for the reduction*/
602 void CkReductionMgr::addContribution(CkReductionMsg *m)
603 {
604   if (isPast(m->redNo))
605   {
606 #ifdef _FAULT_MLOG_
607         CmiAbort("this version should not have late migrations");
608 #else
609         //We've moved on-- forward late contribution straight to root
610     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
611         // if (!hasParent()) //Root moved on too soon-- should never happen
612         //   CkAbort("Late reduction contribution received at root!\n");
613     thisProxy[0].LateMigrantMsg(m);
614 #endif
615   }
616   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
617     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
618     futureMsgs.enq(m);
619   } else {// An ordinary contribution
620     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
621    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
622     startReduction(m->redNo,CkMyPe());
623     msgs.enq(m);
624     nContrib++;
625     finishReduction();
626   }
627 }
628
629 /**function checks if it has got all contributions that it is supposed to
630 get at this processor. If it is done it sends the reduced result to the local
631 nodegroup */
632 void CkReductionMgr::finishReduction(void)
633 {
634   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
635   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
636   if ((!inProgress) || creating){
637         DEBR((AA"Either not in Progress or creating\n"AB));
638         return;
639   }
640   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
641 #ifndef _FAULT_MLOG_   
642
643   if (nContrib<(lcount+adj(redNo).lcount)){
644         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
645          return;//Need more local messages
646   }
647  
648 #else
649
650     if(CkMyPe() != 0){
651         if(redNo != 0){
652             return;
653         }else{
654             CmiAssert(lcount == 0);
655             CkReductionMsg *dummy = reduceMessages();
656             dummy->fromPE = CmiMyPe();
657             dummy->sourceProcessorCount = 0;
658             thisProxy[0].contributeViaMessage(dummy);
659             return;
660         }
661     }
662     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
663         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
664          return;//Need more local messages
665   }else{
666         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
667     }
668
669
670 #endif
671
672   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
673   CkReductionMsg *result=reduceMessages();
674   result->redNo=redNo;
675 #ifndef _FAULT_MLOG_
676
677   result->gcount+=gcount+adj(redNo).gcount;
678
679   result->secondaryCallback = result->callback;
680   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
681         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
682
683   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
684
685 #if DEBUGRED
686  // CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
687 #endif
688   
689   // Find our node reduction manager, and pass reduction to him:
690   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
691   nodeMgr->contributeArrayReduction(result);
692 #else
693 #if DEBUGRED
694     CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer());
695 #endif
696     if (!result->callback.isInvalid())
697         result->callback.send(result);
698     else if (!storedCallback.isInvalid())
699         storedCallback.send(result);
700     else{
701 #if DEBUGRED
702         CkPrintf("No reduction client for group %d \n",thisgroup.idx);
703 #endif
704         CkAbort("No reduction client!\n"
705             "You must register a client with either SetReductionClient or during contribute.\n");
706     }
707 #if DEBUGRED
708        CkPrintf("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer());
709 #endif
710
711 #endif
712
713   //House Keeping Operations will have to check later what needs to be changed
714   redNo++;
715   //Shift the count adjustment vector down one slot (to match new redNo)
716   int i;
717 #ifndef _FAULT_MLOG_
718         if(CkMyPe()!=0){
719 #else
720     {
721 #endif
722 //      int i;
723         completedRedNo++;
724         for (i=1;i<(int)(adjVec.length());i++){
725            adjVec[i-1]=adjVec[i];
726         }
727         adjVec.length()--;  
728   }
729
730   inProgress=CmiFalse;
731   startRequested=CmiFalse;
732   nRemote=nContrib=0;
733
734   //Look through the future queue for messages we can now handle
735   int n=futureMsgs.length();
736   for (i=0;i<n;i++)
737   {
738     CkReductionMsg *m=futureMsgs.deq();
739     if (m!=NULL) //One of these addContributions may have finished us.
740       addContribution(m);//<- if *still* early, puts it back in the queue
741   }
742 #ifndef _FAULT_MLOG_   
743
744   if(maxStartRequest >= redNo){
745           startReduction(redNo,CkMyPe());
746           finishReduction();
747   }
748  
749 #endif
750 }
751
752 //////////// Reduction Manager Utilities /////////////
753
754 //Return the countAdjustment struct for the given redNo:
755 countAdjustment &CkReductionMgr::adj(int number)
756 {
757   number-=completedRedNo;
758   number--;
759   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
760   //Pad the adjustment vector with zeros until it's at least number long
761   while ((int)(adjVec.length())<=number)
762     adjVec.push_back(countAdjustment());
763   return adjVec[number];
764 }
765
766 //Combine (& free) the current message vector msgs.
767 CkReductionMsg *CkReductionMgr::reduceMessages(void)
768 {
769   CkReductionMsg *ret=NULL;
770
771   //Look through the vector for a valid reducer, swapping out placeholder messages
772   CkReduction::reducerType r=CkReduction::invalid;
773   int msgs_gcount=0;//Reduced gcount
774   int msgs_nSources=0;//Reduced nSources
775   int msgs_userFlag=-1;
776   CkCallback msgs_callback;
777   int i;
778   int nMsgs=0;
779   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
780   CkReductionMsg *m;
781         bool isMigratableContributor;
782
783   // Copy message queue into msgArr, skipping placeholders:
784   while (NULL!=(m=msgs.deq()))
785   {
786     msgs_gcount+=m->gcount;
787     if (m->sourceFlag!=0)
788     { //This is a real message from an element, not just a placeholder
789       msgArr[nMsgs++]=m;
790       msgs_nSources+=m->nSources();
791       r=m->reducer;
792       if (!m->callback.isInvalid())
793         msgs_callback=m->callback;
794       if (m->userFlag!=-1)
795         msgs_userFlag=m->userFlag;
796                         
797                         isMigratableContributor=m->isMigratableContributor();
798     }
799     else
800     { //This is just a placeholder message-- forget it
801       delete m;
802     }
803   }
804
805   if (nMsgs==0||r==CkReduction::invalid)
806   //No valid reducer in the whole vector
807     ret=CkReductionMsg::buildNew(0,NULL);
808   else
809   {//Use the reducer to reduce the messages
810                 //if there is only one msg to be reduced just return that message
811                 if(nMsgs == 1){
812                         ret = msgArr[0];        
813                 }else{
814             CkReduction::reducerFn f=CkReduction::reducerTable[r];
815           ret=(*f)(nMsgs,msgArr);
816                 }
817     ret->reducer=r;
818   }
819
820
821
822 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
823   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
824   PathHistory &path = UsrToEnv(ret)->pathHistory;
825   // Combine the critical paths from all the reduction messages into the header for the new result
826   for (i=0;i<nMsgs;i++){
827     if (msgArr[i]!=ret){
828       CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
829       path.updateMax(UsrToEnv(msgArr[i])->pathHistory);
830     }
831   }
832   
833   // Also consider the path which got us into this entry method
834   CkPrintf("[%d] this path = %lf\n", CkMyPe(), currentlyExecutingMsg->pathHistory.getTime() );
835   path.updateMax(currentlyExecutingMsg->pathHistory);
836   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
837   
838 #endif
839
840
841
842
843
844
845
846
847
848
849   
850
851
852         //Go back through the vector, deleting old messages
853   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
854         delete [] msgArr;
855
856   //Set the message counts
857   ret->redNo=redNo;
858   ret->gcount=msgs_gcount;
859   ret->userFlag=msgs_userFlag;
860   ret->callback=msgs_callback;
861   ret->sourceFlag=msgs_nSources;
862         ret->setMigratableContributor(isMigratableContributor);
863   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
864
865   return ret;
866 }
867
868
869 //Checkpointing utilities
870 //pack-unpack method for CkReductionMsg
871 //if packing pack the message and then unpack and return it
872 //if unpacking allocate memory for it read it off disk and then unapck
873 //and return it
874 void CkReductionMgr::pup(PUP::er &p)
875 {
876 //We do not store the client function pointer or the client function parameter,
877 //it is the responsibility of the programmer to correctly restore these
878   CkGroupInitCallback::pup(p);
879   p(redNo);
880   p(completedRedNo);
881   p(inProgress); p(creating); p(startRequested);
882   p(nContrib); p(nRemote);
883   p|msgs;
884   p|futureMsgs;
885   p|futureRemoteMsgs;
886   p|finalMsgs;
887   p|adjVec;
888   p|nodeProxy;
889   p|storedCallback;
890
891   // subtle --- Gengbin
892   // Group : CkReductionMgr
893   // CkArray: CkReductionMgr
894   // lcount/gcount in Group is set in Group constructor
895   // lcount/gcount in CkArray is not, it is set when array elements are created
896   // we can not pup because inserting array elems will add the counters again
897 //  p|lcount;
898 //  p|gcount;
899 #ifdef _FAULT_MLOG_ 
900 //  p|lcount;
901 //  //  p|gcount;
902 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
903 #endif
904   if(p.isUnpacking()){
905     thisProxy = thisgroup;
906     maxStartRequest=0;
907   }
908 #if DEBUGRED
909   CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
910 #endif
911 }
912
913
914 //Callback for doing Reduction through NodeGroups added by Sayantan
915
916 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
917
918         int total = m->gcount+adj(m->redNo).gcount;
919         finalMsgs.enq(m);
920         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
921         adj(m->redNo).mainRecvd = 1;
922 #if DEBUGRED
923         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
924 #endif  
925         endArrayReduction();
926 }
927
928 void CkReductionMgr :: endArrayReduction(){
929         CkReductionMsg *ret=NULL;
930         int nMsgs=finalMsgs.length();
931         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
932         //Look through the vector for a valid reducer, swapping out placeholder messages
933         //CkPrintf("Length of Final Message %d \n",nMsgs);
934         CkReduction::reducerType r=CkReduction::invalid;
935         int msgs_gcount=0;//Reduced gcount
936         int msgs_nSources=0;//Reduced nSources
937         int msgs_userFlag=-1;
938         CkCallback msgs_callback;
939         CkCallback msgs_secondaryCallback;
940         CkVec<CkReductionMsg *> tempMsgs;
941         int i;
942         int numMsgs = 0;
943         for (i=0;i<nMsgs;i++)
944         {
945                 CkReductionMsg *m=finalMsgs.deq();
946                 if(m->redNo == completedRedNo +1){
947                         msgs_gcount+=m->gcount;
948                         if (m->sourceFlag!=0)
949                         { //This is a real message from an element, not just a placeholder
950                                 msgs_nSources+=m->nSources();
951                                 r=m->reducer;
952                                 if (!m->callback.isInvalid())
953                                 msgs_callback=m->callback;
954                                 if(!m->secondaryCallback.isInvalid())
955                                         msgs_secondaryCallback = m->secondaryCallback;
956                                 if (m->userFlag!=-1)
957                                         msgs_userFlag=m->userFlag;
958                                 tempMsgs.push_back(m);
959                         }
960                 }else{
961                         finalMsgs.enq(m);
962                 }
963
964         }
965         numMsgs = tempMsgs.length();
966 #if DEBUGRED
967         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
968 #endif  
969         if(numMsgs == 0){
970                 return;
971         }
972         if(adj(completedRedNo+1).mainRecvd == 0){
973                 for(i=0;i<numMsgs;i++){
974                         finalMsgs.enq(tempMsgs[i]);
975                 }
976                 return;
977         }
978
979 /*
980         NOT NEEDED ANYMORE DONE at nodegroup level
981         if(msgs_gcount  > msgs_nSources){
982                 for(i=0;i<numMsgs;i++){
983                         finalMsgs.enq(tempMsgs[i]);
984                 }
985                 return;
986         }*/
987
988         if (nMsgs==0||r==CkReduction::invalid)
989                 //No valid reducer in the whole vector
990                 ret=CkReductionMsg::buildNew(0,NULL);
991         else{//Use the reducer to reduce the messages
992                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
993                 // has to be corrected elements from above need to be put into a temporary vector
994                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
995                 ret=(*f)(numMsgs,msgArr);
996                 ret->reducer=r;
997
998         }
999
1000         
1001 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1002         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1003         PathHistory &path = UsrToEnv(ret)->pathHistory;
1004         // Combine the critical paths from all the reduction messages into the header for the new result
1005         for (i=0;i<numMsgs;i++){
1006           if (tempMsgs[i]!=ret){
1007             CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1008             path.updateMax(UsrToEnv(tempMsgs[i])->pathHistory);
1009           } else {
1010             CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1011           }
1012         }
1013         // Also consider the path which got us into this entry method
1014         CkPrintf("[%d] this path = %lf\n", CkMyPe(), currentlyExecutingMsg->pathHistory.getTime() );
1015         path.updateMax(currentlyExecutingMsg->pathHistory);
1016         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1017
1018 #endif
1019   
1020
1021
1022
1023
1024         for(i = 0;i<numMsgs;i++){
1025                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1026         }
1027
1028         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1029         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1030
1031         ret->redNo=completedRedNo+1;
1032         ret->gcount=msgs_gcount;
1033         ret->userFlag=msgs_userFlag;
1034         ret->callback=msgs_callback;
1035         ret->secondaryCallback = msgs_secondaryCallback;
1036         ret->sourceFlag=msgs_nSources;
1037
1038 #if DEBUGRED
1039         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
1040 #endif
1041         if (!ret->secondaryCallback.isInvalid())
1042             ret->secondaryCallback.send(ret);
1043     else if (!storedCallback.isInvalid())
1044             storedCallback.send(ret);
1045     else{
1046 #if DEBUGRED
1047             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
1048 #endif
1049             CkAbort("No reduction client!\n"
1050                     "You must register a client with either SetReductionClient or during contribute.\n");
1051     }
1052         completedRedNo++;
1053 #if DEBUGRED
1054        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
1055 #endif
1056         for (i=1;i<(int)(adjVec.length());i++)
1057                 adjVec[i-1]=adjVec[i];
1058         adjVec.length()--;
1059         endArrayReduction();
1060 }
1061
1062
1063 /////////////////////////////////////////////////////////////////////////
1064
1065 ////////////////////////////////
1066 //CkReductionMsg support
1067
1068 //ReductionMessage default private constructor-- does nothing
1069 CkReductionMsg::CkReductionMsg(){}
1070 CkReductionMsg::~CkReductionMsg(){}
1071
1072 //This define gives the distance from the start of the CkReductionMsg
1073 // object to the start of the user data area (just below last object field)
1074 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1075
1076 //"Constructor"-- builds and returns a new CkReductionMsg.
1077 //  the "data" array you specify will be copied into this object.
1078 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1079     CkReduction::reducerType reducer, CkReductionMsg *buf)
1080 {
1081   int len[1];
1082   len[0]=NdataSize;
1083   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1084   
1085   ret->dataSize=NdataSize;
1086   if (srcData!=NULL && !buf)
1087     memcpy(ret->data,srcData,NdataSize);
1088   ret->userFlag=-1;
1089   ret->reducer=reducer;
1090   ret->ci=NULL;
1091   ret->sourceFlag=-1000;
1092   ret->gcount=0;
1093   ret->migratableContributor = true;
1094   return ret;
1095 }
1096
1097 // Charm kernel message runtime support:
1098 void *
1099 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1100 {
1101   int totalsize=ARM_DATASTART+(*sz);
1102   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1103   CkReductionMsg *ret = (CkReductionMsg *)
1104     CkAllocMsg(msgnum,totalsize,priobits);
1105   ret->data=(void *)(&ret->dataStorage);
1106   return (void *) ret;
1107 }
1108
1109 void *
1110 CkReductionMsg::pack(CkReductionMsg* in)
1111 {
1112   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1113   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1114   in->data = NULL;
1115   return (void*) in;
1116 }
1117
1118 CkReductionMsg* CkReductionMsg::unpack(void *in)
1119 {
1120   CkReductionMsg *ret = (CkReductionMsg *)in;
1121   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1122   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1123   ret->data=(void *)(&ret->dataStorage);
1124   return ret;
1125 }
1126
1127
1128 /////////////////////////////////////////////////////////////////////////////////////
1129 ///////////////// Builtin Reducer Functions //////////////
1130 /* A simple reducer, like sum_int, looks like this:
1131 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1132 {
1133   int i,ret=0;
1134   for (i=0;i<nMsg;i++)
1135     ret+=*(int *)(msg[i]->getData());
1136   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1137 }
1138
1139 To keep the code small and easy to change, the implementations below
1140 are built with preprocessor macros.
1141 */
1142
1143 //////////////// simple reducers ///////////////////
1144 /*A define used to quickly and tersely construct simple reductions.
1145 The basic idea is to use the first message's data array as
1146 (pre-initialized!) scratch space for folding in the other messages.
1147  */
1148
1149 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1150 {
1151         CkAbort("Called the invalid reducer type 0.  This probably\n"
1152                 "means you forgot to initialize your custom reducer index.\n");
1153         return NULL;
1154 }
1155
1156 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1157 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1158 {\
1159   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1160   int m,i;\
1161   int nElem=msg[0]->getLength()/sizeof(dataType);\
1162   dataType *ret=(dataType *)(msg[0]->getData());\
1163   for (m=1;m<nMsg;m++)\
1164   {\
1165     dataType *value=(dataType *)(msg[m]->getData());\
1166     for (i=0;i<nElem;i++)\
1167     {\
1168       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1169       loop\
1170     }\
1171   }\
1172   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1173   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1174 }
1175
1176 //Use this macro for reductions that have the same type for all inputs
1177 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1178   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1179   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1180   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1181
1182
1183 //Compute the sum the numbers passed by each element.
1184 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1185
1186 //Compute the product of the numbers passed by each element.
1187 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1188
1189 //Compute the largest number passed by any element.
1190 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1191
1192 //Compute the smallest integer passed by any element.
1193 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1194
1195
1196 //Compute the logical AND of the integers passed by each element.
1197 // The resulting integer will be zero if any source integer is zero; else 1.
1198 SIMPLE_REDUCTION(logical_and,int,"%d",
1199         if (value[i]==0)
1200      ret[i]=0;
1201   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1202 )
1203
1204 //Compute the logical OR of the integers passed by each element.
1205 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1206 SIMPLE_REDUCTION(logical_or,int,"%d",
1207   if (value[i]!=0)
1208            ret[i]=1;
1209   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1210 )
1211
1212 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1213 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1214
1215 //Select one random message to pass on
1216 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1217   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1218 }
1219
1220 /////////////// concat ////////////////
1221 /*
1222 This reducer simply appends the data it recieves from each element,
1223 without any housekeeping data to separate them.
1224 */
1225 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1226 {
1227   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1228   //Figure out how big a message we'll need
1229   int i,retSize=0;
1230   for (i=0;i<nMsg;i++)
1231       retSize+=msg[i]->getSize();
1232
1233   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1234
1235   //Allocate a new message
1236   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1237
1238   //Copy the source message data into the return message
1239   char *cur=(char *)(ret->getData());
1240   for (i=0;i<nMsg;i++) {
1241     int messageBytes=msg[i]->getSize();
1242     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1243     cur+=messageBytes;
1244   }
1245   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1246   return ret;
1247 }
1248
1249 /////////////// set ////////////////
1250 /*
1251 This reducer appends the data it recieves from each element
1252 along with some housekeeping data indicating contribution boundaries.
1253 The message data is thus a list of reduction_set_element structures
1254 terminated by a dummy reduction_set_element with a sourceElement of -1.
1255 */
1256
1257 //This rounds an integer up to the nearest multiple of sizeof(double)
1258 static const int alignSize=sizeof(double);
1259 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1260
1261 //This gives the size (in bytes) of a reduction_set_element
1262 static int SET_SIZE(int dataSize)
1263 {return SET_ALIGN(sizeof(int)+dataSize);}
1264
1265 //This returns a pointer to the next reduction_set_element in the list
1266 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1267 {
1268   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1269   return (CkReduction::setElement *)next;
1270 }
1271
1272 //Combine the data passed by each element into an list of reduction_set_elements.
1273 // Each element may contribute arbitrary data (with arbitrary length).
1274 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1275 {
1276   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1277   //Figure out how big a message we'll need
1278   int i,retSize=0;
1279   for (i=0;i<nMsg;i++) {
1280     if (!msg[i]->isFromUser())
1281     //This message is composite-- it will just be copied over (less terminating -1)
1282       retSize+=(msg[i]->getSize()-sizeof(int));
1283     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1284       retSize+=SET_SIZE(msg[i]->getSize());
1285   }
1286   retSize+=sizeof(int);//Leave room for terminating -1.
1287
1288   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1289
1290   //Allocate a new message
1291   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1292
1293   //Copy the source message data into the return message
1294   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1295   for (i=0;i<nMsg;i++)
1296     if (!msg[i]->isFromUser())
1297     {//This message is composite-- just copy it over (less terminating -1)
1298                         int messageBytes=msg[i]->getSize()-sizeof(int);
1299                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1300                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1301                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1302     }
1303     else //This is a message from an element-- wrap it in a reduction_set_element
1304     {
1305       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1306       cur->dataSize=msg[i]->getSize();
1307       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1308       cur=SET_NEXT(cur);
1309     }
1310   cur->dataSize=-1;//Add a terminating -1.
1311   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1312   return ret;
1313 }
1314
1315 //Utility routine: get the next reduction_set_element in the list
1316 // if there is one, or return NULL if there are none.
1317 //To get all the elements, just keep feeding this procedure's output back to
1318 // its input until it returns NULL.
1319 CkReduction::setElement *CkReduction::setElement::next(void)
1320 {
1321   CkReduction::setElement *n=SET_NEXT(this);
1322   if (n->dataSize==-1)
1323     return NULL;//This is the end of the list
1324   else
1325     return n;//This is just another element
1326 }
1327
1328 /////////////////// Reduction Function Table /////////////////////
1329 CkReduction::CkReduction() {} //Dummy private constructor
1330
1331 //Add the given reducer to the list.  Returns the new reducer's
1332 // reducerType.  Must be called in the same order on every node.
1333 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1334 {
1335   reducerTable[nReducers]=fn;
1336   return (reducerType)nReducers++;
1337 }
1338
1339 /*Reducer table: maps reducerTypes to reducerFns.
1340 It's indexed by reducerType, so the order in this table
1341 must *exactly* match the reducerType enum declaration.
1342 The names don't have to match, but it helps.
1343 */
1344 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1345
1346 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1347     ::invalid_reducer,
1348   //Compute the sum the numbers passed by each element.
1349     ::sum_int,::sum_float,::sum_double,
1350
1351   //Compute the product the numbers passed by each element.
1352     ::product_int,::product_float,::product_double,
1353
1354   //Compute the largest number passed by any element.
1355     ::max_int,::max_float,::max_double,
1356
1357   //Compute the smallest number passed by any element.
1358     ::min_int,::min_float,::min_double,
1359
1360   //Compute the logical AND of the integers passed by each element.
1361   // The resulting integer will be zero if any source integer is zero.
1362     ::logical_and,
1363
1364   //Compute the logical OR of the integers passed by each element.
1365   // The resulting integer will be 1 if any source integer is nonzero.
1366     ::logical_or,
1367
1368     // Compute the logical bitvector AND of the integers passed by each element.
1369     ::bitvec_and,
1370
1371     // Compute the logical bitvector OR of the integers passed by each element.
1372     ::bitvec_or,
1373
1374     // Select one of the messages at random to pass on
1375     ::random,
1376
1377   //Concatenate the (arbitrary) data passed by each element
1378     ::concat,
1379
1380   //Combine the data passed by each element into an list of setElements.
1381   // Each element may contribute arbitrary data (with arbitrary length).
1382     ::set
1383 };
1384
1385
1386
1387
1388
1389
1390
1391
1392 /********** Code added by Sayantan *********************/
1393 /** Locking is a big problem in the nodegroup code for smp.
1394  So a few assumptions have had to be made. There is one lock
1395  called lockEverything. It protects all the data structures 
1396  of the nodegroup reduction mgr. I tried grabbing it separately 
1397  for each datastructure, modifying it and then releasing it and
1398  then grabbing it again, for the next change.
1399  That doesn't really help because the interleaved execution of 
1400  different threads makes the state of the reduction manager 
1401  inconsistent. 
1402  
1403  1. Grab lockEverything before calling finishreduction or startReduction
1404     or doRecvMsg
1405  2. lockEverything is grabbed only in entry methods reductionStarting
1406     or RecvMesg or  addcontribution.
1407  ****/
1408  
1409 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1410 NodeGroup::NodeGroup(void) {
1411   __nodelock=CmiCreateLock();
1412 #ifdef _FAULT_MLOG_
1413     mlogData->objID.type = TypeNodeGroup;
1414     mlogData->objID.data.group.onPE = CkMyNode();
1415 #endif
1416
1417 }
1418 NodeGroup::~NodeGroup() {
1419   CmiDestroyLock(__nodelock);
1420 }
1421 void NodeGroup::pup(PUP::er &p)
1422 {
1423   CkNodeReductionMgr::pup(p);
1424   p|reductionInfo;
1425 }
1426
1427 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1428
1429 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1430   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1431  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1432   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1433  }
1434
1435 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1436                                     ((CkNodeReductionMgr *)this),
1437                                     reductionInfo,false);
1438
1439 /* this contribute also adds up the count across all messages it receives.
1440   Useful for summing up number of array elements who have contributed ****/ 
1441 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1442         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1443
1444
1445
1446 //#define BINOMIAL_TREE
1447
1448 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1449   : thisProxy(thisgroup)
1450 {
1451 #ifdef BINOMIAL_TREE
1452   init_BinomialTree();
1453 #else
1454   init_BinaryTree();
1455 #endif
1456   storedCallback=NULL;
1457   redNo=0;
1458   inProgress=CmiFalse;
1459   
1460   startRequested=CmiFalse;
1461   gcount=CkNumNodes();
1462   lcount=1;
1463   nContrib=nRemote=0;
1464   lockEverything = CmiCreateLock();
1465
1466
1467   creating=CmiFalse;
1468   interrupt = 0;
1469   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1470         /*
1471                 FAULT_EVAC
1472         */
1473         blocked = false;
1474         maxModificationRedNo = INT_MAX;
1475         killed=0;
1476         additionalGCount = newAdditionalGCount = 0;
1477 }
1478
1479 void CkNodeReductionMgr::flushStates()
1480 {
1481  if(CkMyRank() == 0){
1482   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1483   redNo=0;
1484   inProgress=CmiFalse;
1485
1486   startRequested=CmiFalse;
1487   gcount=CkNumNodes();
1488   lcount=1;
1489   nContrib=nRemote=0;
1490
1491   creating=CmiFalse;
1492   interrupt = 0;
1493   while (!msgs.isEmpty()) { delete msgs.deq(); }
1494   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1495   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1496   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1497   }
1498 }
1499
1500 //////////// Reduction Manager Client API /////////////
1501
1502 //Add the given client function.  Overwrites any previous client.
1503 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1504 {
1505   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1506   if(cb->isInvalid()){
1507         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1508   }else{
1509         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1510   }
1511
1512   if (CkMyNode()!=0)
1513           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1514   delete storedCallback;
1515   storedCallback=cb;
1516 }
1517
1518
1519
1520 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1521 {
1522 #ifdef _FAULT_MLOG_
1523     Chare *oldObj =CpvAccess(_currentObj);
1524     CpvAccess(_currentObj) = this;
1525 #endif
1526
1527   m->ci=ci;
1528   m->redNo=ci->redNo++;
1529   m->sourceFlag=-1;//A single contribution
1530   m->gcount=0;
1531 #if DEBUGRED
1532         CkPrintf("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo);
1533 #endif
1534   addContribution(m);
1535
1536 #ifdef _FAULT_MLOG_
1537     CpvAccess(_currentObj) = oldObj;
1538 #endif
1539 }
1540
1541
1542 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1543 {
1544 #ifdef _FAULT_MLOG_
1545     Chare *oldObj =CpvAccess(_currentObj);
1546     CpvAccess(_currentObj) = this;
1547 #endif
1548   m->ci=ci;
1549   m->redNo=ci->redNo++;
1550   m->gcount=count;
1551 #if DEBUGRED
1552  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1553 #endif
1554   addContribution(m);
1555 #if DEBUGRED
1556   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1557 #endif
1558
1559 #ifdef _FAULT_MLOG_
1560     CpvAccess(_currentObj) = oldObj;
1561 #endif
1562 }
1563
1564
1565 //////////// Reduction Manager Remote Entry Points /////////////
1566
1567 //Sent down the reduction tree (used by barren PEs)
1568 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1569 {
1570   CmiLock(lockEverything);
1571   if(blocked){
1572         delete m;
1573         CmiUnlock(lockEverything);
1574         return ;
1575   }
1576   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1577   if (isPresent(m->num) && !inProgress)
1578   {
1579     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1580     startReduction(m->num,srcNode);
1581     finishReduction();
1582   } else if (isFuture(m->num)){
1583         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1584   }
1585   else //is Past
1586     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1587   CmiUnlock(lockEverything);
1588   delete m;
1589
1590 }
1591
1592
1593 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1594 #if DEBUGRED
1595         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1596 #endif
1597         /*
1598                 FAULT_EVAC
1599         */
1600         if(blocked){
1601                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1602                 bufferedRemoteMsgs.enq(m);
1603                 return;
1604         }
1605         
1606         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1607             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1608             startReduction(m->redNo,CkMyNode());
1609             msgs.enq(m);
1610             nRemote++;
1611             finishReduction();
1612         }
1613         else {
1614             if (isFuture(m->redNo)) {
1615                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1616                     futureRemoteMsgs.enq(m);
1617             }else{
1618                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1619                    CkAbort("Recv'd late remote contribution!\n");
1620             }
1621   }
1622 #if DEBUGRED        
1623        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1624 #endif       
1625 }
1626
1627 //Sent up the reduction tree with reduced data
1628 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1629 {
1630 #ifndef CMK_CPV_IS_SMP
1631 #if CMK_IMMEDIATE_MSG
1632         if(interrupt == 1){
1633                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1634                 CpvAccess(_qd)->process(-1);
1635                 CmiDelayImmediate();
1636                 return;
1637         }
1638 #endif  
1639 #endif
1640    interrupt = 1;       
1641    CmiLock(lockEverything);   
1642 #if DEBUGRED   
1643    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1644 #endif   
1645    doRecvMsg(m);
1646    CmiUnlock(lockEverything);    
1647    interrupt = 0;
1648 #if DEBUGRED  
1649    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1650 #endif 
1651 }
1652
1653 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1654 {
1655         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1656         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1657         if (inProgress){
1658                 DEBR((AA"This Node reduction is already in progress\n"AB));
1659                 return;//This reduction already started
1660         }
1661         if (creating) //Don't start yet-- we're creating elements
1662         {
1663                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1664                 startRequested=CmiTrue;
1665                 return;
1666         }
1667         
1668         //If none of these cases, we need to start the reduction--
1669         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1670         inProgress=CmiTrue;
1671         //Sent start requests to our kids (in case they don't already know)
1672
1673         for (int k=0;k<treeKids();k++)
1674         {
1675 #ifdef BINOMIAL_TREE
1676                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1677                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1678 #else
1679                 if(kids[k] != srcNode){
1680                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1681                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1682                 }       
1683 #endif
1684         }
1685
1686         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1687         // in case, nodegroup does not has the attached red group,
1688         // it has to restart groups again
1689         startLocalGroupReductions(number);
1690 /*
1691         if (startLocalGroupReductions(number) == 0)
1692           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1693 */
1694 }
1695
1696 // restart local groups until succeed
1697 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1698   CmiLock(lockEverything);    
1699   if (startLocalGroupReductions(number) == 0)
1700     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1701   CmiUnlock(lockEverything);    
1702 }
1703
1704 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1705         /*
1706                 FAULT_EVAC
1707         */
1708         if(blocked){
1709                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1710                 bufferedMsgs.enq(m);
1711                 return;
1712         }
1713         
1714         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1715                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1716                 futureMsgs.enq(m);
1717         } else {// An ordinary contribution
1718                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1719                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1720                 startReduction(m->redNo,CkMyNode());
1721                 msgs.enq(m);
1722                 nContrib++;
1723                 finishReduction();
1724         }
1725 }
1726
1727 //Handle a message from one element for the reduction
1728 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1729 {
1730   interrupt = 1;
1731   CmiLock(lockEverything);
1732   doAddContribution(m);
1733   CmiUnlock(lockEverything);
1734   interrupt = 0;
1735 }
1736
1737 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1738         if(blocked){
1739                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1740                 bufferedMsgs.enq(m);
1741                 return;
1742         }
1743         
1744         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1745                 DEBR((AA"Latemigrant %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1746 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1747                 futureLateMigrantMsgs.enq(m);
1748         } else {// An ordinary contribution
1749                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1750 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1751                 msgs.enq(m);
1752                 finishReduction();
1753         }
1754 };
1755
1756
1757
1758
1759
1760 /** check if the nodegroup reduction is finished at this node. In that case send it
1761 up the reduction tree **/
1762
1763 void CkNodeReductionMgr::finishReduction(void)
1764 {
1765   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1766   /***Check if reduction is finished in the next few ifs***/
1767   if ((!inProgress) || creating){
1768         DEBR((AA"Either not in Progress or creating\n"AB));
1769         return;
1770   }
1771
1772   if (nContrib<(lcount)){
1773         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1774
1775          return;//Need more local messages
1776   }
1777   if (nRemote<treeKids()){
1778         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1779
1780         return;//Need more remote messages
1781   }
1782   if (nRemote>treeKids()){
1783
1784           interrupt = 0;
1785            CkAbort("Nodegrp Excess remote reduction message received!\n");
1786   }
1787
1788   DEBR((AA"Reducing node data...\n"AB));
1789
1790   /**reduce all messages received at this node **/
1791   CkReductionMsg *result=reduceMessages();
1792
1793   if (hasParent())
1794   {//Pass data up tree to parent
1795         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1796         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1797 #if DEBUGRED
1798         CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
1799 #endif
1800                 /*
1801                         FAULT_EVAC
1802                 */
1803                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1804             thisProxy[treeParent()].RecvMsg(result);
1805         }
1806
1807   }
1808   else
1809   {
1810                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1811 #if DEBUGRED
1812                         CkPrintf("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor());
1813 #endif                  
1814                         msgs.enq(result);
1815                         return;
1816                 }
1817                 result->gcount += additionalGCount;
1818           /** if the reduction is finished and I am the root of the reduction tree
1819           then call the reductionhandler and other stuff ***/
1820                 
1821
1822 #if DEBUGRED
1823    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
1824 #endif
1825     if (!result->callback.isInvalid()){
1826 #if DEBUGRED
1827             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
1828 #endif      
1829             result->callback.send(result);
1830     }
1831     else if (storedCallback!=NULL){
1832 #if DEBUGRED
1833             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
1834 #endif
1835             storedCallback->send(result);
1836     }
1837     else{
1838                 DEBR((AA"Invalid Callback \n"AB));
1839             CkAbort("No reduction client!\n"
1840                     "You must register a client with either SetReductionClient or during contribute.\n");
1841                 }
1842   }
1843
1844   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1845   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1846   redNo++;
1847         updateTree();
1848   int i;
1849   inProgress=CmiFalse;
1850   startRequested=CmiFalse;
1851   nRemote=nContrib=0;
1852
1853   //Look through the future queue for messages we can now handle
1854   int n=futureMsgs.length();
1855
1856   for (i=0;i<n;i++)
1857   {
1858     interrupt = 1;
1859
1860     CkReductionMsg *m=futureMsgs.deq();
1861
1862     interrupt = 0;
1863     if (m!=NULL){ //One of these addContributions may have finished us.
1864 #if DEBUGRED
1865                 CkPrintf("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1866 #endif
1867       doAddContribution(m);//<- if *still* early, puts it back in the queue
1868     }
1869   }
1870
1871   interrupt = 1;
1872
1873   n=futureRemoteMsgs.length();
1874
1875   interrupt = 0;
1876   for (i=0;i<n;i++)
1877   {
1878     interrupt = 1;
1879
1880     CkReductionMsg *m=futureRemoteMsgs.deq();
1881
1882     interrupt = 0;
1883     if (m!=NULL)
1884       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1885   }
1886   
1887   n = futureLateMigrantMsgs.length();
1888   for(i=0;i<n;i++){
1889     CkReductionMsg *m = futureLateMigrantMsgs.deq();
1890     if(m != NULL){
1891       if(m->redNo == redNo){
1892         msgs.enq(m);
1893       }else{
1894         futureLateMigrantMsgs.enq(m);
1895       }
1896     }
1897   }
1898 }
1899
1900 //////////// Reduction Manager Utilities /////////////
1901
1902 void CkNodeReductionMgr::init_BinaryTree(){
1903         parent = (CkMyNode()-1)/TREE_WID;
1904         int firstkid = CkMyNode()*TREE_WID+1;
1905         numKids=CkNumNodes()-firstkid;
1906   if (numKids>TREE_WID) numKids=TREE_WID;
1907   if (numKids<0) numKids=0;
1908
1909         for(int i=0;i<numKids;i++){
1910                 kids.push_back(firstkid+i);
1911                 newKids.push_back(firstkid+i);
1912         }
1913 }
1914
1915 void CkNodeReductionMgr::init_BinomialTree(){
1916         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
1917         /*upperSize = (unsigned )pow((double)2,depth);*/
1918         upperSize = (unsigned) 1 << depth;
1919         label = upperSize-CkMyNode()-1;
1920         int p=label;
1921         int count=0;
1922         while( p > 0){
1923                 if(p % 2 == 0)
1924                         break;
1925                 else{
1926                         p = p/2;
1927                         count++;
1928                 }
1929         }
1930         /*parent = label + rint(pow((double)2,count));*/
1931         parent = label + (1<<count);
1932         parent = upperSize -1 -parent;
1933         int temp;
1934         if(count != 0){
1935                 numKids = 0;
1936                 for(int i=0;i<count;i++){
1937                         /*temp = label - rint(pow((double)2,i));*/
1938                         temp = label - (1<<i);
1939                         temp = upperSize-1-temp;
1940                         if(temp <= CkNumNodes()-1){
1941                 //              kids[numKids] = temp;
1942                                 kids.push_back(temp);
1943                                 numKids++;
1944                         }
1945                 }
1946         }else{
1947                 numKids = 0;
1948         //      kids = NULL;
1949         }
1950 }
1951
1952
1953 int CkNodeReductionMgr::treeRoot(void)
1954 {
1955   return 0;
1956 }
1957 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
1958 {
1959   return (CmiBool)(CkMyNode()!=treeRoot());
1960 }
1961 int CkNodeReductionMgr::treeParent(void) //My parent Node
1962 {
1963 #ifdef BINOMIAL_TREE
1964         return parent;
1965 #else
1966   return parent;
1967 #endif
1968 }
1969
1970 int CkNodeReductionMgr::firstKid(void) //My first child Node
1971 {
1972   return CkMyNode()*TREE_WID+1;
1973 }
1974 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
1975 {
1976 #ifdef BINOMIAL_TREE
1977         return numKids;
1978 #else
1979 /*  int nKids=CkNumNodes()-firstKid();
1980   if (nKids>TREE_WID) nKids=TREE_WID;
1981   if (nKids<0) nKids=0;
1982   return nKids;*/
1983         return numKids;
1984 #endif
1985 }
1986
1987 //Combine (& free) the current message vector msgs.
1988 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
1989 {
1990   CkReductionMsg *ret=NULL;
1991
1992   //Look through the vector for a valid reducer, swapping out placeholder messages
1993   CkReduction::reducerType r=CkReduction::invalid;
1994   int msgs_gcount=0;//Reduced gcount
1995   int msgs_nSources=0;//Reduced nSources
1996   int msgs_userFlag=-1;
1997   CkCallback msgs_callback;
1998   CkCallback msgs_secondaryCallback;
1999   int i;
2000   int nMsgs=0;
2001   CkReductionMsg *m;
2002   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2003         bool isMigratableContributor;
2004         
2005
2006   while(NULL!=(m=msgs.deq()))
2007   {
2008     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2009     msgs_gcount+=m->gcount;
2010     if (m->sourceFlag!=0)
2011     { //This is a real message from an element, not just a placeholder
2012       msgArr[nMsgs++]=m;
2013       msgs_nSources+=m->nSources();
2014       r=m->reducer;
2015       if (!m->callback.isInvalid())
2016         msgs_callback=m->callback;
2017       if(!m->secondaryCallback.isInvalid()){
2018         msgs_secondaryCallback = m->secondaryCallback;
2019       }
2020       if (m->userFlag!=-1)
2021         msgs_userFlag=m->userFlag;
2022
2023                         isMigratableContributor= m->isMigratableContributor();
2024                                 
2025     }
2026     else
2027     { //This is just a placeholder message-- replace it
2028       delete m;
2029     }
2030   }
2031
2032   if (nMsgs==0||r==CkReduction::invalid)
2033   //No valid reducer in the whole vector
2034     ret=CkReductionMsg::buildNew(0,NULL);
2035   else
2036   {//Use the reducer to reduce the messages
2037                 if(nMsgs == 1){
2038                         ret = msgArr[0];
2039                 }else{
2040             CkReduction::reducerFn f=CkReduction::reducerTable[r];
2041           ret=(*f)(nMsgs,msgArr);
2042                 }
2043     ret->reducer=r;
2044   }
2045
2046         
2047 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2048         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2049         PathHistory &path = UsrToEnv(ret)->pathHistory;
2050         // Combine the critical paths from all the reduction messages into the header for the new result
2051         for (i=0;i<nMsgs;i++){
2052           if (msgArr[i]!=ret){
2053             CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2054             path.updateMax(UsrToEnv(msgArr[i])->pathHistory);
2055           } else {
2056             CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2057           }
2058         }
2059         // Also consider the path which got us into this entry method
2060         CkPrintf("[%d] this path = %lf\n", CkMyPe(), currentlyExecutingMsg->pathHistory.getTime() );
2061         path.updateMax(currentlyExecutingMsg->pathHistory);
2062         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2063 #endif
2064
2065
2066
2067         //Go back through the vector, deleting old messages
2068   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2069   delete [] msgArr;
2070   //Set the message counts
2071   ret->redNo=redNo;
2072   ret->gcount=msgs_gcount;
2073   ret->userFlag=msgs_userFlag;
2074   ret->callback=msgs_callback;
2075   ret->secondaryCallback = msgs_secondaryCallback;
2076   ret->sourceFlag=msgs_nSources;
2077         ret->setMigratableContributor(isMigratableContributor);
2078   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2079
2080   return ret;
2081 }
2082
2083 void CkNodeReductionMgr::pup(PUP::er &p)
2084 {
2085 //We do not store the client function pointer or the client function parameter,
2086 //it is the responsibility of the programmer to correctly restore these
2087   IrrGroup::pup(p);
2088   p(redNo);
2089   p(inProgress); p(creating); p(startRequested);
2090   p(lcount);
2091   p(nContrib); p(nRemote);
2092   p(interrupt);
2093   p|msgs;
2094   p|futureMsgs;
2095   p|futureRemoteMsgs;
2096   p|futureLateMigrantMsgs;
2097   p|parent;
2098   p|additionalGCount;
2099   p|newAdditionalGCount;
2100   if(p.isUnpacking()) {
2101     gcount=CkNumNodes();
2102     thisProxy = thisgroup;
2103     lockEverything = CmiCreateLock();
2104 #ifdef BINOMIAL_TREE
2105     init_BinomialTree();
2106 #else
2107     init_BinaryTree();
2108 #endif          
2109   }
2110   p | blocked;
2111   p | maxModificationRedNo;
2112
2113 #ifndef _FAULT_MLOG_
2114   int isnull = (storedCallback == NULL);
2115   p | isnull;
2116   if (!isnull) {
2117     if (p.isUnpacking()) {
2118       storedCallback = new CkCallback;
2119     }
2120     p|*storedCallback;
2121   }
2122 #endif
2123
2124 }
2125
2126 /*
2127         FAULT_EVAC
2128         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2129         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2130         reduction tree. 
2131 */
2132 void CkNodeReductionMgr::evacuate(){
2133         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2134         if(treeKids() == 0){
2135         /*
2136                 if the node going down is a leaf
2137         */
2138                 oldleaf=true;
2139                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2140                 /*
2141                         Need to ask parent for the reduction number that it has seen. 
2142                         Since it is a leaf, the tree does not need to be rewired. 
2143                         We reuse the oldparent type of tree modification message to get 
2144                         the parent to block and tell us about the highest reduction number it has seen.
2145                         
2146                 */
2147                 int data[2];
2148                 data[0]=CkMyNode();
2149                 data[1]=getTotalGCount()+additionalGCount;
2150                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2151                 newParent = treeParent();
2152         }else{
2153                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2154                 oldleaf= false;
2155         /*
2156                 It is not a leaf. It needs to rewire the tree around itself.
2157                 It also needs to decide on a reduction No after which the new tree will be used
2158                 Till it decides on the new tree and the redNo at which it becomes valid,
2159                 all received messages will be buffered
2160         */
2161                 newParent = kids[0];
2162                 for(int i=numKids-1;i>=0;i--){
2163                         newKids.remove(i);
2164                 }
2165                 /*
2166                         Ask everybody for the highest reduction number they have seen and
2167                         also tell them about the new tree
2168                 */
2169                 /*
2170                         Tell parent about its new child;
2171                 */
2172                 int oldParentData[2];
2173                 oldParentData[0] = CkMyNode();
2174                 oldParentData[1] = newParent;
2175                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2176
2177                 /*
2178                         Tell the other children about their new parent
2179                 */
2180                 int childrenData=newParent;
2181                 for(int i=1;i<numKids;i++){
2182                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2183                 }
2184                 
2185                 /*
2186                         Tell newParent (1st child) about its new children,
2187                         the current node and its children except the newParent
2188                 */
2189                 int *newParentData = new int[numKids+2];
2190                 for(int i=1;i<numKids;i++){
2191                         newParentData[i] = kids[i];
2192                 }
2193                 newParentData[0] = CkMyNode();
2194                 newParentData[numKids] = parent;
2195                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2196                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2197         }
2198         readyDeletion = false;
2199         blocked = true;
2200         numModificationReplies = 0;
2201         tempModificationRedNo = findMaxRedNo();
2202 }
2203
2204 /*
2205         Depending on the code, use the data to change the tree
2206         1. OLDPARENT : replace the old child with a new one
2207         2. OLDCHILDREN: replace the parent
2208         3. NEWPARENT:  add the children and change the parent
2209         4. LEAFPARENT: delete the old child
2210 */
2211
2212 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2213         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2214         int sender;
2215         newKids = kids;
2216         readyDeletion = false;
2217         newAdditionalGCount = additionalGCount;
2218         switch(code){
2219                 case OLDPARENT: 
2220                         for(int i=0;i<numKids;i++){
2221                                 if(newKids[i] == data[0]){
2222                                         newKids[i] = data[1];
2223                                         break;
2224                                 }
2225                         }
2226                         sender = data[0];
2227                         newParent = parent;
2228                         break;
2229                 case OLDCHILDREN:
2230                         newParent = data[0];
2231                         sender = parent;
2232                         break;
2233                 case NEWPARENT:
2234                         for(int i=0;i<size-2;i++){
2235                                 newKids.push_back(data[i]);
2236                         }
2237                         newParent = data[size-2];
2238                         newAdditionalGCount += data[size-1];
2239                         sender = parent;
2240                         break;
2241                 case LEAFPARENT:
2242                         for(int i=0;i<numKids;i++){
2243                                 if(newKids[i] == data[0]){
2244                                         newKids.remove(i);
2245                                         break;
2246                                 }
2247                         }
2248                         sender = data[0];
2249                         newParent = parent;
2250                         newAdditionalGCount += data[1];
2251                         break;
2252         };
2253         blocked = true;
2254         int maxRedNo = findMaxRedNo();
2255         
2256         thisProxy[sender].collectMaxRedNo(maxRedNo);
2257 }
2258
2259 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2260         /*
2261                 Find out the maximum redNo that has been seen by 
2262                 the affected nodes
2263         */
2264         numModificationReplies++;
2265         if(maxRedNo > tempModificationRedNo){
2266                 tempModificationRedNo = maxRedNo;
2267         }
2268         if(numModificationReplies == numKids+1){
2269                 maxModificationRedNo = tempModificationRedNo;
2270                 /*
2271                         when all the affected nodes have replied, tell them the maximum.
2272                         Unblock yourself. deal with the buffered messages local and remote
2273                 */
2274                 if(maxModificationRedNo == -1){
2275                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2276                 }else{
2277                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2278                 }
2279                 thisProxy[parent].unblockNode(maxModificationRedNo);
2280                 for(int i=0;i<numKids;i++){
2281                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2282                 }
2283                 blocked = false;
2284                 updateTree();
2285                 clearBlockedMsgs();
2286         }
2287 };
2288
2289 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2290         maxModificationRedNo = maxRedNo;
2291         updateTree();
2292         blocked = false;
2293         clearBlockedMsgs();
2294 }
2295
2296
2297 void CkNodeReductionMgr::clearBlockedMsgs(){
2298         int len = bufferedMsgs.length();
2299         for(int i=0;i<len;i++){
2300                 CkReductionMsg *m = bufferedMsgs.deq();
2301                 doAddContribution(m);
2302         }
2303         len = bufferedRemoteMsgs.length();
2304         for(int i=0;i<len;i++){
2305                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2306                 doRecvMsg(m);
2307         }
2308
2309 }
2310 /*
2311         if the reduction number exceeds the maxModificationRedNo, change the tree
2312         to become the new one
2313 */
2314
2315 void CkNodeReductionMgr::updateTree(){
2316         if(redNo > maxModificationRedNo){
2317                 parent = newParent;
2318                 kids = newKids;
2319                 maxModificationRedNo = INT_MAX;
2320                 numKids = kids.size();
2321                 readyDeletion = true;
2322                 additionalGCount = newAdditionalGCount;
2323                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2324                 for(int i=0;i<(int)(newKids.size());i++){
2325                         DEBREVAC(("%d ",newKids[i]));
2326                 }
2327                 DEBREVAC(("\n"));
2328         //      startReduction(redNo,CkMyNode());
2329         }else{
2330                 if(maxModificationRedNo != INT_MAX){
2331                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2332                         startReduction(redNo,CkMyNode());
2333                         finishReduction();
2334                 }       
2335         }
2336 }
2337
2338
2339 void CkNodeReductionMgr::doneEvacuate(){
2340         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2341 /*      if(oldleaf){
2342                 
2343                         It used to be a leaf
2344                         Then as soon as future messages have been emptied you can 
2345                         send the parent a message telling them that they are not going
2346                         to receive anymore messages from this child
2347                 
2348                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2349                 while(futureMsgs.length() != 0){
2350                         int n = futureMsgs.length();
2351                         for(int i=0;i<n;i++){
2352                                 CkReductionMsg *m = futureMsgs.deq();
2353                                 if(isPresent(m->redNo)){
2354                                         msgs.enq(m);
2355                                 }else{
2356                                         futureMsgs.enq(m);
2357                                 }
2358                         }
2359                         CkReductionMsg *result = reduceMessages();
2360                         thisProxy[treeParent()].RecvMsg(result);
2361                         redNo++;
2362                 }
2363                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2364                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2365         }else{*/
2366                 if(readyDeletion){
2367                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2368                 }else{
2369                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2370                 }
2371 //      }
2372 }
2373
2374 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2375         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2376         for(int i=0;i<numKids;i++){
2377                 if(kids[i] == deletedChild){
2378                         kids.remove(i);
2379                         break;
2380                 }
2381         }
2382         numKids = kids.length();
2383         finishReduction();
2384 }
2385
2386 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2387         for(int i=0;i<(int)(newKids.length());i++){
2388                 if(newKids[i] == deletedChild){
2389                         newKids.remove(i);
2390                         break;
2391                 }
2392         }
2393         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2394         for(int i=0;i<(int)(newKids.size());i++){
2395                 DEBREVAC(("%d ",newKids[i]));
2396         }
2397         DEBREVAC(("\n"));
2398         finishReduction();
2399 }
2400
2401 int CkNodeReductionMgr::findMaxRedNo(){
2402         int max = redNo;
2403         for(int i=0;i<futureRemoteMsgs.length();i++){
2404                 if(futureRemoteMsgs[i]->redNo  > max){
2405                         max = futureRemoteMsgs[i]->redNo;
2406                 }
2407         }
2408         /*
2409                 if redNo is max (that is no future message) and the current reduction has not started
2410                 then tree can be changed before the reduction redNo can be started
2411         */ 
2412         if(redNo == max && msgs.length() == 0){
2413                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2414                 max--;
2415         }
2416         return max;
2417 }
2418
2419 #include "CkReduction.def.h"