83c1accee3de1d1db6083722e7ebac70dcf547e7
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   disableNotifyChildrenStart = CmiFalse;
208   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
209 }
210
211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
212 {
213   redNo=0;
214   completedRedNo = -1;
215   inProgress=CmiFalse;
216   creating=CmiFalse;
217   startRequested=CmiFalse;
218   gcount=lcount=0;
219   nContrib=nRemote=0;
220   maxStartRequest=0;
221   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
222 }
223
224 void CkReductionMgr::flushStates(int isgroup)
225 {
226   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
227   redNo=0;
228   completedRedNo = -1;
229   inProgress=CmiFalse;
230   creating=CmiFalse;
231   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
232   startRequested=CmiFalse;
233   nContrib=nRemote=0;
234   maxStartRequest=0;
235
236   while (!msgs.isEmpty()) { delete msgs.deq(); }
237   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
238   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
239   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
240
241   adjVec.length()=0;
242
243 #if ! GROUP_LEVEL_REDUCTION
244   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
245 #endif
246 }
247
248 //////////// Reduction Manager Client API /////////////
249
250 //Add the given client function.  Overwrites any previous client.
251 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
252 {
253   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
254
255   if (CkMyPe()!=0)
256           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
257   storedCallback=*cb;
258 #if ! GROUP_LEVEL_REDUCTION
259   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
260   nodeProxy.ckSetReductionClient(callback);
261 #endif
262 }
263
264 ///////////////////////////// Contributor ////////////////////////
265 //Contributors keep a copy of this structure:
266
267 /*Contributor migration support:
268 */
269 void contributorInfo::pup(PUP::er &p)
270 {
271   p(redNo);
272 }
273
274 ////////////////////// Contributor list maintainance: /////////////////
275 //These just set and clear the "creating" flag to prevent
276 // reductions from finishing early because not all elements
277 // have been created.
278 void CkReductionMgr::creatingContributors(void)
279 {
280   DEBR((AA"Creating contributors...\n"AB));
281   creating=CmiTrue;
282 }
283 void CkReductionMgr::doneCreatingContributors(void)
284 {
285   DEBR((AA"Done creating contributors...\n"AB));
286   creating=CmiFalse;
287   if (startRequested) startReduction(redNo,CkMyPe());
288   finishReduction();
289 }
290
291 //A new contributor will be created
292 void CkReductionMgr::contributorStamped(contributorInfo *ci)
293 {
294   DEBR((AA"Contributor %p stamped\n"AB,ci));
295   //There is another contributor
296   gcount++;
297   if (inProgress)
298   {
299     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
300     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
301   } else
302     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
303 }
304
305 //A new contributor was actually created
306 void CkReductionMgr::contributorCreated(contributorInfo *ci)
307 {
308   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
309   //We've got another contributor
310   lcount++;
311   //He may not need to contribute to some of our reductions:
312   for (int r=redNo;r<ci->redNo;r++)
313     adj(r).lcount--;//He won't be contributing to r here
314 }
315
316 /*Don't expect any more contributions from this one.
317 This is rather horrifying because we now have to make
318 sure the global element count accurately reflects all the
319 contributions the element made before it died-- these may stretch
320 far into the future.  The adj() vector is what saves us here.
321 */
322 void CkReductionMgr::contributorDied(contributorInfo *ci)
323 {
324 #if CMK_MEM_CHECKPOINT
325   // ignore from listener if it is during restart from crash
326   if (CkInRestarting()) return;
327 #endif
328   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
329   //We lost a contributor
330   gcount--;
331
332   if (ci->redNo<redNo)
333   {//Must have been migrating during reductions-- root is waiting for his
334   // contribution, which will never come.
335     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
336     for (int r=ci->redNo;r<redNo;r++)
337       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
338   }
339
340   //Add to the global count for all his future messages (wherever they are)
341   int r;
342   for (r=redNo;r<ci->redNo;r++)
343   {//He already contributed to this reduction, but won't show up in global count.
344     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
345     adj(r).gcount++;
346   }
347
348   lcount--;
349   //He's already contributed to several reductions here
350   for (r=redNo;r<ci->redNo;r++)
351     adj(r).lcount++;//He'll be contributing to r here
352
353   finishReduction();
354 }
355
356 //Migrating away (note that global count doesn't change)
357 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
358 {
359   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
360   lcount--;//We lost a local
361   //He's already contributed to several reductions here
362   for (int r=redNo;r<ci->redNo;r++)
363     adj(r).lcount++;//He'll be contributing to r here
364
365   finishReduction();
366 }
367
368 //Migrating in (note that global count doesn't change)
369 void CkReductionMgr::contributorArriving(contributorInfo *ci)
370 {
371   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
372   lcount++;//We gained a local
373 #if CMK_MEM_CHECKPOINT
374   // ignore from listener if it is during restart from crash
375   // because the ci may be old.
376   if (CkInRestarting()) return;
377 #endif
378   //He has already contributed (elsewhere) to several reductions:
379   for (int r=redNo;r<ci->redNo;r++)
380     adj(r).lcount--;//He won't be contributing to r here
381
382 }
383
384 //Contribute-- the given msg can contain any data.  The reducerType
385 // field of the message must be valid.
386 // Each contributor must contribute exactly once to the each reduction.
387 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
388 {
389 #if CMK_BIGSIM_CHARM
390   _TRACE_BG_TLINE_END(&(m->log));
391 #endif
392   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
393   //m->ci=ci;
394   m->redNo=ci->redNo++;
395   m->sourceFlag=-1;//A single contribution
396   m->gcount=0;
397
398 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
399     if(lcount == 0){
400         m->sourceProcessorCount = 1;
401     }else{
402         m->sourceProcessorCount = lcount;
403     }
404     m->fromPE = CmiMyPe();
405     Chare *oldObj =CpvAccess(_currentObj);
406     char currentObjName[100];
407     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
408     thisProxy[0].contributeViaMessage(m);
409 #else
410   addContribution(m);
411 #endif
412 }
413
414 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
415 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
416     CmiAssert(CmiMyPe() == 0);
417     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
418     if(redNo == 0){
419         if(perProcessorCounts[m->fromPE] == -1){
420             processorCount++;
421             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
422             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
423         }else{
424             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
425                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
426                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
427             }
428         }
429         if(processorCount == CmiNumPes()){
430             totalCount = 0;
431             for(int i=0;i<CmiNumPes();i++){
432                 CmiAssert(perProcessorCounts[i] != -1);
433                 totalCount += perProcessorCounts[i];
434             }
435             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
436         }
437         if(m->sourceProcessorCount == 0){
438             if(processorCount == CmiNumPes()){
439                 finishReduction();
440             }
441             return;
442         }
443     }
444     addContribution(m);
445 }
446 #else
447 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
448 #endif
449
450 //////////// Reduction Manager Remote Entry Points /////////////
451 //Sent down the reduction tree (used by barren PEs)
452 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
453 {
454  if(CkMyPe()==0){
455         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
456         //delete m;
457         //return;
458  }
459  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
460  int srcPE = (UsrToEnv(m))->getSrcPe();
461 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
462   if (isPresent(m->num) && !inProgress)
463   {
464     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
465     startReduction(m->num,srcPE);
466     finishReduction();
467   } else if (isFuture(m->num)){
468 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
469           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
470           if(maxStartRequest < m->num){
471                   maxStartRequest = m->num;
472           }
473  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
474           
475     }
476   else //is Past
477     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
478   delete m;
479 #else
480     if(redNo == 0){
481         if(lcount == 0){
482             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
483             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
484             dummy->fromPE = CmiMyPe();
485             dummy->sourceProcessorCount = 0;
486             dummy->redNo = 0;
487             thisProxy[0].contributeViaMessage(dummy);
488         }
489     }
490 #endif
491 }
492
493 //Sent to root of reduction tree with reduction contribution
494 // of migrants that missed the main reduction.
495 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
496 {
497 #if GROUP_LEVEL_REDUCTION
498 #if CMK_BIGSIM_CHARM
499   _TRACE_BG_TLINE_END(&(m->log));
500 #endif
501   addContribution(m);
502 #else
503   m->secondaryCallback = m->callback;
504   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
505   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
506   nodeMgr->LateMigrantMsg(m);
507 /*      int len = finalMsgs.length();
508         finalMsgs.enq(m);
509 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
510         endArrayReduction();*/
511 #endif
512 }
513
514 //A late migrating contributor will never contribute to this reduction
515 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
516 {
517   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
518   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
519  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
520   adj(m->num).gcount--;//He won't be contributing to this one.
521   finishReduction();
522   delete m;
523 }
524
525 //////////// Reduction Manager State /////////////
526 void CkReductionMgr::startReduction(int number,int srcPE)
527 {
528   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
529   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
530   if (inProgress){
531         DEBR((AA"This reduction is already in progress\n"AB));
532         return;//This reduction already started
533   }
534   if (creating) //Don't start yet-- we're creating elements
535   {
536     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
537     startRequested=CmiTrue;
538     return;
539   }
540
541 //If none of these cases, we need to start the reduction--
542   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
543   inProgress=CmiTrue;
544  
545
546         /*
547                 FAULT_EVAC
548         */
549   if(!CmiNodeAlive(CkMyPe())){
550         return;
551   }
552
553   if(disableNotifyChildrenStart) return;
554   
555 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
556   if(CmiMyPe() == 0 && redNo == 0){
557             for(int j=0;j<CkNumPes();j++){
558                 if(j != CkMyPe() && j != srcPE){
559                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
560                 }
561             }
562             if(lcount == 0){
563                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
564                 dummy->fromPE = CmiMyPe();
565                 dummy->sourceProcessorCount = 0;
566                 dummy->redNo = 0;
567                 thisProxy[0].contributeViaMessage(dummy);
568             }
569     }   else{
570         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
571     }
572
573
574 #else
575   //Sent start requests to our kids (in case they don't already know)
576 #if GROUP_LEVEL_REDUCTION
577   for (int k=0;k<treeKids();k++)
578   {
579     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
580     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
581   }
582 #else
583   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
584 #endif
585 #endif
586         
587         /*
588   int temp;
589   //making it a broadcast done only by PE 0
590   if(!hasParent()){
591                 temp = completedRedNo+1;
592                 for(int i=temp;i<=number;i++){
593                         for(int j=0;j<CkNumPes();j++){
594                                 if(j != CkMyPe() && j != srcPE){
595                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
596                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
597                                         }
598                                 }
599                         }
600                 }       
601         }       else{
602                 temp = number;
603         }*/
604 /*  if(!hasParent()){
605                 temp = completedRedNo+1;
606         }       else{
607                 temp = number;
608         }
609         for(int i=temp;i<=number;i++){
610         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
611                 if(hasParent()){
612           // kick-start your parent too ...
613                         if(treeParent() != srcPE){
614                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
615 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
616     CpvAccess(_currentObj) = oldObj;
617 #endif
618                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
619                                 }       
620                         }       
621                 }
622           for (int k=0;k<treeKids();k++)
623           {
624                         if(firstKid()+k != srcPE){
625                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
626                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
627                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
628                                 }       
629                         }       
630         }
631         }
632         */
633 }       
634
635 /*Handle a message from one element for the reduction*/
636 void CkReductionMgr::addContribution(CkReductionMsg *m)
637 {
638   if (isPast(m->redNo))
639   {
640 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
641         CmiAbort("this version should not have late migrations");
642 #else
643         //We've moved on-- forward late contribution straight to root
644     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
645         // if (!hasParent()) //Root moved on too soon-- should never happen
646         //   CkAbort("Late reduction contribution received at root!\n");
647     thisProxy[0].LateMigrantMsg(m);
648 #endif
649   }
650   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
651     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
652     futureMsgs.enq(m);
653   } else {// An ordinary contribution
654     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
655    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
656     startReduction(m->redNo,CkMyPe());
657     msgs.enq(m);
658     nContrib++;
659     finishReduction();
660   }
661 }
662
663 /**function checks if it has got all contributions that it is supposed to
664 get at this processor. If it is done it sends the reduced result to the local
665 nodegroup */
666 void CkReductionMgr::finishReduction(void)
667 {
668   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
669   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
670   if ((!inProgress) || creating){
671         DEBR((AA"Either not in Progress or creating\n"AB));
672         return;
673   }
674   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
675 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
676
677   if (nContrib<(lcount+adj(redNo).lcount)){
678          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
679          return;//Need more local messages
680   }
681 #if GROUP_LEVEL_REDUCTION
682   if (nRemote<treeKids()) return;//Need more remote messages
683 #endif
684  
685 #else
686
687     if(CkMyPe() != 0){
688         if(redNo != 0){
689             return;
690         }else{
691             CmiAssert(lcount == 0);
692             CkReductionMsg *dummy = reduceMessages();
693             dummy->fromPE = CmiMyPe();
694             dummy->sourceProcessorCount = 0;
695             thisProxy[0].contributeViaMessage(dummy);
696             return;
697         }
698     }
699     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
700         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
701          return;//Need more local messages
702   }else{
703         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
704     }
705
706
707 #endif
708
709   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
710   CkReductionMsg *result=reduceMessages();
711   result->redNo=redNo;
712 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
713
714 #if GROUP_LEVEL_REDUCTION
715   if (hasParent())
716   {//Pass data up tree to parent
717     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
718     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
719     result->gcount+=gcount+adj(redNo).gcount;
720     thisProxy[treeParent()].RecvMsg(result);
721   }
722   else 
723   {//We are root-- pass data to client
724     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
725     int totalElements=result->gcount+gcount+adj(redNo).gcount;
726     if (totalElements>result->nSources()) 
727     {
728       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
729       msgs.enq(result);
730       return; // Wait for migrants to contribute
731     } else if (totalElements<result->nSources()) {
732       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
733       CkAbort("ERROR! Too many contributions at root!\n");
734     }
735     DEBR((AA"Passing result to client function\n"AB));
736     CkSetRefNum(result, result->getUserFlag());
737     if (!result->callback.isInvalid())
738             result->callback.send(result);
739     else if (!storedCallback.isInvalid())
740             storedCallback.send(result);
741     else
742             CkAbort("No reduction client!\n"
743                     "You must register a client with either SetReductionClient or during contribute.\n");
744   }
745
746 #else
747   result->gcount+=gcount+adj(redNo).gcount;
748
749   result->secondaryCallback = result->callback;
750   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
751         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
752
753   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
754
755  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
756   
757   // Find our node reduction manager, and pass reduction to him:
758   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
759   nodeMgr->contributeArrayReduction(result);
760 #endif
761 #else                // _FAULT_MLOG_
762   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
763
764     CkSetRefNum(result, result->getUserFlag());
765     if (!result->callback.isInvalid())
766         result->callback.send(result);
767     else if (!storedCallback.isInvalid())
768         storedCallback.send(result);
769     else{
770       DEBR(("No reduction client for group %d \n",thisgroup.idx));
771         CkAbort("No reduction client!\n"
772             "You must register a client with either SetReductionClient or during contribute.\n");
773     }
774
775     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
776
777 #endif               // _FAULT_MLOG_
778
779   //House Keeping Operations will have to check later what needs to be changed
780   redNo++;
781   //Shift the count adjustment vector down one slot (to match new redNo)
782   int i;
783 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) && !GROUP_LEVEL_REDUCTION
784     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
785   if(CkMyPe()!=0)
786 #endif
787   {
788         completedRedNo++;
789         for (i=1;i<(int)(adjVec.length());i++){
790            adjVec[i-1]=adjVec[i];
791         }
792         adjVec.length()--;  
793   }
794
795   inProgress=CmiFalse;
796   startRequested=CmiFalse;
797   nRemote=nContrib=0;
798
799   //Look through the future queue for messages we can now handle
800   int n=futureMsgs.length();
801   for (i=0;i<n;i++)
802   {
803     CkReductionMsg *m=futureMsgs.deq();
804     if (m!=NULL) //One of these addContributions may have finished us.
805       addContribution(m);//<- if *still* early, puts it back in the queue
806   }
807 #if GROUP_LEVEL_REDUCTION
808   n=futureRemoteMsgs.length();
809   for (i=0;i<n;i++)
810   {
811     CkReductionMsg *m=futureRemoteMsgs.deq();
812     if (m!=NULL) {
813       RecvMsg(m);//<- if *still* early, puts it back in the queue
814     }
815   }
816 #endif
817
818 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
819   if(maxStartRequest >= redNo){
820           startReduction(redNo,CkMyPe());
821           finishReduction();
822   }
823  
824 #endif
825 }
826
827 //Sent up the reduction tree with reduced data
828   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
829 {
830 #if GROUP_LEVEL_REDUCTION
831 #if CMK_BIGSIM_CHARM
832   _TRACE_BG_TLINE_END(&m->log);
833 #endif
834   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
835     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
836     startReduction(m->redNo, CkMyPe());
837     msgs.enq(m);
838     nRemote++;
839     finishReduction();
840   }
841   else if (isFuture(m->redNo)) {
842     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
843     futureRemoteMsgs.enq(m);
844   } 
845   else CkAbort("Recv'd late remote contribution!\n");
846 #endif
847 }
848
849 //////////// Reduction Manager Utilities /////////////
850
851 //Return the countAdjustment struct for the given redNo:
852 countAdjustment &CkReductionMgr::adj(int number)
853 {
854   number-=completedRedNo;
855   number--;
856   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
857   //Pad the adjustment vector with zeros until it's at least number long
858   while ((int)(adjVec.length())<=number)
859     adjVec.push_back(countAdjustment());
860   return adjVec[number];
861 }
862
863 //Combine (& free) the current message vector msgs.
864 CkReductionMsg *CkReductionMgr::reduceMessages(void)
865 {
866 #if CMK_BIGSIM_CHARM
867   _TRACE_BG_END_EXECUTE(1);
868   void* _bgParentLog = NULL;
869   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
870 #endif
871   CkReductionMsg *ret=NULL;
872
873   //Look through the vector for a valid reducer, swapping out placeholder messages
874   CkReduction::reducerType r=CkReduction::invalid;
875   int msgs_gcount=0;//Reduced gcount
876   int msgs_nSources=0;//Reduced nSources
877   int msgs_userFlag=-1;
878   CkCallback msgs_callback;
879   int i;
880   int nMsgs=0;
881   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
882   CkReductionMsg *m;
883   bool isMigratableContributor;
884
885   // Copy message queue into msgArr, skipping placeholders:
886   while (NULL!=(m=msgs.deq()))
887   {
888     msgs_gcount+=m->gcount;
889     if (m->sourceFlag!=0)
890     { //This is a real message from an element, not just a placeholder
891       msgArr[nMsgs++]=m;
892       msgs_nSources+=m->nSources();
893       r=m->reducer;
894       if (!m->callback.isInvalid())
895         msgs_callback=m->callback;
896       if (m->userFlag!=-1)
897         msgs_userFlag=m->userFlag;
898                         
899         isMigratableContributor=m->isMigratableContributor();
900 #if CMK_BIGSIM_CHARM
901         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
902 #endif
903     }
904     else
905     { //This is just a placeholder message-- forget it
906       delete m;
907     }
908   }
909
910   if (nMsgs==0||r==CkReduction::invalid)
911   //No valid reducer in the whole vector
912     ret=CkReductionMsg::buildNew(0,NULL);
913   else
914   {//Use the reducer to reduce the messages
915                 //if there is only one msg to be reduced just return that message
916     if(nMsgs == 1){
917         ret = msgArr[0];        
918     }else{
919         CkReduction::reducerFn f=CkReduction::reducerTable[r];
920         ret=(*f)(nMsgs,msgArr);
921     }
922     ret->reducer=r;
923   }
924
925
926
927 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
928
929 #if CRITICAL_PATH_DEBUG > 3
930   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
931 #endif
932
933   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
934   path.updateMax(UsrToEnv(ret));
935   // Combine the critical paths from all the reduction messages
936   for (i=0;i<nMsgs;i++){
937     if (msgArr[i]!=ret){
938       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
939       path.updateMax(UsrToEnv(msgArr[i]));
940     }
941   }
942   
943
944 #if CRITICAL_PATH_DEBUG > 3
945   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
946 #endif
947   
948   PathHistoryTableEntry tableEntry(path);
949   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
950   
951 #endif
952
953         //Go back through the vector, deleting old messages
954   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
955   delete [] msgArr;
956
957   //Set the message counts
958   ret->redNo=redNo;
959   ret->gcount=msgs_gcount;
960   ret->userFlag=msgs_userFlag;
961   ret->callback=msgs_callback;
962   ret->sourceFlag=msgs_nSources;
963         ret->setMigratableContributor(isMigratableContributor);
964   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
965
966   return ret;
967 }
968
969
970 //Checkpointing utilities
971 //pack-unpack method for CkReductionMsg
972 //if packing pack the message and then unpack and return it
973 //if unpacking allocate memory for it read it off disk and then unapck
974 //and return it
975 void CkReductionMgr::pup(PUP::er &p)
976 {
977 //We do not store the client function pointer or the client function parameter,
978 //it is the responsibility of the programmer to correctly restore these
979   CkGroupInitCallback::pup(p);
980   p(redNo);
981   p(completedRedNo);
982   p(inProgress); p(creating); p(startRequested);
983   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
984   p|msgs;
985   p|futureMsgs;
986   p|futureRemoteMsgs;
987   p|finalMsgs;
988   p|adjVec;
989   p|nodeProxy;
990   p|storedCallback;
991     // handle CkReductionClientBundle
992   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
993   {
994     CkReductionClientBundle *bd;
995     if (p.isUnpacking()) 
996       bd = new CkReductionClientBundle;
997     else
998       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
999     p|*bd;
1000     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1001   }
1002
1003   // subtle --- Gengbin
1004   // Group : CkReductionMgr
1005   // CkArray: CkReductionMgr
1006   // lcount/gcount in Group is set in Group constructor
1007   // lcount/gcount in CkArray is not, it is set when array elements are created
1008   // we can not pup because inserting array elems will add the counters again
1009 //  p|lcount;
1010 //  p|gcount;
1011 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1012 //  p|lcount;
1013 //  //  p|gcount;
1014 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1015 #endif
1016   if(p.isUnpacking()){
1017     thisProxy = thisgroup;
1018     maxStartRequest=0;
1019 #if GROUP_LEVEL_REDUCTION
1020 #ifdef BINOMIAL_TREE
1021     init_BinomialTree();
1022 #else
1023     init_BinaryTree();
1024 #endif
1025 #endif
1026   }
1027
1028   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1029 }
1030
1031
1032 //Callback for doing Reduction through NodeGroups added by Sayantan
1033
1034 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1035
1036         int total = m->gcount+adj(m->redNo).gcount;
1037         finalMsgs.enq(m);
1038         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1039         adj(m->redNo).mainRecvd = 1;
1040         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1041         endArrayReduction();
1042 }
1043
1044 void CkReductionMgr :: endArrayReduction(){
1045         CkReductionMsg *ret=NULL;
1046         int nMsgs=finalMsgs.length();
1047         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1048         //Look through the vector for a valid reducer, swapping out placeholder messages
1049         //CkPrintf("Length of Final Message %d \n",nMsgs);
1050         CkReduction::reducerType r=CkReduction::invalid;
1051         int msgs_gcount=0;//Reduced gcount
1052         int msgs_nSources=0;//Reduced nSources
1053         int msgs_userFlag=-1;
1054         CkCallback msgs_callback;
1055         CkCallback msgs_secondaryCallback;
1056         CkVec<CkReductionMsg *> tempMsgs;
1057         int i;
1058         int numMsgs = 0;
1059         for (i=0;i<nMsgs;i++)
1060         {
1061                 CkReductionMsg *m=finalMsgs.deq();
1062                 if(m->redNo == completedRedNo +1){
1063                         msgs_gcount+=m->gcount;
1064                         if (m->sourceFlag!=0)
1065                         { //This is a real message from an element, not just a placeholder
1066                                 msgs_nSources+=m->nSources();
1067                                 r=m->reducer;
1068                                 if (!m->callback.isInvalid())
1069                                 msgs_callback=m->callback;
1070                                 if(!m->secondaryCallback.isInvalid())
1071                                         msgs_secondaryCallback = m->secondaryCallback;
1072                                 if (m->userFlag!=-1)
1073                                         msgs_userFlag=m->userFlag;
1074                                 tempMsgs.push_back(m);
1075                         }
1076                 }else{
1077                         finalMsgs.enq(m);
1078                 }
1079
1080         }
1081         numMsgs = tempMsgs.length();
1082
1083         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1084
1085         if(numMsgs == 0){
1086                 return;
1087         }
1088         if(adj(completedRedNo+1).mainRecvd == 0){
1089                 for(i=0;i<numMsgs;i++){
1090                         finalMsgs.enq(tempMsgs[i]);
1091                 }
1092                 return;
1093         }
1094
1095 /*
1096         NOT NEEDED ANYMORE DONE at nodegroup level
1097         if(msgs_gcount  > msgs_nSources){
1098                 for(i=0;i<numMsgs;i++){
1099                         finalMsgs.enq(tempMsgs[i]);
1100                 }
1101                 return;
1102         }*/
1103
1104         if (nMsgs==0||r==CkReduction::invalid)
1105                 //No valid reducer in the whole vector
1106                 ret=CkReductionMsg::buildNew(0,NULL);
1107         else{//Use the reducer to reduce the messages
1108                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1109                 // has to be corrected elements from above need to be put into a temporary vector
1110                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1111                 ret=(*f)(numMsgs,msgArr);
1112                 ret->reducer=r;
1113
1114         }
1115
1116         
1117 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1118
1119 #if CRITICAL_PATH_DEBUG > 3
1120         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1121 #endif
1122
1123         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1124         path.updateMax(UsrToEnv(ret));
1125         // Combine the critical paths from all the reduction messages into the header for the new result
1126         for (i=0;i<numMsgs;i++){
1127           if (tempMsgs[i]!=ret){
1128             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1129             path.updateMax(UsrToEnv(tempMsgs[i]));
1130           } else {
1131             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1132           }
1133         }
1134         // Also consider the path which got us into this entry method
1135
1136 #if CRITICAL_PATH_DEBUG > 3
1137         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1138 #endif
1139
1140 #endif
1141   
1142
1143
1144
1145
1146         for(i = 0;i<numMsgs;i++){
1147                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1148         }
1149
1150         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1151         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1152
1153         ret->redNo=completedRedNo+1;
1154         ret->gcount=msgs_gcount;
1155         ret->userFlag=msgs_userFlag;
1156         ret->callback=msgs_callback;
1157         ret->secondaryCallback = msgs_secondaryCallback;
1158         ret->sourceFlag=msgs_nSources;
1159
1160         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1161
1162         CkSetRefNum(ret, ret->getUserFlag());
1163         if (!ret->secondaryCallback.isInvalid())
1164             ret->secondaryCallback.send(ret);
1165     else if (!storedCallback.isInvalid())
1166             storedCallback.send(ret);
1167     else{
1168       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1169             CkAbort("No reduction client!\n"
1170                     "You must register a client with either SetReductionClient or during contribute.\n");
1171     }
1172         completedRedNo++;
1173
1174         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1175
1176         for (i=1;i<(int)(adjVec.length());i++)
1177                 adjVec[i-1]=adjVec[i];
1178         adjVec.length()--;
1179         endArrayReduction();
1180 }
1181
1182 #if GROUP_LEVEL_REDUCTION
1183
1184 void CkReductionMgr::init_BinaryTree(){
1185         parent = (CkMyPe()-1)/TREE_WID;
1186         int firstkid = CkMyPe()*TREE_WID+1;
1187         numKids=CkNumPes()-firstkid;
1188         if (numKids>TREE_WID) numKids=TREE_WID;
1189         if (numKids<0) numKids=0;
1190
1191         for(int i=0;i<numKids;i++){
1192                 kids.push_back(firstkid+i);
1193                 newKids.push_back(firstkid+i);
1194         }
1195 }
1196
1197 void CkReductionMgr::init_BinomialTree(){
1198         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1199         /*upperSize = (unsigned )pow((double)2,depth);*/
1200         upperSize = (unsigned) 1 << depth;
1201         label = upperSize-CkMyPe()-1;
1202         int p=label;
1203         int count=0;
1204         while( p > 0){
1205                 if(p % 2 == 0)
1206                         break;
1207                 else{
1208                         p = p/2;
1209                         count++;
1210                 }
1211         }
1212         /*parent = label + rint(pow((double)2,count));*/
1213         parent = label + (1<<count);
1214         parent = upperSize -1 -parent;
1215         int temp;
1216         if(count != 0){
1217                 numKids = 0;
1218                 for(int i=0;i<count;i++){
1219                         /*temp = label - rint(pow((double)2,i));*/
1220                         temp = label - (1<<i);
1221                         temp = upperSize-1-temp;
1222                         if(temp <= CkNumPes()-1){
1223                                 kids.push_back(temp);
1224                                 numKids++;
1225                         }
1226                 }
1227         }else{
1228                 numKids = 0;
1229         //      kids = NULL;
1230         }
1231 }
1232
1233
1234 int CkReductionMgr::treeRoot(void)
1235 {
1236   return 0;
1237 }
1238 CmiBool CkReductionMgr::hasParent(void) //Root Node
1239 {
1240   return (CmiBool)(CkMyPe()!=treeRoot());
1241 }
1242 int CkReductionMgr::treeParent(void) //My parent Node
1243 {
1244   return parent;
1245 }
1246
1247 int CkReductionMgr::firstKid(void) //My first child Node
1248 {
1249   return CkMyPe()*TREE_WID+1;
1250 }
1251 int CkReductionMgr::treeKids(void)//Number of children in tree
1252 {
1253   return numKids;
1254 }
1255
1256 #endif
1257
1258 /////////////////////////////////////////////////////////////////////////
1259
1260 ////////////////////////////////
1261 //CkReductionMsg support
1262
1263 //ReductionMessage default private constructor-- does nothing
1264 CkReductionMsg::CkReductionMsg(){}
1265 CkReductionMsg::~CkReductionMsg(){}
1266
1267 //This define gives the distance from the start of the CkReductionMsg
1268 // object to the start of the user data area (just below last object field)
1269 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1270
1271 //"Constructor"-- builds and returns a new CkReductionMsg.
1272 //  the "data" array you specify will be copied into this object.
1273 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1274     CkReduction::reducerType reducer, CkReductionMsg *buf)
1275 {
1276   int len[1];
1277   len[0]=NdataSize;
1278   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1279   
1280   ret->dataSize=NdataSize;
1281   if (srcData!=NULL && !buf)
1282     memcpy(ret->data,srcData,NdataSize);
1283   ret->userFlag=-1;
1284   ret->reducer=reducer;
1285   //ret->ci=NULL;
1286   ret->sourceFlag=-1000;
1287   ret->gcount=0;
1288   ret->migratableContributor = true;
1289 #if CMK_BIGSIM_CHARM
1290   ret->log = NULL;
1291 #endif
1292   return ret;
1293 }
1294
1295 // Charm kernel message runtime support:
1296 void *
1297 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1298 {
1299   int totalsize=ARM_DATASTART+(*sz);
1300   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1301   CkReductionMsg *ret = (CkReductionMsg *)
1302     CkAllocMsg(msgnum,totalsize,priobits);
1303   ret->data=(void *)(&ret->dataStorage);
1304   return (void *) ret;
1305 }
1306
1307 void *
1308 CkReductionMsg::pack(CkReductionMsg* in)
1309 {
1310   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1311   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1312   in->data = NULL;
1313   return (void*) in;
1314 }
1315
1316 CkReductionMsg* CkReductionMsg::unpack(void *in)
1317 {
1318   CkReductionMsg *ret = (CkReductionMsg *)in;
1319   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1320   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1321   ret->data=(void *)(&ret->dataStorage);
1322   return ret;
1323 }
1324
1325
1326 /////////////////////////////////////////////////////////////////////////////////////
1327 ///////////////// Builtin Reducer Functions //////////////
1328 /* A simple reducer, like sum_int, looks like this:
1329 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1330 {
1331   int i,ret=0;
1332   for (i=0;i<nMsg;i++)
1333     ret+=*(int *)(msg[i]->getData());
1334   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1335 }
1336
1337 To keep the code small and easy to change, the implementations below
1338 are built with preprocessor macros.
1339 */
1340
1341 //////////////// simple reducers ///////////////////
1342 /*A define used to quickly and tersely construct simple reductions.
1343 The basic idea is to use the first message's data array as
1344 (pre-initialized!) scratch space for folding in the other messages.
1345  */
1346
1347 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1348 {
1349         CkAbort("Called the invalid reducer type 0.  This probably\n"
1350                 "means you forgot to initialize your custom reducer index.\n");
1351         return NULL;
1352 }
1353
1354 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1355 {
1356   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1357 }
1358
1359 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1360 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1361 {\
1362   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1363   int m,i;\
1364   int nElem=msg[0]->getLength()/sizeof(dataType);\
1365   dataType *ret=(dataType *)(msg[0]->getData());\
1366   for (m=1;m<nMsg;m++)\
1367   {\
1368     dataType *value=(dataType *)(msg[m]->getData());\
1369     for (i=0;i<nElem;i++)\
1370     {\
1371       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1372       loop\
1373     }\
1374   }\
1375   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1376   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1377 }
1378
1379 //Use this macro for reductions that have the same type for all inputs
1380 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1381   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1382   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1383   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1384   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1385
1386
1387 //Compute the sum the numbers passed by each element.
1388 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1389
1390 //Compute the product of the numbers passed by each element.
1391 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1392
1393 //Compute the largest number passed by any element.
1394 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1395
1396 //Compute the smallest integer passed by any element.
1397 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1398
1399
1400 //Compute the logical AND of the integers passed by each element.
1401 // The resulting integer will be zero if any source integer is zero; else 1.
1402 SIMPLE_REDUCTION(logical_and,int,"%d",
1403         if (value[i]==0)
1404      ret[i]=0;
1405   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1406 )
1407
1408 //Compute the logical OR of the integers passed by each element.
1409 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1410 SIMPLE_REDUCTION(logical_or,int,"%d",
1411   if (value[i]!=0)
1412            ret[i]=1;
1413   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1414 )
1415
1416 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1417 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1418
1419 //Select one random message to pass on
1420 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1421   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1422 }
1423
1424 /////////////// concat ////////////////
1425 /*
1426 This reducer simply appends the data it recieves from each element,
1427 without any housekeeping data to separate them.
1428 */
1429 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1430 {
1431   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1432   //Figure out how big a message we'll need
1433   int i,retSize=0;
1434   for (i=0;i<nMsg;i++)
1435       retSize+=msg[i]->getSize();
1436
1437   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1438
1439   //Allocate a new message
1440   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1441
1442   //Copy the source message data into the return message
1443   char *cur=(char *)(ret->getData());
1444   for (i=0;i<nMsg;i++) {
1445     int messageBytes=msg[i]->getSize();
1446     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1447     cur+=messageBytes;
1448   }
1449   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1450   return ret;
1451 }
1452
1453 /////////////// set ////////////////
1454 /*
1455 This reducer appends the data it recieves from each element
1456 along with some housekeeping data indicating contribution boundaries.
1457 The message data is thus a list of reduction_set_element structures
1458 terminated by a dummy reduction_set_element with a sourceElement of -1.
1459 */
1460
1461 //This rounds an integer up to the nearest multiple of sizeof(double)
1462 static const int alignSize=sizeof(double);
1463 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1464
1465 //This gives the size (in bytes) of a reduction_set_element
1466 static int SET_SIZE(int dataSize)
1467 {return SET_ALIGN(sizeof(int)+dataSize);}
1468
1469 //This returns a pointer to the next reduction_set_element in the list
1470 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1471 {
1472   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1473   return (CkReduction::setElement *)next;
1474 }
1475
1476 //Combine the data passed by each element into an list of reduction_set_elements.
1477 // Each element may contribute arbitrary data (with arbitrary length).
1478 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1479 {
1480   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1481   //Figure out how big a message we'll need
1482   int i,retSize=0;
1483   for (i=0;i<nMsg;i++) {
1484     if (!msg[i]->isFromUser())
1485     //This message is composite-- it will just be copied over (less terminating -1)
1486       retSize+=(msg[i]->getSize()-sizeof(int));
1487     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1488       retSize+=SET_SIZE(msg[i]->getSize());
1489   }
1490   retSize+=sizeof(int);//Leave room for terminating -1.
1491
1492   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1493
1494   //Allocate a new message
1495   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1496
1497   //Copy the source message data into the return message
1498   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1499   for (i=0;i<nMsg;i++)
1500     if (!msg[i]->isFromUser())
1501     {//This message is composite-- just copy it over (less terminating -1)
1502                         int messageBytes=msg[i]->getSize()-sizeof(int);
1503                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1504                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1505                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1506     }
1507     else //This is a message from an element-- wrap it in a reduction_set_element
1508     {
1509       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1510       cur->dataSize=msg[i]->getSize();
1511       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1512       cur=SET_NEXT(cur);
1513     }
1514   cur->dataSize=-1;//Add a terminating -1.
1515   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1516   return ret;
1517 }
1518
1519 //Utility routine: get the next reduction_set_element in the list
1520 // if there is one, or return NULL if there are none.
1521 //To get all the elements, just keep feeding this procedure's output back to
1522 // its input until it returns NULL.
1523 CkReduction::setElement *CkReduction::setElement::next(void)
1524 {
1525   CkReduction::setElement *n=SET_NEXT(this);
1526   if (n->dataSize==-1)
1527     return NULL;//This is the end of the list
1528   else
1529     return n;//This is just another element
1530 }
1531
1532 /////////////////// Reduction Function Table /////////////////////
1533 CkReduction::CkReduction() {} //Dummy private constructor
1534
1535 //Add the given reducer to the list.  Returns the new reducer's
1536 // reducerType.  Must be called in the same order on every node.
1537 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1538 {
1539   reducerTable[nReducers]=fn;
1540   return (reducerType)nReducers++;
1541 }
1542
1543 /*Reducer table: maps reducerTypes to reducerFns.
1544 It's indexed by reducerType, so the order in this table
1545 must *exactly* match the reducerType enum declaration.
1546 The names don't have to match, but it helps.
1547 */
1548 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1549
1550 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1551     ::invalid_reducer,
1552     ::nop,
1553   //Compute the sum the numbers passed by each element.
1554     ::sum_int,::sum_long,::sum_float,::sum_double,
1555
1556   //Compute the product the numbers passed by each element.
1557     ::product_int,::product_long,::product_float,::product_double,
1558
1559   //Compute the largest number passed by any element.
1560     ::max_int,::max_long,::max_float,::max_double,
1561
1562   //Compute the smallest number passed by any element.
1563     ::min_int,::min_long,::min_float,::min_double,
1564
1565   //Compute the logical AND of the integers passed by each element.
1566   // The resulting integer will be zero if any source integer is zero.
1567     ::logical_and,
1568
1569   //Compute the logical OR of the integers passed by each element.
1570   // The resulting integer will be 1 if any source integer is nonzero.
1571     ::logical_or,
1572
1573     // Compute the logical bitvector AND of the integers passed by each element.
1574     ::bitvec_and,
1575
1576     // Compute the logical bitvector OR of the integers passed by each element.
1577     ::bitvec_or,
1578
1579     // Select one of the messages at random to pass on
1580     ::random,
1581
1582   //Concatenate the (arbitrary) data passed by each element
1583     ::concat,
1584
1585   //Combine the data passed by each element into an list of setElements.
1586   // Each element may contribute arbitrary data (with arbitrary length).
1587     ::set
1588 };
1589
1590
1591
1592
1593
1594
1595
1596
1597 /********** Code added by Sayantan *********************/
1598 /** Locking is a big problem in the nodegroup code for smp.
1599  So a few assumptions have had to be made. There is one lock
1600  called lockEverything. It protects all the data structures 
1601  of the nodegroup reduction mgr. I tried grabbing it separately 
1602  for each datastructure, modifying it and then releasing it and
1603  then grabbing it again, for the next change.
1604  That doesn't really help because the interleaved execution of 
1605  different threads makes the state of the reduction manager 
1606  inconsistent. 
1607  
1608  1. Grab lockEverything before calling finishreduction or startReduction
1609     or doRecvMsg
1610  2. lockEverything is grabbed only in entry methods reductionStarting
1611     or RecvMesg or  addcontribution.
1612  ****/
1613  
1614 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1615 NodeGroup::NodeGroup(void) {
1616   __nodelock=CmiCreateLock();
1617 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1618     mlogData->objID.type = TypeNodeGroup;
1619     mlogData->objID.data.group.onPE = CkMyNode();
1620 #endif
1621
1622 }
1623 NodeGroup::~NodeGroup() {
1624   CmiDestroyLock(__nodelock);
1625 }
1626 void NodeGroup::pup(PUP::er &p)
1627 {
1628   CkNodeReductionMgr::pup(p);
1629   p|reductionInfo;
1630 }
1631
1632 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1633
1634 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1635   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1636  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1637   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1638  }
1639
1640 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1641                                     ((CkNodeReductionMgr *)this),
1642                                     reductionInfo,false)
1643
1644 /* this contribute also adds up the count across all messages it receives.
1645   Useful for summing up number of array elements who have contributed ****/ 
1646 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1647         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1648
1649
1650
1651 //#define BINOMIAL_TREE
1652
1653 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1654   : thisProxy(thisgroup)
1655 {
1656 #ifdef BINOMIAL_TREE
1657   init_BinomialTree();
1658 #else
1659   init_BinaryTree();
1660 #endif
1661   storedCallback=NULL;
1662   redNo=0;
1663   inProgress=CmiFalse;
1664   
1665   startRequested=CmiFalse;
1666   gcount=CkNumNodes();
1667   lcount=1;
1668   nContrib=nRemote=0;
1669   lockEverything = CmiCreateLock();
1670
1671
1672   creating=CmiFalse;
1673   interrupt = 0;
1674   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1675         /*
1676                 FAULT_EVAC
1677         */
1678         blocked = false;
1679         maxModificationRedNo = INT_MAX;
1680         killed=0;
1681         additionalGCount = newAdditionalGCount = 0;
1682 }
1683
1684 void CkNodeReductionMgr::flushStates()
1685 {
1686  if(CkMyRank() == 0){
1687   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1688   redNo=0;
1689   inProgress=CmiFalse;
1690
1691   startRequested=CmiFalse;
1692   gcount=CkNumNodes();
1693   lcount=1;
1694   nContrib=nRemote=0;
1695
1696   creating=CmiFalse;
1697   interrupt = 0;
1698   while (!msgs.isEmpty()) { delete msgs.deq(); }
1699   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1700   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1701   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1702   }
1703 }
1704
1705 //////////// Reduction Manager Client API /////////////
1706
1707 //Add the given client function.  Overwrites any previous client.
1708 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1709 {
1710   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1711   if(cb->isInvalid()){
1712         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1713   }else{
1714         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1715   }
1716
1717   if (CkMyNode()!=0)
1718           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1719   delete storedCallback;
1720   storedCallback=cb;
1721 }
1722
1723
1724
1725 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1726 {
1727 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1728     Chare *oldObj =CpvAccess(_currentObj);
1729     CpvAccess(_currentObj) = this;
1730 #endif
1731
1732   //m->ci=ci;
1733   m->redNo=ci->redNo++;
1734   m->sourceFlag=-1;//A single contribution
1735   m->gcount=0;
1736   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1737   addContribution(m);
1738
1739 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1740     CpvAccess(_currentObj) = oldObj;
1741 #endif
1742 }
1743
1744
1745 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1746 {
1747 #if CMK_BIGSIM_CHARM
1748   _TRACE_BG_TLINE_END(&m->log);
1749 #endif
1750 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1751     Chare *oldObj =CpvAccess(_currentObj);
1752     CpvAccess(_currentObj) = this;
1753 #endif
1754   //m->ci=ci;
1755   m->redNo=ci->redNo++;
1756   m->gcount=count;
1757   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1758   addContribution(m);
1759   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1760
1761 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1762     CpvAccess(_currentObj) = oldObj;
1763 #endif
1764 }
1765
1766
1767 //////////// Reduction Manager Remote Entry Points /////////////
1768
1769 //Sent down the reduction tree (used by barren PEs)
1770 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1771 {
1772   CmiLock(lockEverything);
1773   if(blocked){
1774         delete m;
1775         CmiUnlock(lockEverything);
1776         return ;
1777   }
1778   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1779   if (isPresent(m->num) && !inProgress)
1780   {
1781     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1782     startReduction(m->num,srcNode);
1783     finishReduction();
1784   } else if (isFuture(m->num)){
1785         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1786   }
1787   else //is Past
1788     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1789   CmiUnlock(lockEverything);
1790   delete m;
1791
1792 }
1793
1794
1795 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1796         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1797         /*
1798                 FAULT_EVAC
1799         */
1800         if(blocked){
1801                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1802                 bufferedRemoteMsgs.enq(m);
1803                 return;
1804         }
1805         
1806         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1807             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1808             startReduction(m->redNo,CkMyNode());
1809             msgs.enq(m);
1810             nRemote++;
1811             finishReduction();
1812         }
1813         else {
1814             if (isFuture(m->redNo)) {
1815                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1816                     futureRemoteMsgs.enq(m);
1817             }else{
1818                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1819                    CkAbort("Recv'd late remote contribution!\n");
1820             }
1821         }
1822         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1823 }
1824
1825 //Sent up the reduction tree with reduced data
1826 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1827 {
1828 #if CMK_BIGSIM_CHARM
1829   _TRACE_BG_TLINE_END(&m->log);
1830 #endif
1831 #ifndef CMK_CPV_IS_SMP
1832 #if CMK_IMMEDIATE_MSG
1833         if(interrupt == 1){
1834                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1835                 CpvAccess(_qd)->process(-1);
1836                 CmiDelayImmediate();
1837                 return;
1838         }
1839 #endif  
1840 #endif
1841    interrupt = 1;       
1842    CmiLock(lockEverything);   
1843    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1844    doRecvMsg(m);
1845    CmiUnlock(lockEverything);    
1846    interrupt = 0;
1847    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1848 }
1849
1850 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1851 {
1852         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1853         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1854         if (inProgress){
1855                 DEBR((AA"This Node reduction is already in progress\n"AB));
1856                 return;//This reduction already started
1857         }
1858         if (creating) //Don't start yet-- we're creating elements
1859         {
1860                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1861                 startRequested=CmiTrue;
1862                 return;
1863         }
1864         
1865         //If none of these cases, we need to start the reduction--
1866         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1867         inProgress=CmiTrue;
1868         //Sent start requests to our kids (in case they don't already know)
1869         
1870         for (int k=0;k<treeKids();k++)
1871         {
1872 #ifdef BINOMIAL_TREE
1873                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1874                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1875 #else
1876                 if(kids[k] != srcNode){
1877                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1878                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1879                 }       
1880 #endif
1881         }
1882
1883         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1884         // in case, nodegroup does not has the attached red group,
1885         // it has to restart groups again
1886         startLocalGroupReductions(number);
1887 /*
1888         if (startLocalGroupReductions(number) == 0)
1889           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1890 */
1891 }
1892
1893 // restart local groups until succeed
1894 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1895   CmiLock(lockEverything);    
1896   if (startLocalGroupReductions(number) == 0)
1897     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1898   CmiUnlock(lockEverything);    
1899 }
1900
1901 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1902         /*
1903                 FAULT_EVAC
1904         */
1905         if(blocked){
1906                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1907                 bufferedMsgs.enq(m);
1908                 return;
1909         }
1910         
1911         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1912                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1913                 futureMsgs.enq(m);
1914         } else {// An ordinary contribution
1915                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1916                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1917                 startReduction(m->redNo,CkMyNode());
1918                 msgs.enq(m);
1919                 nContrib++;
1920                 finishReduction();
1921         }
1922 }
1923
1924 //Handle a message from one element for the reduction
1925 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1926 {
1927   interrupt = 1;
1928   CmiLock(lockEverything);
1929   doAddContribution(m);
1930   CmiUnlock(lockEverything);
1931   interrupt = 0;
1932 }
1933
1934 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1935         if(blocked){
1936                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1937                 bufferedMsgs.enq(m);
1938                 return;
1939         }
1940         
1941         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1942                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1943 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1944                 futureLateMigrantMsgs.enq(m);
1945         } else {// An ordinary contribution
1946                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1947 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1948                 msgs.enq(m);
1949                 finishReduction();
1950         }
1951 }
1952
1953
1954
1955
1956
1957 /** check if the nodegroup reduction is finished at this node. In that case send it
1958 up the reduction tree **/
1959
1960 void CkNodeReductionMgr::finishReduction(void)
1961 {
1962   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1963   /***Check if reduction is finished in the next few ifs***/
1964   if ((!inProgress) || creating){
1965         DEBR((AA"Either not in Progress or creating\n"AB));
1966         return;
1967   }
1968
1969   if (nContrib<(lcount)){
1970         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1971
1972          return;//Need more local messages
1973   }
1974   if (nRemote<treeKids()){
1975         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1976
1977         return;//Need more remote messages
1978   }
1979   if (nRemote>treeKids()){
1980
1981           interrupt = 0;
1982            CkAbort("Nodegrp Excess remote reduction message received!\n");
1983   }
1984
1985   DEBR((AA"Reducing node data...\n"AB));
1986
1987   /**reduce all messages received at this node **/
1988   CkReductionMsg *result=reduceMessages();
1989
1990   if (hasParent())
1991   {//Pass data up tree to parent
1992         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1993         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1994         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1995                 /*
1996                         FAULT_EVAC
1997                 */
1998                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1999             thisProxy[treeParent()].RecvMsg(result);
2000         }
2001
2002   }
2003   else
2004   {
2005                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2006                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2007                         msgs.enq(result);
2008                         return;
2009                 }
2010                 result->gcount += additionalGCount;
2011           /** if the reduction is finished and I am the root of the reduction tree
2012           then call the reductionhandler and other stuff ***/
2013                 
2014
2015                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2016     CkSetRefNum(result, result->getUserFlag());
2017     if (!result->callback.isInvalid()){
2018       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2019             result->callback.send(result);
2020     }
2021     else if (storedCallback!=NULL){
2022       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2023             storedCallback->send(result);
2024     }
2025     else{
2026                 DEBR((AA"Invalid Callback \n"AB));
2027             CkAbort("No reduction client!\n"
2028                     "You must register a client with either SetReductionClient or during contribute.\n");
2029                 }
2030   }
2031
2032   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2033   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2034   redNo++;
2035         updateTree();
2036   int i;
2037   inProgress=CmiFalse;
2038   startRequested=CmiFalse;
2039   nRemote=nContrib=0;
2040
2041   //Look through the future queue for messages we can now handle
2042   int n=futureMsgs.length();
2043
2044   for (i=0;i<n;i++)
2045   {
2046     interrupt = 1;
2047
2048     CkReductionMsg *m=futureMsgs.deq();
2049
2050     interrupt = 0;
2051     if (m!=NULL){ //One of these addContributions may have finished us.
2052       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2053       doAddContribution(m);//<- if *still* early, puts it back in the queue
2054     }
2055   }
2056
2057   interrupt = 1;
2058
2059   n=futureRemoteMsgs.length();
2060
2061   interrupt = 0;
2062   for (i=0;i<n;i++)
2063   {
2064     interrupt = 1;
2065
2066     CkReductionMsg *m=futureRemoteMsgs.deq();
2067
2068     interrupt = 0;
2069     if (m!=NULL)
2070       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2071   }
2072   
2073   n = futureLateMigrantMsgs.length();
2074   for(i=0;i<n;i++){
2075     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2076     if(m != NULL){
2077       if(m->redNo == redNo){
2078         msgs.enq(m);
2079       }else{
2080         futureLateMigrantMsgs.enq(m);
2081       }
2082     }
2083   }
2084 }
2085
2086 //////////// Reduction Manager Utilities /////////////
2087
2088 void CkNodeReductionMgr::init_BinaryTree(){
2089         parent = (CkMyNode()-1)/TREE_WID;
2090         int firstkid = CkMyNode()*TREE_WID+1;
2091         numKids=CkNumNodes()-firstkid;
2092   if (numKids>TREE_WID) numKids=TREE_WID;
2093   if (numKids<0) numKids=0;
2094
2095         for(int i=0;i<numKids;i++){
2096                 kids.push_back(firstkid+i);
2097                 newKids.push_back(firstkid+i);
2098         }
2099 }
2100
2101 void CkNodeReductionMgr::init_BinomialTree(){
2102         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2103         /*upperSize = (unsigned )pow((double)2,depth);*/
2104         upperSize = (unsigned) 1 << depth;
2105         label = upperSize-CkMyNode()-1;
2106         int p=label;
2107         int count=0;
2108         while( p > 0){
2109                 if(p % 2 == 0)
2110                         break;
2111                 else{
2112                         p = p/2;
2113                         count++;
2114                 }
2115         }
2116         /*parent = label + rint(pow((double)2,count));*/
2117         parent = label + (1<<count);
2118         parent = upperSize -1 -parent;
2119         int temp;
2120         if(count != 0){
2121                 numKids = 0;
2122                 for(int i=0;i<count;i++){
2123                         /*temp = label - rint(pow((double)2,i));*/
2124                         temp = label - (1<<i);
2125                         temp = upperSize-1-temp;
2126                         if(temp <= CkNumNodes()-1){
2127                 //              kids[numKids] = temp;
2128                                 kids.push_back(temp);
2129                                 numKids++;
2130                         }
2131                 }
2132         }else{
2133                 numKids = 0;
2134         //      kids = NULL;
2135         }
2136 }
2137
2138
2139 int CkNodeReductionMgr::treeRoot(void)
2140 {
2141   return 0;
2142 }
2143 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2144 {
2145   return (CmiBool)(CkMyNode()!=treeRoot());
2146 }
2147 int CkNodeReductionMgr::treeParent(void) //My parent Node
2148 {
2149   return parent;
2150 }
2151
2152 int CkNodeReductionMgr::firstKid(void) //My first child Node
2153 {
2154   return CkMyNode()*TREE_WID+1;
2155 }
2156 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2157 {
2158 #ifdef BINOMIAL_TREE
2159         return numKids;
2160 #else
2161 /*  int nKids=CkNumNodes()-firstKid();
2162   if (nKids>TREE_WID) nKids=TREE_WID;
2163   if (nKids<0) nKids=0;
2164   return nKids;*/
2165         return numKids;
2166 #endif
2167 }
2168
2169 //Combine (& free) the current message vector msgs.
2170 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2171 {
2172 #if CMK_BIGSIM_CHARM
2173   _TRACE_BG_END_EXECUTE(1);
2174   void* _bgParentLog = NULL;
2175   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2176 #endif
2177   CkReductionMsg *ret=NULL;
2178
2179   //Look through the vector for a valid reducer, swapping out placeholder messages
2180   CkReduction::reducerType r=CkReduction::invalid;
2181   int msgs_gcount=0;//Reduced gcount
2182   int msgs_nSources=0;//Reduced nSources
2183   int msgs_userFlag=-1;
2184   CkCallback msgs_callback;
2185   CkCallback msgs_secondaryCallback;
2186   int i;
2187   int nMsgs=0;
2188   CkReductionMsg *m;
2189   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2190   bool isMigratableContributor;
2191         
2192
2193   while(NULL!=(m=msgs.deq()))
2194   {
2195     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2196     msgs_gcount+=m->gcount;
2197     if (m->sourceFlag!=0)
2198     { //This is a real message from an element, not just a placeholder
2199       msgArr[nMsgs++]=m;
2200       msgs_nSources+=m->nSources();
2201       r=m->reducer;
2202       if (!m->callback.isInvalid())
2203         msgs_callback=m->callback;
2204       if(!m->secondaryCallback.isInvalid()){
2205         msgs_secondaryCallback = m->secondaryCallback;
2206       }
2207       if (m->userFlag!=-1)
2208         msgs_userFlag=m->userFlag;
2209
2210         isMigratableContributor= m->isMigratableContributor();
2211 #if CMK_BIGSIM_CHARM
2212       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2213 #endif
2214                                 
2215     }
2216     else
2217     { //This is just a placeholder message-- replace it
2218       delete m;
2219     }
2220   }
2221
2222   if (nMsgs==0||r==CkReduction::invalid)
2223   //No valid reducer in the whole vector
2224     ret=CkReductionMsg::buildNew(0,NULL);
2225   else
2226   {//Use the reducer to reduce the messages
2227     if(nMsgs == 1){
2228         ret = msgArr[0];
2229     }else{
2230         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2231         ret=(*f)(nMsgs,msgArr);
2232     }
2233     ret->reducer=r;
2234   }
2235
2236         
2237 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2238 #if CRITICAL_PATH_DEBUG > 3
2239         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2240 #endif
2241         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2242         path.updateMax(UsrToEnv(ret));
2243         // Combine the critical paths from all the reduction messages into the header for the new result
2244         for (i=0;i<nMsgs;i++){
2245           if (msgArr[i]!=ret){
2246             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2247             path.updateMax(UsrToEnv(msgArr[i]));
2248           } else {
2249             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2250           }
2251         }
2252 #if CRITICAL_PATH_DEBUG > 3
2253         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2254 #endif
2255
2256 #endif
2257
2258
2259         //Go back through the vector, deleting old messages
2260   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2261   delete [] msgArr;
2262   //Set the message counts
2263   ret->redNo=redNo;
2264   ret->gcount=msgs_gcount;
2265   ret->userFlag=msgs_userFlag;
2266   ret->callback=msgs_callback;
2267   ret->secondaryCallback = msgs_secondaryCallback;
2268   ret->sourceFlag=msgs_nSources;
2269   ret->setMigratableContributor(isMigratableContributor);
2270   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2271 #if CMK_BIGSIM_CHARM
2272   _TRACE_BG_TLINE_END(&ret->log);
2273 #endif
2274
2275   return ret;
2276 }
2277
2278 void CkNodeReductionMgr::pup(PUP::er &p)
2279 {
2280 //We do not store the client function pointer or the client function parameter,
2281 //it is the responsibility of the programmer to correctly restore these
2282   IrrGroup::pup(p);
2283   p(redNo);
2284   p(inProgress); p(creating); p(startRequested);
2285   p(lcount);
2286   p(nContrib); p(nRemote);
2287   p(interrupt);
2288   p|msgs;
2289   p|futureMsgs;
2290   p|futureRemoteMsgs;
2291   p|futureLateMigrantMsgs;
2292   p|parent;
2293   p|additionalGCount;
2294   p|newAdditionalGCount;
2295   if(p.isUnpacking()) {
2296     gcount=CkNumNodes();
2297     thisProxy = thisgroup;
2298     lockEverything = CmiCreateLock();
2299 #ifdef BINOMIAL_TREE
2300     init_BinomialTree();
2301 #else
2302     init_BinaryTree();
2303 #endif          
2304   }
2305   p | blocked;
2306   p | maxModificationRedNo;
2307
2308 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2309   int isnull = (storedCallback == NULL);
2310   p | isnull;
2311   if (!isnull) {
2312     if (p.isUnpacking()) {
2313       storedCallback = new CkCallback;
2314     }
2315     p|*storedCallback;
2316   }
2317 #endif
2318
2319 }
2320
2321 /*
2322         FAULT_EVAC
2323         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2324         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2325         reduction tree. 
2326 */
2327 void CkNodeReductionMgr::evacuate(){
2328         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2329         if(treeKids() == 0){
2330         /*
2331                 if the node going down is a leaf
2332         */
2333                 oldleaf=true;
2334                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2335                 /*
2336                         Need to ask parent for the reduction number that it has seen. 
2337                         Since it is a leaf, the tree does not need to be rewired. 
2338                         We reuse the oldparent type of tree modification message to get 
2339                         the parent to block and tell us about the highest reduction number it has seen.
2340                         
2341                 */
2342                 int data[2];
2343                 data[0]=CkMyNode();
2344                 data[1]=getTotalGCount()+additionalGCount;
2345                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2346                 newParent = treeParent();
2347         }else{
2348                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2349                 oldleaf= false;
2350         /*
2351                 It is not a leaf. It needs to rewire the tree around itself.
2352                 It also needs to decide on a reduction No after which the new tree will be used
2353                 Till it decides on the new tree and the redNo at which it becomes valid,
2354                 all received messages will be buffered
2355         */
2356                 newParent = kids[0];
2357                 for(int i=numKids-1;i>=0;i--){
2358                         newKids.remove(i);
2359                 }
2360                 /*
2361                         Ask everybody for the highest reduction number they have seen and
2362                         also tell them about the new tree
2363                 */
2364                 /*
2365                         Tell parent about its new child;
2366                 */
2367                 int oldParentData[2];
2368                 oldParentData[0] = CkMyNode();
2369                 oldParentData[1] = newParent;
2370                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2371
2372                 /*
2373                         Tell the other children about their new parent
2374                 */
2375                 int childrenData=newParent;
2376                 for(int i=1;i<numKids;i++){
2377                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2378                 }
2379                 
2380                 /*
2381                         Tell newParent (1st child) about its new children,
2382                         the current node and its children except the newParent
2383                 */
2384                 int *newParentData = new int[numKids+2];
2385                 for(int i=1;i<numKids;i++){
2386                         newParentData[i] = kids[i];
2387                 }
2388                 newParentData[0] = CkMyNode();
2389                 newParentData[numKids] = parent;
2390                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2391                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2392         }
2393         readyDeletion = false;
2394         blocked = true;
2395         numModificationReplies = 0;
2396         tempModificationRedNo = findMaxRedNo();
2397 }
2398
2399 /*
2400         Depending on the code, use the data to change the tree
2401         1. OLDPARENT : replace the old child with a new one
2402         2. OLDCHILDREN: replace the parent
2403         3. NEWPARENT:  add the children and change the parent
2404         4. LEAFPARENT: delete the old child
2405 */
2406
2407 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2408         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2409         int sender;
2410         newKids = kids;
2411         readyDeletion = false;
2412         newAdditionalGCount = additionalGCount;
2413         switch(code){
2414                 case OLDPARENT: 
2415                         for(int i=0;i<numKids;i++){
2416                                 if(newKids[i] == data[0]){
2417                                         newKids[i] = data[1];
2418                                         break;
2419                                 }
2420                         }
2421                         sender = data[0];
2422                         newParent = parent;
2423                         break;
2424                 case OLDCHILDREN:
2425                         newParent = data[0];
2426                         sender = parent;
2427                         break;
2428                 case NEWPARENT:
2429                         for(int i=0;i<size-2;i++){
2430                                 newKids.push_back(data[i]);
2431                         }
2432                         newParent = data[size-2];
2433                         newAdditionalGCount += data[size-1];
2434                         sender = parent;
2435                         break;
2436                 case LEAFPARENT:
2437                         for(int i=0;i<numKids;i++){
2438                                 if(newKids[i] == data[0]){
2439                                         newKids.remove(i);
2440                                         break;
2441                                 }
2442                         }
2443                         sender = data[0];
2444                         newParent = parent;
2445                         newAdditionalGCount += data[1];
2446                         break;
2447         };
2448         blocked = true;
2449         int maxRedNo = findMaxRedNo();
2450         
2451         thisProxy[sender].collectMaxRedNo(maxRedNo);
2452 }
2453
2454 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2455         /*
2456                 Find out the maximum redNo that has been seen by 
2457                 the affected nodes
2458         */
2459         numModificationReplies++;
2460         if(maxRedNo > tempModificationRedNo){
2461                 tempModificationRedNo = maxRedNo;
2462         }
2463         if(numModificationReplies == numKids+1){
2464                 maxModificationRedNo = tempModificationRedNo;
2465                 /*
2466                         when all the affected nodes have replied, tell them the maximum.
2467                         Unblock yourself. deal with the buffered messages local and remote
2468                 */
2469                 if(maxModificationRedNo == -1){
2470                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2471                 }else{
2472                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2473                 }
2474                 thisProxy[parent].unblockNode(maxModificationRedNo);
2475                 for(int i=0;i<numKids;i++){
2476                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2477                 }
2478                 blocked = false;
2479                 updateTree();
2480                 clearBlockedMsgs();
2481         }
2482 }
2483
2484 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2485         maxModificationRedNo = maxRedNo;
2486         updateTree();
2487         blocked = false;
2488         clearBlockedMsgs();
2489 }
2490
2491
2492 void CkNodeReductionMgr::clearBlockedMsgs(){
2493         int len = bufferedMsgs.length();
2494         for(int i=0;i<len;i++){
2495                 CkReductionMsg *m = bufferedMsgs.deq();
2496                 doAddContribution(m);
2497         }
2498         len = bufferedRemoteMsgs.length();
2499         for(int i=0;i<len;i++){
2500                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2501                 doRecvMsg(m);
2502         }
2503
2504 }
2505 /*
2506         if the reduction number exceeds the maxModificationRedNo, change the tree
2507         to become the new one
2508 */
2509
2510 void CkNodeReductionMgr::updateTree(){
2511         if(redNo > maxModificationRedNo){
2512                 parent = newParent;
2513                 kids = newKids;
2514                 maxModificationRedNo = INT_MAX;
2515                 numKids = kids.size();
2516                 readyDeletion = true;
2517                 additionalGCount = newAdditionalGCount;
2518                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2519                 for(int i=0;i<(int)(newKids.size());i++){
2520                         DEBREVAC(("%d ",newKids[i]));
2521                 }
2522                 DEBREVAC(("\n"));
2523         //      startReduction(redNo,CkMyNode());
2524         }else{
2525                 if(maxModificationRedNo != INT_MAX){
2526                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2527                         startReduction(redNo,CkMyNode());
2528                         finishReduction();
2529                 }       
2530         }
2531 }
2532
2533
2534 void CkNodeReductionMgr::doneEvacuate(){
2535         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2536 /*      if(oldleaf){
2537                 
2538                         It used to be a leaf
2539                         Then as soon as future messages have been emptied you can 
2540                         send the parent a message telling them that they are not going
2541                         to receive anymore messages from this child
2542                 
2543                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2544                 while(futureMsgs.length() != 0){
2545                         int n = futureMsgs.length();
2546                         for(int i=0;i<n;i++){
2547                                 CkReductionMsg *m = futureMsgs.deq();
2548                                 if(isPresent(m->redNo)){
2549                                         msgs.enq(m);
2550                                 }else{
2551                                         futureMsgs.enq(m);
2552                                 }
2553                         }
2554                         CkReductionMsg *result = reduceMessages();
2555                         thisProxy[treeParent()].RecvMsg(result);
2556                         redNo++;
2557                 }
2558                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2559                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2560         }else{*/
2561                 if(readyDeletion){
2562                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2563                 }else{
2564                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2565                 }
2566 //      }
2567 }
2568
2569 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2570         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2571         for(int i=0;i<numKids;i++){
2572                 if(kids[i] == deletedChild){
2573                         kids.remove(i);
2574                         break;
2575                 }
2576         }
2577         numKids = kids.length();
2578         finishReduction();
2579 }
2580
2581 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2582         for(int i=0;i<(int)(newKids.length());i++){
2583                 if(newKids[i] == deletedChild){
2584                         newKids.remove(i);
2585                         break;
2586                 }
2587         }
2588         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2589         for(int i=0;i<(int)(newKids.size());i++){
2590                 DEBREVAC(("%d ",newKids[i]));
2591         }
2592         DEBREVAC(("\n"));
2593         finishReduction();
2594 }
2595
2596 int CkNodeReductionMgr::findMaxRedNo(){
2597         int max = redNo;
2598         for(int i=0;i<futureRemoteMsgs.length();i++){
2599                 if(futureRemoteMsgs[i]->redNo  > max){
2600                         max = futureRemoteMsgs[i]->redNo;
2601                 }
2602         }
2603         /*
2604                 if redNo is max (that is no future message) and the current reduction has not started
2605                 then tree can be changed before the reduction redNo can be started
2606         */ 
2607         if(redNo == max && msgs.length() == 0){
2608                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2609                 max--;
2610         }
2611         return max;
2612 }
2613
2614 // initnode call. check the size of reduction table
2615 void CkReductionMgr::sanitycheck()
2616 {
2617 #if CMK_ERROR_CHECKING
2618   int count = 0;
2619   while (CkReduction::reducerTable[count] != NULL) count++;
2620   CmiAssert(CkReduction::nReducers == count);
2621 #endif
2622 }
2623
2624 #include "CkReduction.def.h"