Minor: disable children notification in node-level if it is set
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   disableNotifyChildrenStart = CmiFalse;
208   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
209 }
210
211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
212 {
213   redNo=0;
214   completedRedNo = -1;
215   inProgress=CmiFalse;
216   creating=CmiFalse;
217   startRequested=CmiFalse;
218   gcount=lcount=0;
219   nContrib=nRemote=0;
220   maxStartRequest=0;
221   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
222 }
223
224 void CkReductionMgr::flushStates(int isgroup)
225 {
226   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
227   redNo=0;
228   completedRedNo = -1;
229   inProgress=CmiFalse;
230   creating=CmiFalse;
231   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
232   startRequested=CmiFalse;
233   nContrib=nRemote=0;
234   maxStartRequest=0;
235
236   while (!msgs.isEmpty()) { delete msgs.deq(); }
237   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
238   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
239   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
240
241   adjVec.length()=0;
242
243 #if ! GROUP_LEVEL_REDUCTION
244   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
245 #endif
246 }
247
248 //////////// Reduction Manager Client API /////////////
249
250 //Add the given client function.  Overwrites any previous client.
251 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
252 {
253   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
254
255   if (CkMyPe()!=0)
256           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
257   storedCallback=*cb;
258 #if ! GROUP_LEVEL_REDUCTION
259   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
260   nodeProxy.ckSetReductionClient(callback);
261 #endif
262 }
263
264 ///////////////////////////// Contributor ////////////////////////
265 //Contributors keep a copy of this structure:
266
267 /*Contributor migration support:
268 */
269 void contributorInfo::pup(PUP::er &p)
270 {
271   p(redNo);
272 }
273
274 ////////////////////// Contributor list maintainance: /////////////////
275 //These just set and clear the "creating" flag to prevent
276 // reductions from finishing early because not all elements
277 // have been created.
278 void CkReductionMgr::creatingContributors(void)
279 {
280   DEBR((AA"Creating contributors...\n"AB));
281   creating=CmiTrue;
282 }
283 void CkReductionMgr::doneCreatingContributors(void)
284 {
285   DEBR((AA"Done creating contributors...\n"AB));
286   creating=CmiFalse;
287   if (startRequested) startReduction(redNo,CkMyPe());
288   finishReduction();
289 }
290
291 //A new contributor will be created
292 void CkReductionMgr::contributorStamped(contributorInfo *ci)
293 {
294   DEBR((AA"Contributor %p stamped\n"AB,ci));
295   //There is another contributor
296   gcount++;
297   if (inProgress)
298   {
299     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
300     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
301   } else
302     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
303 }
304
305 //A new contributor was actually created
306 void CkReductionMgr::contributorCreated(contributorInfo *ci)
307 {
308   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
309   //We've got another contributor
310   lcount++;
311   //He may not need to contribute to some of our reductions:
312   for (int r=redNo;r<ci->redNo;r++)
313     adj(r).lcount--;//He won't be contributing to r here
314 }
315
316 /*Don't expect any more contributions from this one.
317 This is rather horrifying because we now have to make
318 sure the global element count accurately reflects all the
319 contributions the element made before it died-- these may stretch
320 far into the future.  The adj() vector is what saves us here.
321 */
322 void CkReductionMgr::contributorDied(contributorInfo *ci)
323 {
324 #if CMK_MEM_CHECKPOINT
325   // ignore from listener if it is during restart from crash
326   if (CkInRestarting()) return;
327 #endif
328   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
329   //We lost a contributor
330   gcount--;
331
332   if (ci->redNo<redNo)
333   {//Must have been migrating during reductions-- root is waiting for his
334   // contribution, which will never come.
335     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
336     for (int r=ci->redNo;r<redNo;r++)
337       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
338   }
339
340   //Add to the global count for all his future messages (wherever they are)
341   int r;
342   for (r=redNo;r<ci->redNo;r++)
343   {//He already contributed to this reduction, but won't show up in global count.
344     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
345     adj(r).gcount++;
346   }
347
348   lcount--;
349   //He's already contributed to several reductions here
350   for (r=redNo;r<ci->redNo;r++)
351     adj(r).lcount++;//He'll be contributing to r here
352
353   finishReduction();
354 }
355
356 //Migrating away (note that global count doesn't change)
357 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
358 {
359   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
360   lcount--;//We lost a local
361   //He's already contributed to several reductions here
362   for (int r=redNo;r<ci->redNo;r++)
363     adj(r).lcount++;//He'll be contributing to r here
364
365   finishReduction();
366 }
367
368 //Migrating in (note that global count doesn't change)
369 void CkReductionMgr::contributorArriving(contributorInfo *ci)
370 {
371   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
372   lcount++;//We gained a local
373 #if CMK_MEM_CHECKPOINT
374   // ignore from listener if it is during restart from crash
375   // because the ci may be old.
376   if (CkInRestarting()) return;
377 #endif
378   //He has already contributed (elsewhere) to several reductions:
379   for (int r=redNo;r<ci->redNo;r++)
380     adj(r).lcount--;//He won't be contributing to r here
381
382 }
383
384 //Contribute-- the given msg can contain any data.  The reducerType
385 // field of the message must be valid.
386 // Each contributor must contribute exactly once to the each reduction.
387 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
388 {
389 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
390     Chare *oldObj =CpvAccess(_currentObj);
391     CpvAccess(_currentObj) = this;
392 #endif
393
394 #if CMK_BIGSIM_CHARM
395   _TRACE_BG_TLINE_END(&(m->log));
396 #endif
397   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
398   //m->ci=ci;
399   m->redNo=ci->redNo++;
400   m->sourceFlag=-1;//A single contribution
401   m->gcount=0;
402
403 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
404     if(lcount == 0){
405         m->sourceProcessorCount = 1;
406     }else{
407         m->sourceProcessorCount = lcount;
408     }
409     m->fromPE = CmiMyPe();
410     Chare *oldObj =CpvAccess(_currentObj);
411     char currentObjName[100];
412     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
413     thisProxy[0].contributeViaMessage(m);
414 #else
415   addContribution(m);
416 #endif
417
418 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
419     CpvAccess(_currentObj) = oldObj;
420 #endif
421 }
422
423 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
424 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
425     CmiAssert(CmiMyPe() == 0);
426     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
427     if(redNo == 0){
428         if(perProcessorCounts[m->fromPE] == -1){
429             processorCount++;
430             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
431             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
432         }else{
433             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
434                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
435                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
436             }
437         }
438         if(processorCount == CmiNumPes()){
439             totalCount = 0;
440             for(int i=0;i<CmiNumPes();i++){
441                 CmiAssert(perProcessorCounts[i] != -1);
442                 totalCount += perProcessorCounts[i];
443             }
444             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
445         }
446         if(m->sourceProcessorCount == 0){
447             if(processorCount == CmiNumPes()){
448                 finishReduction();
449             }
450             return;
451         }
452     }
453     addContribution(m);
454 }
455 #else
456 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
457 #endif
458
459 //////////// Reduction Manager Remote Entry Points /////////////
460 //Sent down the reduction tree (used by barren PEs)
461 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
462 {
463  if(CkMyPe()==0){
464         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
465         //delete m;
466         //return;
467  }
468  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
469  int srcPE = (UsrToEnv(m))->getSrcPe();
470 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
471   if (isPresent(m->num) && !inProgress)
472   {
473     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
474     startReduction(m->num,srcPE);
475     finishReduction();
476   } else if (isFuture(m->num)){
477 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
478           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
479           if(maxStartRequest < m->num){
480                   maxStartRequest = m->num;
481           }
482  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
483           
484     }
485   else //is Past
486     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
487   delete m;
488 #else
489     if(redNo == 0){
490         if(lcount == 0){
491             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
492             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
493             dummy->fromPE = CmiMyPe();
494             dummy->sourceProcessorCount = 0;
495             dummy->redNo = 0;
496             thisProxy[0].contributeViaMessage(dummy);
497         }
498     }
499 #endif
500 }
501
502 //Sent to root of reduction tree with reduction contribution
503 // of migrants that missed the main reduction.
504 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
505 {
506 #if GROUP_LEVEL_REDUCTION
507 #if CMK_BIGSIM_CHARM
508   _TRACE_BG_TLINE_END(&(m->log));
509 #endif
510   addContribution(m);
511 #else
512   m->secondaryCallback = m->callback;
513   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
514   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
515   nodeMgr->LateMigrantMsg(m);
516 /*      int len = finalMsgs.length();
517         finalMsgs.enq(m);
518 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
519         endArrayReduction();*/
520 #endif
521 }
522
523 //A late migrating contributor will never contribute to this reduction
524 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
525 {
526   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
527   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
528  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
529   adj(m->num).gcount--;//He won't be contributing to this one.
530   finishReduction();
531   delete m;
532 }
533
534 //////////// Reduction Manager State /////////////
535 void CkReductionMgr::startReduction(int number,int srcPE)
536 {
537   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
538   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
539   if (inProgress){
540         DEBR((AA"This reduction is already in progress\n"AB));
541         return;//This reduction already started
542   }
543   if (creating) //Don't start yet-- we're creating elements
544   {
545     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
546     startRequested=CmiTrue;
547     return;
548   }
549
550 //If none of these cases, we need to start the reduction--
551   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
552   inProgress=CmiTrue;
553  
554
555         /*
556                 FAULT_EVAC
557         */
558   if(!CmiNodeAlive(CkMyPe())){
559         return;
560   }
561
562   if(disableNotifyChildrenStart) return;
563  
564 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_) 
565   if(CmiMyPe() == 0 && redNo == 0){
566             for(int j=0;j<CkNumPes();j++){
567                 if(j != CkMyPe() && j != srcPE){
568                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
569                 }
570             }
571             if(lcount == 0){
572                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
573                 dummy->fromPE = CmiMyPe();
574                 dummy->sourceProcessorCount = 0;
575                 dummy->redNo = 0;
576                 thisProxy[0].contributeViaMessage(dummy);
577             }
578     }   else{
579         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
580     }
581
582
583 #else
584   //Sent start requests to our kids (in case they don't already know)
585 #if GROUP_LEVEL_REDUCTION
586   for (int k=0;k<treeKids();k++)
587   {
588     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
589     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
590   }
591 #else
592   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
593 #endif
594 #endif
595         
596         /*
597   int temp;
598   //making it a broadcast done only by PE 0
599   if(!hasParent()){
600                 temp = completedRedNo+1;
601                 for(int i=temp;i<=number;i++){
602                         for(int j=0;j<CkNumPes();j++){
603                                 if(j != CkMyPe() && j != srcPE){
604                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
605                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
606                                         }
607                                 }
608                         }
609                 }       
610         }       else{
611                 temp = number;
612         }*/
613 /*  if(!hasParent()){
614                 temp = completedRedNo+1;
615         }       else{
616                 temp = number;
617         }
618         for(int i=temp;i<=number;i++){
619         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
620                 if(hasParent()){
621           // kick-start your parent too ...
622                         if(treeParent() != srcPE){
623                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
624 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
625     CpvAccess(_currentObj) = oldObj;
626 #endif
627                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
628                                 }       
629                         }       
630                 }
631           for (int k=0;k<treeKids();k++)
632           {
633                         if(firstKid()+k != srcPE){
634                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
635                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
636                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
637                                 }       
638                         }       
639         }
640         }
641         */
642 }       
643
644 /*Handle a message from one element for the reduction*/
645 void CkReductionMgr::addContribution(CkReductionMsg *m)
646 {
647   if (isPast(m->redNo))
648   {
649 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
650         CmiAbort("this version should not have late migrations");
651 #else
652         //We've moved on-- forward late contribution straight to root
653     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
654         // if (!hasParent()) //Root moved on too soon-- should never happen
655         //   CkAbort("Late reduction contribution received at root!\n");
656     thisProxy[0].LateMigrantMsg(m);
657 #endif
658   }
659   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
660     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
661     futureMsgs.enq(m);
662   } else {// An ordinary contribution
663     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
664    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
665     startReduction(m->redNo,CkMyPe());
666     msgs.enq(m);
667     nContrib++;
668     finishReduction();
669   }
670 }
671
672 /**function checks if it has got all contributions that it is supposed to
673 get at this processor. If it is done it sends the reduced result to the local
674 nodegroup */
675 void CkReductionMgr::finishReduction(void)
676 {
677   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
678   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
679   if ((!inProgress) || creating){
680         DEBR((AA"Either not in Progress or creating\n"AB));
681         return;
682   }
683   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
684 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
685
686   if (nContrib<(lcount+adj(redNo).lcount)){
687          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
688          return;//Need more local messages
689   }
690 #if GROUP_LEVEL_REDUCTION
691   if (nRemote<treeKids()) return;//Need more remote messages
692 #endif
693  
694 #else
695
696     if(CkMyPe() != 0){
697         if(redNo != 0){
698             return;
699         }else{
700             CmiAssert(lcount == 0);
701             CkReductionMsg *dummy = reduceMessages();
702             dummy->fromPE = CmiMyPe();
703             dummy->sourceProcessorCount = 0;
704             thisProxy[0].contributeViaMessage(dummy);
705             return;
706         }
707     }
708     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
709         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
710          return;//Need more local messages
711   }else{
712         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
713     }
714
715
716 #endif
717
718   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
719   CkReductionMsg *result=reduceMessages();
720   result->redNo=redNo;
721 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
722
723 #if GROUP_LEVEL_REDUCTION
724   if (hasParent())
725   {//Pass data up tree to parent
726     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
727     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
728     result->gcount+=gcount+adj(redNo).gcount;
729     thisProxy[treeParent()].RecvMsg(result);
730   }
731   else 
732   {//We are root-- pass data to client
733     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
734     int totalElements=result->gcount+gcount+adj(redNo).gcount;
735     if (totalElements>result->nSources()) 
736     {
737       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
738       msgs.enq(result);
739       return; // Wait for migrants to contribute
740     } else if (totalElements<result->nSources()) {
741       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
742       CkAbort("ERROR! Too many contributions at root!\n");
743     }
744     DEBR((AA"Passing result to client function\n"AB));
745     CkSetRefNum(result, result->getUserFlag());
746     if (!result->callback.isInvalid())
747             result->callback.send(result);
748     else if (!storedCallback.isInvalid())
749             storedCallback.send(result);
750     else
751             CkAbort("No reduction client!\n"
752                     "You must register a client with either SetReductionClient or during contribute.\n");
753   }
754
755 #else
756   result->gcount+=gcount+adj(redNo).gcount;
757
758   result->secondaryCallback = result->callback;
759   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
760         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
761
762   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
763
764  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
765   
766   // Find our node reduction manager, and pass reduction to him:
767   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
768   nodeMgr->contributeArrayReduction(result);
769 #endif
770 #else                // _FAULT_MLOG_
771   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
772
773     CkSetRefNum(result, result->getUserFlag());
774     if (!result->callback.isInvalid())
775         result->callback.send(result);
776     else if (!storedCallback.isInvalid())
777         storedCallback.send(result);
778     else{
779       DEBR(("No reduction client for group %d \n",thisgroup.idx));
780         CkAbort("No reduction client!\n"
781             "You must register a client with either SetReductionClient or during contribute.\n");
782     }
783
784     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
785
786 #endif               // _FAULT_MLOG_
787
788   //House Keeping Operations will have to check later what needs to be changed
789   redNo++;
790   //Shift the count adjustment vector down one slot (to match new redNo)
791   int i;
792 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
793     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
794   if(CkMyPe()!=0)
795 #endif
796   {
797         completedRedNo++;
798         for (i=1;i<(int)(adjVec.length());i++){
799            adjVec[i-1]=adjVec[i];
800         }
801         adjVec.length()--;  
802   }
803
804   inProgress=CmiFalse;
805   startRequested=CmiFalse;
806   nRemote=nContrib=0;
807
808   //Look through the future queue for messages we can now handle
809   int n=futureMsgs.length();
810   for (i=0;i<n;i++)
811   {
812     CkReductionMsg *m=futureMsgs.deq();
813     if (m!=NULL) //One of these addContributions may have finished us.
814       addContribution(m);//<- if *still* early, puts it back in the queue
815   }
816 #if GROUP_LEVEL_REDUCTION
817   n=futureRemoteMsgs.length();
818   for (i=0;i<n;i++)
819   {
820     CkReductionMsg *m=futureRemoteMsgs.deq();
821     if (m!=NULL) {
822       RecvMsg(m);//<- if *still* early, puts it back in the queue
823     }
824   }
825 #endif
826
827 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
828   if(maxStartRequest >= redNo){
829           startReduction(redNo,CkMyPe());
830           finishReduction();
831   }
832  
833 #endif
834 }
835
836 //Sent up the reduction tree with reduced data
837   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
838 {
839 #if GROUP_LEVEL_REDUCTION
840 #if CMK_BIGSIM_CHARM
841   _TRACE_BG_TLINE_END(&m->log);
842 #endif
843   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
844     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
845     startReduction(m->redNo, CkMyPe());
846     msgs.enq(m);
847     nRemote++;
848     finishReduction();
849   }
850   else if (isFuture(m->redNo)) {
851     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
852     futureRemoteMsgs.enq(m);
853   } 
854   else CkAbort("Recv'd late remote contribution!\n");
855 #endif
856 }
857
858 //////////// Reduction Manager Utilities /////////////
859
860 //Return the countAdjustment struct for the given redNo:
861 countAdjustment &CkReductionMgr::adj(int number)
862 {
863   number-=completedRedNo;
864   number--;
865   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
866   //Pad the adjustment vector with zeros until it's at least number long
867   while ((int)(adjVec.length())<=number)
868     adjVec.push_back(countAdjustment());
869   return adjVec[number];
870 }
871
872 //Combine (& free) the current message vector msgs.
873 CkReductionMsg *CkReductionMgr::reduceMessages(void)
874 {
875 #if CMK_BIGSIM_CHARM
876   _TRACE_BG_END_EXECUTE(1);
877   void* _bgParentLog = NULL;
878   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
879 #endif
880   CkReductionMsg *ret=NULL;
881
882   //Look through the vector for a valid reducer, swapping out placeholder messages
883   CkReduction::reducerType r=CkReduction::invalid;
884   int msgs_gcount=0;//Reduced gcount
885   int msgs_nSources=0;//Reduced nSources
886   int msgs_userFlag=-1;
887   CkCallback msgs_callback;
888   int i;
889   int nMsgs=0;
890   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
891   CkReductionMsg *m;
892   bool isMigratableContributor;
893
894   // Copy message queue into msgArr, skipping placeholders:
895   while (NULL!=(m=msgs.deq()))
896   {
897     msgs_gcount+=m->gcount;
898     if (m->sourceFlag!=0)
899     { //This is a real message from an element, not just a placeholder
900       msgArr[nMsgs++]=m;
901       msgs_nSources+=m->nSources();
902       r=m->reducer;
903       if (!m->callback.isInvalid())
904         msgs_callback=m->callback;
905       if (m->userFlag!=-1)
906         msgs_userFlag=m->userFlag;
907                         
908         isMigratableContributor=m->isMigratableContributor();
909 #if CMK_BIGSIM_CHARM
910         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
911 #endif
912     }
913     else
914     { //This is just a placeholder message-- forget it
915       delete m;
916     }
917   }
918
919   if (nMsgs==0||r==CkReduction::invalid)
920   //No valid reducer in the whole vector
921     ret=CkReductionMsg::buildNew(0,NULL);
922   else
923   {//Use the reducer to reduce the messages
924                 //if there is only one msg to be reduced just return that message
925     if(nMsgs == 1){
926         ret = msgArr[0];        
927     }else{
928         CkReduction::reducerFn f=CkReduction::reducerTable[r];
929         ret=(*f)(nMsgs,msgArr);
930     }
931     ret->reducer=r;
932   }
933
934
935
936 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
937
938 #if CRITICAL_PATH_DEBUG > 3
939   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
940 #endif
941
942   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
943   path.updateMax(UsrToEnv(ret));
944   // Combine the critical paths from all the reduction messages
945   for (i=0;i<nMsgs;i++){
946     if (msgArr[i]!=ret){
947       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
948       path.updateMax(UsrToEnv(msgArr[i]));
949     }
950   }
951   
952
953 #if CRITICAL_PATH_DEBUG > 3
954   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
955 #endif
956   
957   PathHistoryTableEntry tableEntry(path);
958   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
959   
960 #endif
961
962         //Go back through the vector, deleting old messages
963   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
964   delete [] msgArr;
965
966   //Set the message counts
967   ret->redNo=redNo;
968   ret->gcount=msgs_gcount;
969   ret->userFlag=msgs_userFlag;
970   ret->callback=msgs_callback;
971   ret->sourceFlag=msgs_nSources;
972         ret->setMigratableContributor(isMigratableContributor);
973   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
974
975   return ret;
976 }
977
978
979 //Checkpointing utilities
980 //pack-unpack method for CkReductionMsg
981 //if packing pack the message and then unpack and return it
982 //if unpacking allocate memory for it read it off disk and then unapck
983 //and return it
984 void CkReductionMgr::pup(PUP::er &p)
985 {
986 //We do not store the client function pointer or the client function parameter,
987 //it is the responsibility of the programmer to correctly restore these
988   CkGroupInitCallback::pup(p);
989   p(redNo);
990   p(completedRedNo);
991   p(inProgress); p(creating); p(startRequested);
992   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
993   p|msgs;
994   p|futureMsgs;
995   p|futureRemoteMsgs;
996   p|finalMsgs;
997   p|adjVec;
998   p|nodeProxy;
999   p|storedCallback;
1000     // handle CkReductionClientBundle
1001   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
1002   {
1003     CkReductionClientBundle *bd;
1004     if (p.isUnpacking()) 
1005       bd = new CkReductionClientBundle;
1006     else
1007       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1008     p|*bd;
1009     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1010   }
1011
1012   // subtle --- Gengbin
1013   // Group : CkReductionMgr
1014   // CkArray: CkReductionMgr
1015   // lcount/gcount in Group is set in Group constructor
1016   // lcount/gcount in CkArray is not, it is set when array elements are created
1017   // we can not pup because inserting array elems will add the counters again
1018 //  p|lcount;
1019 //  p|gcount;
1020 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
1021 //  p|lcount;
1022 //  //  p|gcount;
1023 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1024 #endif
1025   if(p.isUnpacking()){
1026     thisProxy = thisgroup;
1027     maxStartRequest=0;
1028 #if GROUP_LEVEL_REDUCTION
1029 #ifdef BINOMIAL_TREE
1030     init_BinomialTree();
1031 #else
1032     init_BinaryTree();
1033 #endif
1034 #endif
1035   }
1036
1037   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1038 }
1039
1040
1041 //Callback for doing Reduction through NodeGroups added by Sayantan
1042
1043 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1044
1045         int total = m->gcount+adj(m->redNo).gcount;
1046         finalMsgs.enq(m);
1047         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1048         adj(m->redNo).mainRecvd = 1;
1049         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1050         endArrayReduction();
1051 }
1052
1053 void CkReductionMgr :: endArrayReduction(){
1054         CkReductionMsg *ret=NULL;
1055         int nMsgs=finalMsgs.length();
1056         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1057         //Look through the vector for a valid reducer, swapping out placeholder messages
1058         //CkPrintf("Length of Final Message %d \n",nMsgs);
1059         CkReduction::reducerType r=CkReduction::invalid;
1060         int msgs_gcount=0;//Reduced gcount
1061         int msgs_nSources=0;//Reduced nSources
1062         int msgs_userFlag=-1;
1063         CkCallback msgs_callback;
1064         CkCallback msgs_secondaryCallback;
1065         CkVec<CkReductionMsg *> tempMsgs;
1066         int i;
1067         int numMsgs = 0;
1068         for (i=0;i<nMsgs;i++)
1069         {
1070                 CkReductionMsg *m=finalMsgs.deq();
1071                 if(m->redNo == completedRedNo +1){
1072                         msgs_gcount+=m->gcount;
1073                         if (m->sourceFlag!=0)
1074                         { //This is a real message from an element, not just a placeholder
1075                                 msgs_nSources+=m->nSources();
1076                                 r=m->reducer;
1077                                 if (!m->callback.isInvalid())
1078                                 msgs_callback=m->callback;
1079                                 if(!m->secondaryCallback.isInvalid())
1080                                         msgs_secondaryCallback = m->secondaryCallback;
1081                                 if (m->userFlag!=-1)
1082                                         msgs_userFlag=m->userFlag;
1083                                 tempMsgs.push_back(m);
1084                         }
1085                 }else{
1086                         finalMsgs.enq(m);
1087                 }
1088
1089         }
1090         numMsgs = tempMsgs.length();
1091
1092         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1093
1094         if(numMsgs == 0){
1095                 return;
1096         }
1097         if(adj(completedRedNo+1).mainRecvd == 0){
1098                 for(i=0;i<numMsgs;i++){
1099                         finalMsgs.enq(tempMsgs[i]);
1100                 }
1101                 return;
1102         }
1103
1104 /*
1105         NOT NEEDED ANYMORE DONE at nodegroup level
1106         if(msgs_gcount  > msgs_nSources){
1107                 for(i=0;i<numMsgs;i++){
1108                         finalMsgs.enq(tempMsgs[i]);
1109                 }
1110                 return;
1111         }*/
1112
1113         if (nMsgs==0||r==CkReduction::invalid)
1114                 //No valid reducer in the whole vector
1115                 ret=CkReductionMsg::buildNew(0,NULL);
1116         else{//Use the reducer to reduce the messages
1117                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1118                 // has to be corrected elements from above need to be put into a temporary vector
1119                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1120                 ret=(*f)(numMsgs,msgArr);
1121                 ret->reducer=r;
1122
1123         }
1124
1125         
1126 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1127
1128 #if CRITICAL_PATH_DEBUG > 3
1129         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1130 #endif
1131
1132         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1133         path.updateMax(UsrToEnv(ret));
1134         // Combine the critical paths from all the reduction messages into the header for the new result
1135         for (i=0;i<numMsgs;i++){
1136           if (tempMsgs[i]!=ret){
1137             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1138             path.updateMax(UsrToEnv(tempMsgs[i]));
1139           } else {
1140             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1141           }
1142         }
1143         // Also consider the path which got us into this entry method
1144
1145 #if CRITICAL_PATH_DEBUG > 3
1146         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1147 #endif
1148
1149 #endif
1150   
1151
1152
1153
1154
1155         for(i = 0;i<numMsgs;i++){
1156                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1157         }
1158
1159         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1160         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1161
1162         ret->redNo=completedRedNo+1;
1163         ret->gcount=msgs_gcount;
1164         ret->userFlag=msgs_userFlag;
1165         ret->callback=msgs_callback;
1166         ret->secondaryCallback = msgs_secondaryCallback;
1167         ret->sourceFlag=msgs_nSources;
1168
1169         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1170
1171         CkSetRefNum(ret, ret->getUserFlag());
1172         if (!ret->secondaryCallback.isInvalid())
1173             ret->secondaryCallback.send(ret);
1174     else if (!storedCallback.isInvalid())
1175             storedCallback.send(ret);
1176     else{
1177       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1178             CkAbort("No reduction client!\n"
1179                     "You must register a client with either SetReductionClient or during contribute.\n");
1180     }
1181         completedRedNo++;
1182
1183         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1184
1185         for (i=1;i<(int)(adjVec.length());i++)
1186                 adjVec[i-1]=adjVec[i];
1187         adjVec.length()--;
1188         endArrayReduction();
1189 }
1190
1191 #if GROUP_LEVEL_REDUCTION
1192
1193 void CkReductionMgr::init_BinaryTree(){
1194         parent = (CkMyPe()-1)/TREE_WID;
1195         int firstkid = CkMyPe()*TREE_WID+1;
1196         numKids=CkNumPes()-firstkid;
1197         if (numKids>TREE_WID) numKids=TREE_WID;
1198         if (numKids<0) numKids=0;
1199
1200         for(int i=0;i<numKids;i++){
1201                 kids.push_back(firstkid+i);
1202                 newKids.push_back(firstkid+i);
1203         }
1204 }
1205
1206 void CkReductionMgr::init_BinomialTree(){
1207         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1208         /*upperSize = (unsigned )pow((double)2,depth);*/
1209         upperSize = (unsigned) 1 << depth;
1210         label = upperSize-CkMyPe()-1;
1211         int p=label;
1212         int count=0;
1213         while( p > 0){
1214                 if(p % 2 == 0)
1215                         break;
1216                 else{
1217                         p = p/2;
1218                         count++;
1219                 }
1220         }
1221         /*parent = label + rint(pow((double)2,count));*/
1222         parent = label + (1<<count);
1223         parent = upperSize -1 -parent;
1224         int temp;
1225         if(count != 0){
1226                 numKids = 0;
1227                 for(int i=0;i<count;i++){
1228                         /*temp = label - rint(pow((double)2,i));*/
1229                         temp = label - (1<<i);
1230                         temp = upperSize-1-temp;
1231                         if(temp <= CkNumPes()-1){
1232                                 kids.push_back(temp);
1233                                 numKids++;
1234                         }
1235                 }
1236         }else{
1237                 numKids = 0;
1238         //      kids = NULL;
1239         }
1240 }
1241
1242
1243 int CkReductionMgr::treeRoot(void)
1244 {
1245   return 0;
1246 }
1247 CmiBool CkReductionMgr::hasParent(void) //Root Node
1248 {
1249   return (CmiBool)(CkMyPe()!=treeRoot());
1250 }
1251 int CkReductionMgr::treeParent(void) //My parent Node
1252 {
1253   return parent;
1254 }
1255
1256 int CkReductionMgr::firstKid(void) //My first child Node
1257 {
1258   return CkMyPe()*TREE_WID+1;
1259 }
1260 int CkReductionMgr::treeKids(void)//Number of children in tree
1261 {
1262   return numKids;
1263 }
1264
1265 #endif
1266
1267 /////////////////////////////////////////////////////////////////////////
1268
1269 ////////////////////////////////
1270 //CkReductionMsg support
1271
1272 //ReductionMessage default private constructor-- does nothing
1273 CkReductionMsg::CkReductionMsg(){}
1274 CkReductionMsg::~CkReductionMsg(){}
1275
1276 //This define gives the distance from the start of the CkReductionMsg
1277 // object to the start of the user data area (just below last object field)
1278 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1279
1280 //"Constructor"-- builds and returns a new CkReductionMsg.
1281 //  the "data" array you specify will be copied into this object.
1282 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1283     CkReduction::reducerType reducer, CkReductionMsg *buf)
1284 {
1285   int len[1];
1286   len[0]=NdataSize;
1287   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1288   
1289   ret->dataSize=NdataSize;
1290   if (srcData!=NULL && !buf)
1291     memcpy(ret->data,srcData,NdataSize);
1292   ret->userFlag=-1;
1293   ret->reducer=reducer;
1294   //ret->ci=NULL;
1295   ret->sourceFlag=-1000;
1296   ret->gcount=0;
1297   ret->migratableContributor = true;
1298 #if CMK_BIGSIM_CHARM
1299   ret->log = NULL;
1300 #endif
1301   return ret;
1302 }
1303
1304 // Charm kernel message runtime support:
1305 void *
1306 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1307 {
1308   int totalsize=ARM_DATASTART+(*sz);
1309   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1310   CkReductionMsg *ret = (CkReductionMsg *)
1311     CkAllocMsg(msgnum,totalsize,priobits);
1312   ret->data=(void *)(&ret->dataStorage);
1313   return (void *) ret;
1314 }
1315
1316 void *
1317 CkReductionMsg::pack(CkReductionMsg* in)
1318 {
1319   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1320   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1321   in->data = NULL;
1322   return (void*) in;
1323 }
1324
1325 CkReductionMsg* CkReductionMsg::unpack(void *in)
1326 {
1327   CkReductionMsg *ret = (CkReductionMsg *)in;
1328   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1329   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1330   ret->data=(void *)(&ret->dataStorage);
1331   return ret;
1332 }
1333
1334
1335 /////////////////////////////////////////////////////////////////////////////////////
1336 ///////////////// Builtin Reducer Functions //////////////
1337 /* A simple reducer, like sum_int, looks like this:
1338 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1339 {
1340   int i,ret=0;
1341   for (i=0;i<nMsg;i++)
1342     ret+=*(int *)(msg[i]->getData());
1343   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1344 }
1345
1346 To keep the code small and easy to change, the implementations below
1347 are built with preprocessor macros.
1348 */
1349
1350 //////////////// simple reducers ///////////////////
1351 /*A define used to quickly and tersely construct simple reductions.
1352 The basic idea is to use the first message's data array as
1353 (pre-initialized!) scratch space for folding in the other messages.
1354  */
1355
1356 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1357 {
1358         CkAbort("Called the invalid reducer type 0.  This probably\n"
1359                 "means you forgot to initialize your custom reducer index.\n");
1360         return NULL;
1361 }
1362
1363 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1364 {
1365   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1366 }
1367
1368 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1369 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1370 {\
1371   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1372   int m,i;\
1373   int nElem=msg[0]->getLength()/sizeof(dataType);\
1374   dataType *ret=(dataType *)(msg[0]->getData());\
1375   for (m=1;m<nMsg;m++)\
1376   {\
1377     dataType *value=(dataType *)(msg[m]->getData());\
1378     for (i=0;i<nElem;i++)\
1379     {\
1380       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1381       loop\
1382     }\
1383   }\
1384   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1385   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1386 }
1387
1388 //Use this macro for reductions that have the same type for all inputs
1389 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1390   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1391   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1392   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1393   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1394
1395
1396 //Compute the sum the numbers passed by each element.
1397 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1398
1399 //Compute the product of the numbers passed by each element.
1400 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1401
1402 //Compute the largest number passed by any element.
1403 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1404
1405 //Compute the smallest integer passed by any element.
1406 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1407
1408
1409 //Compute the logical AND of the integers passed by each element.
1410 // The resulting integer will be zero if any source integer is zero; else 1.
1411 SIMPLE_REDUCTION(logical_and,int,"%d",
1412         if (value[i]==0)
1413      ret[i]=0;
1414   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1415 )
1416
1417 //Compute the logical OR of the integers passed by each element.
1418 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1419 SIMPLE_REDUCTION(logical_or,int,"%d",
1420   if (value[i]!=0)
1421            ret[i]=1;
1422   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1423 )
1424
1425 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1426 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1427
1428 //Select one random message to pass on
1429 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1430   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1431 }
1432
1433 /////////////// concat ////////////////
1434 /*
1435 This reducer simply appends the data it recieves from each element,
1436 without any housekeeping data to separate them.
1437 */
1438 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1439 {
1440   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1441   //Figure out how big a message we'll need
1442   int i,retSize=0;
1443   for (i=0;i<nMsg;i++)
1444       retSize+=msg[i]->getSize();
1445
1446   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1447
1448   //Allocate a new message
1449   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1450
1451   //Copy the source message data into the return message
1452   char *cur=(char *)(ret->getData());
1453   for (i=0;i<nMsg;i++) {
1454     int messageBytes=msg[i]->getSize();
1455     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1456     cur+=messageBytes;
1457   }
1458   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1459   return ret;
1460 }
1461
1462 /////////////// set ////////////////
1463 /*
1464 This reducer appends the data it recieves from each element
1465 along with some housekeeping data indicating contribution boundaries.
1466 The message data is thus a list of reduction_set_element structures
1467 terminated by a dummy reduction_set_element with a sourceElement of -1.
1468 */
1469
1470 //This rounds an integer up to the nearest multiple of sizeof(double)
1471 static const int alignSize=sizeof(double);
1472 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1473
1474 //This gives the size (in bytes) of a reduction_set_element
1475 static int SET_SIZE(int dataSize)
1476 {return SET_ALIGN(sizeof(int)+dataSize);}
1477
1478 //This returns a pointer to the next reduction_set_element in the list
1479 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1480 {
1481   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1482   return (CkReduction::setElement *)next;
1483 }
1484
1485 //Combine the data passed by each element into an list of reduction_set_elements.
1486 // Each element may contribute arbitrary data (with arbitrary length).
1487 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1488 {
1489   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1490   //Figure out how big a message we'll need
1491   int i,retSize=0;
1492   for (i=0;i<nMsg;i++) {
1493     if (!msg[i]->isFromUser())
1494     //This message is composite-- it will just be copied over (less terminating -1)
1495       retSize+=(msg[i]->getSize()-sizeof(int));
1496     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1497       retSize+=SET_SIZE(msg[i]->getSize());
1498   }
1499   retSize+=sizeof(int);//Leave room for terminating -1.
1500
1501   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1502
1503   //Allocate a new message
1504   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1505
1506   //Copy the source message data into the return message
1507   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1508   for (i=0;i<nMsg;i++)
1509     if (!msg[i]->isFromUser())
1510     {//This message is composite-- just copy it over (less terminating -1)
1511                         int messageBytes=msg[i]->getSize()-sizeof(int);
1512                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1513                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1514                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1515     }
1516     else //This is a message from an element-- wrap it in a reduction_set_element
1517     {
1518       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1519       cur->dataSize=msg[i]->getSize();
1520       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1521       cur=SET_NEXT(cur);
1522     }
1523   cur->dataSize=-1;//Add a terminating -1.
1524   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1525   return ret;
1526 }
1527
1528 //Utility routine: get the next reduction_set_element in the list
1529 // if there is one, or return NULL if there are none.
1530 //To get all the elements, just keep feeding this procedure's output back to
1531 // its input until it returns NULL.
1532 CkReduction::setElement *CkReduction::setElement::next(void)
1533 {
1534   CkReduction::setElement *n=SET_NEXT(this);
1535   if (n->dataSize==-1)
1536     return NULL;//This is the end of the list
1537   else
1538     return n;//This is just another element
1539 }
1540
1541 /////////////////// Reduction Function Table /////////////////////
1542 CkReduction::CkReduction() {} //Dummy private constructor
1543
1544 //Add the given reducer to the list.  Returns the new reducer's
1545 // reducerType.  Must be called in the same order on every node.
1546 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1547 {
1548   reducerTable[nReducers]=fn;
1549   return (reducerType)nReducers++;
1550 }
1551
1552 /*Reducer table: maps reducerTypes to reducerFns.
1553 It's indexed by reducerType, so the order in this table
1554 must *exactly* match the reducerType enum declaration.
1555 The names don't have to match, but it helps.
1556 */
1557 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1558
1559 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1560     ::invalid_reducer,
1561     ::nop,
1562   //Compute the sum the numbers passed by each element.
1563     ::sum_int,::sum_long,::sum_float,::sum_double,
1564
1565   //Compute the product the numbers passed by each element.
1566     ::product_int,::product_long,::product_float,::product_double,
1567
1568   //Compute the largest number passed by any element.
1569     ::max_int,::max_long,::max_float,::max_double,
1570
1571   //Compute the smallest number passed by any element.
1572     ::min_int,::min_long,::min_float,::min_double,
1573
1574   //Compute the logical AND of the integers passed by each element.
1575   // The resulting integer will be zero if any source integer is zero.
1576     ::logical_and,
1577
1578   //Compute the logical OR of the integers passed by each element.
1579   // The resulting integer will be 1 if any source integer is nonzero.
1580     ::logical_or,
1581
1582     // Compute the logical bitvector AND of the integers passed by each element.
1583     ::bitvec_and,
1584
1585     // Compute the logical bitvector OR of the integers passed by each element.
1586     ::bitvec_or,
1587
1588     // Select one of the messages at random to pass on
1589     ::random,
1590
1591   //Concatenate the (arbitrary) data passed by each element
1592     ::concat,
1593
1594   //Combine the data passed by each element into an list of setElements.
1595   // Each element may contribute arbitrary data (with arbitrary length).
1596     ::set
1597 };
1598
1599
1600
1601
1602
1603
1604
1605
1606 /********** Code added by Sayantan *********************/
1607 /** Locking is a big problem in the nodegroup code for smp.
1608  So a few assumptions have had to be made. There is one lock
1609  called lockEverything. It protects all the data structures 
1610  of the nodegroup reduction mgr. I tried grabbing it separately 
1611  for each datastructure, modifying it and then releasing it and
1612  then grabbing it again, for the next change.
1613  That doesn't really help because the interleaved execution of 
1614  different threads makes the state of the reduction manager 
1615  inconsistent. 
1616  
1617  1. Grab lockEverything before calling finishreduction or startReduction
1618     or doRecvMsg
1619  2. lockEverything is grabbed only in entry methods reductionStarting
1620     or RecvMesg or  addcontribution.
1621  ****/
1622  
1623 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1624 NodeGroup::NodeGroup(void) {
1625   __nodelock=CmiCreateLock();
1626 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1627     mlogData->objID.type = TypeNodeGroup;
1628     mlogData->objID.data.group.onPE = CkMyNode();
1629 #endif
1630
1631 }
1632 NodeGroup::~NodeGroup() {
1633   CmiDestroyLock(__nodelock);
1634 }
1635 void NodeGroup::pup(PUP::er &p)
1636 {
1637   CkNodeReductionMgr::pup(p);
1638   p|reductionInfo;
1639 }
1640
1641 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1642
1643 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1644   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1645  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1646   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1647  }
1648
1649 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1650                                     ((CkNodeReductionMgr *)this),
1651                                     reductionInfo,false)
1652
1653 /* this contribute also adds up the count across all messages it receives.
1654   Useful for summing up number of array elements who have contributed ****/ 
1655 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1656         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1657
1658
1659
1660 //#define BINOMIAL_TREE
1661
1662 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1663   : thisProxy(thisgroup)
1664 {
1665 #ifdef BINOMIAL_TREE
1666   init_BinomialTree();
1667 #else
1668   init_BinaryTree();
1669 #endif
1670   storedCallback=NULL;
1671   redNo=0;
1672   inProgress=CmiFalse;
1673   
1674   startRequested=CmiFalse;
1675   gcount=CkNumNodes();
1676   lcount=1;
1677   nContrib=nRemote=0;
1678   lockEverything = CmiCreateLock();
1679
1680
1681   creating=CmiFalse;
1682   interrupt = 0;
1683   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1684         /*
1685                 FAULT_EVAC
1686         */
1687         blocked = false;
1688         maxModificationRedNo = INT_MAX;
1689         killed=0;
1690         additionalGCount = newAdditionalGCount = 0;
1691 }
1692
1693 void CkNodeReductionMgr::flushStates()
1694 {
1695  if(CkMyRank() == 0){
1696   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1697   redNo=0;
1698   inProgress=CmiFalse;
1699
1700   startRequested=CmiFalse;
1701   gcount=CkNumNodes();
1702   lcount=1;
1703   nContrib=nRemote=0;
1704
1705   creating=CmiFalse;
1706   interrupt = 0;
1707   while (!msgs.isEmpty()) { delete msgs.deq(); }
1708   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1709   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1710   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1711   }
1712 }
1713
1714 //////////// Reduction Manager Client API /////////////
1715
1716 //Add the given client function.  Overwrites any previous client.
1717 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1718 {
1719   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1720   if(cb->isInvalid()){
1721         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1722   }else{
1723         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1724   }
1725
1726   if (CkMyNode()!=0)
1727           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1728   delete storedCallback;
1729   storedCallback=cb;
1730 }
1731
1732
1733
1734 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1735 {
1736 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1737     Chare *oldObj =CpvAccess(_currentObj);
1738     CpvAccess(_currentObj) = this;
1739 #endif
1740
1741   //m->ci=ci;
1742   m->redNo=ci->redNo++;
1743   m->sourceFlag=-1;//A single contribution
1744   m->gcount=0;
1745   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1746   addContribution(m);
1747
1748 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1749     CpvAccess(_currentObj) = oldObj;
1750 #endif
1751 }
1752
1753
1754 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1755 {
1756 #if CMK_BIGSIM_CHARM
1757   _TRACE_BG_TLINE_END(&m->log);
1758 #endif
1759 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1760     Chare *oldObj =CpvAccess(_currentObj);
1761     CpvAccess(_currentObj) = this;
1762 #endif
1763   //m->ci=ci;
1764   m->redNo=ci->redNo++;
1765   m->gcount=count;
1766   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1767   addContribution(m);
1768   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1769
1770 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1771     CpvAccess(_currentObj) = oldObj;
1772 #endif
1773 }
1774
1775
1776 //////////// Reduction Manager Remote Entry Points /////////////
1777
1778 //Sent down the reduction tree (used by barren PEs)
1779 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1780 {
1781   CmiLock(lockEverything);
1782   if(blocked){
1783         delete m;
1784         CmiUnlock(lockEverything);
1785         return ;
1786   }
1787   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1788   if (isPresent(m->num) && !inProgress)
1789   {
1790     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1791     startReduction(m->num,srcNode);
1792     finishReduction();
1793   } else if (isFuture(m->num)){
1794         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1795   }
1796   else //is Past
1797     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1798   CmiUnlock(lockEverything);
1799   delete m;
1800
1801 }
1802
1803
1804 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1805         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1806         /*
1807                 FAULT_EVAC
1808         */
1809         if(blocked){
1810                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1811                 bufferedRemoteMsgs.enq(m);
1812                 return;
1813         }
1814         
1815         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1816             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1817             startReduction(m->redNo,CkMyNode());
1818             msgs.enq(m);
1819             nRemote++;
1820             finishReduction();
1821         }
1822         else {
1823             if (isFuture(m->redNo)) {
1824                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1825                     futureRemoteMsgs.enq(m);
1826             }else{
1827                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1828                    CkAbort("Recv'd late remote contribution!\n");
1829             }
1830         }
1831         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1832 }
1833
1834 //Sent up the reduction tree with reduced data
1835 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1836 {
1837 #if CMK_BIGSIM_CHARM
1838   _TRACE_BG_TLINE_END(&m->log);
1839 #endif
1840 #ifndef CMK_CPV_IS_SMP
1841 #if CMK_IMMEDIATE_MSG
1842         if(interrupt == 1){
1843                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1844                 CpvAccess(_qd)->process(-1);
1845                 CmiDelayImmediate();
1846                 return;
1847         }
1848 #endif  
1849 #endif
1850    interrupt = 1;       
1851    CmiLock(lockEverything);   
1852    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1853    doRecvMsg(m);
1854    CmiUnlock(lockEverything);    
1855    interrupt = 0;
1856    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1857 }
1858
1859 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1860 {
1861         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1862         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1863         if (inProgress){
1864                 DEBR((AA"This Node reduction is already in progress\n"AB));
1865                 return;//This reduction already started
1866         }
1867         if (creating) //Don't start yet-- we're creating elements
1868         {
1869                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1870                 startRequested=CmiTrue;
1871                 return;
1872         }
1873         
1874         //If none of these cases, we need to start the reduction--
1875         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1876         inProgress=CmiTrue;
1877
1878         if(!_isNotifyChildInRed) return;
1879
1880         //Sent start requests to our kids (in case they don't already know)
1881         
1882         for (int k=0;k<treeKids();k++)
1883         {
1884 #ifdef BINOMIAL_TREE
1885                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1886                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1887 #else
1888                 if(kids[k] != srcNode){
1889                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1890                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1891                 }       
1892 #endif
1893         }
1894
1895         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1896         // in case, nodegroup does not has the attached red group,
1897         // it has to restart groups again
1898         startLocalGroupReductions(number);
1899 /*
1900         if (startLocalGroupReductions(number) == 0)
1901           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1902 */
1903 }
1904
1905 // restart local groups until succeed
1906 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1907   CmiLock(lockEverything);    
1908   if (startLocalGroupReductions(number) == 0)
1909     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1910   CmiUnlock(lockEverything);    
1911 }
1912
1913 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1914         /*
1915                 FAULT_EVAC
1916         */
1917         if(blocked){
1918                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1919                 bufferedMsgs.enq(m);
1920                 return;
1921         }
1922         
1923         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1924                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1925                 futureMsgs.enq(m);
1926         } else {// An ordinary contribution
1927                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1928                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1929                 startReduction(m->redNo,CkMyNode());
1930                 msgs.enq(m);
1931                 nContrib++;
1932                 finishReduction();
1933         }
1934 }
1935
1936 //Handle a message from one element for the reduction
1937 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1938 {
1939   interrupt = 1;
1940   CmiLock(lockEverything);
1941   doAddContribution(m);
1942   CmiUnlock(lockEverything);
1943   interrupt = 0;
1944 }
1945
1946 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1947         if(blocked){
1948                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1949                 bufferedMsgs.enq(m);
1950                 return;
1951         }
1952         
1953         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1954                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1955 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1956                 futureLateMigrantMsgs.enq(m);
1957         } else {// An ordinary contribution
1958                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1959 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1960                 msgs.enq(m);
1961                 finishReduction();
1962         }
1963 }
1964
1965
1966
1967
1968
1969 /** check if the nodegroup reduction is finished at this node. In that case send it
1970 up the reduction tree **/
1971
1972 void CkNodeReductionMgr::finishReduction(void)
1973 {
1974   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1975   /***Check if reduction is finished in the next few ifs***/
1976   if ((!inProgress) || creating){
1977         DEBR((AA"Either not in Progress or creating\n"AB));
1978         return;
1979   }
1980
1981   if (nContrib<(lcount)){
1982         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1983
1984          return;//Need more local messages
1985   }
1986   if (nRemote<treeKids()){
1987         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1988
1989         return;//Need more remote messages
1990   }
1991   if (nRemote>treeKids()){
1992
1993           interrupt = 0;
1994            CkAbort("Nodegrp Excess remote reduction message received!\n");
1995   }
1996
1997   DEBR((AA"Reducing node data...\n"AB));
1998
1999   /**reduce all messages received at this node **/
2000   CkReductionMsg *result=reduceMessages();
2001
2002   if (hasParent())
2003   {//Pass data up tree to parent
2004         if(CmiNodeAlive(CkMyNode()) || killed == 0){
2005         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
2006         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
2007                 /*
2008                         FAULT_EVAC
2009                 */
2010                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2011             thisProxy[treeParent()].RecvMsg(result);
2012         }
2013
2014   }
2015   else
2016   {
2017                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2018                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2019                         msgs.enq(result);
2020                         return;
2021                 }
2022                 result->gcount += additionalGCount;
2023           /** if the reduction is finished and I am the root of the reduction tree
2024           then call the reductionhandler and other stuff ***/
2025                 
2026
2027                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2028     CkSetRefNum(result, result->getUserFlag());
2029     if (!result->callback.isInvalid()){
2030       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2031             result->callback.send(result);
2032     }
2033     else if (storedCallback!=NULL){
2034       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2035             storedCallback->send(result);
2036     }
2037     else{
2038                 DEBR((AA"Invalid Callback \n"AB));
2039             CkAbort("No reduction client!\n"
2040                     "You must register a client with either SetReductionClient or during contribute.\n");
2041                 }
2042   }
2043
2044   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2045   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2046   redNo++;
2047         updateTree();
2048   int i;
2049   inProgress=CmiFalse;
2050   startRequested=CmiFalse;
2051   nRemote=nContrib=0;
2052
2053   //Look through the future queue for messages we can now handle
2054   int n=futureMsgs.length();
2055
2056   for (i=0;i<n;i++)
2057   {
2058     interrupt = 1;
2059
2060     CkReductionMsg *m=futureMsgs.deq();
2061
2062     interrupt = 0;
2063     if (m!=NULL){ //One of these addContributions may have finished us.
2064       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2065       doAddContribution(m);//<- if *still* early, puts it back in the queue
2066     }
2067   }
2068
2069   interrupt = 1;
2070
2071   n=futureRemoteMsgs.length();
2072
2073   interrupt = 0;
2074   for (i=0;i<n;i++)
2075   {
2076     interrupt = 1;
2077
2078     CkReductionMsg *m=futureRemoteMsgs.deq();
2079
2080     interrupt = 0;
2081     if (m!=NULL)
2082       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2083   }
2084   
2085   n = futureLateMigrantMsgs.length();
2086   for(i=0;i<n;i++){
2087     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2088     if(m != NULL){
2089       if(m->redNo == redNo){
2090         msgs.enq(m);
2091       }else{
2092         futureLateMigrantMsgs.enq(m);
2093       }
2094     }
2095   }
2096 }
2097
2098 //////////// Reduction Manager Utilities /////////////
2099
2100 void CkNodeReductionMgr::init_BinaryTree(){
2101         parent = (CkMyNode()-1)/TREE_WID;
2102         int firstkid = CkMyNode()*TREE_WID+1;
2103         numKids=CkNumNodes()-firstkid;
2104   if (numKids>TREE_WID) numKids=TREE_WID;
2105   if (numKids<0) numKids=0;
2106
2107         for(int i=0;i<numKids;i++){
2108                 kids.push_back(firstkid+i);
2109                 newKids.push_back(firstkid+i);
2110         }
2111 }
2112
2113 void CkNodeReductionMgr::init_BinomialTree(){
2114         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2115         /*upperSize = (unsigned )pow((double)2,depth);*/
2116         upperSize = (unsigned) 1 << depth;
2117         label = upperSize-CkMyNode()-1;
2118         int p=label;
2119         int count=0;
2120         while( p > 0){
2121                 if(p % 2 == 0)
2122                         break;
2123                 else{
2124                         p = p/2;
2125                         count++;
2126                 }
2127         }
2128         /*parent = label + rint(pow((double)2,count));*/
2129         parent = label + (1<<count);
2130         parent = upperSize -1 -parent;
2131         int temp;
2132         if(count != 0){
2133                 numKids = 0;
2134                 for(int i=0;i<count;i++){
2135                         /*temp = label - rint(pow((double)2,i));*/
2136                         temp = label - (1<<i);
2137                         temp = upperSize-1-temp;
2138                         if(temp <= CkNumNodes()-1){
2139                 //              kids[numKids] = temp;
2140                                 kids.push_back(temp);
2141                                 numKids++;
2142                         }
2143                 }
2144         }else{
2145                 numKids = 0;
2146         //      kids = NULL;
2147         }
2148 }
2149
2150
2151 int CkNodeReductionMgr::treeRoot(void)
2152 {
2153   return 0;
2154 }
2155 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2156 {
2157   return (CmiBool)(CkMyNode()!=treeRoot());
2158 }
2159 int CkNodeReductionMgr::treeParent(void) //My parent Node
2160 {
2161   return parent;
2162 }
2163
2164 int CkNodeReductionMgr::firstKid(void) //My first child Node
2165 {
2166   return CkMyNode()*TREE_WID+1;
2167 }
2168 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2169 {
2170 #ifdef BINOMIAL_TREE
2171         return numKids;
2172 #else
2173 /*  int nKids=CkNumNodes()-firstKid();
2174   if (nKids>TREE_WID) nKids=TREE_WID;
2175   if (nKids<0) nKids=0;
2176   return nKids;*/
2177         return numKids;
2178 #endif
2179 }
2180
2181 //Combine (& free) the current message vector msgs.
2182 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2183 {
2184 #if CMK_BIGSIM_CHARM
2185   _TRACE_BG_END_EXECUTE(1);
2186   void* _bgParentLog = NULL;
2187   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2188 #endif
2189   CkReductionMsg *ret=NULL;
2190
2191   //Look through the vector for a valid reducer, swapping out placeholder messages
2192   CkReduction::reducerType r=CkReduction::invalid;
2193   int msgs_gcount=0;//Reduced gcount
2194   int msgs_nSources=0;//Reduced nSources
2195   int msgs_userFlag=-1;
2196   CkCallback msgs_callback;
2197   CkCallback msgs_secondaryCallback;
2198   int i;
2199   int nMsgs=0;
2200   CkReductionMsg *m;
2201   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2202   bool isMigratableContributor;
2203         
2204
2205   while(NULL!=(m=msgs.deq()))
2206   {
2207     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2208     msgs_gcount+=m->gcount;
2209     if (m->sourceFlag!=0)
2210     { //This is a real message from an element, not just a placeholder
2211       msgArr[nMsgs++]=m;
2212       msgs_nSources+=m->nSources();
2213       r=m->reducer;
2214       if (!m->callback.isInvalid())
2215         msgs_callback=m->callback;
2216       if(!m->secondaryCallback.isInvalid()){
2217         msgs_secondaryCallback = m->secondaryCallback;
2218       }
2219       if (m->userFlag!=-1)
2220         msgs_userFlag=m->userFlag;
2221
2222         isMigratableContributor= m->isMigratableContributor();
2223 #if CMK_BIGSIM_CHARM
2224       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2225 #endif
2226                                 
2227     }
2228     else
2229     { //This is just a placeholder message-- replace it
2230       delete m;
2231     }
2232   }
2233
2234   if (nMsgs==0||r==CkReduction::invalid)
2235   //No valid reducer in the whole vector
2236     ret=CkReductionMsg::buildNew(0,NULL);
2237   else
2238   {//Use the reducer to reduce the messages
2239     if(nMsgs == 1){
2240         ret = msgArr[0];
2241     }else{
2242         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2243         ret=(*f)(nMsgs,msgArr);
2244     }
2245     ret->reducer=r;
2246   }
2247
2248         
2249 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2250 #if CRITICAL_PATH_DEBUG > 3
2251         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2252 #endif
2253         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2254         path.updateMax(UsrToEnv(ret));
2255         // Combine the critical paths from all the reduction messages into the header for the new result
2256         for (i=0;i<nMsgs;i++){
2257           if (msgArr[i]!=ret){
2258             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2259             path.updateMax(UsrToEnv(msgArr[i]));
2260           } else {
2261             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2262           }
2263         }
2264 #if CRITICAL_PATH_DEBUG > 3
2265         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2266 #endif
2267
2268 #endif
2269
2270
2271         //Go back through the vector, deleting old messages
2272   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2273   delete [] msgArr;
2274   //Set the message counts
2275   ret->redNo=redNo;
2276   ret->gcount=msgs_gcount;
2277   ret->userFlag=msgs_userFlag;
2278   ret->callback=msgs_callback;
2279   ret->secondaryCallback = msgs_secondaryCallback;
2280   ret->sourceFlag=msgs_nSources;
2281   ret->setMigratableContributor(isMigratableContributor);
2282   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2283 #if CMK_BIGSIM_CHARM
2284   _TRACE_BG_TLINE_END(&ret->log);
2285 #endif
2286
2287   return ret;
2288 }
2289
2290 void CkNodeReductionMgr::pup(PUP::er &p)
2291 {
2292 //We do not store the client function pointer or the client function parameter,
2293 //it is the responsibility of the programmer to correctly restore these
2294   IrrGroup::pup(p);
2295   p(redNo);
2296   p(inProgress); p(creating); p(startRequested);
2297   p(lcount);
2298   p(nContrib); p(nRemote);
2299   p(interrupt);
2300   p|msgs;
2301   p|futureMsgs;
2302   p|futureRemoteMsgs;
2303   p|futureLateMigrantMsgs;
2304   p|parent;
2305   p|additionalGCount;
2306   p|newAdditionalGCount;
2307   if(p.isUnpacking()) {
2308     gcount=CkNumNodes();
2309     thisProxy = thisgroup;
2310     lockEverything = CmiCreateLock();
2311 #ifdef BINOMIAL_TREE
2312     init_BinomialTree();
2313 #else
2314     init_BinaryTree();
2315 #endif          
2316   }
2317   p | blocked;
2318   p | maxModificationRedNo;
2319
2320 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2321   int isnull = (storedCallback == NULL);
2322   p | isnull;
2323   if (!isnull) {
2324     if (p.isUnpacking()) {
2325       storedCallback = new CkCallback;
2326     }
2327     p|*storedCallback;
2328   }
2329 #endif
2330
2331 }
2332
2333 /*
2334         FAULT_EVAC
2335         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2336         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2337         reduction tree. 
2338 */
2339 void CkNodeReductionMgr::evacuate(){
2340         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2341         if(treeKids() == 0){
2342         /*
2343                 if the node going down is a leaf
2344         */
2345                 oldleaf=true;
2346                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2347                 /*
2348                         Need to ask parent for the reduction number that it has seen. 
2349                         Since it is a leaf, the tree does not need to be rewired. 
2350                         We reuse the oldparent type of tree modification message to get 
2351                         the parent to block and tell us about the highest reduction number it has seen.
2352                         
2353                 */
2354                 int data[2];
2355                 data[0]=CkMyNode();
2356                 data[1]=getTotalGCount()+additionalGCount;
2357                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2358                 newParent = treeParent();
2359         }else{
2360                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2361                 oldleaf= false;
2362         /*
2363                 It is not a leaf. It needs to rewire the tree around itself.
2364                 It also needs to decide on a reduction No after which the new tree will be used
2365                 Till it decides on the new tree and the redNo at which it becomes valid,
2366                 all received messages will be buffered
2367         */
2368                 newParent = kids[0];
2369                 for(int i=numKids-1;i>=0;i--){
2370                         newKids.remove(i);
2371                 }
2372                 /*
2373                         Ask everybody for the highest reduction number they have seen and
2374                         also tell them about the new tree
2375                 */
2376                 /*
2377                         Tell parent about its new child;
2378                 */
2379                 int oldParentData[2];
2380                 oldParentData[0] = CkMyNode();
2381                 oldParentData[1] = newParent;
2382                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2383
2384                 /*
2385                         Tell the other children about their new parent
2386                 */
2387                 int childrenData=newParent;
2388                 for(int i=1;i<numKids;i++){
2389                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2390                 }
2391                 
2392                 /*
2393                         Tell newParent (1st child) about its new children,
2394                         the current node and its children except the newParent
2395                 */
2396                 int *newParentData = new int[numKids+2];
2397                 for(int i=1;i<numKids;i++){
2398                         newParentData[i] = kids[i];
2399                 }
2400                 newParentData[0] = CkMyNode();
2401                 newParentData[numKids] = parent;
2402                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2403                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2404         }
2405         readyDeletion = false;
2406         blocked = true;
2407         numModificationReplies = 0;
2408         tempModificationRedNo = findMaxRedNo();
2409 }
2410
2411 /*
2412         Depending on the code, use the data to change the tree
2413         1. OLDPARENT : replace the old child with a new one
2414         2. OLDCHILDREN: replace the parent
2415         3. NEWPARENT:  add the children and change the parent
2416         4. LEAFPARENT: delete the old child
2417 */
2418
2419 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2420         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2421         int sender;
2422         newKids = kids;
2423         readyDeletion = false;
2424         newAdditionalGCount = additionalGCount;
2425         switch(code){
2426                 case OLDPARENT: 
2427                         for(int i=0;i<numKids;i++){
2428                                 if(newKids[i] == data[0]){
2429                                         newKids[i] = data[1];
2430                                         break;
2431                                 }
2432                         }
2433                         sender = data[0];
2434                         newParent = parent;
2435                         break;
2436                 case OLDCHILDREN:
2437                         newParent = data[0];
2438                         sender = parent;
2439                         break;
2440                 case NEWPARENT:
2441                         for(int i=0;i<size-2;i++){
2442                                 newKids.push_back(data[i]);
2443                         }
2444                         newParent = data[size-2];
2445                         newAdditionalGCount += data[size-1];
2446                         sender = parent;
2447                         break;
2448                 case LEAFPARENT:
2449                         for(int i=0;i<numKids;i++){
2450                                 if(newKids[i] == data[0]){
2451                                         newKids.remove(i);
2452                                         break;
2453                                 }
2454                         }
2455                         sender = data[0];
2456                         newParent = parent;
2457                         newAdditionalGCount += data[1];
2458                         break;
2459         };
2460         blocked = true;
2461         int maxRedNo = findMaxRedNo();
2462         
2463         thisProxy[sender].collectMaxRedNo(maxRedNo);
2464 }
2465
2466 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2467         /*
2468                 Find out the maximum redNo that has been seen by 
2469                 the affected nodes
2470         */
2471         numModificationReplies++;
2472         if(maxRedNo > tempModificationRedNo){
2473                 tempModificationRedNo = maxRedNo;
2474         }
2475         if(numModificationReplies == numKids+1){
2476                 maxModificationRedNo = tempModificationRedNo;
2477                 /*
2478                         when all the affected nodes have replied, tell them the maximum.
2479                         Unblock yourself. deal with the buffered messages local and remote
2480                 */
2481                 if(maxModificationRedNo == -1){
2482                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2483                 }else{
2484                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2485                 }
2486                 thisProxy[parent].unblockNode(maxModificationRedNo);
2487                 for(int i=0;i<numKids;i++){
2488                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2489                 }
2490                 blocked = false;
2491                 updateTree();
2492                 clearBlockedMsgs();
2493         }
2494 }
2495
2496 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2497         maxModificationRedNo = maxRedNo;
2498         updateTree();
2499         blocked = false;
2500         clearBlockedMsgs();
2501 }
2502
2503
2504 void CkNodeReductionMgr::clearBlockedMsgs(){
2505         int len = bufferedMsgs.length();
2506         for(int i=0;i<len;i++){
2507                 CkReductionMsg *m = bufferedMsgs.deq();
2508                 doAddContribution(m);
2509         }
2510         len = bufferedRemoteMsgs.length();
2511         for(int i=0;i<len;i++){
2512                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2513                 doRecvMsg(m);
2514         }
2515
2516 }
2517 /*
2518         if the reduction number exceeds the maxModificationRedNo, change the tree
2519         to become the new one
2520 */
2521
2522 void CkNodeReductionMgr::updateTree(){
2523         if(redNo > maxModificationRedNo){
2524                 parent = newParent;
2525                 kids = newKids;
2526                 maxModificationRedNo = INT_MAX;
2527                 numKids = kids.size();
2528                 readyDeletion = true;
2529                 additionalGCount = newAdditionalGCount;
2530                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2531                 for(int i=0;i<(int)(newKids.size());i++){
2532                         DEBREVAC(("%d ",newKids[i]));
2533                 }
2534                 DEBREVAC(("\n"));
2535         //      startReduction(redNo,CkMyNode());
2536         }else{
2537                 if(maxModificationRedNo != INT_MAX){
2538                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2539                         startReduction(redNo,CkMyNode());
2540                         finishReduction();
2541                 }       
2542         }
2543 }
2544
2545
2546 void CkNodeReductionMgr::doneEvacuate(){
2547         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2548 /*      if(oldleaf){
2549                 
2550                         It used to be a leaf
2551                         Then as soon as future messages have been emptied you can 
2552                         send the parent a message telling them that they are not going
2553                         to receive anymore messages from this child
2554                 
2555                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2556                 while(futureMsgs.length() != 0){
2557                         int n = futureMsgs.length();
2558                         for(int i=0;i<n;i++){
2559                                 CkReductionMsg *m = futureMsgs.deq();
2560                                 if(isPresent(m->redNo)){
2561                                         msgs.enq(m);
2562                                 }else{
2563                                         futureMsgs.enq(m);
2564                                 }
2565                         }
2566                         CkReductionMsg *result = reduceMessages();
2567                         thisProxy[treeParent()].RecvMsg(result);
2568                         redNo++;
2569                 }
2570                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2571                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2572         }else{*/
2573                 if(readyDeletion){
2574                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2575                 }else{
2576                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2577                 }
2578 //      }
2579 }
2580
2581 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2582         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2583         for(int i=0;i<numKids;i++){
2584                 if(kids[i] == deletedChild){
2585                         kids.remove(i);
2586                         break;
2587                 }
2588         }
2589         numKids = kids.length();
2590         finishReduction();
2591 }
2592
2593 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2594         for(int i=0;i<(int)(newKids.length());i++){
2595                 if(newKids[i] == deletedChild){
2596                         newKids.remove(i);
2597                         break;
2598                 }
2599         }
2600         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2601         for(int i=0;i<(int)(newKids.size());i++){
2602                 DEBREVAC(("%d ",newKids[i]));
2603         }
2604         DEBREVAC(("\n"));
2605         finishReduction();
2606 }
2607
2608 int CkNodeReductionMgr::findMaxRedNo(){
2609         int max = redNo;
2610         for(int i=0;i<futureRemoteMsgs.length();i++){
2611                 if(futureRemoteMsgs[i]->redNo  > max){
2612                         max = futureRemoteMsgs[i]->redNo;
2613                 }
2614         }
2615         /*
2616                 if redNo is max (that is no future message) and the current reduction has not started
2617                 then tree can be changed before the reduction redNo can be started
2618         */ 
2619         if(redNo == max && msgs.length() == 0){
2620                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2621                 max--;
2622         }
2623         return max;
2624 }
2625
2626 // initnode call. check the size of reduction table
2627 void CkReductionMgr::sanitycheck()
2628 {
2629 #if CMK_ERROR_CHECKING
2630   int count = 0;
2631   while (CkReduction::reducerTable[count] != NULL) count++;
2632   CmiAssert(CkReduction::nReducers == count);
2633 #endif
2634 }
2635
2636 #include "CkReduction.def.h"