More small changes
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if 0
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBUGRED 1
61 #define DEBR(x) CkPrintf x
62 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
63 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
64
65 #define DEBN(x) CkPrintf x
66 #define AAN "Red Node%d "
67 #define ABN ,CkMyNode()
68
69 // For status and data messages from the builtin reducer functions.
70 #define RED_DEB(x) //CkPrintf x
71 #define DEBREVAC(x) CkPrintf x
72 #else
73 //No debugging info-- empty defines
74 #define DEBUGRED 0
75 #define DEBR(x) // CkPrintf x
76 #define DEBRMLOG(x) CkPrintf x
77 #define AA
78 #define AB
79 #define DEBN(x) //CkPrintf x
80 #define RED_DEB(x) //CkPrintf x
81 #define DEBREVAC(x) //CkPrintf x
82 #endif
83
84 #ifndef INT_MAX
85 #define INT_MAX 2147483647
86 #endif
87
88 extern int _inrestart;
89
90 Group::Group()
91 {
92         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
93
94         creatingContributors();
95         contributorStamped(&reductionInfo);
96         contributorCreated(&reductionInfo);
97         doneCreatingContributors();
98 #if DEBUGRED
99         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
100 #endif                  
101 #if !GROUP_LEVEL_REDUCTION
102         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
103         nodeProxy = nodetemp;
104 #endif
105 }
106
107 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
108 {
109         creatingContributors();
110         contributorStamped(&reductionInfo);
111         contributorCreated(&reductionInfo);
112         doneCreatingContributors();
113 }
114
115 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
116                                     ((CkReductionMgr *)this),
117                                     reductionInfo,false)
118 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
119
120
121
122 CkGroupInitCallback::CkGroupInitCallback(void) {}
123 /*
124 The callback is just used to tell the caller that this group
125 has been constructed.  (Now they can safely call CkLocalBranch)
126 */
127 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
128 {
129   m->call();
130   delete m;
131 }
132
133 /*
134 The callback is just used to tell the caller that this group
135 is constructed and ready to process other calls.
136 */
137 CkGroupReadyCallback::CkGroupReadyCallback(void)
138 {
139   _isReady = 0;
140 }
141 void
142 CkGroupReadyCallback::callBuffered(void)
143 {
144   int n = _msgs.length();
145   for(int i=0;i<n;i++)
146   {
147     CkGroupCallbackMsg *msg = _msgs.deq();
148     msg->call();
149     delete msg;
150   }
151 }
152 void
153 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
154 {
155   if(_isReady) {
156     msg->call();
157     delete msg;
158   } else {
159     _msgs.enq(msg);
160   }
161 }
162
163 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
164         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
165 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
166 {
167         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
168         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
169         b->fn(b->param,m->getSize(),m->getData());
170         delete m;
171 }
172
173 ///////////////// Reduction Manager //////////////////
174 /*
175 One CkReductionMgr runs a non-overlapping set of reductions.
176 It collects messages from all local contributors, then sends
177 the reduced message up the reduction tree to node zero, where
178 they're passed to the user's client function.
179 */
180
181 CkReductionMgr::CkReductionMgr()//Constructor
182   : thisProxy(thisgroup)
183
184 #if GROUP_LEVEL_REDUCTION
185 #ifdef BINOMIAL_TREE
186   init_BinomialTree();
187 #else
188   init_BinaryTree();
189 #endif
190 #endif
191   redNo=0;
192   completedRedNo = -1;
193   inProgress=CmiFalse;
194   creating=CmiFalse;
195   startRequested=CmiFalse;
196   gcount=lcount=0;
197   nContrib=nRemote=0;
198   maxStartRequest=0;
199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
200     if(CkMyPe() != 0){
201         perProcessorCounts = NULL;
202     }else{
203         perProcessorCounts = new int[CmiNumPes()];
204         for(int i=0;i<CmiNumPes();i++){
205             perProcessorCounts[i] = -1;
206         }
207     }
208     totalCount = 0;
209     processorCount = 0;
210 #endif
211   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
212 }
213
214 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
215 {
216   redNo=0;
217   completedRedNo = -1;
218   inProgress=CmiFalse;
219   creating=CmiFalse;
220   startRequested=CmiFalse;
221   gcount=lcount=0;
222   nContrib=nRemote=0;
223   maxStartRequest=0;
224   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
225 }
226
227 void CkReductionMgr::flushStates()
228 {
229   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
230   redNo=0;
231   completedRedNo = -1;
232   inProgress=CmiFalse;
233   creating=CmiFalse;
234   gcount=lcount=0;
235   startRequested=CmiFalse;
236   nContrib=nRemote=0;
237   maxStartRequest=0;
238
239   while (!msgs.isEmpty()) { delete msgs.deq(); }
240   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
241   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
242   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
243
244   adjVec.length()=0;
245
246   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
247 }
248
249 //////////// Reduction Manager Client API /////////////
250
251 //Add the given client function.  Overwrites any previous client.
252 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
253 {
254   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
255
256   if (CkMyPe()!=0)
257           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
258   storedCallback=*cb;
259 #if ! GROUP_LEVEL_REDUCTION
260   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
261   nodeProxy.ckSetReductionClient(callback);
262 #endif
263 }
264
265 ///////////////////////////// Contributor ////////////////////////
266 //Contributors keep a copy of this structure:
267
268 /*Contributor migration support:
269 */
270 void contributorInfo::pup(PUP::er &p)
271 {
272   p(redNo);
273 }
274
275 ////////////////////// Contributor list maintainance: /////////////////
276 //These just set and clear the "creating" flag to prevent
277 // reductions from finishing early because not all elements
278 // have been created.
279 void CkReductionMgr::creatingContributors(void)
280 {
281   DEBR((AA"Creating contributors...\n"AB));
282   creating=CmiTrue;
283 }
284 void CkReductionMgr::doneCreatingContributors(void)
285 {
286   DEBR((AA"Done creating contributors...\n"AB));
287   creating=CmiFalse;
288   if (startRequested) startReduction(redNo,CkMyPe());
289   finishReduction();
290 }
291
292 //A new contributor will be created
293 void CkReductionMgr::contributorStamped(contributorInfo *ci)
294 {
295   DEBR((AA"Contributor %p stamped\n"AB,ci));
296   //There is another contributor
297   gcount++;
298   if (inProgress)
299   {
300     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
301     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
302   } else
303     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
304 }
305
306 //A new contributor was actually created
307 void CkReductionMgr::contributorCreated(contributorInfo *ci)
308 {
309   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
310   //We've got another contributor
311   lcount++;
312   //He may not need to contribute to some of our reductions:
313   for (int r=redNo;r<ci->redNo;r++)
314     adj(r).lcount--;//He won't be contributing to r here
315 }
316
317 /*Don't expect any more contributions from this one.
318 This is rather horrifying because we now have to make
319 sure the global element count accurately reflects all the
320 contributions the element made before it died-- these may stretch
321 far into the future.  The adj() vector is what saves us here.
322 */
323 void CkReductionMgr::contributorDied(contributorInfo *ci)
324 {
325 #if CMK_MEM_CHECKPOINT
326   // ignore from listener if it is during restart from crash
327   if (CkInRestarting()) return;
328 #endif
329   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
330   //We lost a contributor
331   gcount--;
332
333   if (ci->redNo<redNo)
334   {//Must have been migrating during reductions-- root is waiting for his
335   // contribution, which will never come.
336     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
337     for (int r=ci->redNo;r<redNo;r++)
338       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
339   }
340
341   //Add to the global count for all his future messages (wherever they are)
342   int r;
343   for (r=redNo;r<ci->redNo;r++)
344   {//He already contributed to this reduction, but won't show up in global count.
345     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
346     adj(r).gcount++;
347   }
348
349   lcount--;
350   //He's already contributed to several reductions here
351   for (r=redNo;r<ci->redNo;r++)
352     adj(r).lcount++;//He'll be contributing to r here
353
354   finishReduction();
355 }
356
357 //Migrating away (note that global count doesn't change)
358 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
359 {
360   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
361   lcount--;//We lost a local
362   //He's already contributed to several reductions here
363   for (int r=redNo;r<ci->redNo;r++)
364     adj(r).lcount++;//He'll be contributing to r here
365
366   finishReduction();
367 }
368
369 //Migrating in (note that global count doesn't change)
370 void CkReductionMgr::contributorArriving(contributorInfo *ci)
371 {
372   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
373   lcount++;//We gained a local
374 #if CMK_MEM_CHECKPOINT
375   // ignore from listener if it is during restart from crash
376   // because the ci may be old.
377   if (CkInRestarting()) return;
378 #endif
379   //He has already contributed (elsewhere) to several reductions:
380   for (int r=redNo;r<ci->redNo;r++)
381     adj(r).lcount--;//He won't be contributing to r here
382
383 }
384
385 //Contribute-- the given msg can contain any data.  The reducerType
386 // field of the message must be valid.
387 // Each contributor must contribute exactly once to the each reduction.
388 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
389 {
390 #if CMK_BLUEGENE_CHARM
391   _TRACE_BG_TLINE_END(&(m->log));
392 #endif
393   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
394   //m->ci=ci;
395   m->redNo=ci->redNo++;
396   m->sourceFlag=-1;//A single contribution
397   m->gcount=0;
398
399 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
400     if(lcount == 0){
401         m->sourceProcessorCount = 1;
402     }else{
403         m->sourceProcessorCount = lcount;
404     }
405     m->fromPE = CmiMyPe();
406     Chare *oldObj =CpvAccess(_currentObj);
407     char currentObjName[100];
408     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
409     thisProxy[0].contributeViaMessage(m);
410 #else
411   addContribution(m);
412 #endif
413 }
414
415 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
416 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
417     CmiAssert(CmiMyPe() == 0);
418     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
419     if(redNo == 0){
420         if(perProcessorCounts[m->fromPE] == -1){
421             processorCount++;
422             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
423             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
424         }else{
425             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
426                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
427                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
428             }
429         }
430         if(processorCount == CmiNumPes()){
431             totalCount = 0;
432             for(int i=0;i<CmiNumPes();i++){
433                 CmiAssert(perProcessorCounts[i] != -1);
434                 totalCount += perProcessorCounts[i];
435             }
436             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
437         }
438         if(m->sourceProcessorCount == 0){
439             if(processorCount == CmiNumPes()){
440                 finishReduction();
441             }
442             return;
443         }
444     }
445     addContribution(m);
446 }
447 #else
448 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
449 #endif
450
451 //////////// Reduction Manager Remote Entry Points /////////////
452 //Sent down the reduction tree (used by barren PEs)
453 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
454 {
455  if(CkMyPe()==0){
456         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
457         //delete m;
458         //return;
459  }
460  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
461  int srcPE = (UsrToEnv(m))->getSrcPe();
462 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
463   if (isPresent(m->num) && !inProgress)
464   {
465     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
466     startReduction(m->num,srcPE);
467     finishReduction();
468   } else if (isFuture(m->num)){
469 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
470           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
471           if(maxStartRequest < m->num){
472                   maxStartRequest = m->num;
473           }
474  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
475           
476     }
477   else //is Past
478     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
479   delete m;
480 #else
481     if(redNo == 0){
482         if(lcount == 0){
483             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
484             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
485             dummy->fromPE = CmiMyPe();
486             dummy->sourceProcessorCount = 0;
487             dummy->redNo = 0;
488             thisProxy[0].contributeViaMessage(dummy);
489         }
490     }
491 #endif
492 }
493
494 //Sent to root of reduction tree with reduction contribution
495 // of migrants that missed the main reduction.
496 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
497 {
498 #if GROUP_LEVEL_REDUCTION
499 #if CMK_BLUEGENE_CHARM
500   _TRACE_BG_TLINE_END(&(m->log));
501 #endif
502   addContribution(m);
503 #else
504   m->secondaryCallback = m->callback;
505   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
506   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
507   nodeMgr->LateMigrantMsg(m);
508 /*      int len = finalMsgs.length();
509         finalMsgs.enq(m);
510 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
511         endArrayReduction();*/
512 #endif
513 }
514
515 //A late migrating contributor will never contribute to this reduction
516 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
517 {
518   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
519   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
520  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
521   adj(m->num).gcount--;//He won't be contributing to this one.
522   finishReduction();
523   delete m;
524 }
525
526 //////////// Reduction Manager State /////////////
527 void CkReductionMgr::startReduction(int number,int srcPE)
528 {
529   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
530   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
531   if (inProgress){
532         DEBR((AA"This reduction is already in progress\n"AB));
533         return;//This reduction already started
534   }
535   if (creating) //Don't start yet-- we're creating elements
536   {
537     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
538     startRequested=CmiTrue;
539     return;
540   }
541
542 //If none of these cases, we need to start the reduction--
543   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
544   inProgress=CmiTrue;
545  
546
547         /*
548                 FAULT_EVAC
549         */
550   if(!CmiNodeAlive(CkMyPe())){
551         return;
552   }
553
554 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
555   if(CmiMyPe() == 0 && redNo == 0){
556             for(int j=0;j<CkNumPes();j++){
557                 if(j != CkMyPe() && j != srcPE){
558                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
559                 }
560             }
561             if(lcount == 0){
562                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
563                 dummy->fromPE = CmiMyPe();
564                 dummy->sourceProcessorCount = 0;
565                 dummy->redNo = 0;
566                 thisProxy[0].contributeViaMessage(dummy);
567             }
568     }   else{
569         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
570     }
571
572
573 #else
574   //Sent start requests to our kids (in case they don't already know)
575 #if GROUP_LEVEL_REDUCTION
576   for (int k=0;k<treeKids();k++)
577   {
578     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
579     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
580   }
581 #else
582   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
583 #endif
584 #endif
585         
586         /*
587   int temp;
588   //making it a broadcast done only by PE 0
589   if(!hasParent()){
590                 temp = completedRedNo+1;
591                 for(int i=temp;i<=number;i++){
592                         for(int j=0;j<CkNumPes();j++){
593                                 if(j != CkMyPe() && j != srcPE){
594                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
595                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
596                                         }
597                                 }
598                         }
599                 }       
600         }       else{
601                 temp = number;
602         }*/
603 /*  if(!hasParent()){
604                 temp = completedRedNo+1;
605         }       else{
606                 temp = number;
607         }
608         for(int i=temp;i<=number;i++){
609         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
610                 if(hasParent()){
611           // kick-start your parent too ...
612                         if(treeParent() != srcPE){
613                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
614 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
615     CpvAccess(_currentObj) = oldObj;
616 #endif
617                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
618                                 }       
619                         }       
620                 }
621           for (int k=0;k<treeKids();k++)
622           {
623                         if(firstKid()+k != srcPE){
624                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
625                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
626                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
627                                 }       
628                         }       
629         }
630         }
631         */
632 }       
633
634 /*Handle a message from one element for the reduction*/
635 void CkReductionMgr::addContribution(CkReductionMsg *m)
636 {
637   if (isPast(m->redNo))
638   {
639 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
640         CmiAbort("this version should not have late migrations");
641 #else
642         //We've moved on-- forward late contribution straight to root
643     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
644         // if (!hasParent()) //Root moved on too soon-- should never happen
645         //   CkAbort("Late reduction contribution received at root!\n");
646     thisProxy[0].LateMigrantMsg(m);
647 #endif
648   }
649   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
650     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
651     futureMsgs.enq(m);
652   } else {// An ordinary contribution
653     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
654    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
655     startReduction(m->redNo,CkMyPe());
656     msgs.enq(m);
657     nContrib++;
658     finishReduction();
659   }
660 }
661
662 /**function checks if it has got all contributions that it is supposed to
663 get at this processor. If it is done it sends the reduced result to the local
664 nodegroup */
665 void CkReductionMgr::finishReduction(void)
666 {
667   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
668   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
669   if ((!inProgress) || creating){
670         DEBR((AA"Either not in Progress or creating\n"AB));
671         return;
672   }
673   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
674 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
675
676   if (nContrib<(lcount+adj(redNo).lcount)){
677          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
678          return;//Need more local messages
679   }
680 #if GROUP_LEVEL_REDUCTION
681   if (nRemote<treeKids()) return;//Need more remote messages
682 #endif
683  
684 #else
685
686     if(CkMyPe() != 0){
687         if(redNo != 0){
688             return;
689         }else{
690             CmiAssert(lcount == 0);
691             CkReductionMsg *dummy = reduceMessages();
692             dummy->fromPE = CmiMyPe();
693             dummy->sourceProcessorCount = 0;
694             thisProxy[0].contributeViaMessage(dummy);
695             return;
696         }
697     }
698     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
699         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
700          return;//Need more local messages
701   }else{
702         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
703     }
704
705
706 #endif
707
708   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
709   CkReductionMsg *result=reduceMessages();
710   result->redNo=redNo;
711 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
712
713 #if GROUP_LEVEL_REDUCTION
714   if (hasParent())
715   {//Pass data up tree to parent
716     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
717     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
718     result->gcount+=gcount+adj(redNo).gcount;
719     thisProxy[treeParent()].RecvMsg(result);
720   }
721   else 
722   {//We are root-- pass data to client
723     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
724     int totalElements=result->gcount+gcount+adj(redNo).gcount;
725     if (totalElements>result->nSources()) 
726     {
727       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
728       msgs.enq(result);
729       return; // Wait for migrants to contribute
730     } else if (totalElements<result->nSources()) {
731       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
732       CkAbort("ERROR! Too many contributions at root!\n");
733     }
734     DEBR((AA"Passing result to client function\n"AB));
735     if (!result->callback.isInvalid())
736             result->callback.send(result);
737     else if (!storedCallback.isInvalid())
738             storedCallback.send(result);
739     else
740             CkAbort("No reduction client!\n"
741                     "You must register a client with either SetReductionClient or during contribute.\n");
742   }
743
744 #else
745   result->gcount+=gcount+adj(redNo).gcount;
746
747   result->secondaryCallback = result->callback;
748   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
749         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
750
751   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
752
753 #if DEBUGRED
754  // CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
755 #endif
756   
757   // Find our node reduction manager, and pass reduction to him:
758   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
759   nodeMgr->contributeArrayReduction(result);
760 #endif
761 #else                // _FAULT_MLOG_
762 #if DEBUGRED
763     CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer());
764 #endif
765     if (!result->callback.isInvalid())
766         result->callback.send(result);
767     else if (!storedCallback.isInvalid())
768         storedCallback.send(result);
769     else{
770 #if DEBUGRED
771         CkPrintf("No reduction client for group %d \n",thisgroup.idx);
772 #endif
773         CkAbort("No reduction client!\n"
774             "You must register a client with either SetReductionClient or during contribute.\n");
775     }
776 #if DEBUGRED
777        CkPrintf("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer());
778 #endif
779
780 #endif               // _FAULT_MLOG_
781
782   //House Keeping Operations will have to check later what needs to be changed
783   redNo++;
784   //Shift the count adjustment vector down one slot (to match new redNo)
785   int i;
786 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
787         if(CkMyPe()!=0){
788 #else
789     {
790 #endif
791 //      int i;
792         completedRedNo++;
793         for (i=1;i<(int)(adjVec.length());i++){
794            adjVec[i-1]=adjVec[i];
795         }
796         adjVec.length()--;  
797   }
798
799   inProgress=CmiFalse;
800   startRequested=CmiFalse;
801   nRemote=nContrib=0;
802
803   //Look through the future queue for messages we can now handle
804   int n=futureMsgs.length();
805   for (i=0;i<n;i++)
806   {
807     CkReductionMsg *m=futureMsgs.deq();
808     if (m!=NULL) //One of these addContributions may have finished us.
809       addContribution(m);//<- if *still* early, puts it back in the queue
810   }
811 #if GROUP_LEVEL_REDUCTION
812   n=futureRemoteMsgs.length();
813   for (i=0;i<n;i++)
814   {
815     CkReductionMsg *m=futureRemoteMsgs.deq();
816     if (m!=NULL) {
817       RecvMsg(m);//<- if *still* early, puts it back in the queue
818     }
819   }
820 #endif
821
822 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
823   if(maxStartRequest >= redNo){
824           startReduction(redNo,CkMyPe());
825           finishReduction();
826   }
827  
828 #endif
829 }
830
831 //Sent up the reduction tree with reduced data
832   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
833 {
834 #if GROUP_LEVEL_REDUCTION
835 #if CMK_BLUEGENE_CHARM
836   _TRACE_BG_TLINE_END(&m->log);
837 #endif
838   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
839     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
840     startReduction(m->redNo, CkMyPe());
841     msgs.enq(m);
842     nRemote++;
843     finishReduction();
844   }
845   else if (isFuture(m->redNo)) {
846     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
847     futureRemoteMsgs.enq(m);
848   } 
849   else CkAbort("Recv'd late remote contribution!\n");
850 #endif
851 }
852
853 //////////// Reduction Manager Utilities /////////////
854
855 //Return the countAdjustment struct for the given redNo:
856 countAdjustment &CkReductionMgr::adj(int number)
857 {
858   number-=completedRedNo;
859   number--;
860   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
861   //Pad the adjustment vector with zeros until it's at least number long
862   while ((int)(adjVec.length())<=number)
863     adjVec.push_back(countAdjustment());
864   return adjVec[number];
865 }
866
867 //Combine (& free) the current message vector msgs.
868 CkReductionMsg *CkReductionMgr::reduceMessages(void)
869 {
870 #if CMK_BLUEGENE_CHARM
871   _TRACE_BG_END_EXECUTE(1);
872   void* _bgParentLog = NULL;
873   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
874 #endif
875   CkReductionMsg *ret=NULL;
876
877   //Look through the vector for a valid reducer, swapping out placeholder messages
878   CkReduction::reducerType r=CkReduction::invalid;
879   int msgs_gcount=0;//Reduced gcount
880   int msgs_nSources=0;//Reduced nSources
881   int msgs_userFlag=-1;
882   CkCallback msgs_callback;
883   int i;
884   int nMsgs=0;
885   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
886   CkReductionMsg *m;
887   bool isMigratableContributor;
888
889   // Copy message queue into msgArr, skipping placeholders:
890   while (NULL!=(m=msgs.deq()))
891   {
892     msgs_gcount+=m->gcount;
893     if (m->sourceFlag!=0)
894     { //This is a real message from an element, not just a placeholder
895       msgArr[nMsgs++]=m;
896       msgs_nSources+=m->nSources();
897       r=m->reducer;
898       if (!m->callback.isInvalid())
899         msgs_callback=m->callback;
900       if (m->userFlag!=-1)
901         msgs_userFlag=m->userFlag;
902                         
903         isMigratableContributor=m->isMigratableContributor();
904 #if CMK_BLUEGENE_CHARM
905         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
906 #endif
907     }
908     else
909     { //This is just a placeholder message-- forget it
910       delete m;
911     }
912   }
913
914   if (nMsgs==0||r==CkReduction::invalid)
915   //No valid reducer in the whole vector
916     ret=CkReductionMsg::buildNew(0,NULL);
917   else
918   {//Use the reducer to reduce the messages
919                 //if there is only one msg to be reduced just return that message
920     if(nMsgs == 1){
921         ret = msgArr[0];        
922     }else{
923         CkReduction::reducerFn f=CkReduction::reducerTable[r];
924         ret=(*f)(nMsgs,msgArr);
925     }
926     ret->reducer=r;
927   }
928
929
930
931 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
932
933 #if CRITICAL_PATH_DEBUG > 3
934   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
935 #endif
936
937   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
938   path.updateMax(UsrToEnv(ret));
939   // Combine the critical paths from all the reduction messages
940   for (i=0;i<nMsgs;i++){
941     if (msgArr[i]!=ret){
942       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
943       path.updateMax(UsrToEnv(msgArr[i]));
944     }
945   }
946   
947
948 #if CRITICAL_PATH_DEBUG > 3
949   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
950 #endif
951   
952   PathHistoryTableEntry tableEntry(path);
953   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
954   
955 #endif
956
957         //Go back through the vector, deleting old messages
958   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
959         delete [] msgArr;
960
961   //Set the message counts
962   ret->redNo=redNo;
963   ret->gcount=msgs_gcount;
964   ret->userFlag=msgs_userFlag;
965   ret->callback=msgs_callback;
966   ret->sourceFlag=msgs_nSources;
967         ret->setMigratableContributor(isMigratableContributor);
968   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
969
970   return ret;
971 }
972
973
974 //Checkpointing utilities
975 //pack-unpack method for CkReductionMsg
976 //if packing pack the message and then unpack and return it
977 //if unpacking allocate memory for it read it off disk and then unapck
978 //and return it
979 void CkReductionMgr::pup(PUP::er &p)
980 {
981 //We do not store the client function pointer or the client function parameter,
982 //it is the responsibility of the programmer to correctly restore these
983   CkGroupInitCallback::pup(p);
984   p(redNo);
985   p(completedRedNo);
986   p(inProgress); p(creating); p(startRequested);
987   p(nContrib); p(nRemote);
988   p|msgs;
989   p|futureMsgs;
990   p|futureRemoteMsgs;
991   p|finalMsgs;
992   p|adjVec;
993   p|nodeProxy;
994   p|storedCallback;
995     // handle CkReductionClientBundle
996   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
997   {
998     CkReductionClientBundle *bd;
999     if (p.isUnpacking()) 
1000       bd = new CkReductionClientBundle;
1001     else
1002       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1003     p|*bd;
1004     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1005   }
1006
1007   // subtle --- Gengbin
1008   // Group : CkReductionMgr
1009   // CkArray: CkReductionMgr
1010   // lcount/gcount in Group is set in Group constructor
1011   // lcount/gcount in CkArray is not, it is set when array elements are created
1012   // we can not pup because inserting array elems will add the counters again
1013 //  p|lcount;
1014 //  p|gcount;
1015 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1016 //  p|lcount;
1017 //  //  p|gcount;
1018 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1019 #endif
1020   if(p.isUnpacking()){
1021     thisProxy = thisgroup;
1022     maxStartRequest=0;
1023 #if GROUP_LEVEL_REDUCTION
1024 #ifdef BINOMIAL_TREE
1025     init_BinomialTree();
1026 #else
1027    init_BinaryTree();
1028 #endif
1029 #endif
1030   }
1031 #if DEBUGRED
1032   CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
1033 #endif
1034 }
1035
1036
1037 //Callback for doing Reduction through NodeGroups added by Sayantan
1038
1039 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1040
1041         int total = m->gcount+adj(m->redNo).gcount;
1042         finalMsgs.enq(m);
1043         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1044         adj(m->redNo).mainRecvd = 1;
1045 #if DEBUGRED
1046         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
1047 #endif  
1048         endArrayReduction();
1049 }
1050
1051 void CkReductionMgr :: endArrayReduction(){
1052         CkReductionMsg *ret=NULL;
1053         int nMsgs=finalMsgs.length();
1054         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1055         //Look through the vector for a valid reducer, swapping out placeholder messages
1056         //CkPrintf("Length of Final Message %d \n",nMsgs);
1057         CkReduction::reducerType r=CkReduction::invalid;
1058         int msgs_gcount=0;//Reduced gcount
1059         int msgs_nSources=0;//Reduced nSources
1060         int msgs_userFlag=-1;
1061         CkCallback msgs_callback;
1062         CkCallback msgs_secondaryCallback;
1063         CkVec<CkReductionMsg *> tempMsgs;
1064         int i;
1065         int numMsgs = 0;
1066         for (i=0;i<nMsgs;i++)
1067         {
1068                 CkReductionMsg *m=finalMsgs.deq();
1069                 if(m->redNo == completedRedNo +1){
1070                         msgs_gcount+=m->gcount;
1071                         if (m->sourceFlag!=0)
1072                         { //This is a real message from an element, not just a placeholder
1073                                 msgs_nSources+=m->nSources();
1074                                 r=m->reducer;
1075                                 if (!m->callback.isInvalid())
1076                                 msgs_callback=m->callback;
1077                                 if(!m->secondaryCallback.isInvalid())
1078                                         msgs_secondaryCallback = m->secondaryCallback;
1079                                 if (m->userFlag!=-1)
1080                                         msgs_userFlag=m->userFlag;
1081                                 tempMsgs.push_back(m);
1082                         }
1083                 }else{
1084                         finalMsgs.enq(m);
1085                 }
1086
1087         }
1088         numMsgs = tempMsgs.length();
1089 #if DEBUGRED
1090         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
1091 #endif  
1092         if(numMsgs == 0){
1093                 return;
1094         }
1095         if(adj(completedRedNo+1).mainRecvd == 0){
1096                 for(i=0;i<numMsgs;i++){
1097                         finalMsgs.enq(tempMsgs[i]);
1098                 }
1099                 return;
1100         }
1101
1102 /*
1103         NOT NEEDED ANYMORE DONE at nodegroup level
1104         if(msgs_gcount  > msgs_nSources){
1105                 for(i=0;i<numMsgs;i++){
1106                         finalMsgs.enq(tempMsgs[i]);
1107                 }
1108                 return;
1109         }*/
1110
1111         if (nMsgs==0||r==CkReduction::invalid)
1112                 //No valid reducer in the whole vector
1113                 ret=CkReductionMsg::buildNew(0,NULL);
1114         else{//Use the reducer to reduce the messages
1115                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1116                 // has to be corrected elements from above need to be put into a temporary vector
1117                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1118                 ret=(*f)(numMsgs,msgArr);
1119                 ret->reducer=r;
1120
1121         }
1122
1123         
1124 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1125
1126 #if CRITICAL_PATH_DEBUG > 3
1127         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1128 #endif
1129
1130         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1131         path.updateMax(UsrToEnv(ret));
1132         // Combine the critical paths from all the reduction messages into the header for the new result
1133         for (i=0;i<numMsgs;i++){
1134           if (tempMsgs[i]!=ret){
1135             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1136             path.updateMax(UsrToEnv(tempMsgs[i]));
1137           } else {
1138             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1139           }
1140         }
1141         // Also consider the path which got us into this entry method
1142
1143 #if CRITICAL_PATH_DEBUG > 3
1144         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1145 #endif
1146
1147 #endif
1148   
1149
1150
1151
1152
1153         for(i = 0;i<numMsgs;i++){
1154                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1155         }
1156
1157         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1158         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1159
1160         ret->redNo=completedRedNo+1;
1161         ret->gcount=msgs_gcount;
1162         ret->userFlag=msgs_userFlag;
1163         ret->callback=msgs_callback;
1164         ret->secondaryCallback = msgs_secondaryCallback;
1165         ret->sourceFlag=msgs_nSources;
1166
1167 #if DEBUGRED
1168         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
1169 #endif
1170         if (!ret->secondaryCallback.isInvalid())
1171             ret->secondaryCallback.send(ret);
1172     else if (!storedCallback.isInvalid())
1173             storedCallback.send(ret);
1174     else{
1175 #if DEBUGRED
1176             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
1177 #endif
1178             CkAbort("No reduction client!\n"
1179                     "You must register a client with either SetReductionClient or during contribute.\n");
1180     }
1181         completedRedNo++;
1182 #if DEBUGRED
1183        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
1184 #endif
1185         for (i=1;i<(int)(adjVec.length());i++)
1186                 adjVec[i-1]=adjVec[i];
1187         adjVec.length()--;
1188         endArrayReduction();
1189 }
1190
1191 #if GROUP_LEVEL_REDUCTION
1192
1193 void CkReductionMgr::init_BinaryTree(){
1194         parent = (CkMyPe()-1)/TREE_WID;
1195         int firstkid = CkMyPe()*TREE_WID+1;
1196         numKids=CkNumPes()-firstkid;
1197         if (numKids>TREE_WID) numKids=TREE_WID;
1198         if (numKids<0) numKids=0;
1199
1200         for(int i=0;i<numKids;i++){
1201                 kids.push_back(firstkid+i);
1202                 newKids.push_back(firstkid+i);
1203         }
1204 }
1205
1206 void CkReductionMgr::init_BinomialTree(){
1207         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1208         /*upperSize = (unsigned )pow((double)2,depth);*/
1209         upperSize = (unsigned) 1 << depth;
1210         label = upperSize-CkMyPe()-1;
1211         int p=label;
1212         int count=0;
1213         while( p > 0){
1214                 if(p % 2 == 0)
1215                         break;
1216                 else{
1217                         p = p/2;
1218                         count++;
1219                 }
1220         }
1221         /*parent = label + rint(pow((double)2,count));*/
1222         parent = label + (1<<count);
1223         parent = upperSize -1 -parent;
1224         int temp;
1225         if(count != 0){
1226                 numKids = 0;
1227                 for(int i=0;i<count;i++){
1228                         /*temp = label - rint(pow((double)2,i));*/
1229                         temp = label - (1<<i);
1230                         temp = upperSize-1-temp;
1231                         if(temp <= CkNumPes()-1){
1232                                 kids.push_back(temp);
1233                                 numKids++;
1234                         }
1235                 }
1236         }else{
1237                 numKids = 0;
1238         //      kids = NULL;
1239         }
1240 }
1241
1242
1243 int CkReductionMgr::treeRoot(void)
1244 {
1245   return 0;
1246 }
1247 CmiBool CkReductionMgr::hasParent(void) //Root Node
1248 {
1249   return (CmiBool)(CkMyPe()!=treeRoot());
1250 }
1251 int CkReductionMgr::treeParent(void) //My parent Node
1252 {
1253   return parent;
1254 }
1255
1256 int CkReductionMgr::firstKid(void) //My first child Node
1257 {
1258   return CkMyPe()*TREE_WID+1;
1259 }
1260 int CkReductionMgr::treeKids(void)//Number of children in tree
1261 {
1262   return numKids;
1263 }
1264
1265 #endif
1266
1267 /////////////////////////////////////////////////////////////////////////
1268
1269 ////////////////////////////////
1270 //CkReductionMsg support
1271
1272 //ReductionMessage default private constructor-- does nothing
1273 CkReductionMsg::CkReductionMsg(){}
1274 CkReductionMsg::~CkReductionMsg(){}
1275
1276 //This define gives the distance from the start of the CkReductionMsg
1277 // object to the start of the user data area (just below last object field)
1278 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1279
1280 //"Constructor"-- builds and returns a new CkReductionMsg.
1281 //  the "data" array you specify will be copied into this object.
1282 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1283     CkReduction::reducerType reducer, CkReductionMsg *buf)
1284 {
1285   int len[1];
1286   len[0]=NdataSize;
1287   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1288   
1289   ret->dataSize=NdataSize;
1290   if (srcData!=NULL && !buf)
1291     memcpy(ret->data,srcData,NdataSize);
1292   ret->userFlag=-1;
1293   ret->reducer=reducer;
1294   //ret->ci=NULL;
1295   ret->sourceFlag=-1000;
1296   ret->gcount=0;
1297   ret->migratableContributor = true;
1298 #if CMK_BLUEGENE_CHARM
1299   ret->log = NULL;
1300 #endif
1301   return ret;
1302 }
1303
1304 // Charm kernel message runtime support:
1305 void *
1306 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1307 {
1308   int totalsize=ARM_DATASTART+(*sz);
1309   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1310   CkReductionMsg *ret = (CkReductionMsg *)
1311     CkAllocMsg(msgnum,totalsize,priobits);
1312   ret->data=(void *)(&ret->dataStorage);
1313   return (void *) ret;
1314 }
1315
1316 void *
1317 CkReductionMsg::pack(CkReductionMsg* in)
1318 {
1319   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1320   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1321   in->data = NULL;
1322   return (void*) in;
1323 }
1324
1325 CkReductionMsg* CkReductionMsg::unpack(void *in)
1326 {
1327   CkReductionMsg *ret = (CkReductionMsg *)in;
1328   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1329   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1330   ret->data=(void *)(&ret->dataStorage);
1331   return ret;
1332 }
1333
1334
1335 /////////////////////////////////////////////////////////////////////////////////////
1336 ///////////////// Builtin Reducer Functions //////////////
1337 /* A simple reducer, like sum_int, looks like this:
1338 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1339 {
1340   int i,ret=0;
1341   for (i=0;i<nMsg;i++)
1342     ret+=*(int *)(msg[i]->getData());
1343   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1344 }
1345
1346 To keep the code small and easy to change, the implementations below
1347 are built with preprocessor macros.
1348 */
1349
1350 //////////////// simple reducers ///////////////////
1351 /*A define used to quickly and tersely construct simple reductions.
1352 The basic idea is to use the first message's data array as
1353 (pre-initialized!) scratch space for folding in the other messages.
1354  */
1355
1356 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1357 {
1358         CkAbort("Called the invalid reducer type 0.  This probably\n"
1359                 "means you forgot to initialize your custom reducer index.\n");
1360         return NULL;
1361 }
1362
1363 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1364 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1365 {\
1366   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1367   int m,i;\
1368   int nElem=msg[0]->getLength()/sizeof(dataType);\
1369   dataType *ret=(dataType *)(msg[0]->getData());\
1370   for (m=1;m<nMsg;m++)\
1371   {\
1372     dataType *value=(dataType *)(msg[m]->getData());\
1373     for (i=0;i<nElem;i++)\
1374     {\
1375       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1376       loop\
1377     }\
1378   }\
1379   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1380   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1381 }
1382
1383 //Use this macro for reductions that have the same type for all inputs
1384 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1385   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1386   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1387   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1388
1389
1390 //Compute the sum the numbers passed by each element.
1391 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1392
1393 //Compute the product of the numbers passed by each element.
1394 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1395
1396 //Compute the largest number passed by any element.
1397 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1398
1399 //Compute the smallest integer passed by any element.
1400 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1401
1402
1403 //Compute the logical AND of the integers passed by each element.
1404 // The resulting integer will be zero if any source integer is zero; else 1.
1405 SIMPLE_REDUCTION(logical_and,int,"%d",
1406         if (value[i]==0)
1407      ret[i]=0;
1408   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1409 )
1410
1411 //Compute the logical OR of the integers passed by each element.
1412 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1413 SIMPLE_REDUCTION(logical_or,int,"%d",
1414   if (value[i]!=0)
1415            ret[i]=1;
1416   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1417 )
1418
1419 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1420 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1421
1422 //Select one random message to pass on
1423 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1424   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1425 }
1426
1427 /////////////// concat ////////////////
1428 /*
1429 This reducer simply appends the data it recieves from each element,
1430 without any housekeeping data to separate them.
1431 */
1432 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1433 {
1434   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1435   //Figure out how big a message we'll need
1436   int i,retSize=0;
1437   for (i=0;i<nMsg;i++)
1438       retSize+=msg[i]->getSize();
1439
1440   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1441
1442   //Allocate a new message
1443   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1444
1445   //Copy the source message data into the return message
1446   char *cur=(char *)(ret->getData());
1447   for (i=0;i<nMsg;i++) {
1448     int messageBytes=msg[i]->getSize();
1449     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1450     cur+=messageBytes;
1451   }
1452   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1453   return ret;
1454 }
1455
1456 /////////////// set ////////////////
1457 /*
1458 This reducer appends the data it recieves from each element
1459 along with some housekeeping data indicating contribution boundaries.
1460 The message data is thus a list of reduction_set_element structures
1461 terminated by a dummy reduction_set_element with a sourceElement of -1.
1462 */
1463
1464 //This rounds an integer up to the nearest multiple of sizeof(double)
1465 static const int alignSize=sizeof(double);
1466 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1467
1468 //This gives the size (in bytes) of a reduction_set_element
1469 static int SET_SIZE(int dataSize)
1470 {return SET_ALIGN(sizeof(int)+dataSize);}
1471
1472 //This returns a pointer to the next reduction_set_element in the list
1473 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1474 {
1475   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1476   return (CkReduction::setElement *)next;
1477 }
1478
1479 //Combine the data passed by each element into an list of reduction_set_elements.
1480 // Each element may contribute arbitrary data (with arbitrary length).
1481 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1482 {
1483   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1484   //Figure out how big a message we'll need
1485   int i,retSize=0;
1486   for (i=0;i<nMsg;i++) {
1487     if (!msg[i]->isFromUser())
1488     //This message is composite-- it will just be copied over (less terminating -1)
1489       retSize+=(msg[i]->getSize()-sizeof(int));
1490     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1491       retSize+=SET_SIZE(msg[i]->getSize());
1492   }
1493   retSize+=sizeof(int);//Leave room for terminating -1.
1494
1495   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1496
1497   //Allocate a new message
1498   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1499
1500   //Copy the source message data into the return message
1501   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1502   for (i=0;i<nMsg;i++)
1503     if (!msg[i]->isFromUser())
1504     {//This message is composite-- just copy it over (less terminating -1)
1505                         int messageBytes=msg[i]->getSize()-sizeof(int);
1506                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1507                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1508                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1509     }
1510     else //This is a message from an element-- wrap it in a reduction_set_element
1511     {
1512       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1513       cur->dataSize=msg[i]->getSize();
1514       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1515       cur=SET_NEXT(cur);
1516     }
1517   cur->dataSize=-1;//Add a terminating -1.
1518   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1519   return ret;
1520 }
1521
1522 //Utility routine: get the next reduction_set_element in the list
1523 // if there is one, or return NULL if there are none.
1524 //To get all the elements, just keep feeding this procedure's output back to
1525 // its input until it returns NULL.
1526 CkReduction::setElement *CkReduction::setElement::next(void)
1527 {
1528   CkReduction::setElement *n=SET_NEXT(this);
1529   if (n->dataSize==-1)
1530     return NULL;//This is the end of the list
1531   else
1532     return n;//This is just another element
1533 }
1534
1535 /////////////////// Reduction Function Table /////////////////////
1536 CkReduction::CkReduction() {} //Dummy private constructor
1537
1538 //Add the given reducer to the list.  Returns the new reducer's
1539 // reducerType.  Must be called in the same order on every node.
1540 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1541 {
1542   reducerTable[nReducers]=fn;
1543   return (reducerType)nReducers++;
1544 }
1545
1546 /*Reducer table: maps reducerTypes to reducerFns.
1547 It's indexed by reducerType, so the order in this table
1548 must *exactly* match the reducerType enum declaration.
1549 The names don't have to match, but it helps.
1550 */
1551 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1552
1553 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1554     ::invalid_reducer,
1555   //Compute the sum the numbers passed by each element.
1556     ::sum_int,::sum_float,::sum_double,
1557
1558   //Compute the product the numbers passed by each element.
1559     ::product_int,::product_float,::product_double,
1560
1561   //Compute the largest number passed by any element.
1562     ::max_int,::max_float,::max_double,
1563
1564   //Compute the smallest number passed by any element.
1565     ::min_int,::min_float,::min_double,
1566
1567   //Compute the logical AND of the integers passed by each element.
1568   // The resulting integer will be zero if any source integer is zero.
1569     ::logical_and,
1570
1571   //Compute the logical OR of the integers passed by each element.
1572   // The resulting integer will be 1 if any source integer is nonzero.
1573     ::logical_or,
1574
1575     // Compute the logical bitvector AND of the integers passed by each element.
1576     ::bitvec_and,
1577
1578     // Compute the logical bitvector OR of the integers passed by each element.
1579     ::bitvec_or,
1580
1581     // Select one of the messages at random to pass on
1582     ::random,
1583
1584   //Concatenate the (arbitrary) data passed by each element
1585     ::concat,
1586
1587   //Combine the data passed by each element into an list of setElements.
1588   // Each element may contribute arbitrary data (with arbitrary length).
1589     ::set
1590 };
1591
1592
1593
1594
1595
1596
1597
1598
1599 /********** Code added by Sayantan *********************/
1600 /** Locking is a big problem in the nodegroup code for smp.
1601  So a few assumptions have had to be made. There is one lock
1602  called lockEverything. It protects all the data structures 
1603  of the nodegroup reduction mgr. I tried grabbing it separately 
1604  for each datastructure, modifying it and then releasing it and
1605  then grabbing it again, for the next change.
1606  That doesn't really help because the interleaved execution of 
1607  different threads makes the state of the reduction manager 
1608  inconsistent. 
1609  
1610  1. Grab lockEverything before calling finishreduction or startReduction
1611     or doRecvMsg
1612  2. lockEverything is grabbed only in entry methods reductionStarting
1613     or RecvMesg or  addcontribution.
1614  ****/
1615  
1616 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1617 NodeGroup::NodeGroup(void) {
1618   __nodelock=CmiCreateLock();
1619 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1620     mlogData->objID.type = TypeNodeGroup;
1621     mlogData->objID.data.group.onPE = CkMyNode();
1622 #endif
1623
1624 }
1625 NodeGroup::~NodeGroup() {
1626   CmiDestroyLock(__nodelock);
1627 }
1628 void NodeGroup::pup(PUP::er &p)
1629 {
1630   CkNodeReductionMgr::pup(p);
1631   p|reductionInfo;
1632 }
1633
1634 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1635
1636 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1637   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1638  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1639   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1640  }
1641
1642 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1643                                     ((CkNodeReductionMgr *)this),
1644                                     reductionInfo,false)
1645
1646 /* this contribute also adds up the count across all messages it receives.
1647   Useful for summing up number of array elements who have contributed ****/ 
1648 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1649         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1650
1651
1652
1653 //#define BINOMIAL_TREE
1654
1655 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1656   : thisProxy(thisgroup)
1657 {
1658 #ifdef BINOMIAL_TREE
1659   init_BinomialTree();
1660 #else
1661   init_BinaryTree();
1662 #endif
1663   storedCallback=NULL;
1664   redNo=0;
1665   inProgress=CmiFalse;
1666   
1667   startRequested=CmiFalse;
1668   gcount=CkNumNodes();
1669   lcount=1;
1670   nContrib=nRemote=0;
1671   lockEverything = CmiCreateLock();
1672
1673
1674   creating=CmiFalse;
1675   interrupt = 0;
1676   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1677         /*
1678                 FAULT_EVAC
1679         */
1680         blocked = false;
1681         maxModificationRedNo = INT_MAX;
1682         killed=0;
1683         additionalGCount = newAdditionalGCount = 0;
1684 }
1685
1686 void CkNodeReductionMgr::flushStates()
1687 {
1688  if(CkMyRank() == 0){
1689   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1690   redNo=0;
1691   inProgress=CmiFalse;
1692
1693   startRequested=CmiFalse;
1694   gcount=CkNumNodes();
1695   lcount=1;
1696   nContrib=nRemote=0;
1697
1698   creating=CmiFalse;
1699   interrupt = 0;
1700   while (!msgs.isEmpty()) { delete msgs.deq(); }
1701   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1702   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1703   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1704   }
1705 }
1706
1707 //////////// Reduction Manager Client API /////////////
1708
1709 //Add the given client function.  Overwrites any previous client.
1710 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1711 {
1712   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1713   if(cb->isInvalid()){
1714         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1715   }else{
1716         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1717   }
1718
1719   if (CkMyNode()!=0)
1720           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1721   delete storedCallback;
1722   storedCallback=cb;
1723 }
1724
1725
1726
1727 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1728 {
1729 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1730     Chare *oldObj =CpvAccess(_currentObj);
1731     CpvAccess(_currentObj) = this;
1732 #endif
1733
1734   //m->ci=ci;
1735   m->redNo=ci->redNo++;
1736   m->sourceFlag=-1;//A single contribution
1737   m->gcount=0;
1738 #if DEBUGRED
1739         CkPrintf("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo);
1740 #endif
1741   addContribution(m);
1742
1743 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1744     CpvAccess(_currentObj) = oldObj;
1745 #endif
1746 }
1747
1748
1749 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1750 {
1751 #if CMK_BLUEGENE_CHARM
1752   _TRACE_BG_TLINE_END(&m->log);
1753 #endif
1754 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1755     Chare *oldObj =CpvAccess(_currentObj);
1756     CpvAccess(_currentObj) = this;
1757 #endif
1758   //m->ci=ci;
1759   m->redNo=ci->redNo++;
1760   m->gcount=count;
1761 #if DEBUGRED
1762  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1763 #endif
1764   addContribution(m);
1765 #if DEBUGRED
1766   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1767 #endif
1768
1769 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1770     CpvAccess(_currentObj) = oldObj;
1771 #endif
1772 }
1773
1774
1775 //////////// Reduction Manager Remote Entry Points /////////////
1776
1777 //Sent down the reduction tree (used by barren PEs)
1778 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1779 {
1780   CmiLock(lockEverything);
1781   if(blocked){
1782         delete m;
1783         CmiUnlock(lockEverything);
1784         return ;
1785   }
1786   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1787   if (isPresent(m->num) && !inProgress)
1788   {
1789     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1790     startReduction(m->num,srcNode);
1791     finishReduction();
1792   } else if (isFuture(m->num)){
1793         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1794   }
1795   else //is Past
1796     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1797   CmiUnlock(lockEverything);
1798   delete m;
1799
1800 }
1801
1802
1803 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1804 #if DEBUGRED
1805         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1806 #endif
1807         /*
1808                 FAULT_EVAC
1809         */
1810         if(blocked){
1811                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1812                 bufferedRemoteMsgs.enq(m);
1813                 return;
1814         }
1815         
1816         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1817             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1818             startReduction(m->redNo,CkMyNode());
1819             msgs.enq(m);
1820             nRemote++;
1821             finishReduction();
1822         }
1823         else {
1824             if (isFuture(m->redNo)) {
1825                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1826                     futureRemoteMsgs.enq(m);
1827             }else{
1828                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1829                    CkAbort("Recv'd late remote contribution!\n");
1830             }
1831   }
1832 #if DEBUGRED        
1833        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1834 #endif       
1835 }
1836
1837 //Sent up the reduction tree with reduced data
1838 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1839 {
1840 #if CMK_BLUEGENE_CHARM
1841   _TRACE_BG_TLINE_END(&m->log);
1842 #endif
1843 #ifndef CMK_CPV_IS_SMP
1844 #if CMK_IMMEDIATE_MSG
1845         if(interrupt == 1){
1846                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1847                 CpvAccess(_qd)->process(-1);
1848                 CmiDelayImmediate();
1849                 return;
1850         }
1851 #endif  
1852 #endif
1853    interrupt = 1;       
1854    CmiLock(lockEverything);   
1855 #if DEBUGRED   
1856    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1857 #endif   
1858    doRecvMsg(m);
1859    CmiUnlock(lockEverything);    
1860    interrupt = 0;
1861 #if DEBUGRED  
1862    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1863 #endif 
1864 }
1865
1866 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1867 {
1868         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1869         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1870         if (inProgress){
1871                 DEBR((AA"This Node reduction is already in progress\n"AB));
1872                 return;//This reduction already started
1873         }
1874         if (creating) //Don't start yet-- we're creating elements
1875         {
1876                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1877                 startRequested=CmiTrue;
1878                 return;
1879         }
1880         
1881         //If none of these cases, we need to start the reduction--
1882         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1883         inProgress=CmiTrue;
1884         //Sent start requests to our kids (in case they don't already know)
1885
1886         for (int k=0;k<treeKids();k++)
1887         {
1888 #ifdef BINOMIAL_TREE
1889                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1890                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1891 #else
1892                 if(kids[k] != srcNode){
1893                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1894                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1895                 }       
1896 #endif
1897         }
1898
1899         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1900         // in case, nodegroup does not has the attached red group,
1901         // it has to restart groups again
1902         startLocalGroupReductions(number);
1903 /*
1904         if (startLocalGroupReductions(number) == 0)
1905           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1906 */
1907 }
1908
1909 // restart local groups until succeed
1910 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1911   CmiLock(lockEverything);    
1912   if (startLocalGroupReductions(number) == 0)
1913     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1914   CmiUnlock(lockEverything);    
1915 }
1916
1917 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1918         /*
1919                 FAULT_EVAC
1920         */
1921         if(blocked){
1922                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1923                 bufferedMsgs.enq(m);
1924                 return;
1925         }
1926         
1927         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1928                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1929                 futureMsgs.enq(m);
1930         } else {// An ordinary contribution
1931                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1932                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1933                 startReduction(m->redNo,CkMyNode());
1934                 msgs.enq(m);
1935                 nContrib++;
1936                 finishReduction();
1937         }
1938 }
1939
1940 //Handle a message from one element for the reduction
1941 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1942 {
1943   interrupt = 1;
1944   CmiLock(lockEverything);
1945   doAddContribution(m);
1946   CmiUnlock(lockEverything);
1947   interrupt = 0;
1948 }
1949
1950 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1951         if(blocked){
1952                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1953                 bufferedMsgs.enq(m);
1954                 return;
1955         }
1956         
1957         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1958                 DEBR((AA"Latemigrant %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1959 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1960                 futureLateMigrantMsgs.enq(m);
1961         } else {// An ordinary contribution
1962                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1963 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1964                 msgs.enq(m);
1965                 finishReduction();
1966         }
1967 }
1968
1969
1970
1971
1972
1973 /** check if the nodegroup reduction is finished at this node. In that case send it
1974 up the reduction tree **/
1975
1976 void CkNodeReductionMgr::finishReduction(void)
1977 {
1978   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1979   /***Check if reduction is finished in the next few ifs***/
1980   if ((!inProgress) || creating){
1981         DEBR((AA"Either not in Progress or creating\n"AB));
1982         return;
1983   }
1984
1985   if (nContrib<(lcount)){
1986         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1987
1988          return;//Need more local messages
1989   }
1990   if (nRemote<treeKids()){
1991         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1992
1993         return;//Need more remote messages
1994   }
1995   if (nRemote>treeKids()){
1996
1997           interrupt = 0;
1998            CkAbort("Nodegrp Excess remote reduction message received!\n");
1999   }
2000
2001   DEBR((AA"Reducing node data...\n"AB));
2002
2003   /**reduce all messages received at this node **/
2004   CkReductionMsg *result=reduceMessages();
2005
2006   if (hasParent())
2007   {//Pass data up tree to parent
2008         if(CmiNodeAlive(CkMyNode()) || killed == 0){
2009         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
2010 #if DEBUGRED
2011         CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
2012 #endif
2013                 /*
2014                         FAULT_EVAC
2015                 */
2016                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2017             thisProxy[treeParent()].RecvMsg(result);
2018         }
2019
2020   }
2021   else
2022   {
2023                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2024 #if DEBUGRED
2025                         CkPrintf("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor());
2026 #endif                  
2027                         msgs.enq(result);
2028                         return;
2029                 }
2030                 result->gcount += additionalGCount;
2031           /** if the reduction is finished and I am the root of the reduction tree
2032           then call the reductionhandler and other stuff ***/
2033                 
2034
2035 #if DEBUGRED
2036    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
2037 #endif
2038     if (!result->callback.isInvalid()){
2039 #if DEBUGRED
2040             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
2041 #endif      
2042             result->callback.send(result);
2043     }
2044     else if (storedCallback!=NULL){
2045 #if DEBUGRED
2046             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
2047 #endif
2048             storedCallback->send(result);
2049     }
2050     else{
2051                 DEBR((AA"Invalid Callback \n"AB));
2052             CkAbort("No reduction client!\n"
2053                     "You must register a client with either SetReductionClient or during contribute.\n");
2054                 }
2055   }
2056
2057   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2058   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2059   redNo++;
2060         updateTree();
2061   int i;
2062   inProgress=CmiFalse;
2063   startRequested=CmiFalse;
2064   nRemote=nContrib=0;
2065
2066   //Look through the future queue for messages we can now handle
2067   int n=futureMsgs.length();
2068
2069   for (i=0;i<n;i++)
2070   {
2071     interrupt = 1;
2072
2073     CkReductionMsg *m=futureMsgs.deq();
2074
2075     interrupt = 0;
2076     if (m!=NULL){ //One of these addContributions may have finished us.
2077 #if DEBUGRED
2078                 CkPrintf("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2079 #endif
2080       doAddContribution(m);//<- if *still* early, puts it back in the queue
2081     }
2082   }
2083
2084   interrupt = 1;
2085
2086   n=futureRemoteMsgs.length();
2087
2088   interrupt = 0;
2089   for (i=0;i<n;i++)
2090   {
2091     interrupt = 1;
2092
2093     CkReductionMsg *m=futureRemoteMsgs.deq();
2094
2095     interrupt = 0;
2096     if (m!=NULL)
2097       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2098   }
2099   
2100   n = futureLateMigrantMsgs.length();
2101   for(i=0;i<n;i++){
2102     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2103     if(m != NULL){
2104       if(m->redNo == redNo){
2105         msgs.enq(m);
2106       }else{
2107         futureLateMigrantMsgs.enq(m);
2108       }
2109     }
2110   }
2111 }
2112
2113 //////////// Reduction Manager Utilities /////////////
2114
2115 void CkNodeReductionMgr::init_BinaryTree(){
2116         parent = (CkMyNode()-1)/TREE_WID;
2117         int firstkid = CkMyNode()*TREE_WID+1;
2118         numKids=CkNumNodes()-firstkid;
2119   if (numKids>TREE_WID) numKids=TREE_WID;
2120   if (numKids<0) numKids=0;
2121
2122         for(int i=0;i<numKids;i++){
2123                 kids.push_back(firstkid+i);
2124                 newKids.push_back(firstkid+i);
2125         }
2126 }
2127
2128 void CkNodeReductionMgr::init_BinomialTree(){
2129         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2130         /*upperSize = (unsigned )pow((double)2,depth);*/
2131         upperSize = (unsigned) 1 << depth;
2132         label = upperSize-CkMyNode()-1;
2133         int p=label;
2134         int count=0;
2135         while( p > 0){
2136                 if(p % 2 == 0)
2137                         break;
2138                 else{
2139                         p = p/2;
2140                         count++;
2141                 }
2142         }
2143         /*parent = label + rint(pow((double)2,count));*/
2144         parent = label + (1<<count);
2145         parent = upperSize -1 -parent;
2146         int temp;
2147         if(count != 0){
2148                 numKids = 0;
2149                 for(int i=0;i<count;i++){
2150                         /*temp = label - rint(pow((double)2,i));*/
2151                         temp = label - (1<<i);
2152                         temp = upperSize-1-temp;
2153                         if(temp <= CkNumNodes()-1){
2154                 //              kids[numKids] = temp;
2155                                 kids.push_back(temp);
2156                                 numKids++;
2157                         }
2158                 }
2159         }else{
2160                 numKids = 0;
2161         //      kids = NULL;
2162         }
2163 }
2164
2165
2166 int CkNodeReductionMgr::treeRoot(void)
2167 {
2168   return 0;
2169 }
2170 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2171 {
2172   return (CmiBool)(CkMyNode()!=treeRoot());
2173 }
2174 int CkNodeReductionMgr::treeParent(void) //My parent Node
2175 {
2176 #ifdef BINOMIAL_TREE
2177         return parent;
2178 #else
2179   return parent;
2180 #endif
2181 }
2182
2183 int CkNodeReductionMgr::firstKid(void) //My first child Node
2184 {
2185   return CkMyNode()*TREE_WID+1;
2186 }
2187 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2188 {
2189 #ifdef BINOMIAL_TREE
2190         return numKids;
2191 #else
2192 /*  int nKids=CkNumNodes()-firstKid();
2193   if (nKids>TREE_WID) nKids=TREE_WID;
2194   if (nKids<0) nKids=0;
2195   return nKids;*/
2196         return numKids;
2197 #endif
2198 }
2199
2200 //Combine (& free) the current message vector msgs.
2201 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2202 {
2203 #if CMK_BLUEGENE_CHARM
2204   _TRACE_BG_END_EXECUTE(1);
2205   void* _bgParentLog = NULL;
2206   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2207 #endif
2208   CkReductionMsg *ret=NULL;
2209
2210   //Look through the vector for a valid reducer, swapping out placeholder messages
2211   CkReduction::reducerType r=CkReduction::invalid;
2212   int msgs_gcount=0;//Reduced gcount
2213   int msgs_nSources=0;//Reduced nSources
2214   int msgs_userFlag=-1;
2215   CkCallback msgs_callback;
2216   CkCallback msgs_secondaryCallback;
2217   int i;
2218   int nMsgs=0;
2219   CkReductionMsg *m;
2220   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2221   bool isMigratableContributor;
2222         
2223
2224   while(NULL!=(m=msgs.deq()))
2225   {
2226     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2227     msgs_gcount+=m->gcount;
2228     if (m->sourceFlag!=0)
2229     { //This is a real message from an element, not just a placeholder
2230       msgArr[nMsgs++]=m;
2231       msgs_nSources+=m->nSources();
2232       r=m->reducer;
2233       if (!m->callback.isInvalid())
2234         msgs_callback=m->callback;
2235       if(!m->secondaryCallback.isInvalid()){
2236         msgs_secondaryCallback = m->secondaryCallback;
2237       }
2238       if (m->userFlag!=-1)
2239         msgs_userFlag=m->userFlag;
2240
2241         isMigratableContributor= m->isMigratableContributor();
2242 #if CMK_BLUEGENE_CHARM
2243       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2244 #endif
2245                                 
2246     }
2247     else
2248     { //This is just a placeholder message-- replace it
2249       delete m;
2250     }
2251   }
2252
2253   if (nMsgs==0||r==CkReduction::invalid)
2254   //No valid reducer in the whole vector
2255     ret=CkReductionMsg::buildNew(0,NULL);
2256   else
2257   {//Use the reducer to reduce the messages
2258     if(nMsgs == 1){
2259         ret = msgArr[0];
2260     }else{
2261         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2262         ret=(*f)(nMsgs,msgArr);
2263     }
2264     ret->reducer=r;
2265   }
2266
2267         
2268 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2269 #if CRITICAL_PATH_DEBUG > 3
2270         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2271 #endif
2272         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2273         path.updateMax(UsrToEnv(ret));
2274         // Combine the critical paths from all the reduction messages into the header for the new result
2275         for (i=0;i<nMsgs;i++){
2276           if (msgArr[i]!=ret){
2277             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2278             path.updateMax(UsrToEnv(msgArr[i]));
2279           } else {
2280             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2281           }
2282         }
2283 #if CRITICAL_PATH_DEBUG > 3
2284         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2285 #endif
2286
2287 #endif
2288
2289
2290         //Go back through the vector, deleting old messages
2291   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2292   delete [] msgArr;
2293   //Set the message counts
2294   ret->redNo=redNo;
2295   ret->gcount=msgs_gcount;
2296   ret->userFlag=msgs_userFlag;
2297   ret->callback=msgs_callback;
2298   ret->secondaryCallback = msgs_secondaryCallback;
2299   ret->sourceFlag=msgs_nSources;
2300   ret->setMigratableContributor(isMigratableContributor);
2301   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2302 #if CMK_BLUEGENE_CHARM
2303   _TRACE_BG_TLINE_END(&ret->log);
2304 #endif
2305
2306   return ret;
2307 }
2308
2309 void CkNodeReductionMgr::pup(PUP::er &p)
2310 {
2311 //We do not store the client function pointer or the client function parameter,
2312 //it is the responsibility of the programmer to correctly restore these
2313   IrrGroup::pup(p);
2314   p(redNo);
2315   p(inProgress); p(creating); p(startRequested);
2316   p(lcount);
2317   p(nContrib); p(nRemote);
2318   p(interrupt);
2319   p|msgs;
2320   p|futureMsgs;
2321   p|futureRemoteMsgs;
2322   p|futureLateMigrantMsgs;
2323   p|parent;
2324   p|additionalGCount;
2325   p|newAdditionalGCount;
2326   if(p.isUnpacking()) {
2327     gcount=CkNumNodes();
2328     thisProxy = thisgroup;
2329     lockEverything = CmiCreateLock();
2330 #ifdef BINOMIAL_TREE
2331     init_BinomialTree();
2332 #else
2333     init_BinaryTree();
2334 #endif          
2335   }
2336   p | blocked;
2337   p | maxModificationRedNo;
2338
2339 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2340   int isnull = (storedCallback == NULL);
2341   p | isnull;
2342   if (!isnull) {
2343     if (p.isUnpacking()) {
2344       storedCallback = new CkCallback;
2345     }
2346     p|*storedCallback;
2347   }
2348 #endif
2349
2350 }
2351
2352 /*
2353         FAULT_EVAC
2354         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2355         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2356         reduction tree. 
2357 */
2358 void CkNodeReductionMgr::evacuate(){
2359         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2360         if(treeKids() == 0){
2361         /*
2362                 if the node going down is a leaf
2363         */
2364                 oldleaf=true;
2365                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2366                 /*
2367                         Need to ask parent for the reduction number that it has seen. 
2368                         Since it is a leaf, the tree does not need to be rewired. 
2369                         We reuse the oldparent type of tree modification message to get 
2370                         the parent to block and tell us about the highest reduction number it has seen.
2371                         
2372                 */
2373                 int data[2];
2374                 data[0]=CkMyNode();
2375                 data[1]=getTotalGCount()+additionalGCount;
2376                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2377                 newParent = treeParent();
2378         }else{
2379                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2380                 oldleaf= false;
2381         /*
2382                 It is not a leaf. It needs to rewire the tree around itself.
2383                 It also needs to decide on a reduction No after which the new tree will be used
2384                 Till it decides on the new tree and the redNo at which it becomes valid,
2385                 all received messages will be buffered
2386         */
2387                 newParent = kids[0];
2388                 for(int i=numKids-1;i>=0;i--){
2389                         newKids.remove(i);
2390                 }
2391                 /*
2392                         Ask everybody for the highest reduction number they have seen and
2393                         also tell them about the new tree
2394                 */
2395                 /*
2396                         Tell parent about its new child;
2397                 */
2398                 int oldParentData[2];
2399                 oldParentData[0] = CkMyNode();
2400                 oldParentData[1] = newParent;
2401                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2402
2403                 /*
2404                         Tell the other children about their new parent
2405                 */
2406                 int childrenData=newParent;
2407                 for(int i=1;i<numKids;i++){
2408                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2409                 }
2410                 
2411                 /*
2412                         Tell newParent (1st child) about its new children,
2413                         the current node and its children except the newParent
2414                 */
2415                 int *newParentData = new int[numKids+2];
2416                 for(int i=1;i<numKids;i++){
2417                         newParentData[i] = kids[i];
2418                 }
2419                 newParentData[0] = CkMyNode();
2420                 newParentData[numKids] = parent;
2421                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2422                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2423         }
2424         readyDeletion = false;
2425         blocked = true;
2426         numModificationReplies = 0;
2427         tempModificationRedNo = findMaxRedNo();
2428 }
2429
2430 /*
2431         Depending on the code, use the data to change the tree
2432         1. OLDPARENT : replace the old child with a new one
2433         2. OLDCHILDREN: replace the parent
2434         3. NEWPARENT:  add the children and change the parent
2435         4. LEAFPARENT: delete the old child
2436 */
2437
2438 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2439         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2440         int sender;
2441         newKids = kids;
2442         readyDeletion = false;
2443         newAdditionalGCount = additionalGCount;
2444         switch(code){
2445                 case OLDPARENT: 
2446                         for(int i=0;i<numKids;i++){
2447                                 if(newKids[i] == data[0]){
2448                                         newKids[i] = data[1];
2449                                         break;
2450                                 }
2451                         }
2452                         sender = data[0];
2453                         newParent = parent;
2454                         break;
2455                 case OLDCHILDREN:
2456                         newParent = data[0];
2457                         sender = parent;
2458                         break;
2459                 case NEWPARENT:
2460                         for(int i=0;i<size-2;i++){
2461                                 newKids.push_back(data[i]);
2462                         }
2463                         newParent = data[size-2];
2464                         newAdditionalGCount += data[size-1];
2465                         sender = parent;
2466                         break;
2467                 case LEAFPARENT:
2468                         for(int i=0;i<numKids;i++){
2469                                 if(newKids[i] == data[0]){
2470                                         newKids.remove(i);
2471                                         break;
2472                                 }
2473                         }
2474                         sender = data[0];
2475                         newParent = parent;
2476                         newAdditionalGCount += data[1];
2477                         break;
2478         };
2479         blocked = true;
2480         int maxRedNo = findMaxRedNo();
2481         
2482         thisProxy[sender].collectMaxRedNo(maxRedNo);
2483 }
2484
2485 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2486         /*
2487                 Find out the maximum redNo that has been seen by 
2488                 the affected nodes
2489         */
2490         numModificationReplies++;
2491         if(maxRedNo > tempModificationRedNo){
2492                 tempModificationRedNo = maxRedNo;
2493         }
2494         if(numModificationReplies == numKids+1){
2495                 maxModificationRedNo = tempModificationRedNo;
2496                 /*
2497                         when all the affected nodes have replied, tell them the maximum.
2498                         Unblock yourself. deal with the buffered messages local and remote
2499                 */
2500                 if(maxModificationRedNo == -1){
2501                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2502                 }else{
2503                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2504                 }
2505                 thisProxy[parent].unblockNode(maxModificationRedNo);
2506                 for(int i=0;i<numKids;i++){
2507                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2508                 }
2509                 blocked = false;
2510                 updateTree();
2511                 clearBlockedMsgs();
2512         }
2513 }
2514
2515 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2516         maxModificationRedNo = maxRedNo;
2517         updateTree();
2518         blocked = false;
2519         clearBlockedMsgs();
2520 }
2521
2522
2523 void CkNodeReductionMgr::clearBlockedMsgs(){
2524         int len = bufferedMsgs.length();
2525         for(int i=0;i<len;i++){
2526                 CkReductionMsg *m = bufferedMsgs.deq();
2527                 doAddContribution(m);
2528         }
2529         len = bufferedRemoteMsgs.length();
2530         for(int i=0;i<len;i++){
2531                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2532                 doRecvMsg(m);
2533         }
2534
2535 }
2536 /*
2537         if the reduction number exceeds the maxModificationRedNo, change the tree
2538         to become the new one
2539 */
2540
2541 void CkNodeReductionMgr::updateTree(){
2542         if(redNo > maxModificationRedNo){
2543                 parent = newParent;
2544                 kids = newKids;
2545                 maxModificationRedNo = INT_MAX;
2546                 numKids = kids.size();
2547                 readyDeletion = true;
2548                 additionalGCount = newAdditionalGCount;
2549                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2550                 for(int i=0;i<(int)(newKids.size());i++){
2551                         DEBREVAC(("%d ",newKids[i]));
2552                 }
2553                 DEBREVAC(("\n"));
2554         //      startReduction(redNo,CkMyNode());
2555         }else{
2556                 if(maxModificationRedNo != INT_MAX){
2557                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2558                         startReduction(redNo,CkMyNode());
2559                         finishReduction();
2560                 }       
2561         }
2562 }
2563
2564
2565 void CkNodeReductionMgr::doneEvacuate(){
2566         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2567 /*      if(oldleaf){
2568                 
2569                         It used to be a leaf
2570                         Then as soon as future messages have been emptied you can 
2571                         send the parent a message telling them that they are not going
2572                         to receive anymore messages from this child
2573                 
2574                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2575                 while(futureMsgs.length() != 0){
2576                         int n = futureMsgs.length();
2577                         for(int i=0;i<n;i++){
2578                                 CkReductionMsg *m = futureMsgs.deq();
2579                                 if(isPresent(m->redNo)){
2580                                         msgs.enq(m);
2581                                 }else{
2582                                         futureMsgs.enq(m);
2583                                 }
2584                         }
2585                         CkReductionMsg *result = reduceMessages();
2586                         thisProxy[treeParent()].RecvMsg(result);
2587                         redNo++;
2588                 }
2589                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2590                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2591         }else{*/
2592                 if(readyDeletion){
2593                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2594                 }else{
2595                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2596                 }
2597 //      }
2598 }
2599
2600 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2601         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2602         for(int i=0;i<numKids;i++){
2603                 if(kids[i] == deletedChild){
2604                         kids.remove(i);
2605                         break;
2606                 }
2607         }
2608         numKids = kids.length();
2609         finishReduction();
2610 }
2611
2612 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2613         for(int i=0;i<(int)(newKids.length());i++){
2614                 if(newKids[i] == deletedChild){
2615                         newKids.remove(i);
2616                         break;
2617                 }
2618         }
2619         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2620         for(int i=0;i<(int)(newKids.size());i++){
2621                 DEBREVAC(("%d ",newKids[i]));
2622         }
2623         DEBREVAC(("\n"));
2624         finishReduction();
2625 }
2626
2627 int CkNodeReductionMgr::findMaxRedNo(){
2628         int max = redNo;
2629         for(int i=0;i<futureRemoteMsgs.length();i++){
2630                 if(futureRemoteMsgs[i]->redNo  > max){
2631                         max = futureRemoteMsgs[i]->redNo;
2632                 }
2633         }
2634         /*
2635                 if redNo is max (that is no future message) and the current reduction has not started
2636                 then tree can be changed before the reduction redNo can be started
2637         */ 
2638         if(redNo == max && msgs.length() == 0){
2639                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2640                 max--;
2641         }
2642         return max;
2643 }
2644
2645 #include "CkReduction.def.h"