Initializing number of emigrant recovering objects variable.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207 #if defined(_FAULT_CAUSAL_)
208         numImmigrantRecObjs = 0;
209         numEmigrantRecObjs = 0;
210 #endif
211   disableNotifyChildrenStart = CmiFalse;
212   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
213 }
214
215 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
216 {
217   redNo=0;
218   completedRedNo = -1;
219   inProgress=CmiFalse;
220   creating=CmiFalse;
221   startRequested=CmiFalse;
222   gcount=lcount=0;
223   nContrib=nRemote=0;
224   maxStartRequest=0;
225   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
226 #if defined(_FAULT_CAUSAL_)
227         numImmigrantRecObjs = 0;
228         numEmigrantRecObjs = 0;
229 #endif
230
231 }
232
233 void CkReductionMgr::flushStates(int isgroup)
234 {
235   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
236   redNo=0;
237   completedRedNo = -1;
238   inProgress=CmiFalse;
239   creating=CmiFalse;
240   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
241   startRequested=CmiFalse;
242   nContrib=nRemote=0;
243   maxStartRequest=0;
244
245   while (!msgs.isEmpty()) { delete msgs.deq(); }
246   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
247   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
248   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
249
250   adjVec.length()=0;
251
252 #if ! GROUP_LEVEL_REDUCTION
253   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
254 #endif
255 }
256
257 //////////// Reduction Manager Client API /////////////
258
259 //Add the given client function.  Overwrites any previous client.
260 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
261 {
262   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
263
264   if (CkMyPe()!=0)
265           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
266   storedCallback=*cb;
267 #if ! GROUP_LEVEL_REDUCTION
268   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
269   nodeProxy.ckSetReductionClient(callback);
270 #endif
271 }
272
273 ///////////////////////////// Contributor ////////////////////////
274 //Contributors keep a copy of this structure:
275
276 /*Contributor migration support:
277 */
278 void contributorInfo::pup(PUP::er &p)
279 {
280   p(redNo);
281 }
282
283 ////////////////////// Contributor list maintainance: /////////////////
284 //These just set and clear the "creating" flag to prevent
285 // reductions from finishing early because not all elements
286 // have been created.
287 void CkReductionMgr::creatingContributors(void)
288 {
289   DEBR((AA"Creating contributors...\n"AB));
290   creating=CmiTrue;
291 }
292 void CkReductionMgr::doneCreatingContributors(void)
293 {
294   DEBR((AA"Done creating contributors...\n"AB));
295   creating=CmiFalse;
296   if (startRequested) startReduction(redNo,CkMyPe());
297   finishReduction();
298 }
299
300 //A new contributor will be created
301 void CkReductionMgr::contributorStamped(contributorInfo *ci)
302 {
303   DEBR((AA"Contributor %p stamped\n"AB,ci));
304   //There is another contributor
305   gcount++;
306   if (inProgress)
307   {
308     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
309     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
310   } else
311     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
312 }
313
314 //A new contributor was actually created
315 void CkReductionMgr::contributorCreated(contributorInfo *ci)
316 {
317   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
318   //We've got another contributor
319   lcount++;
320   //He may not need to contribute to some of our reductions:
321   for (int r=redNo;r<ci->redNo;r++)
322     adj(r).lcount--;//He won't be contributing to r here
323 }
324
325 /*Don't expect any more contributions from this one.
326 This is rather horrifying because we now have to make
327 sure the global element count accurately reflects all the
328 contributions the element made before it died-- these may stretch
329 far into the future.  The adj() vector is what saves us here.
330 */
331 void CkReductionMgr::contributorDied(contributorInfo *ci)
332 {
333 #if CMK_MEM_CHECKPOINT
334   // ignore from listener if it is during restart from crash
335   if (CkInRestarting()) return;
336 #endif
337   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
338   //We lost a contributor
339   gcount--;
340
341   if (ci->redNo<redNo)
342   {//Must have been migrating during reductions-- root is waiting for his
343   // contribution, which will never come.
344     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
345     for (int r=ci->redNo;r<redNo;r++)
346       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
347   }
348
349   //Add to the global count for all his future messages (wherever they are)
350   int r;
351   for (r=redNo;r<ci->redNo;r++)
352   {//He already contributed to this reduction, but won't show up in global count.
353     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
354     adj(r).gcount++;
355   }
356
357   lcount--;
358   //He's already contributed to several reductions here
359   for (r=redNo;r<ci->redNo;r++)
360     adj(r).lcount++;//He'll be contributing to r here
361
362   finishReduction();
363 }
364
365 //Migrating away (note that global count doesn't change)
366 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
367 {
368   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
369   lcount--;//We lost a local
370   //He's already contributed to several reductions here
371   for (int r=redNo;r<ci->redNo;r++)
372     adj(r).lcount++;//He'll be contributing to r here
373
374   finishReduction();
375 }
376
377 //Migrating in (note that global count doesn't change)
378 void CkReductionMgr::contributorArriving(contributorInfo *ci)
379 {
380   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
381   lcount++;//We gained a local
382 #if CMK_MEM_CHECKPOINT
383   // ignore from listener if it is during restart from crash
384   // because the ci may be old.
385   if (CkInRestarting()) return;
386 #endif
387   //He has already contributed (elsewhere) to several reductions:
388   for (int r=redNo;r<ci->redNo;r++)
389     adj(r).lcount--;//He won't be contributing to r here
390
391 }
392
393 //Contribute-- the given msg can contain any data.  The reducerType
394 // field of the message must be valid.
395 // Each contributor must contribute exactly once to the each reduction.
396 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
397 {
398 #if CMK_BIGSIM_CHARM
399   _TRACE_BG_TLINE_END(&(m->log));
400 #endif
401   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
402   //m->ci=ci;
403   m->redNo=ci->redNo++;
404   m->sourceFlag=-1;//A single contribution
405   m->gcount=0;
406
407 #if defined(_FAULT_CAUSAL_)
408
409         // if object is an immigrant recovery object, we send the contribution to the source PE
410         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
411                 
412                 // turning on the message-logging bypass flag
413                 envelope *env = UsrToEnv(m);
414                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
415         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
416                 return;
417         }
418
419     Chare *oldObj = CpvAccess(_currentObj);
420     CpvAccess(_currentObj) = this;
421
422         // adding contribution
423         addContribution(m);
424
425     CpvAccess(_currentObj) = oldObj;
426 #else
427   addContribution(m);
428 #endif
429 }
430
431 #if defined(_FAULT_CAUSAL_)
432 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
433         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
434         
435         // turning off bypassing flag
436         envelope *env = UsrToEnv(m);
437         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
438
439         // adding contribution
440     addContribution(m);
441 }
442 #else
443 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
444 #endif
445
446 //////////// Reduction Manager Remote Entry Points /////////////
447 //Sent down the reduction tree (used by barren PEs)
448 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
449 {
450  if(CkMyPe()==0){
451         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
452         //delete m;
453         //return;
454  }
455  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
456  int srcPE = (UsrToEnv(m))->getSrcPe();
457 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
458   if (isPresent(m->num) && !inProgress)
459   {
460     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
461     startReduction(m->num,srcPE);
462     finishReduction();
463   } else if (isFuture(m->num)){
464 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
465           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
466           if(maxStartRequest < m->num){
467                   maxStartRequest = m->num;
468           }
469  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
470           
471     }
472   else //is Past
473     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
474   delete m;
475 #else
476     if(redNo == 0){
477         if(lcount == 0){
478             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
479             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
480             dummy->fromPE = CmiMyPe();
481             dummy->sourceProcessorCount = 0;
482             dummy->redNo = 0;
483             thisProxy[0].contributeViaMessage(dummy);
484         }
485     }
486 #endif
487 }
488
489 //Sent to root of reduction tree with reduction contribution
490 // of migrants that missed the main reduction.
491 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
492 {
493 #if GROUP_LEVEL_REDUCTION
494 #if CMK_BIGSIM_CHARM
495   _TRACE_BG_TLINE_END(&(m->log));
496 #endif
497   addContribution(m);
498 #else
499   m->secondaryCallback = m->callback;
500   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
501   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
502   nodeMgr->LateMigrantMsg(m);
503 /*      int len = finalMsgs.length();
504         finalMsgs.enq(m);
505 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
506         endArrayReduction();*/
507 #endif
508 }
509
510 //A late migrating contributor will never contribute to this reduction
511 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
512 {
513   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
514   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
515  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
516   adj(m->num).gcount--;//He won't be contributing to this one.
517   finishReduction();
518   delete m;
519 }
520
521 //////////// Reduction Manager State /////////////
522 void CkReductionMgr::startReduction(int number,int srcPE)
523 {
524   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
525   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
526   if (inProgress){
527         DEBR((AA"This reduction is already in progress\n"AB));
528         return;//This reduction already started
529   }
530   if (creating) //Don't start yet-- we're creating elements
531   {
532     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
533     startRequested=CmiTrue;
534     return;
535   }
536
537 //If none of these cases, we need to start the reduction--
538   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
539   inProgress=CmiTrue;
540  
541
542         /*
543                 FAULT_EVAC
544         */
545   if(!CmiNodeAlive(CkMyPe())){
546         return;
547   }
548
549   if(disableNotifyChildrenStart) return;
550  
551 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_) 
552   if(CmiMyPe() == 0 && redNo == 0){
553             for(int j=0;j<CkNumPes();j++){
554                 if(j != CkMyPe() && j != srcPE){
555                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
556                 }
557             }
558             if(lcount == 0){
559                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
560                 dummy->fromPE = CmiMyPe();
561                 dummy->sourceProcessorCount = 0;
562                 dummy->redNo = 0;
563                 thisProxy[0].contributeViaMessage(dummy);
564             }
565     }   else{
566         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
567     }
568
569
570 #else
571   //Sent start requests to our kids (in case they don't already know)
572 #if GROUP_LEVEL_REDUCTION
573   for (int k=0;k<treeKids();k++)
574   {
575     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
576     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
577   }
578 #else
579   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
580 #endif
581 #endif
582         
583         /*
584   int temp;
585   //making it a broadcast done only by PE 0
586   if(!hasParent()){
587                 temp = completedRedNo+1;
588                 for(int i=temp;i<=number;i++){
589                         for(int j=0;j<CkNumPes();j++){
590                                 if(j != CkMyPe() && j != srcPE){
591                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
592                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
593                                         }
594                                 }
595                         }
596                 }       
597         }       else{
598                 temp = number;
599         }*/
600 /*  if(!hasParent()){
601                 temp = completedRedNo+1;
602         }       else{
603                 temp = number;
604         }
605         for(int i=temp;i<=number;i++){
606         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
607                 if(hasParent()){
608           // kick-start your parent too ...
609                         if(treeParent() != srcPE){
610                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
611 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
612     CpvAccess(_currentObj) = oldObj;
613 #endif
614                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
615                                 }       
616                         }       
617                 }
618           for (int k=0;k<treeKids();k++)
619           {
620                         if(firstKid()+k != srcPE){
621                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
622                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
623                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
624                                 }       
625                         }       
626         }
627         }
628         */
629 }       
630
631 /*Handle a message from one element for the reduction*/
632 void CkReductionMgr::addContribution(CkReductionMsg *m)
633 {
634   if (isPast(m->redNo))
635   {
636 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
637         CmiAbort("this version should not have late migrations");
638 #else
639         //We've moved on-- forward late contribution straight to root
640     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
641         // if (!hasParent()) //Root moved on too soon-- should never happen
642         //   CkAbort("Late reduction contribution received at root!\n");
643     thisProxy[0].LateMigrantMsg(m);
644 #endif
645   }
646   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
647     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
648     futureMsgs.enq(m);
649   } else {// An ordinary contribution
650     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
651    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
652     startReduction(m->redNo,CkMyPe());
653     msgs.enq(m);
654     nContrib++;
655     finishReduction();
656   }
657 }
658
659 /**function checks if it has got all contributions that it is supposed to
660 get at this processor. If it is done it sends the reduced result to the local
661 nodegroup */
662 void CkReductionMgr::finishReduction(void)
663 {
664   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
665   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
666   if ((!inProgress) || creating){
667         DEBR((AA"Either not in Progress or creating\n"AB));
668         return;
669   }
670   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
671 #if (defined(_FAULT_CAUSAL_))
672         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
673         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
674                 return;//Need more local messages
675         }
676 #else
677   if (nContrib<(lcount+adj(redNo).lcount)){
678          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
679          return;//Need more local messages
680   }
681 #endif
682
683 #if GROUP_LEVEL_REDUCTION
684   if (nRemote<treeKids())  return;//Need more remote messages
685         
686 #endif
687  
688   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
689   CkReductionMsg *result=reduceMessages();
690   result->redNo=redNo;
691 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
692
693 #if GROUP_LEVEL_REDUCTION
694   if (hasParent())
695   {//Pass data up tree to parent
696     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
697     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
698 #if (defined(_FAULT_CAUSAL_))
699     result->gcount+=gcount+adj(redNo).gcount;
700 #else
701     result->gcount+=gcount+adj(redNo).gcount;
702 #endif
703     thisProxy[treeParent()].RecvMsg(result);
704   }
705   else 
706   {//We are root-- pass data to client
707     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
708     int totalElements=result->gcount+gcount+adj(redNo).gcount;
709     if (totalElements>result->nSources()) 
710     {
711       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
712       msgs.enq(result);
713       return; // Wait for migrants to contribute
714     } else if (totalElements<result->nSources()) {
715       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
716       CkAbort("ERROR! Too many contributions at root!\n");
717     }
718     DEBR((AA"Passing result to client function\n"AB));
719     CkSetRefNum(result, result->getUserFlag());
720     if (!result->callback.isInvalid())
721             result->callback.send(result);
722     else if (!storedCallback.isInvalid())
723             storedCallback.send(result);
724     else
725             CkAbort("No reduction client!\n"
726                     "You must register a client with either SetReductionClient or during contribute.\n");
727   }
728
729 #else
730   result->gcount+=gcount+adj(redNo).gcount;
731
732   result->secondaryCallback = result->callback;
733   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
734         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
735
736   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
737
738  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
739   
740   // Find our node reduction manager, and pass reduction to him:
741   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
742   nodeMgr->contributeArrayReduction(result);
743 #endif
744 #else                // _FAULT_MLOG_
745   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
746
747     CkSetRefNum(result, result->getUserFlag());
748     if (!result->callback.isInvalid())
749         result->callback.send(result);
750     else if (!storedCallback.isInvalid())
751         storedCallback.send(result);
752     else{
753       DEBR(("No reduction client for group %d \n",thisgroup.idx));
754         CkAbort("No reduction client!\n"
755             "You must register a client with either SetReductionClient or during contribute.\n");
756     }
757
758     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
759
760 #endif               // _FAULT_MLOG_
761
762   //House Keeping Operations will have to check later what needs to be changed
763   redNo++;
764   //Shift the count adjustment vector down one slot (to match new redNo)
765   int i;
766 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
767     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
768   if(CkMyPe()!=0)
769 #endif
770   {
771         completedRedNo++;
772         for (i=1;i<(int)(adjVec.length());i++){
773            adjVec[i-1]=adjVec[i];
774         }
775         adjVec.length()--;  
776   }
777
778   inProgress=CmiFalse;
779   startRequested=CmiFalse;
780   nRemote=nContrib=0;
781
782   //Look through the future queue for messages we can now handle
783   int n=futureMsgs.length();
784   for (i=0;i<n;i++)
785   {
786     CkReductionMsg *m=futureMsgs.deq();
787     if (m!=NULL) //One of these addContributions may have finished us.
788       addContribution(m);//<- if *still* early, puts it back in the queue
789   }
790 #if GROUP_LEVEL_REDUCTION
791   n=futureRemoteMsgs.length();
792   for (i=0;i<n;i++)
793   {
794     CkReductionMsg *m=futureRemoteMsgs.deq();
795     if (m!=NULL) {
796       RecvMsg(m);//<- if *still* early, puts it back in the queue
797     }
798   }
799 #endif
800
801 #if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
802   if(maxStartRequest >= redNo){
803           startReduction(redNo,CkMyPe());
804           finishReduction();
805   }
806  
807 #endif
808
809 }
810
811 //Sent up the reduction tree with reduced data
812   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
813 {
814 #if GROUP_LEVEL_REDUCTION
815 #if CMK_BIGSIM_CHARM
816   _TRACE_BG_TLINE_END(&m->log);
817 #endif
818   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
819     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
820     startReduction(m->redNo, CkMyPe());
821     msgs.enq(m);
822     nRemote++;
823     finishReduction();
824   }
825   else if (isFuture(m->redNo)) {
826     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
827     futureRemoteMsgs.enq(m);
828   } 
829   else CkAbort("Recv'd late remote contribution!\n");
830 #endif
831 }
832
833 //////////// Reduction Manager Utilities /////////////
834
835 //Return the countAdjustment struct for the given redNo:
836 countAdjustment &CkReductionMgr::adj(int number)
837 {
838   number-=completedRedNo;
839   number--;
840   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
841   //Pad the adjustment vector with zeros until it's at least number long
842   while ((int)(adjVec.length())<=number)
843     adjVec.push_back(countAdjustment());
844   return adjVec[number];
845 }
846
847 //Combine (& free) the current message vector msgs.
848 CkReductionMsg *CkReductionMgr::reduceMessages(void)
849 {
850 #if CMK_BIGSIM_CHARM
851   _TRACE_BG_END_EXECUTE(1);
852   void* _bgParentLog = NULL;
853   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
854 #endif
855   CkReductionMsg *ret=NULL;
856
857   //Look through the vector for a valid reducer, swapping out placeholder messages
858   CkReduction::reducerType r=CkReduction::invalid;
859   int msgs_gcount=0;//Reduced gcount
860   int msgs_nSources=0;//Reduced nSources
861   int msgs_userFlag=-1;
862   CkCallback msgs_callback;
863   int i;
864   int nMsgs=0;
865   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
866   CkReductionMsg *m;
867   bool isMigratableContributor;
868
869   // Copy message queue into msgArr, skipping placeholders:
870   while (NULL!=(m=msgs.deq()))
871   {
872     msgs_gcount+=m->gcount;
873     if (m->sourceFlag!=0)
874     { //This is a real message from an element, not just a placeholder
875       msgArr[nMsgs++]=m;
876       msgs_nSources+=m->nSources();
877       r=m->reducer;
878       if (!m->callback.isInvalid())
879         msgs_callback=m->callback;
880       if (m->userFlag!=-1)
881         msgs_userFlag=m->userFlag;
882                         
883         isMigratableContributor=m->isMigratableContributor();
884 #if CMK_BIGSIM_CHARM
885         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
886 #endif
887     }
888     else
889     { //This is just a placeholder message-- forget it
890       delete m;
891     }
892   }
893
894   if (nMsgs==0||r==CkReduction::invalid)
895   //No valid reducer in the whole vector
896     ret=CkReductionMsg::buildNew(0,NULL);
897   else
898   {//Use the reducer to reduce the messages
899                 //if there is only one msg to be reduced just return that message
900     if(nMsgs == 1){
901         ret = msgArr[0];        
902     }else{
903         CkReduction::reducerFn f=CkReduction::reducerTable[r];
904         ret=(*f)(nMsgs,msgArr);
905     }
906     ret->reducer=r;
907   }
908
909
910
911 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
912
913 #if CRITICAL_PATH_DEBUG > 3
914   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
915 #endif
916
917   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
918   path.updateMax(UsrToEnv(ret));
919   // Combine the critical paths from all the reduction messages
920   for (i=0;i<nMsgs;i++){
921     if (msgArr[i]!=ret){
922       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
923       path.updateMax(UsrToEnv(msgArr[i]));
924     }
925   }
926   
927
928 #if CRITICAL_PATH_DEBUG > 3
929   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
930 #endif
931   
932   PathHistoryTableEntry tableEntry(path);
933   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
934   
935 #endif
936
937         //Go back through the vector, deleting old messages
938   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
939   delete [] msgArr;
940
941   //Set the message counts
942   ret->redNo=redNo;
943   ret->gcount=msgs_gcount;
944   ret->userFlag=msgs_userFlag;
945   ret->callback=msgs_callback;
946   ret->sourceFlag=msgs_nSources;
947         ret->setMigratableContributor(isMigratableContributor);
948   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
949
950   return ret;
951 }
952
953
954 //Checkpointing utilities
955 //pack-unpack method for CkReductionMsg
956 //if packing pack the message and then unpack and return it
957 //if unpacking allocate memory for it read it off disk and then unapck
958 //and return it
959 void CkReductionMgr::pup(PUP::er &p)
960 {
961 //We do not store the client function pointer or the client function parameter,
962 //it is the responsibility of the programmer to correctly restore these
963   CkGroupInitCallback::pup(p);
964   p(redNo);
965   p(completedRedNo);
966   p(inProgress); p(creating); p(startRequested);
967   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
968   p|msgs;
969   p|futureMsgs;
970   p|futureRemoteMsgs;
971   p|finalMsgs;
972   p|adjVec;
973   p|nodeProxy;
974   p|storedCallback;
975     // handle CkReductionClientBundle
976   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
977   {
978     CkReductionClientBundle *bd;
979     if (p.isUnpacking()) 
980       bd = new CkReductionClientBundle;
981     else
982       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
983     p|*bd;
984     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
985   }
986
987   // subtle --- Gengbin
988   // Group : CkReductionMgr
989   // CkArray: CkReductionMgr
990   // lcount/gcount in Group is set in Group constructor
991   // lcount/gcount in CkArray is not, it is set when array elements are created
992   // we can not pup because inserting array elems will add the counters again
993 //  p|lcount;
994 //  p|gcount;
995 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
996 //  p|lcount;
997 //  //  p|gcount;
998 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
999 #endif
1000   if(p.isUnpacking()){
1001     thisProxy = thisgroup;
1002     maxStartRequest=0;
1003 #if GROUP_LEVEL_REDUCTION
1004 #ifdef BINOMIAL_TREE
1005     init_BinomialTree();
1006 #else
1007     init_BinaryTree();
1008 #endif
1009 #endif
1010   }
1011
1012   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1013 }
1014
1015
1016 //Callback for doing Reduction through NodeGroups added by Sayantan
1017
1018 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1019
1020         int total = m->gcount+adj(m->redNo).gcount;
1021         finalMsgs.enq(m);
1022         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1023         adj(m->redNo).mainRecvd = 1;
1024         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1025         endArrayReduction();
1026 }
1027
1028 void CkReductionMgr :: endArrayReduction(){
1029         CkReductionMsg *ret=NULL;
1030         int nMsgs=finalMsgs.length();
1031         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1032         //Look through the vector for a valid reducer, swapping out placeholder messages
1033         //CkPrintf("Length of Final Message %d \n",nMsgs);
1034         CkReduction::reducerType r=CkReduction::invalid;
1035         int msgs_gcount=0;//Reduced gcount
1036         int msgs_nSources=0;//Reduced nSources
1037         int msgs_userFlag=-1;
1038         CkCallback msgs_callback;
1039         CkCallback msgs_secondaryCallback;
1040         CkVec<CkReductionMsg *> tempMsgs;
1041         int i;
1042         int numMsgs = 0;
1043         for (i=0;i<nMsgs;i++)
1044         {
1045                 CkReductionMsg *m=finalMsgs.deq();
1046                 if(m->redNo == completedRedNo +1){
1047                         msgs_gcount+=m->gcount;
1048                         if (m->sourceFlag!=0)
1049                         { //This is a real message from an element, not just a placeholder
1050                                 msgs_nSources+=m->nSources();
1051                                 r=m->reducer;
1052                                 if (!m->callback.isInvalid())
1053                                 msgs_callback=m->callback;
1054                                 if(!m->secondaryCallback.isInvalid())
1055                                         msgs_secondaryCallback = m->secondaryCallback;
1056                                 if (m->userFlag!=-1)
1057                                         msgs_userFlag=m->userFlag;
1058                                 tempMsgs.push_back(m);
1059                         }
1060                 }else{
1061                         finalMsgs.enq(m);
1062                 }
1063
1064         }
1065         numMsgs = tempMsgs.length();
1066
1067         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1068
1069         if(numMsgs == 0){
1070                 return;
1071         }
1072         if(adj(completedRedNo+1).mainRecvd == 0){
1073                 for(i=0;i<numMsgs;i++){
1074                         finalMsgs.enq(tempMsgs[i]);
1075                 }
1076                 return;
1077         }
1078
1079 /*
1080         NOT NEEDED ANYMORE DONE at nodegroup level
1081         if(msgs_gcount  > msgs_nSources){
1082                 for(i=0;i<numMsgs;i++){
1083                         finalMsgs.enq(tempMsgs[i]);
1084                 }
1085                 return;
1086         }*/
1087
1088         if (nMsgs==0||r==CkReduction::invalid)
1089                 //No valid reducer in the whole vector
1090                 ret=CkReductionMsg::buildNew(0,NULL);
1091         else{//Use the reducer to reduce the messages
1092                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1093                 // has to be corrected elements from above need to be put into a temporary vector
1094                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1095                 ret=(*f)(numMsgs,msgArr);
1096                 ret->reducer=r;
1097
1098         }
1099
1100         
1101 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1102
1103 #if CRITICAL_PATH_DEBUG > 3
1104         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1105 #endif
1106
1107         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1108         path.updateMax(UsrToEnv(ret));
1109         // Combine the critical paths from all the reduction messages into the header for the new result
1110         for (i=0;i<numMsgs;i++){
1111           if (tempMsgs[i]!=ret){
1112             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1113             path.updateMax(UsrToEnv(tempMsgs[i]));
1114           } else {
1115             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1116           }
1117         }
1118         // Also consider the path which got us into this entry method
1119
1120 #if CRITICAL_PATH_DEBUG > 3
1121         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1122 #endif
1123
1124 #endif
1125   
1126
1127
1128
1129
1130         for(i = 0;i<numMsgs;i++){
1131                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1132         }
1133
1134         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1135         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1136
1137         ret->redNo=completedRedNo+1;
1138         ret->gcount=msgs_gcount;
1139         ret->userFlag=msgs_userFlag;
1140         ret->callback=msgs_callback;
1141         ret->secondaryCallback = msgs_secondaryCallback;
1142         ret->sourceFlag=msgs_nSources;
1143
1144         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1145
1146         CkSetRefNum(ret, ret->getUserFlag());
1147         if (!ret->secondaryCallback.isInvalid())
1148             ret->secondaryCallback.send(ret);
1149     else if (!storedCallback.isInvalid())
1150             storedCallback.send(ret);
1151     else{
1152       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1153             CkAbort("No reduction client!\n"
1154                     "You must register a client with either SetReductionClient or during contribute.\n");
1155     }
1156         completedRedNo++;
1157
1158         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1159
1160         for (i=1;i<(int)(adjVec.length());i++)
1161                 adjVec[i-1]=adjVec[i];
1162         adjVec.length()--;
1163         endArrayReduction();
1164 }
1165
1166 #if GROUP_LEVEL_REDUCTION
1167
1168 void CkReductionMgr::init_BinaryTree(){
1169         parent = (CkMyPe()-1)/TREE_WID;
1170         int firstkid = CkMyPe()*TREE_WID+1;
1171         numKids=CkNumPes()-firstkid;
1172         if (numKids>TREE_WID) numKids=TREE_WID;
1173         if (numKids<0) numKids=0;
1174
1175         for(int i=0;i<numKids;i++){
1176                 kids.push_back(firstkid+i);
1177                 newKids.push_back(firstkid+i);
1178         }
1179 }
1180
1181 void CkReductionMgr::init_BinomialTree(){
1182         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1183         /*upperSize = (unsigned )pow((double)2,depth);*/
1184         upperSize = (unsigned) 1 << depth;
1185         label = upperSize-CkMyPe()-1;
1186         int p=label;
1187         int count=0;
1188         while( p > 0){
1189                 if(p % 2 == 0)
1190                         break;
1191                 else{
1192                         p = p/2;
1193                         count++;
1194                 }
1195         }
1196         /*parent = label + rint(pow((double)2,count));*/
1197         parent = label + (1<<count);
1198         parent = upperSize -1 -parent;
1199         int temp;
1200         if(count != 0){
1201                 numKids = 0;
1202                 for(int i=0;i<count;i++){
1203                         /*temp = label - rint(pow((double)2,i));*/
1204                         temp = label - (1<<i);
1205                         temp = upperSize-1-temp;
1206                         if(temp <= CkNumPes()-1){
1207                                 kids.push_back(temp);
1208                                 numKids++;
1209                         }
1210                 }
1211         }else{
1212                 numKids = 0;
1213         //      kids = NULL;
1214         }
1215 }
1216
1217
1218 int CkReductionMgr::treeRoot(void)
1219 {
1220   return 0;
1221 }
1222 CmiBool CkReductionMgr::hasParent(void) //Root Node
1223 {
1224   return (CmiBool)(CkMyPe()!=treeRoot());
1225 }
1226 int CkReductionMgr::treeParent(void) //My parent Node
1227 {
1228   return parent;
1229 }
1230
1231 int CkReductionMgr::firstKid(void) //My first child Node
1232 {
1233   return CkMyPe()*TREE_WID+1;
1234 }
1235 int CkReductionMgr::treeKids(void)//Number of children in tree
1236 {
1237   return numKids;
1238 }
1239
1240 #endif
1241
1242 /////////////////////////////////////////////////////////////////////////
1243
1244 ////////////////////////////////
1245 //CkReductionMsg support
1246
1247 //ReductionMessage default private constructor-- does nothing
1248 CkReductionMsg::CkReductionMsg(){}
1249 CkReductionMsg::~CkReductionMsg(){}
1250
1251 //This define gives the distance from the start of the CkReductionMsg
1252 // object to the start of the user data area (just below last object field)
1253 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1254
1255 //"Constructor"-- builds and returns a new CkReductionMsg.
1256 //  the "data" array you specify will be copied into this object.
1257 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1258     CkReduction::reducerType reducer, CkReductionMsg *buf)
1259 {
1260   int len[1];
1261   len[0]=NdataSize;
1262   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1263   
1264   ret->dataSize=NdataSize;
1265   if (srcData!=NULL && !buf)
1266     memcpy(ret->data,srcData,NdataSize);
1267   ret->userFlag=-1;
1268   ret->reducer=reducer;
1269   //ret->ci=NULL;
1270   ret->sourceFlag=-1000;
1271   ret->gcount=0;
1272   ret->migratableContributor = true;
1273 #if CMK_BIGSIM_CHARM
1274   ret->log = NULL;
1275 #endif
1276   return ret;
1277 }
1278
1279 // Charm kernel message runtime support:
1280 void *
1281 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1282 {
1283   int totalsize=ARM_DATASTART+(*sz);
1284   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1285   CkReductionMsg *ret = (CkReductionMsg *)
1286     CkAllocMsg(msgnum,totalsize,priobits);
1287   ret->data=(void *)(&ret->dataStorage);
1288   return (void *) ret;
1289 }
1290
1291 void *
1292 CkReductionMsg::pack(CkReductionMsg* in)
1293 {
1294   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1295   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1296   in->data = NULL;
1297   return (void*) in;
1298 }
1299
1300 CkReductionMsg* CkReductionMsg::unpack(void *in)
1301 {
1302   CkReductionMsg *ret = (CkReductionMsg *)in;
1303   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1304   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1305   ret->data=(void *)(&ret->dataStorage);
1306   return ret;
1307 }
1308
1309
1310 /////////////////////////////////////////////////////////////////////////////////////
1311 ///////////////// Builtin Reducer Functions //////////////
1312 /* A simple reducer, like sum_int, looks like this:
1313 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1314 {
1315   int i,ret=0;
1316   for (i=0;i<nMsg;i++)
1317     ret+=*(int *)(msg[i]->getData());
1318   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1319 }
1320
1321 To keep the code small and easy to change, the implementations below
1322 are built with preprocessor macros.
1323 */
1324
1325 //////////////// simple reducers ///////////////////
1326 /*A define used to quickly and tersely construct simple reductions.
1327 The basic idea is to use the first message's data array as
1328 (pre-initialized!) scratch space for folding in the other messages.
1329  */
1330
1331 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1332 {
1333         CkAbort("Called the invalid reducer type 0.  This probably\n"
1334                 "means you forgot to initialize your custom reducer index.\n");
1335         return NULL;
1336 }
1337
1338 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1339 {
1340   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1341 }
1342
1343 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1344 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1345 {\
1346   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1347   int m,i;\
1348   int nElem=msg[0]->getLength()/sizeof(dataType);\
1349   dataType *ret=(dataType *)(msg[0]->getData());\
1350   for (m=1;m<nMsg;m++)\
1351   {\
1352     dataType *value=(dataType *)(msg[m]->getData());\
1353     for (i=0;i<nElem;i++)\
1354     {\
1355       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1356       loop\
1357     }\
1358   }\
1359   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1360   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1361 }
1362
1363 //Use this macro for reductions that have the same type for all inputs
1364 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1365   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1366   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1367   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1368   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1369
1370
1371 //Compute the sum the numbers passed by each element.
1372 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1373
1374 //Compute the product of the numbers passed by each element.
1375 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1376
1377 //Compute the largest number passed by any element.
1378 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1379
1380 //Compute the smallest integer passed by any element.
1381 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1382
1383
1384 //Compute the logical AND of the integers passed by each element.
1385 // The resulting integer will be zero if any source integer is zero; else 1.
1386 SIMPLE_REDUCTION(logical_and,int,"%d",
1387         if (value[i]==0)
1388      ret[i]=0;
1389   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1390 )
1391
1392 //Compute the logical OR of the integers passed by each element.
1393 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1394 SIMPLE_REDUCTION(logical_or,int,"%d",
1395   if (value[i]!=0)
1396            ret[i]=1;
1397   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1398 )
1399
1400 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1401 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1402
1403 //Select one random message to pass on
1404 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1405   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1406 }
1407
1408 /////////////// concat ////////////////
1409 /*
1410 This reducer simply appends the data it recieves from each element,
1411 without any housekeeping data to separate them.
1412 */
1413 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1414 {
1415   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1416   //Figure out how big a message we'll need
1417   int i,retSize=0;
1418   for (i=0;i<nMsg;i++)
1419       retSize+=msg[i]->getSize();
1420
1421   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1422
1423   //Allocate a new message
1424   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1425
1426   //Copy the source message data into the return message
1427   char *cur=(char *)(ret->getData());
1428   for (i=0;i<nMsg;i++) {
1429     int messageBytes=msg[i]->getSize();
1430     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1431     cur+=messageBytes;
1432   }
1433   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1434   return ret;
1435 }
1436
1437 /////////////// set ////////////////
1438 /*
1439 This reducer appends the data it recieves from each element
1440 along with some housekeeping data indicating contribution boundaries.
1441 The message data is thus a list of reduction_set_element structures
1442 terminated by a dummy reduction_set_element with a sourceElement of -1.
1443 */
1444
1445 //This rounds an integer up to the nearest multiple of sizeof(double)
1446 static const int alignSize=sizeof(double);
1447 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1448
1449 //This gives the size (in bytes) of a reduction_set_element
1450 static int SET_SIZE(int dataSize)
1451 {return SET_ALIGN(sizeof(int)+dataSize);}
1452
1453 //This returns a pointer to the next reduction_set_element in the list
1454 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1455 {
1456   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1457   return (CkReduction::setElement *)next;
1458 }
1459
1460 //Combine the data passed by each element into an list of reduction_set_elements.
1461 // Each element may contribute arbitrary data (with arbitrary length).
1462 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1463 {
1464   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1465   //Figure out how big a message we'll need
1466   int i,retSize=0;
1467   for (i=0;i<nMsg;i++) {
1468     if (!msg[i]->isFromUser())
1469     //This message is composite-- it will just be copied over (less terminating -1)
1470       retSize+=(msg[i]->getSize()-sizeof(int));
1471     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1472       retSize+=SET_SIZE(msg[i]->getSize());
1473   }
1474   retSize+=sizeof(int);//Leave room for terminating -1.
1475
1476   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1477
1478   //Allocate a new message
1479   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1480
1481   //Copy the source message data into the return message
1482   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1483   for (i=0;i<nMsg;i++)
1484     if (!msg[i]->isFromUser())
1485     {//This message is composite-- just copy it over (less terminating -1)
1486                         int messageBytes=msg[i]->getSize()-sizeof(int);
1487                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1488                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1489                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1490     }
1491     else //This is a message from an element-- wrap it in a reduction_set_element
1492     {
1493       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1494       cur->dataSize=msg[i]->getSize();
1495       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1496       cur=SET_NEXT(cur);
1497     }
1498   cur->dataSize=-1;//Add a terminating -1.
1499   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1500   return ret;
1501 }
1502
1503 //Utility routine: get the next reduction_set_element in the list
1504 // if there is one, or return NULL if there are none.
1505 //To get all the elements, just keep feeding this procedure's output back to
1506 // its input until it returns NULL.
1507 CkReduction::setElement *CkReduction::setElement::next(void)
1508 {
1509   CkReduction::setElement *n=SET_NEXT(this);
1510   if (n->dataSize==-1)
1511     return NULL;//This is the end of the list
1512   else
1513     return n;//This is just another element
1514 }
1515
1516 /////////////////// Reduction Function Table /////////////////////
1517 CkReduction::CkReduction() {} //Dummy private constructor
1518
1519 //Add the given reducer to the list.  Returns the new reducer's
1520 // reducerType.  Must be called in the same order on every node.
1521 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1522 {
1523   reducerTable[nReducers]=fn;
1524   return (reducerType)nReducers++;
1525 }
1526
1527 /*Reducer table: maps reducerTypes to reducerFns.
1528 It's indexed by reducerType, so the order in this table
1529 must *exactly* match the reducerType enum declaration.
1530 The names don't have to match, but it helps.
1531 */
1532 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1533
1534 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1535     ::invalid_reducer,
1536     ::nop,
1537   //Compute the sum the numbers passed by each element.
1538     ::sum_int,::sum_long,::sum_float,::sum_double,
1539
1540   //Compute the product the numbers passed by each element.
1541     ::product_int,::product_long,::product_float,::product_double,
1542
1543   //Compute the largest number passed by any element.
1544     ::max_int,::max_long,::max_float,::max_double,
1545
1546   //Compute the smallest number passed by any element.
1547     ::min_int,::min_long,::min_float,::min_double,
1548
1549   //Compute the logical AND of the integers passed by each element.
1550   // The resulting integer will be zero if any source integer is zero.
1551     ::logical_and,
1552
1553   //Compute the logical OR of the integers passed by each element.
1554   // The resulting integer will be 1 if any source integer is nonzero.
1555     ::logical_or,
1556
1557     // Compute the logical bitvector AND of the integers passed by each element.
1558     ::bitvec_and,
1559
1560     // Compute the logical bitvector OR of the integers passed by each element.
1561     ::bitvec_or,
1562
1563     // Select one of the messages at random to pass on
1564     ::random,
1565
1566   //Concatenate the (arbitrary) data passed by each element
1567     ::concat,
1568
1569   //Combine the data passed by each element into an list of setElements.
1570   // Each element may contribute arbitrary data (with arbitrary length).
1571     ::set
1572 };
1573
1574
1575
1576
1577
1578
1579
1580
1581 /********** Code added by Sayantan *********************/
1582 /** Locking is a big problem in the nodegroup code for smp.
1583  So a few assumptions have had to be made. There is one lock
1584  called lockEverything. It protects all the data structures 
1585  of the nodegroup reduction mgr. I tried grabbing it separately 
1586  for each datastructure, modifying it and then releasing it and
1587  then grabbing it again, for the next change.
1588  That doesn't really help because the interleaved execution of 
1589  different threads makes the state of the reduction manager 
1590  inconsistent. 
1591  
1592  1. Grab lockEverything before calling finishreduction or startReduction
1593     or doRecvMsg
1594  2. lockEverything is grabbed only in entry methods reductionStarting
1595     or RecvMesg or  addcontribution.
1596  ****/
1597  
1598 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1599 NodeGroup::NodeGroup(void) {
1600   __nodelock=CmiCreateLock();
1601 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1602     mlogData->objID.type = TypeNodeGroup;
1603     mlogData->objID.data.group.onPE = CkMyNode();
1604 #endif
1605
1606 }
1607 NodeGroup::~NodeGroup() {
1608   CmiDestroyLock(__nodelock);
1609 }
1610 void NodeGroup::pup(PUP::er &p)
1611 {
1612   CkNodeReductionMgr::pup(p);
1613   p|reductionInfo;
1614 }
1615
1616 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1617
1618 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1619   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1620  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1621   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1622  }
1623
1624 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1625                                     ((CkNodeReductionMgr *)this),
1626                                     reductionInfo,false)
1627
1628 /* this contribute also adds up the count across all messages it receives.
1629   Useful for summing up number of array elements who have contributed ****/ 
1630 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1631         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1632
1633
1634
1635 //#define BINOMIAL_TREE
1636
1637 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1638   : thisProxy(thisgroup)
1639 {
1640 #ifdef BINOMIAL_TREE
1641   init_BinomialTree();
1642 #else
1643   init_BinaryTree();
1644 #endif
1645   storedCallback=NULL;
1646   redNo=0;
1647   inProgress=CmiFalse;
1648   
1649   startRequested=CmiFalse;
1650   gcount=CkNumNodes();
1651   lcount=1;
1652   nContrib=nRemote=0;
1653   lockEverything = CmiCreateLock();
1654
1655
1656   creating=CmiFalse;
1657   interrupt = 0;
1658   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1659         /*
1660                 FAULT_EVAC
1661         */
1662         blocked = false;
1663         maxModificationRedNo = INT_MAX;
1664         killed=0;
1665         additionalGCount = newAdditionalGCount = 0;
1666 }
1667
1668 void CkNodeReductionMgr::flushStates()
1669 {
1670  if(CkMyRank() == 0){
1671   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1672   redNo=0;
1673   inProgress=CmiFalse;
1674
1675   startRequested=CmiFalse;
1676   gcount=CkNumNodes();
1677   lcount=1;
1678   nContrib=nRemote=0;
1679
1680   creating=CmiFalse;
1681   interrupt = 0;
1682   while (!msgs.isEmpty()) { delete msgs.deq(); }
1683   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1684   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1685   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1686   }
1687 }
1688
1689 //////////// Reduction Manager Client API /////////////
1690
1691 //Add the given client function.  Overwrites any previous client.
1692 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1693 {
1694   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1695   if(cb->isInvalid()){
1696         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1697   }else{
1698         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1699   }
1700
1701   if (CkMyNode()!=0)
1702           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1703   delete storedCallback;
1704   storedCallback=cb;
1705 }
1706
1707
1708
1709 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1710 {
1711 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1712     Chare *oldObj =CpvAccess(_currentObj);
1713     CpvAccess(_currentObj) = this;
1714 #endif
1715
1716   //m->ci=ci;
1717   m->redNo=ci->redNo++;
1718   m->sourceFlag=-1;//A single contribution
1719   m->gcount=0;
1720   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1721   addContribution(m);
1722
1723 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1724     CpvAccess(_currentObj) = oldObj;
1725 #endif
1726 }
1727
1728
1729 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1730 {
1731 #if CMK_BIGSIM_CHARM
1732   _TRACE_BG_TLINE_END(&m->log);
1733 #endif
1734 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1735     Chare *oldObj =CpvAccess(_currentObj);
1736     CpvAccess(_currentObj) = this;
1737 #endif
1738   //m->ci=ci;
1739   m->redNo=ci->redNo++;
1740   m->gcount=count;
1741   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1742   addContribution(m);
1743   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1744
1745 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1746     CpvAccess(_currentObj) = oldObj;
1747 #endif
1748 }
1749
1750
1751 //////////// Reduction Manager Remote Entry Points /////////////
1752
1753 //Sent down the reduction tree (used by barren PEs)
1754 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1755 {
1756   CmiLock(lockEverything);
1757   if(blocked){
1758         delete m;
1759         CmiUnlock(lockEverything);
1760         return ;
1761   }
1762   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1763   if (isPresent(m->num) && !inProgress)
1764   {
1765     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1766     startReduction(m->num,srcNode);
1767     finishReduction();
1768   } else if (isFuture(m->num)){
1769         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1770   }
1771   else //is Past
1772     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1773   CmiUnlock(lockEverything);
1774   delete m;
1775
1776 }
1777
1778
1779 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1780         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1781         /*
1782                 FAULT_EVAC
1783         */
1784         if(blocked){
1785                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1786                 bufferedRemoteMsgs.enq(m);
1787                 return;
1788         }
1789         
1790         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1791             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1792             startReduction(m->redNo,CkMyNode());
1793             msgs.enq(m);
1794             nRemote++;
1795             finishReduction();
1796         }
1797         else {
1798             if (isFuture(m->redNo)) {
1799                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1800                     futureRemoteMsgs.enq(m);
1801             }else{
1802                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1803                    CkAbort("Recv'd late remote contribution!\n");
1804             }
1805         }
1806         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1807 }
1808
1809 //Sent up the reduction tree with reduced data
1810 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1811 {
1812 #if CMK_BIGSIM_CHARM
1813   _TRACE_BG_TLINE_END(&m->log);
1814 #endif
1815 #ifndef CMK_CPV_IS_SMP
1816 #if CMK_IMMEDIATE_MSG
1817         if(interrupt == 1){
1818                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1819                 CpvAccess(_qd)->process(-1);
1820                 CmiDelayImmediate();
1821                 return;
1822         }
1823 #endif  
1824 #endif
1825    interrupt = 1;       
1826    CmiLock(lockEverything);   
1827    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1828    doRecvMsg(m);
1829    CmiUnlock(lockEverything);    
1830    interrupt = 0;
1831    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1832 }
1833
1834 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1835 {
1836         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1837         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1838         if (inProgress){
1839                 DEBR((AA"This Node reduction is already in progress\n"AB));
1840                 return;//This reduction already started
1841         }
1842         if (creating) //Don't start yet-- we're creating elements
1843         {
1844                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1845                 startRequested=CmiTrue;
1846                 return;
1847         }
1848         
1849         //If none of these cases, we need to start the reduction--
1850         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1851         inProgress=CmiTrue;
1852
1853         if(!_isNotifyChildInRed) return;
1854
1855         //Sent start requests to our kids (in case they don't already know)
1856         
1857         for (int k=0;k<treeKids();k++)
1858         {
1859 #ifdef BINOMIAL_TREE
1860                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1861                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1862 #else
1863                 if(kids[k] != srcNode){
1864                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1865                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1866                 }       
1867 #endif
1868         }
1869
1870         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1871         // in case, nodegroup does not has the attached red group,
1872         // it has to restart groups again
1873         startLocalGroupReductions(number);
1874 /*
1875         if (startLocalGroupReductions(number) == 0)
1876           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1877 */
1878 }
1879
1880 // restart local groups until succeed
1881 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1882   CmiLock(lockEverything);    
1883   if (startLocalGroupReductions(number) == 0)
1884     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1885   CmiUnlock(lockEverything);    
1886 }
1887
1888 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1889         /*
1890                 FAULT_EVAC
1891         */
1892         if(blocked){
1893                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1894                 bufferedMsgs.enq(m);
1895                 return;
1896         }
1897         
1898         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1899                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1900                 futureMsgs.enq(m);
1901         } else {// An ordinary contribution
1902                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1903                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1904                 startReduction(m->redNo,CkMyNode());
1905                 msgs.enq(m);
1906                 nContrib++;
1907                 finishReduction();
1908         }
1909 }
1910
1911 //Handle a message from one element for the reduction
1912 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1913 {
1914   interrupt = 1;
1915   CmiLock(lockEverything);
1916   doAddContribution(m);
1917   CmiUnlock(lockEverything);
1918   interrupt = 0;
1919 }
1920
1921 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1922         if(blocked){
1923                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1924                 bufferedMsgs.enq(m);
1925                 return;
1926         }
1927         
1928         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1929                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1930 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1931                 futureLateMigrantMsgs.enq(m);
1932         } else {// An ordinary contribution
1933                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1934 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1935                 msgs.enq(m);
1936                 finishReduction();
1937         }
1938 }
1939
1940
1941
1942
1943
1944 /** check if the nodegroup reduction is finished at this node. In that case send it
1945 up the reduction tree **/
1946
1947 void CkNodeReductionMgr::finishReduction(void)
1948 {
1949   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1950   /***Check if reduction is finished in the next few ifs***/
1951   if ((!inProgress) || creating){
1952         DEBR((AA"Either not in Progress or creating\n"AB));
1953         return;
1954   }
1955
1956   if (nContrib<(lcount)){
1957         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1958
1959          return;//Need more local messages
1960   }
1961   if (nRemote<treeKids()){
1962         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1963
1964         return;//Need more remote messages
1965   }
1966   if (nRemote>treeKids()){
1967
1968           interrupt = 0;
1969            CkAbort("Nodegrp Excess remote reduction message received!\n");
1970   }
1971
1972   DEBR((AA"Reducing node data...\n"AB));
1973
1974   /**reduce all messages received at this node **/
1975   CkReductionMsg *result=reduceMessages();
1976
1977   if (hasParent())
1978   {//Pass data up tree to parent
1979         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1980         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1981         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1982                 /*
1983                         FAULT_EVAC
1984                 */
1985                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1986             thisProxy[treeParent()].RecvMsg(result);
1987         }
1988
1989   }
1990   else
1991   {
1992                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1993                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
1994                         msgs.enq(result);
1995                         return;
1996                 }
1997                 result->gcount += additionalGCount;
1998           /** if the reduction is finished and I am the root of the reduction tree
1999           then call the reductionhandler and other stuff ***/
2000                 
2001
2002                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2003     CkSetRefNum(result, result->getUserFlag());
2004     if (!result->callback.isInvalid()){
2005       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2006             result->callback.send(result);
2007     }
2008     else if (storedCallback!=NULL){
2009       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2010             storedCallback->send(result);
2011     }
2012     else{
2013                 DEBR((AA"Invalid Callback \n"AB));
2014             CkAbort("No reduction client!\n"
2015                     "You must register a client with either SetReductionClient or during contribute.\n");
2016                 }
2017   }
2018
2019   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2020   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2021   redNo++;
2022         updateTree();
2023   int i;
2024   inProgress=CmiFalse;
2025   startRequested=CmiFalse;
2026   nRemote=nContrib=0;
2027
2028   //Look through the future queue for messages we can now handle
2029   int n=futureMsgs.length();
2030
2031   for (i=0;i<n;i++)
2032   {
2033     interrupt = 1;
2034
2035     CkReductionMsg *m=futureMsgs.deq();
2036
2037     interrupt = 0;
2038     if (m!=NULL){ //One of these addContributions may have finished us.
2039       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2040       doAddContribution(m);//<- if *still* early, puts it back in the queue
2041     }
2042   }
2043
2044   interrupt = 1;
2045
2046   n=futureRemoteMsgs.length();
2047
2048   interrupt = 0;
2049   for (i=0;i<n;i++)
2050   {
2051     interrupt = 1;
2052
2053     CkReductionMsg *m=futureRemoteMsgs.deq();
2054
2055     interrupt = 0;
2056     if (m!=NULL)
2057       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2058   }
2059   
2060   n = futureLateMigrantMsgs.length();
2061   for(i=0;i<n;i++){
2062     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2063     if(m != NULL){
2064       if(m->redNo == redNo){
2065         msgs.enq(m);
2066       }else{
2067         futureLateMigrantMsgs.enq(m);
2068       }
2069     }
2070   }
2071 }
2072
2073 //////////// Reduction Manager Utilities /////////////
2074
2075 void CkNodeReductionMgr::init_BinaryTree(){
2076         parent = (CkMyNode()-1)/TREE_WID;
2077         int firstkid = CkMyNode()*TREE_WID+1;
2078         numKids=CkNumNodes()-firstkid;
2079   if (numKids>TREE_WID) numKids=TREE_WID;
2080   if (numKids<0) numKids=0;
2081
2082         for(int i=0;i<numKids;i++){
2083                 kids.push_back(firstkid+i);
2084                 newKids.push_back(firstkid+i);
2085         }
2086 }
2087
2088 void CkNodeReductionMgr::init_BinomialTree(){
2089         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2090         /*upperSize = (unsigned )pow((double)2,depth);*/
2091         upperSize = (unsigned) 1 << depth;
2092         label = upperSize-CkMyNode()-1;
2093         int p=label;
2094         int count=0;
2095         while( p > 0){
2096                 if(p % 2 == 0)
2097                         break;
2098                 else{
2099                         p = p/2;
2100                         count++;
2101                 }
2102         }
2103         /*parent = label + rint(pow((double)2,count));*/
2104         parent = label + (1<<count);
2105         parent = upperSize -1 -parent;
2106         int temp;
2107         if(count != 0){
2108                 numKids = 0;
2109                 for(int i=0;i<count;i++){
2110                         /*temp = label - rint(pow((double)2,i));*/
2111                         temp = label - (1<<i);
2112                         temp = upperSize-1-temp;
2113                         if(temp <= CkNumNodes()-1){
2114                 //              kids[numKids] = temp;
2115                                 kids.push_back(temp);
2116                                 numKids++;
2117                         }
2118                 }
2119         }else{
2120                 numKids = 0;
2121         //      kids = NULL;
2122         }
2123 }
2124
2125
2126 int CkNodeReductionMgr::treeRoot(void)
2127 {
2128   return 0;
2129 }
2130 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2131 {
2132   return (CmiBool)(CkMyNode()!=treeRoot());
2133 }
2134 int CkNodeReductionMgr::treeParent(void) //My parent Node
2135 {
2136   return parent;
2137 }
2138
2139 int CkNodeReductionMgr::firstKid(void) //My first child Node
2140 {
2141   return CkMyNode()*TREE_WID+1;
2142 }
2143 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2144 {
2145 #ifdef BINOMIAL_TREE
2146         return numKids;
2147 #else
2148 /*  int nKids=CkNumNodes()-firstKid();
2149   if (nKids>TREE_WID) nKids=TREE_WID;
2150   if (nKids<0) nKids=0;
2151   return nKids;*/
2152         return numKids;
2153 #endif
2154 }
2155
2156 //Combine (& free) the current message vector msgs.
2157 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2158 {
2159 #if CMK_BIGSIM_CHARM
2160   _TRACE_BG_END_EXECUTE(1);
2161   void* _bgParentLog = NULL;
2162   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2163 #endif
2164   CkReductionMsg *ret=NULL;
2165
2166   //Look through the vector for a valid reducer, swapping out placeholder messages
2167   CkReduction::reducerType r=CkReduction::invalid;
2168   int msgs_gcount=0;//Reduced gcount
2169   int msgs_nSources=0;//Reduced nSources
2170   int msgs_userFlag=-1;
2171   CkCallback msgs_callback;
2172   CkCallback msgs_secondaryCallback;
2173   int i;
2174   int nMsgs=0;
2175   CkReductionMsg *m;
2176   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2177   bool isMigratableContributor;
2178         
2179
2180   while(NULL!=(m=msgs.deq()))
2181   {
2182     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2183     msgs_gcount+=m->gcount;
2184     if (m->sourceFlag!=0)
2185     { //This is a real message from an element, not just a placeholder
2186       msgArr[nMsgs++]=m;
2187       msgs_nSources+=m->nSources();
2188       r=m->reducer;
2189       if (!m->callback.isInvalid())
2190         msgs_callback=m->callback;
2191       if(!m->secondaryCallback.isInvalid()){
2192         msgs_secondaryCallback = m->secondaryCallback;
2193       }
2194       if (m->userFlag!=-1)
2195         msgs_userFlag=m->userFlag;
2196
2197         isMigratableContributor= m->isMigratableContributor();
2198 #if CMK_BIGSIM_CHARM
2199       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2200 #endif
2201                                 
2202     }
2203     else
2204     { //This is just a placeholder message-- replace it
2205       delete m;
2206     }
2207   }
2208
2209   if (nMsgs==0||r==CkReduction::invalid)
2210   //No valid reducer in the whole vector
2211     ret=CkReductionMsg::buildNew(0,NULL);
2212   else
2213   {//Use the reducer to reduce the messages
2214     if(nMsgs == 1){
2215         ret = msgArr[0];
2216     }else{
2217         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2218         ret=(*f)(nMsgs,msgArr);
2219     }
2220     ret->reducer=r;
2221   }
2222
2223         
2224 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2225 #if CRITICAL_PATH_DEBUG > 3
2226         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2227 #endif
2228         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2229         path.updateMax(UsrToEnv(ret));
2230         // Combine the critical paths from all the reduction messages into the header for the new result
2231         for (i=0;i<nMsgs;i++){
2232           if (msgArr[i]!=ret){
2233             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2234             path.updateMax(UsrToEnv(msgArr[i]));
2235           } else {
2236             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2237           }
2238         }
2239 #if CRITICAL_PATH_DEBUG > 3
2240         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2241 #endif
2242
2243 #endif
2244
2245
2246         //Go back through the vector, deleting old messages
2247   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2248   delete [] msgArr;
2249   //Set the message counts
2250   ret->redNo=redNo;
2251   ret->gcount=msgs_gcount;
2252   ret->userFlag=msgs_userFlag;
2253   ret->callback=msgs_callback;
2254   ret->secondaryCallback = msgs_secondaryCallback;
2255   ret->sourceFlag=msgs_nSources;
2256   ret->setMigratableContributor(isMigratableContributor);
2257   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2258 #if CMK_BIGSIM_CHARM
2259   _TRACE_BG_TLINE_END(&ret->log);
2260 #endif
2261
2262   return ret;
2263 }
2264
2265 void CkNodeReductionMgr::pup(PUP::er &p)
2266 {
2267 //We do not store the client function pointer or the client function parameter,
2268 //it is the responsibility of the programmer to correctly restore these
2269   IrrGroup::pup(p);
2270   p(redNo);
2271   p(inProgress); p(creating); p(startRequested);
2272   p(lcount);
2273   p(nContrib); p(nRemote);
2274   p(interrupt);
2275   p|msgs;
2276   p|futureMsgs;
2277   p|futureRemoteMsgs;
2278   p|futureLateMigrantMsgs;
2279   p|parent;
2280   p|additionalGCount;
2281   p|newAdditionalGCount;
2282   if(p.isUnpacking()) {
2283     gcount=CkNumNodes();
2284     thisProxy = thisgroup;
2285     lockEverything = CmiCreateLock();
2286 #ifdef BINOMIAL_TREE
2287     init_BinomialTree();
2288 #else
2289     init_BinaryTree();
2290 #endif          
2291   }
2292   p | blocked;
2293   p | maxModificationRedNo;
2294
2295 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2296   int isnull = (storedCallback == NULL);
2297   p | isnull;
2298   if (!isnull) {
2299     if (p.isUnpacking()) {
2300       storedCallback = new CkCallback;
2301     }
2302     p|*storedCallback;
2303   }
2304 #endif
2305
2306 }
2307
2308 /*
2309         FAULT_EVAC
2310         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2311         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2312         reduction tree. 
2313 */
2314 void CkNodeReductionMgr::evacuate(){
2315         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2316         if(treeKids() == 0){
2317         /*
2318                 if the node going down is a leaf
2319         */
2320                 oldleaf=true;
2321                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2322                 /*
2323                         Need to ask parent for the reduction number that it has seen. 
2324                         Since it is a leaf, the tree does not need to be rewired. 
2325                         We reuse the oldparent type of tree modification message to get 
2326                         the parent to block and tell us about the highest reduction number it has seen.
2327                         
2328                 */
2329                 int data[2];
2330                 data[0]=CkMyNode();
2331                 data[1]=getTotalGCount()+additionalGCount;
2332                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2333                 newParent = treeParent();
2334         }else{
2335                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2336                 oldleaf= false;
2337         /*
2338                 It is not a leaf. It needs to rewire the tree around itself.
2339                 It also needs to decide on a reduction No after which the new tree will be used
2340                 Till it decides on the new tree and the redNo at which it becomes valid,
2341                 all received messages will be buffered
2342         */
2343                 newParent = kids[0];
2344                 for(int i=numKids-1;i>=0;i--){
2345                         newKids.remove(i);
2346                 }
2347                 /*
2348                         Ask everybody for the highest reduction number they have seen and
2349                         also tell them about the new tree
2350                 */
2351                 /*
2352                         Tell parent about its new child;
2353                 */
2354                 int oldParentData[2];
2355                 oldParentData[0] = CkMyNode();
2356                 oldParentData[1] = newParent;
2357                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2358
2359                 /*
2360                         Tell the other children about their new parent
2361                 */
2362                 int childrenData=newParent;
2363                 for(int i=1;i<numKids;i++){
2364                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2365                 }
2366                 
2367                 /*
2368                         Tell newParent (1st child) about its new children,
2369                         the current node and its children except the newParent
2370                 */
2371                 int *newParentData = new int[numKids+2];
2372                 for(int i=1;i<numKids;i++){
2373                         newParentData[i] = kids[i];
2374                 }
2375                 newParentData[0] = CkMyNode();
2376                 newParentData[numKids] = parent;
2377                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2378                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2379         }
2380         readyDeletion = false;
2381         blocked = true;
2382         numModificationReplies = 0;
2383         tempModificationRedNo = findMaxRedNo();
2384 }
2385
2386 /*
2387         Depending on the code, use the data to change the tree
2388         1. OLDPARENT : replace the old child with a new one
2389         2. OLDCHILDREN: replace the parent
2390         3. NEWPARENT:  add the children and change the parent
2391         4. LEAFPARENT: delete the old child
2392 */
2393
2394 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2395         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2396         int sender;
2397         newKids = kids;
2398         readyDeletion = false;
2399         newAdditionalGCount = additionalGCount;
2400         switch(code){
2401                 case OLDPARENT: 
2402                         for(int i=0;i<numKids;i++){
2403                                 if(newKids[i] == data[0]){
2404                                         newKids[i] = data[1];
2405                                         break;
2406                                 }
2407                         }
2408                         sender = data[0];
2409                         newParent = parent;
2410                         break;
2411                 case OLDCHILDREN:
2412                         newParent = data[0];
2413                         sender = parent;
2414                         break;
2415                 case NEWPARENT:
2416                         for(int i=0;i<size-2;i++){
2417                                 newKids.push_back(data[i]);
2418                         }
2419                         newParent = data[size-2];
2420                         newAdditionalGCount += data[size-1];
2421                         sender = parent;
2422                         break;
2423                 case LEAFPARENT:
2424                         for(int i=0;i<numKids;i++){
2425                                 if(newKids[i] == data[0]){
2426                                         newKids.remove(i);
2427                                         break;
2428                                 }
2429                         }
2430                         sender = data[0];
2431                         newParent = parent;
2432                         newAdditionalGCount += data[1];
2433                         break;
2434         };
2435         blocked = true;
2436         int maxRedNo = findMaxRedNo();
2437         
2438         thisProxy[sender].collectMaxRedNo(maxRedNo);
2439 }
2440
2441 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2442         /*
2443                 Find out the maximum redNo that has been seen by 
2444                 the affected nodes
2445         */
2446         numModificationReplies++;
2447         if(maxRedNo > tempModificationRedNo){
2448                 tempModificationRedNo = maxRedNo;
2449         }
2450         if(numModificationReplies == numKids+1){
2451                 maxModificationRedNo = tempModificationRedNo;
2452                 /*
2453                         when all the affected nodes have replied, tell them the maximum.
2454                         Unblock yourself. deal with the buffered messages local and remote
2455                 */
2456                 if(maxModificationRedNo == -1){
2457                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2458                 }else{
2459                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2460                 }
2461                 thisProxy[parent].unblockNode(maxModificationRedNo);
2462                 for(int i=0;i<numKids;i++){
2463                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2464                 }
2465                 blocked = false;
2466                 updateTree();
2467                 clearBlockedMsgs();
2468         }
2469 }
2470
2471 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2472         maxModificationRedNo = maxRedNo;
2473         updateTree();
2474         blocked = false;
2475         clearBlockedMsgs();
2476 }
2477
2478
2479 void CkNodeReductionMgr::clearBlockedMsgs(){
2480         int len = bufferedMsgs.length();
2481         for(int i=0;i<len;i++){
2482                 CkReductionMsg *m = bufferedMsgs.deq();
2483                 doAddContribution(m);
2484         }
2485         len = bufferedRemoteMsgs.length();
2486         for(int i=0;i<len;i++){
2487                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2488                 doRecvMsg(m);
2489         }
2490
2491 }
2492 /*
2493         if the reduction number exceeds the maxModificationRedNo, change the tree
2494         to become the new one
2495 */
2496
2497 void CkNodeReductionMgr::updateTree(){
2498         if(redNo > maxModificationRedNo){
2499                 parent = newParent;
2500                 kids = newKids;
2501                 maxModificationRedNo = INT_MAX;
2502                 numKids = kids.size();
2503                 readyDeletion = true;
2504                 additionalGCount = newAdditionalGCount;
2505                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2506                 for(int i=0;i<(int)(newKids.size());i++){
2507                         DEBREVAC(("%d ",newKids[i]));
2508                 }
2509                 DEBREVAC(("\n"));
2510         //      startReduction(redNo,CkMyNode());
2511         }else{
2512                 if(maxModificationRedNo != INT_MAX){
2513                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2514                         startReduction(redNo,CkMyNode());
2515                         finishReduction();
2516                 }       
2517         }
2518 }
2519
2520
2521 void CkNodeReductionMgr::doneEvacuate(){
2522         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2523 /*      if(oldleaf){
2524                 
2525                         It used to be a leaf
2526                         Then as soon as future messages have been emptied you can 
2527                         send the parent a message telling them that they are not going
2528                         to receive anymore messages from this child
2529                 
2530                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2531                 while(futureMsgs.length() != 0){
2532                         int n = futureMsgs.length();
2533                         for(int i=0;i<n;i++){
2534                                 CkReductionMsg *m = futureMsgs.deq();
2535                                 if(isPresent(m->redNo)){
2536                                         msgs.enq(m);
2537                                 }else{
2538                                         futureMsgs.enq(m);
2539                                 }
2540                         }
2541                         CkReductionMsg *result = reduceMessages();
2542                         thisProxy[treeParent()].RecvMsg(result);
2543                         redNo++;
2544                 }
2545                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2546                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2547         }else{*/
2548                 if(readyDeletion){
2549                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2550                 }else{
2551                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2552                 }
2553 //      }
2554 }
2555
2556 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2557         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2558         for(int i=0;i<numKids;i++){
2559                 if(kids[i] == deletedChild){
2560                         kids.remove(i);
2561                         break;
2562                 }
2563         }
2564         numKids = kids.length();
2565         finishReduction();
2566 }
2567
2568 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2569         for(int i=0;i<(int)(newKids.length());i++){
2570                 if(newKids[i] == deletedChild){
2571                         newKids.remove(i);
2572                         break;
2573                 }
2574         }
2575         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2576         for(int i=0;i<(int)(newKids.size());i++){
2577                 DEBREVAC(("%d ",newKids[i]));
2578         }
2579         DEBREVAC(("\n"));
2580         finishReduction();
2581 }
2582
2583 int CkNodeReductionMgr::findMaxRedNo(){
2584         int max = redNo;
2585         for(int i=0;i<futureRemoteMsgs.length();i++){
2586                 if(futureRemoteMsgs[i]->redNo  > max){
2587                         max = futureRemoteMsgs[i]->redNo;
2588                 }
2589         }
2590         /*
2591                 if redNo is max (that is no future message) and the current reduction has not started
2592                 then tree can be changed before the reduction redNo can be started
2593         */ 
2594         if(redNo == max && msgs.length() == 0){
2595                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2596                 max--;
2597         }
2598         return max;
2599 }
2600
2601 // initnode call. check the size of reduction table
2602 void CkReductionMgr::sanitycheck()
2603 {
2604 #if CMK_ERROR_CHECKING
2605   int count = 0;
2606   while (CkReduction::reducerTable[count] != NULL) count++;
2607   CmiAssert(CkReduction::nReducers == count);
2608 #endif
2609 }
2610
2611 #include "CkReduction.def.h"