c09fea060a4402e12d737d3cb0e7bb70772f191e
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
54 #include "pathHistory.h"
55 #endif
56
57 #if CMK_DEBUG_REDUCTIONS
58 //Debugging messages:
59 // Reduction mananger internal information:
60 #define DEBR(x) CkPrintf x
61 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
62 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
63
64 #define DEBN(x) CkPrintf x
65 #define AAN "Red Node%d "
66 #define ABN ,CkMyNode()
67
68 // For status and data messages from the builtin reducer functions.
69 #define RED_DEB(x) //CkPrintf x
70 #define DEBREVAC(x) CkPrintf x
71 #else
72 //No debugging info-- empty defines
73 #define DEBR(x) // CkPrintf x
74 #define DEBRMLOG(x) CkPrintf x
75 #define AA
76 #define AB
77 #define DEBN(x) //CkPrintf x
78 #define RED_DEB(x) //CkPrintf x
79 #define DEBREVAC(x) //CkPrintf x
80 #endif
81
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
85
86 extern int _inrestart;
87
88 Group::Group()
89 {
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
91
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
96         DEBR(("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr)));
97 #if !GROUP_LEVEL_REDUCTION
98         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
99         nodeProxy = nodetemp;
100 #endif
101 }
102
103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
104 {
105         creatingContributors();
106         contributorStamped(&reductionInfo);
107         contributorCreated(&reductionInfo);
108         doneCreatingContributors();
109 }
110
111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
112                                     ((CkReductionMgr *)this),
113                                     reductionInfo,false)
114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
115
116
117
118 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 /*
120 The callback is just used to tell the caller that this group
121 has been constructed.  (Now they can safely call CkLocalBranch)
122 */
123 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124 {
125   m->call();
126   delete m;
127 }
128
129 /*
130 The callback is just used to tell the caller that this group
131 is constructed and ready to process other calls.
132 */
133 CkGroupReadyCallback::CkGroupReadyCallback(void)
134 {
135   _isReady = 0;
136 }
137 void
138 CkGroupReadyCallback::callBuffered(void)
139 {
140   int n = _msgs.length();
141   for(int i=0;i<n;i++)
142   {
143     CkGroupCallbackMsg *msg = _msgs.deq();
144     msg->call();
145     delete msg;
146   }
147 }
148 void
149 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150 {
151   if(_isReady) {
152     msg->call();
153     delete msg;
154   } else {
155     _msgs.enq(msg);
156   }
157 }
158
159 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
160         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
161 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 {
163         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
164         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
165         b->fn(b->param,m->getSize(),m->getData());
166         delete m;
167 }
168
169 ///////////////// Reduction Manager //////////////////
170 /*
171 One CkReductionMgr runs a non-overlapping set of reductions.
172 It collects messages from all local contributors, then sends
173 the reduced message up the reduction tree to node zero, where
174 they're passed to the user's client function.
175 */
176
177 CkReductionMgr::CkReductionMgr()//Constructor
178   : thisProxy(thisgroup)
179
180 #if GROUP_LEVEL_REDUCTION
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_BinaryTree();
185 #endif
186 #endif
187   redNo=0;
188   completedRedNo = -1;
189   inProgress=CmiFalse;
190   creating=CmiFalse;
191   startRequested=CmiFalse;
192   gcount=lcount=0;
193   nContrib=nRemote=0;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196     if(CkMyPe() != 0){
197         perProcessorCounts = NULL;
198     }else{
199         perProcessorCounts = new int[CmiNumPes()];
200         for(int i=0;i<CmiNumPes();i++){
201             perProcessorCounts[i] = -1;
202         }
203     }
204     totalCount = 0;
205     processorCount = 0;
206 #endif
207   disableNotifyChildrenStart = CmiFalse;
208   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
209 }
210
211 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
212 {
213   redNo=0;
214   completedRedNo = -1;
215   inProgress=CmiFalse;
216   creating=CmiFalse;
217   startRequested=CmiFalse;
218   gcount=lcount=0;
219   nContrib=nRemote=0;
220   maxStartRequest=0;
221   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
222 }
223
224 void CkReductionMgr::flushStates(int isgroup)
225 {
226   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
227   redNo=0;
228   completedRedNo = -1;
229   inProgress=CmiFalse;
230   creating=CmiFalse;
231   if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
232   startRequested=CmiFalse;
233   nContrib=nRemote=0;
234   maxStartRequest=0;
235
236   while (!msgs.isEmpty()) { delete msgs.deq(); }
237   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
238   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
239   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
240
241   adjVec.length()=0;
242
243 #if ! GROUP_LEVEL_REDUCTION
244   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
245 #endif
246 }
247
248 //////////// Reduction Manager Client API /////////////
249
250 //Add the given client function.  Overwrites any previous client.
251 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
252 {
253   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
254
255   if (CkMyPe()!=0)
256           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
257   storedCallback=*cb;
258 #if ! GROUP_LEVEL_REDUCTION
259   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
260   nodeProxy.ckSetReductionClient(callback);
261 #endif
262 }
263
264 ///////////////////////////// Contributor ////////////////////////
265 //Contributors keep a copy of this structure:
266
267 /*Contributor migration support:
268 */
269 void contributorInfo::pup(PUP::er &p)
270 {
271   p(redNo);
272 }
273
274 ////////////////////// Contributor list maintainance: /////////////////
275 //These just set and clear the "creating" flag to prevent
276 // reductions from finishing early because not all elements
277 // have been created.
278 void CkReductionMgr::creatingContributors(void)
279 {
280   DEBR((AA"Creating contributors...\n"AB));
281   creating=CmiTrue;
282 }
283 void CkReductionMgr::doneCreatingContributors(void)
284 {
285   DEBR((AA"Done creating contributors...\n"AB));
286   creating=CmiFalse;
287   if (startRequested) startReduction(redNo,CkMyPe());
288   finishReduction();
289 }
290
291 //A new contributor will be created
292 void CkReductionMgr::contributorStamped(contributorInfo *ci)
293 {
294   DEBR((AA"Contributor %p stamped\n"AB,ci));
295   //There is another contributor
296   gcount++;
297   if (inProgress)
298   {
299     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
300     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
301   } else
302     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
303 }
304
305 //A new contributor was actually created
306 void CkReductionMgr::contributorCreated(contributorInfo *ci)
307 {
308   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
309   //We've got another contributor
310   lcount++;
311   //He may not need to contribute to some of our reductions:
312   for (int r=redNo;r<ci->redNo;r++)
313     adj(r).lcount--;//He won't be contributing to r here
314 }
315
316 /*Don't expect any more contributions from this one.
317 This is rather horrifying because we now have to make
318 sure the global element count accurately reflects all the
319 contributions the element made before it died-- these may stretch
320 far into the future.  The adj() vector is what saves us here.
321 */
322 void CkReductionMgr::contributorDied(contributorInfo *ci)
323 {
324 #if CMK_MEM_CHECKPOINT
325   // ignore from listener if it is during restart from crash
326   if (CkInRestarting()) return;
327 #endif
328   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
329   //We lost a contributor
330   gcount--;
331
332   if (ci->redNo<redNo)
333   {//Must have been migrating during reductions-- root is waiting for his
334   // contribution, which will never come.
335     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
336     for (int r=ci->redNo;r<redNo;r++)
337       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
338   }
339
340   //Add to the global count for all his future messages (wherever they are)
341   int r;
342   for (r=redNo;r<ci->redNo;r++)
343   {//He already contributed to this reduction, but won't show up in global count.
344     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
345     adj(r).gcount++;
346   }
347
348   lcount--;
349   //He's already contributed to several reductions here
350   for (r=redNo;r<ci->redNo;r++)
351     adj(r).lcount++;//He'll be contributing to r here
352
353   finishReduction();
354 }
355
356 //Migrating away (note that global count doesn't change)
357 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
358 {
359   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
360   lcount--;//We lost a local
361   //He's already contributed to several reductions here
362   for (int r=redNo;r<ci->redNo;r++)
363     adj(r).lcount++;//He'll be contributing to r here
364
365   finishReduction();
366 }
367
368 //Migrating in (note that global count doesn't change)
369 void CkReductionMgr::contributorArriving(contributorInfo *ci)
370 {
371   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
372   lcount++;//We gained a local
373 #if CMK_MEM_CHECKPOINT
374   // ignore from listener if it is during restart from crash
375   // because the ci may be old.
376   if (CkInRestarting()) return;
377 #endif
378   //He has already contributed (elsewhere) to several reductions:
379   for (int r=redNo;r<ci->redNo;r++)
380     adj(r).lcount--;//He won't be contributing to r here
381
382 }
383
384 //Contribute-- the given msg can contain any data.  The reducerType
385 // field of the message must be valid.
386 // Each contributor must contribute exactly once to the each reduction.
387 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
388 {
389 #if CMK_BIGSIM_CHARM
390   _TRACE_BG_TLINE_END(&(m->log));
391 #endif
392   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
393   //m->ci=ci;
394   m->redNo=ci->redNo++;
395   m->sourceFlag=-1;//A single contribution
396   m->gcount=0;
397
398 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
399     if(lcount == 0){
400         m->sourceProcessorCount = 1;
401     }else{
402         m->sourceProcessorCount = lcount;
403     }
404     m->fromPE = CmiMyPe();
405     Chare *oldObj =CpvAccess(_currentObj);
406     char currentObjName[100];
407     DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
408     thisProxy[0].contributeViaMessage(m);
409 #else
410   addContribution(m);
411 #endif
412 }
413
414 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
415 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
416     CmiAssert(CmiMyPe() == 0);
417     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
418     if(redNo == 0){
419         if(perProcessorCounts[m->fromPE] == -1){
420             processorCount++;
421             perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
422             DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
423         }else{
424             if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
425                 DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
426                 perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
427             }
428         }
429         if(processorCount == CmiNumPes()){
430             totalCount = 0;
431             for(int i=0;i<CmiNumPes();i++){
432                 CmiAssert(perProcessorCounts[i] != -1);
433                 totalCount += perProcessorCounts[i];
434             }
435             DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
436         }
437         if(m->sourceProcessorCount == 0){
438             if(processorCount == CmiNumPes()){
439                 finishReduction();
440             }
441             return;
442         }
443     }
444     addContribution(m);
445 }
446 #else
447 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
448 #endif
449
450 //////////// Reduction Manager Remote Entry Points /////////////
451 //Sent down the reduction tree (used by barren PEs)
452 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
453 {
454  if(CkMyPe()==0){
455         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
456         //delete m;
457         //return;
458  }
459  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
460  int srcPE = (UsrToEnv(m))->getSrcPe();
461 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
462   if (isPresent(m->num) && !inProgress)
463   {
464     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
465     startReduction(m->num,srcPE);
466     finishReduction();
467   } else if (isFuture(m->num)){
468 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
469           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
470           if(maxStartRequest < m->num){
471                   maxStartRequest = m->num;
472           }
473  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
474           
475     }
476   else //is Past
477     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
478   delete m;
479 #else
480     if(redNo == 0){
481         if(lcount == 0){
482             DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
483             CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
484             dummy->fromPE = CmiMyPe();
485             dummy->sourceProcessorCount = 0;
486             dummy->redNo = 0;
487             thisProxy[0].contributeViaMessage(dummy);
488         }
489     }
490 #endif
491 }
492
493 //Sent to root of reduction tree with reduction contribution
494 // of migrants that missed the main reduction.
495 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
496 {
497 #if GROUP_LEVEL_REDUCTION
498 #if CMK_BIGSIM_CHARM
499   _TRACE_BG_TLINE_END(&(m->log));
500 #endif
501   addContribution(m);
502 #else
503   m->secondaryCallback = m->callback;
504   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
505   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
506   nodeMgr->LateMigrantMsg(m);
507 /*      int len = finalMsgs.length();
508         finalMsgs.enq(m);
509 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
510         endArrayReduction();*/
511 #endif
512 }
513
514 //A late migrating contributor will never contribute to this reduction
515 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
516 {
517   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
518   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
519  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
520   adj(m->num).gcount--;//He won't be contributing to this one.
521   finishReduction();
522   delete m;
523 }
524
525 //////////// Reduction Manager State /////////////
526 void CkReductionMgr::startReduction(int number,int srcPE)
527 {
528   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
529   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
530   if (inProgress){
531         DEBR((AA"This reduction is already in progress\n"AB));
532         return;//This reduction already started
533   }
534   if (creating) //Don't start yet-- we're creating elements
535   {
536     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
537     startRequested=CmiTrue;
538     return;
539   }
540
541 //If none of these cases, we need to start the reduction--
542   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
543   inProgress=CmiTrue;
544  
545
546         /*
547                 FAULT_EVAC
548         */
549   if(!CmiNodeAlive(CkMyPe())){
550         return;
551   }
552
553   if(disableNotifyChildrenStart) return;
554   
555 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
556   if(CmiMyPe() == 0 && redNo == 0){
557             for(int j=0;j<CkNumPes();j++){
558                 if(j != CkMyPe() && j != srcPE){
559                         thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
560                 }
561             }
562             if(lcount == 0){
563                 CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
564                 dummy->fromPE = CmiMyPe();
565                 dummy->sourceProcessorCount = 0;
566                 dummy->redNo = 0;
567                 thisProxy[0].contributeViaMessage(dummy);
568             }
569     }   else{
570         thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
571     }
572
573
574 #else
575   //Sent start requests to our kids (in case they don't already know)
576 #if GROUP_LEVEL_REDUCTION
577   for (int k=0;k<treeKids();k++)
578   {
579     DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
580     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
581   }
582 #else
583   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
584 #endif
585 #endif
586         
587         /*
588   int temp;
589   //making it a broadcast done only by PE 0
590   if(!hasParent()){
591                 temp = completedRedNo+1;
592                 for(int i=temp;i<=number;i++){
593                         for(int j=0;j<CkNumPes();j++){
594                                 if(j != CkMyPe() && j != srcPE){
595                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
596                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
597                                         }
598                                 }
599                         }
600                 }       
601         }       else{
602                 temp = number;
603         }*/
604 /*  if(!hasParent()){
605                 temp = completedRedNo+1;
606         }       else{
607                 temp = number;
608         }
609         for(int i=temp;i<=number;i++){
610         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
611                 if(hasParent()){
612           // kick-start your parent too ...
613                         if(treeParent() != srcPE){
614                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
615 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
616     CpvAccess(_currentObj) = oldObj;
617 #endif
618                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
619                                 }       
620                         }       
621                 }
622           for (int k=0;k<treeKids();k++)
623           {
624                         if(firstKid()+k != srcPE){
625                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
626                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
627                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
628                                 }       
629                         }       
630         }
631         }
632         */
633 }       
634
635 /*Handle a message from one element for the reduction*/
636 void CkReductionMgr::addContribution(CkReductionMsg *m)
637 {
638   if (isPast(m->redNo))
639   {
640 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
641         CmiAbort("this version should not have late migrations");
642 #else
643         //We've moved on-- forward late contribution straight to root
644     DEBR((AA"Migrant gives late contribution for #%d!\n"AB,m->redNo));
645         // if (!hasParent()) //Root moved on too soon-- should never happen
646         //   CkAbort("Late reduction contribution received at root!\n");
647     thisProxy[0].LateMigrantMsg(m);
648 #endif
649   }
650   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
651     DEBR((AA"Contributor gives early contribution-- for #%d\n"AB,m->redNo));
652     futureMsgs.enq(m);
653   } else {// An ordinary contribution
654     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
655    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
656     startReduction(m->redNo,CkMyPe());
657     msgs.enq(m);
658     nContrib++;
659     finishReduction();
660   }
661 }
662
663 /**function checks if it has got all contributions that it is supposed to
664 get at this processor. If it is done it sends the reduced result to the local
665 nodegroup */
666 void CkReductionMgr::finishReduction(void)
667 {
668   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
669   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
670   if ((!inProgress) || creating){
671         DEBR((AA"Either not in Progress or creating\n"AB));
672         return;
673   }
674   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
675 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
676
677   if (nContrib<(lcount+adj(redNo).lcount)){
678          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
679          return;//Need more local messages
680   }
681 #if GROUP_LEVEL_REDUCTION
682   if (nRemote<treeKids()) return;//Need more remote messages
683 #endif
684  
685 #else
686
687     if(CkMyPe() != 0){
688         if(redNo != 0){
689             return;
690         }else{
691             CmiAssert(lcount == 0);
692             CkReductionMsg *dummy = reduceMessages();
693             dummy->fromPE = CmiMyPe();
694             dummy->sourceProcessorCount = 0;
695             thisProxy[0].contributeViaMessage(dummy);
696             return;
697         }
698     }
699     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
700         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
701          return;//Need more local messages
702   }else{
703         DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
704     }
705
706
707 #endif
708
709   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
710   CkReductionMsg *result=reduceMessages();
711   result->redNo=redNo;
712 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
713
714 #if GROUP_LEVEL_REDUCTION
715   if (hasParent())
716   {//Pass data up tree to parent
717     DEBR((AA"Passing reduced data up to parent node %d.\n"AB,treeParent()));
718     DEBR((AA"Message gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
719     result->gcount+=gcount+adj(redNo).gcount;
720     thisProxy[treeParent()].RecvMsg(result);
721   }
722   else 
723   {//We are root-- pass data to client
724     DEBR((AA"Final gcount is %d+%d+%d.\n"AB,result->gcount,gcount,adj(redNo).gcount));
725     int totalElements=result->gcount+gcount+adj(redNo).gcount;
726     if (totalElements>result->nSources()) 
727     {
728       DEBR((AA"Only got %d of %d contributions (c'mon, migrators!)\n"AB,result->nSources(),totalElements));
729       msgs.enq(result);
730       return; // Wait for migrants to contribute
731     } else if (totalElements<result->nSources()) {
732       DEBR((AA"Got %d of %d contributions\n"AB,result->nSources(),totalElements));
733       CkAbort("ERROR! Too many contributions at root!\n");
734     }
735     DEBR((AA"Passing result to client function\n"AB));
736     CkSetRefNum(result, result->getUserFlag());
737     if (!result->callback.isInvalid())
738             result->callback.send(result);
739     else if (!storedCallback.isInvalid())
740             storedCallback.send(result);
741     else
742             CkAbort("No reduction client!\n"
743                     "You must register a client with either SetReductionClient or during contribute.\n");
744   }
745
746 #else
747   result->gcount+=gcount+adj(redNo).gcount;
748
749   result->secondaryCallback = result->callback;
750   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
751         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
752
753   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
754
755  // DEBR(("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount));
756   
757   // Find our node reduction manager, and pass reduction to him:
758   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
759   nodeMgr->contributeArrayReduction(result);
760 #endif
761 #else                // _FAULT_MLOG_
762   DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer()));
763
764     CkSetRefNum(result, result->getUserFlag());
765     if (!result->callback.isInvalid())
766         result->callback.send(result);
767     else if (!storedCallback.isInvalid())
768         storedCallback.send(result);
769     else{
770       DEBR(("No reduction client for group %d \n",thisgroup.idx));
771         CkAbort("No reduction client!\n"
772             "You must register a client with either SetReductionClient or during contribute.\n");
773     }
774
775     DEBR(("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer()));
776
777 #endif               // _FAULT_MLOG_
778
779   //House Keeping Operations will have to check later what needs to be changed
780   redNo++;
781   //Shift the count adjustment vector down one slot (to match new redNo)
782   int i;
783 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
784         if(CkMyPe()!=0){
785 #else
786     {
787 #endif
788 //      int i;
789         completedRedNo++;
790         for (i=1;i<(int)(adjVec.length());i++){
791            adjVec[i-1]=adjVec[i];
792         }
793         adjVec.length()--;  
794   }
795
796   inProgress=CmiFalse;
797   startRequested=CmiFalse;
798   nRemote=nContrib=0;
799
800   //Look through the future queue for messages we can now handle
801   int n=futureMsgs.length();
802   for (i=0;i<n;i++)
803   {
804     CkReductionMsg *m=futureMsgs.deq();
805     if (m!=NULL) //One of these addContributions may have finished us.
806       addContribution(m);//<- if *still* early, puts it back in the queue
807   }
808 #if GROUP_LEVEL_REDUCTION
809   n=futureRemoteMsgs.length();
810   for (i=0;i<n;i++)
811   {
812     CkReductionMsg *m=futureRemoteMsgs.deq();
813     if (m!=NULL) {
814       RecvMsg(m);//<- if *still* early, puts it back in the queue
815     }
816   }
817 #endif
818
819 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
820   if(maxStartRequest >= redNo){
821           startReduction(redNo,CkMyPe());
822           finishReduction();
823   }
824  
825 #endif
826 }
827
828 //Sent up the reduction tree with reduced data
829   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
830 {
831 #if GROUP_LEVEL_REDUCTION
832 #if CMK_BIGSIM_CHARM
833   _TRACE_BG_TLINE_END(&m->log);
834 #endif
835   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
836     DEBR((AA"Recv'd remote contribution %d for #%d\n"AB,nRemote,m->redNo));
837     startReduction(m->redNo, CkMyPe());
838     msgs.enq(m);
839     nRemote++;
840     finishReduction();
841   }
842   else if (isFuture(m->redNo)) {
843     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
844     futureRemoteMsgs.enq(m);
845   } 
846   else CkAbort("Recv'd late remote contribution!\n");
847 #endif
848 }
849
850 //////////// Reduction Manager Utilities /////////////
851
852 //Return the countAdjustment struct for the given redNo:
853 countAdjustment &CkReductionMgr::adj(int number)
854 {
855   number-=completedRedNo;
856   number--;
857   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
858   //Pad the adjustment vector with zeros until it's at least number long
859   while ((int)(adjVec.length())<=number)
860     adjVec.push_back(countAdjustment());
861   return adjVec[number];
862 }
863
864 //Combine (& free) the current message vector msgs.
865 CkReductionMsg *CkReductionMgr::reduceMessages(void)
866 {
867 #if CMK_BIGSIM_CHARM
868   _TRACE_BG_END_EXECUTE(1);
869   void* _bgParentLog = NULL;
870   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
871 #endif
872   CkReductionMsg *ret=NULL;
873
874   //Look through the vector for a valid reducer, swapping out placeholder messages
875   CkReduction::reducerType r=CkReduction::invalid;
876   int msgs_gcount=0;//Reduced gcount
877   int msgs_nSources=0;//Reduced nSources
878   int msgs_userFlag=-1;
879   CkCallback msgs_callback;
880   int i;
881   int nMsgs=0;
882   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
883   CkReductionMsg *m;
884   bool isMigratableContributor;
885
886   // Copy message queue into msgArr, skipping placeholders:
887   while (NULL!=(m=msgs.deq()))
888   {
889     msgs_gcount+=m->gcount;
890     if (m->sourceFlag!=0)
891     { //This is a real message from an element, not just a placeholder
892       msgArr[nMsgs++]=m;
893       msgs_nSources+=m->nSources();
894       r=m->reducer;
895       if (!m->callback.isInvalid())
896         msgs_callback=m->callback;
897       if (m->userFlag!=-1)
898         msgs_userFlag=m->userFlag;
899                         
900         isMigratableContributor=m->isMigratableContributor();
901 #if CMK_BIGSIM_CHARM
902         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
903 #endif
904     }
905     else
906     { //This is just a placeholder message-- forget it
907       delete m;
908     }
909   }
910
911   if (nMsgs==0||r==CkReduction::invalid)
912   //No valid reducer in the whole vector
913     ret=CkReductionMsg::buildNew(0,NULL);
914   else
915   {//Use the reducer to reduce the messages
916                 //if there is only one msg to be reduced just return that message
917     if(nMsgs == 1){
918         ret = msgArr[0];        
919     }else{
920         CkReduction::reducerFn f=CkReduction::reducerTable[r];
921         ret=(*f)(nMsgs,msgArr);
922     }
923     ret->reducer=r;
924   }
925
926
927
928 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
929
930 #if CRITICAL_PATH_DEBUG > 3
931   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
932 #endif
933
934   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
935   path.updateMax(UsrToEnv(ret));
936   // Combine the critical paths from all the reduction messages
937   for (i=0;i<nMsgs;i++){
938     if (msgArr[i]!=ret){
939       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
940       path.updateMax(UsrToEnv(msgArr[i]));
941     }
942   }
943   
944
945 #if CRITICAL_PATH_DEBUG > 3
946   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
947 #endif
948   
949   PathHistoryTableEntry tableEntry(path);
950   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
951   
952 #endif
953
954         //Go back through the vector, deleting old messages
955   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
956         delete [] msgArr;
957
958   //Set the message counts
959   ret->redNo=redNo;
960   ret->gcount=msgs_gcount;
961   ret->userFlag=msgs_userFlag;
962   ret->callback=msgs_callback;
963   ret->sourceFlag=msgs_nSources;
964         ret->setMigratableContributor(isMigratableContributor);
965   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
966
967   return ret;
968 }
969
970
971 //Checkpointing utilities
972 //pack-unpack method for CkReductionMsg
973 //if packing pack the message and then unpack and return it
974 //if unpacking allocate memory for it read it off disk and then unapck
975 //and return it
976 void CkReductionMgr::pup(PUP::er &p)
977 {
978 //We do not store the client function pointer or the client function parameter,
979 //it is the responsibility of the programmer to correctly restore these
980   CkGroupInitCallback::pup(p);
981   p(redNo);
982   p(completedRedNo);
983   p(inProgress); p(creating); p(startRequested);
984   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
985   p|msgs;
986   p|futureMsgs;
987   p|futureRemoteMsgs;
988   p|finalMsgs;
989   p|adjVec;
990   p|nodeProxy;
991   p|storedCallback;
992     // handle CkReductionClientBundle
993   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
994   {
995     CkReductionClientBundle *bd;
996     if (p.isUnpacking()) 
997       bd = new CkReductionClientBundle;
998     else
999       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1000     p|*bd;
1001     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1002   }
1003
1004   // subtle --- Gengbin
1005   // Group : CkReductionMgr
1006   // CkArray: CkReductionMgr
1007   // lcount/gcount in Group is set in Group constructor
1008   // lcount/gcount in CkArray is not, it is set when array elements are created
1009   // we can not pup because inserting array elems will add the counters again
1010 //  p|lcount;
1011 //  p|gcount;
1012 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1013 //  p|lcount;
1014 //  //  p|gcount;
1015 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1016 #endif
1017   if(p.isUnpacking()){
1018     thisProxy = thisgroup;
1019     maxStartRequest=0;
1020 #if GROUP_LEVEL_REDUCTION
1021 #ifdef BINOMIAL_TREE
1022     init_BinomialTree();
1023 #else
1024     init_BinaryTree();
1025 #endif
1026 #endif
1027   }
1028
1029   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1030 }
1031
1032
1033 //Callback for doing Reduction through NodeGroups added by Sayantan
1034
1035 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
1036
1037         int total = m->gcount+adj(m->redNo).gcount;
1038         finalMsgs.enq(m);
1039         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
1040         adj(m->redNo).mainRecvd = 1;
1041         DEBR(("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer()));
1042         endArrayReduction();
1043 }
1044
1045 void CkReductionMgr :: endArrayReduction(){
1046         CkReductionMsg *ret=NULL;
1047         int nMsgs=finalMsgs.length();
1048         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
1049         //Look through the vector for a valid reducer, swapping out placeholder messages
1050         //CkPrintf("Length of Final Message %d \n",nMsgs);
1051         CkReduction::reducerType r=CkReduction::invalid;
1052         int msgs_gcount=0;//Reduced gcount
1053         int msgs_nSources=0;//Reduced nSources
1054         int msgs_userFlag=-1;
1055         CkCallback msgs_callback;
1056         CkCallback msgs_secondaryCallback;
1057         CkVec<CkReductionMsg *> tempMsgs;
1058         int i;
1059         int numMsgs = 0;
1060         for (i=0;i<nMsgs;i++)
1061         {
1062                 CkReductionMsg *m=finalMsgs.deq();
1063                 if(m->redNo == completedRedNo +1){
1064                         msgs_gcount+=m->gcount;
1065                         if (m->sourceFlag!=0)
1066                         { //This is a real message from an element, not just a placeholder
1067                                 msgs_nSources+=m->nSources();
1068                                 r=m->reducer;
1069                                 if (!m->callback.isInvalid())
1070                                 msgs_callback=m->callback;
1071                                 if(!m->secondaryCallback.isInvalid())
1072                                         msgs_secondaryCallback = m->secondaryCallback;
1073                                 if (m->userFlag!=-1)
1074                                         msgs_userFlag=m->userFlag;
1075                                 tempMsgs.push_back(m);
1076                         }
1077                 }else{
1078                         finalMsgs.enq(m);
1079                 }
1080
1081         }
1082         numMsgs = tempMsgs.length();
1083
1084         DEBR(("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd));
1085
1086         if(numMsgs == 0){
1087                 return;
1088         }
1089         if(adj(completedRedNo+1).mainRecvd == 0){
1090                 for(i=0;i<numMsgs;i++){
1091                         finalMsgs.enq(tempMsgs[i]);
1092                 }
1093                 return;
1094         }
1095
1096 /*
1097         NOT NEEDED ANYMORE DONE at nodegroup level
1098         if(msgs_gcount  > msgs_nSources){
1099                 for(i=0;i<numMsgs;i++){
1100                         finalMsgs.enq(tempMsgs[i]);
1101                 }
1102                 return;
1103         }*/
1104
1105         if (nMsgs==0||r==CkReduction::invalid)
1106                 //No valid reducer in the whole vector
1107                 ret=CkReductionMsg::buildNew(0,NULL);
1108         else{//Use the reducer to reduce the messages
1109                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
1110                 // has to be corrected elements from above need to be put into a temporary vector
1111                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
1112                 ret=(*f)(numMsgs,msgArr);
1113                 ret->reducer=r;
1114
1115         }
1116
1117         
1118 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1119
1120 #if CRITICAL_PATH_DEBUG > 3
1121         CkPrintf("[%d] combining critical path information from messages in CkReductionMgr::endArrayReduction(). numMsgs=%d\n", CkMyPe(), numMsgs);
1122 #endif
1123
1124         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
1125         path.updateMax(UsrToEnv(ret));
1126         // Combine the critical paths from all the reduction messages into the header for the new result
1127         for (i=0;i<numMsgs;i++){
1128           if (tempMsgs[i]!=ret){
1129             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1130             path.updateMax(UsrToEnv(tempMsgs[i]));
1131           } else {
1132             //  CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(tempMsgs[i])->pathHistory.getTime() );
1133           }
1134         }
1135         // Also consider the path which got us into this entry method
1136
1137 #if CRITICAL_PATH_DEBUG > 3
1138         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
1139 #endif
1140
1141 #endif
1142   
1143
1144
1145
1146
1147         for(i = 0;i<numMsgs;i++){
1148                 if (tempMsgs[i] != ret) delete tempMsgs[i];
1149         }
1150
1151         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
1152         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
1153
1154         ret->redNo=completedRedNo+1;
1155         ret->gcount=msgs_gcount;
1156         ret->userFlag=msgs_userFlag;
1157         ret->callback=msgs_callback;
1158         ret->secondaryCallback = msgs_secondaryCallback;
1159         ret->sourceFlag=msgs_nSources;
1160
1161         DEBR(("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer()));
1162
1163         CkSetRefNum(ret, ret->getUserFlag());
1164         if (!ret->secondaryCallback.isInvalid())
1165             ret->secondaryCallback.send(ret);
1166     else if (!storedCallback.isInvalid())
1167             storedCallback.send(ret);
1168     else{
1169       DEBR(("No reduction client for group %d \n",thisgroup.idx));
1170             CkAbort("No reduction client!\n"
1171                     "You must register a client with either SetReductionClient or during contribute.\n");
1172     }
1173         completedRedNo++;
1174
1175         DEBR(("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer()));
1176
1177         for (i=1;i<(int)(adjVec.length());i++)
1178                 adjVec[i-1]=adjVec[i];
1179         adjVec.length()--;
1180         endArrayReduction();
1181 }
1182
1183 #if GROUP_LEVEL_REDUCTION
1184
1185 void CkReductionMgr::init_BinaryTree(){
1186         parent = (CkMyPe()-1)/TREE_WID;
1187         int firstkid = CkMyPe()*TREE_WID+1;
1188         numKids=CkNumPes()-firstkid;
1189         if (numKids>TREE_WID) numKids=TREE_WID;
1190         if (numKids<0) numKids=0;
1191
1192         for(int i=0;i<numKids;i++){
1193                 kids.push_back(firstkid+i);
1194                 newKids.push_back(firstkid+i);
1195         }
1196 }
1197
1198 void CkReductionMgr::init_BinomialTree(){
1199         int depth = (int )ceil((log((double )CkNumPes())/log((double)2)));
1200         /*upperSize = (unsigned )pow((double)2,depth);*/
1201         upperSize = (unsigned) 1 << depth;
1202         label = upperSize-CkMyPe()-1;
1203         int p=label;
1204         int count=0;
1205         while( p > 0){
1206                 if(p % 2 == 0)
1207                         break;
1208                 else{
1209                         p = p/2;
1210                         count++;
1211                 }
1212         }
1213         /*parent = label + rint(pow((double)2,count));*/
1214         parent = label + (1<<count);
1215         parent = upperSize -1 -parent;
1216         int temp;
1217         if(count != 0){
1218                 numKids = 0;
1219                 for(int i=0;i<count;i++){
1220                         /*temp = label - rint(pow((double)2,i));*/
1221                         temp = label - (1<<i);
1222                         temp = upperSize-1-temp;
1223                         if(temp <= CkNumPes()-1){
1224                                 kids.push_back(temp);
1225                                 numKids++;
1226                         }
1227                 }
1228         }else{
1229                 numKids = 0;
1230         //      kids = NULL;
1231         }
1232 }
1233
1234
1235 int CkReductionMgr::treeRoot(void)
1236 {
1237   return 0;
1238 }
1239 CmiBool CkReductionMgr::hasParent(void) //Root Node
1240 {
1241   return (CmiBool)(CkMyPe()!=treeRoot());
1242 }
1243 int CkReductionMgr::treeParent(void) //My parent Node
1244 {
1245   return parent;
1246 }
1247
1248 int CkReductionMgr::firstKid(void) //My first child Node
1249 {
1250   return CkMyPe()*TREE_WID+1;
1251 }
1252 int CkReductionMgr::treeKids(void)//Number of children in tree
1253 {
1254   return numKids;
1255 }
1256
1257 #endif
1258
1259 /////////////////////////////////////////////////////////////////////////
1260
1261 ////////////////////////////////
1262 //CkReductionMsg support
1263
1264 //ReductionMessage default private constructor-- does nothing
1265 CkReductionMsg::CkReductionMsg(){}
1266 CkReductionMsg::~CkReductionMsg(){}
1267
1268 //This define gives the distance from the start of the CkReductionMsg
1269 // object to the start of the user data area (just below last object field)
1270 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1271
1272 //"Constructor"-- builds and returns a new CkReductionMsg.
1273 //  the "data" array you specify will be copied into this object.
1274 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1275     CkReduction::reducerType reducer, CkReductionMsg *buf)
1276 {
1277   int len[1];
1278   len[0]=NdataSize;
1279   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
1280   
1281   ret->dataSize=NdataSize;
1282   if (srcData!=NULL && !buf)
1283     memcpy(ret->data,srcData,NdataSize);
1284   ret->userFlag=-1;
1285   ret->reducer=reducer;
1286   //ret->ci=NULL;
1287   ret->sourceFlag=-1000;
1288   ret->gcount=0;
1289   ret->migratableContributor = true;
1290 #if CMK_BIGSIM_CHARM
1291   ret->log = NULL;
1292 #endif
1293   return ret;
1294 }
1295
1296 // Charm kernel message runtime support:
1297 void *
1298 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1299 {
1300   int totalsize=ARM_DATASTART+(*sz);
1301   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1302   CkReductionMsg *ret = (CkReductionMsg *)
1303     CkAllocMsg(msgnum,totalsize,priobits);
1304   ret->data=(void *)(&ret->dataStorage);
1305   return (void *) ret;
1306 }
1307
1308 void *
1309 CkReductionMsg::pack(CkReductionMsg* in)
1310 {
1311   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1312   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1313   in->data = NULL;
1314   return (void*) in;
1315 }
1316
1317 CkReductionMsg* CkReductionMsg::unpack(void *in)
1318 {
1319   CkReductionMsg *ret = (CkReductionMsg *)in;
1320   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1321   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1322   ret->data=(void *)(&ret->dataStorage);
1323   return ret;
1324 }
1325
1326
1327 /////////////////////////////////////////////////////////////////////////////////////
1328 ///////////////// Builtin Reducer Functions //////////////
1329 /* A simple reducer, like sum_int, looks like this:
1330 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1331 {
1332   int i,ret=0;
1333   for (i=0;i<nMsg;i++)
1334     ret+=*(int *)(msg[i]->getData());
1335   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1336 }
1337
1338 To keep the code small and easy to change, the implementations below
1339 are built with preprocessor macros.
1340 */
1341
1342 //////////////// simple reducers ///////////////////
1343 /*A define used to quickly and tersely construct simple reductions.
1344 The basic idea is to use the first message's data array as
1345 (pre-initialized!) scratch space for folding in the other messages.
1346  */
1347
1348 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1349 {
1350         CkAbort("Called the invalid reducer type 0.  This probably\n"
1351                 "means you forgot to initialize your custom reducer index.\n");
1352         return NULL;
1353 }
1354
1355 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1356 {
1357   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1358 }
1359
1360 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1361 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1362 {\
1363   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1364   int m,i;\
1365   int nElem=msg[0]->getLength()/sizeof(dataType);\
1366   dataType *ret=(dataType *)(msg[0]->getData());\
1367   for (m=1;m<nMsg;m++)\
1368   {\
1369     dataType *value=(dataType *)(msg[m]->getData());\
1370     for (i=0;i<nElem;i++)\
1371     {\
1372       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
1373       loop\
1374     }\
1375   }\
1376   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1377   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1378 }
1379
1380 //Use this macro for reductions that have the same type for all inputs
1381 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1382   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1383   SIMPLE_REDUCTION(nameBase##_long,CmiInt8,"%d",loop) \
1384   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1385   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1386
1387
1388 //Compute the sum the numbers passed by each element.
1389 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1390
1391 //Compute the product of the numbers passed by each element.
1392 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1393
1394 //Compute the largest number passed by any element.
1395 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1396
1397 //Compute the smallest integer passed by any element.
1398 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1399
1400
1401 //Compute the logical AND of the integers passed by each element.
1402 // The resulting integer will be zero if any source integer is zero; else 1.
1403 SIMPLE_REDUCTION(logical_and,int,"%d",
1404         if (value[i]==0)
1405      ret[i]=0;
1406   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1407 )
1408
1409 //Compute the logical OR of the integers passed by each element.
1410 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1411 SIMPLE_REDUCTION(logical_or,int,"%d",
1412   if (value[i]!=0)
1413            ret[i]=1;
1414   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1415 )
1416
1417 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1418 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1419
1420 //Select one random message to pass on
1421 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1422   return CkReductionMsg::buildNew(msg[0]->getLength(),(void *)msg[0]->getData(), CkReduction::invalid, msg[0]);
1423 }
1424
1425 /////////////// concat ////////////////
1426 /*
1427 This reducer simply appends the data it recieves from each element,
1428 without any housekeeping data to separate them.
1429 */
1430 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1431 {
1432   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1433   //Figure out how big a message we'll need
1434   int i,retSize=0;
1435   for (i=0;i<nMsg;i++)
1436       retSize+=msg[i]->getSize();
1437
1438   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1439
1440   //Allocate a new message
1441   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1442
1443   //Copy the source message data into the return message
1444   char *cur=(char *)(ret->getData());
1445   for (i=0;i<nMsg;i++) {
1446     int messageBytes=msg[i]->getSize();
1447     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1448     cur+=messageBytes;
1449   }
1450   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1451   return ret;
1452 }
1453
1454 /////////////// set ////////////////
1455 /*
1456 This reducer appends the data it recieves from each element
1457 along with some housekeeping data indicating contribution boundaries.
1458 The message data is thus a list of reduction_set_element structures
1459 terminated by a dummy reduction_set_element with a sourceElement of -1.
1460 */
1461
1462 //This rounds an integer up to the nearest multiple of sizeof(double)
1463 static const int alignSize=sizeof(double);
1464 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1465
1466 //This gives the size (in bytes) of a reduction_set_element
1467 static int SET_SIZE(int dataSize)
1468 {return SET_ALIGN(sizeof(int)+dataSize);}
1469
1470 //This returns a pointer to the next reduction_set_element in the list
1471 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1472 {
1473   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1474   return (CkReduction::setElement *)next;
1475 }
1476
1477 //Combine the data passed by each element into an list of reduction_set_elements.
1478 // Each element may contribute arbitrary data (with arbitrary length).
1479 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1480 {
1481   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1482   //Figure out how big a message we'll need
1483   int i,retSize=0;
1484   for (i=0;i<nMsg;i++) {
1485     if (!msg[i]->isFromUser())
1486     //This message is composite-- it will just be copied over (less terminating -1)
1487       retSize+=(msg[i]->getSize()-sizeof(int));
1488     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1489       retSize+=SET_SIZE(msg[i]->getSize());
1490   }
1491   retSize+=sizeof(int);//Leave room for terminating -1.
1492
1493   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1494
1495   //Allocate a new message
1496   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1497
1498   //Copy the source message data into the return message
1499   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1500   for (i=0;i<nMsg;i++)
1501     if (!msg[i]->isFromUser())
1502     {//This message is composite-- just copy it over (less terminating -1)
1503                         int messageBytes=msg[i]->getSize()-sizeof(int);
1504                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1505                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1506                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1507     }
1508     else //This is a message from an element-- wrap it in a reduction_set_element
1509     {
1510       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1511       cur->dataSize=msg[i]->getSize();
1512       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1513       cur=SET_NEXT(cur);
1514     }
1515   cur->dataSize=-1;//Add a terminating -1.
1516   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1517   return ret;
1518 }
1519
1520 //Utility routine: get the next reduction_set_element in the list
1521 // if there is one, or return NULL if there are none.
1522 //To get all the elements, just keep feeding this procedure's output back to
1523 // its input until it returns NULL.
1524 CkReduction::setElement *CkReduction::setElement::next(void)
1525 {
1526   CkReduction::setElement *n=SET_NEXT(this);
1527   if (n->dataSize==-1)
1528     return NULL;//This is the end of the list
1529   else
1530     return n;//This is just another element
1531 }
1532
1533 /////////////////// Reduction Function Table /////////////////////
1534 CkReduction::CkReduction() {} //Dummy private constructor
1535
1536 //Add the given reducer to the list.  Returns the new reducer's
1537 // reducerType.  Must be called in the same order on every node.
1538 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1539 {
1540   reducerTable[nReducers]=fn;
1541   return (reducerType)nReducers++;
1542 }
1543
1544 /*Reducer table: maps reducerTypes to reducerFns.
1545 It's indexed by reducerType, so the order in this table
1546 must *exactly* match the reducerType enum declaration.
1547 The names don't have to match, but it helps.
1548 */
1549 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1550
1551 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1552     ::invalid_reducer,
1553     ::nop,
1554   //Compute the sum the numbers passed by each element.
1555     ::sum_int,::sum_long,::sum_float,::sum_double,
1556
1557   //Compute the product the numbers passed by each element.
1558     ::product_int,::product_long,::product_float,::product_double,
1559
1560   //Compute the largest number passed by any element.
1561     ::max_int,::max_long,::max_float,::max_double,
1562
1563   //Compute the smallest number passed by any element.
1564     ::min_int,::min_long,::min_float,::min_double,
1565
1566   //Compute the logical AND of the integers passed by each element.
1567   // The resulting integer will be zero if any source integer is zero.
1568     ::logical_and,
1569
1570   //Compute the logical OR of the integers passed by each element.
1571   // The resulting integer will be 1 if any source integer is nonzero.
1572     ::logical_or,
1573
1574     // Compute the logical bitvector AND of the integers passed by each element.
1575     ::bitvec_and,
1576
1577     // Compute the logical bitvector OR of the integers passed by each element.
1578     ::bitvec_or,
1579
1580     // Select one of the messages at random to pass on
1581     ::random,
1582
1583   //Concatenate the (arbitrary) data passed by each element
1584     ::concat,
1585
1586   //Combine the data passed by each element into an list of setElements.
1587   // Each element may contribute arbitrary data (with arbitrary length).
1588     ::set
1589 };
1590
1591
1592
1593
1594
1595
1596
1597
1598 /********** Code added by Sayantan *********************/
1599 /** Locking is a big problem in the nodegroup code for smp.
1600  So a few assumptions have had to be made. There is one lock
1601  called lockEverything. It protects all the data structures 
1602  of the nodegroup reduction mgr. I tried grabbing it separately 
1603  for each datastructure, modifying it and then releasing it and
1604  then grabbing it again, for the next change.
1605  That doesn't really help because the interleaved execution of 
1606  different threads makes the state of the reduction manager 
1607  inconsistent. 
1608  
1609  1. Grab lockEverything before calling finishreduction or startReduction
1610     or doRecvMsg
1611  2. lockEverything is grabbed only in entry methods reductionStarting
1612     or RecvMesg or  addcontribution.
1613  ****/
1614  
1615 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1616 NodeGroup::NodeGroup(void) {
1617   __nodelock=CmiCreateLock();
1618 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1619     mlogData->objID.type = TypeNodeGroup;
1620     mlogData->objID.data.group.onPE = CkMyNode();
1621 #endif
1622
1623 }
1624 NodeGroup::~NodeGroup() {
1625   CmiDestroyLock(__nodelock);
1626 }
1627 void NodeGroup::pup(PUP::er &p)
1628 {
1629   CkNodeReductionMgr::pup(p);
1630   p|reductionInfo;
1631 }
1632
1633 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1634
1635 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1636   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1637  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1638   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1639  }
1640
1641 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1642                                     ((CkNodeReductionMgr *)this),
1643                                     reductionInfo,false)
1644
1645 /* this contribute also adds up the count across all messages it receives.
1646   Useful for summing up number of array elements who have contributed ****/ 
1647 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1648         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1649
1650
1651
1652 //#define BINOMIAL_TREE
1653
1654 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1655   : thisProxy(thisgroup)
1656 {
1657 #ifdef BINOMIAL_TREE
1658   init_BinomialTree();
1659 #else
1660   init_BinaryTree();
1661 #endif
1662   storedCallback=NULL;
1663   redNo=0;
1664   inProgress=CmiFalse;
1665   
1666   startRequested=CmiFalse;
1667   gcount=CkNumNodes();
1668   lcount=1;
1669   nContrib=nRemote=0;
1670   lockEverything = CmiCreateLock();
1671
1672
1673   creating=CmiFalse;
1674   interrupt = 0;
1675   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1676         /*
1677                 FAULT_EVAC
1678         */
1679         blocked = false;
1680         maxModificationRedNo = INT_MAX;
1681         killed=0;
1682         additionalGCount = newAdditionalGCount = 0;
1683 }
1684
1685 void CkNodeReductionMgr::flushStates()
1686 {
1687  if(CkMyRank() == 0){
1688   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1689   redNo=0;
1690   inProgress=CmiFalse;
1691
1692   startRequested=CmiFalse;
1693   gcount=CkNumNodes();
1694   lcount=1;
1695   nContrib=nRemote=0;
1696
1697   creating=CmiFalse;
1698   interrupt = 0;
1699   while (!msgs.isEmpty()) { delete msgs.deq(); }
1700   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1701   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1702   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1703   }
1704 }
1705
1706 //////////// Reduction Manager Client API /////////////
1707
1708 //Add the given client function.  Overwrites any previous client.
1709 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1710 {
1711   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1712   if(cb->isInvalid()){
1713         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1714   }else{
1715         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1716   }
1717
1718   if (CkMyNode()!=0)
1719           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1720   delete storedCallback;
1721   storedCallback=cb;
1722 }
1723
1724
1725
1726 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1727 {
1728 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1729     Chare *oldObj =CpvAccess(_currentObj);
1730     CpvAccess(_currentObj) = this;
1731 #endif
1732
1733   //m->ci=ci;
1734   m->redNo=ci->redNo++;
1735   m->sourceFlag=-1;//A single contribution
1736   m->gcount=0;
1737   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
1738   addContribution(m);
1739
1740 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1741     CpvAccess(_currentObj) = oldObj;
1742 #endif
1743 }
1744
1745
1746 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1747 {
1748 #if CMK_BIGSIM_CHARM
1749   _TRACE_BG_TLINE_END(&m->log);
1750 #endif
1751 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1752     Chare *oldObj =CpvAccess(_currentObj);
1753     CpvAccess(_currentObj) = this;
1754 #endif
1755   //m->ci=ci;
1756   m->redNo=ci->redNo++;
1757   m->gcount=count;
1758   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1759   addContribution(m);
1760   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
1761
1762 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1763     CpvAccess(_currentObj) = oldObj;
1764 #endif
1765 }
1766
1767
1768 //////////// Reduction Manager Remote Entry Points /////////////
1769
1770 //Sent down the reduction tree (used by barren PEs)
1771 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1772 {
1773   CmiLock(lockEverything);
1774   if(blocked){
1775         delete m;
1776         CmiUnlock(lockEverything);
1777         return ;
1778   }
1779   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1780   if (isPresent(m->num) && !inProgress)
1781   {
1782     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1783     startReduction(m->num,srcNode);
1784     finishReduction();
1785   } else if (isFuture(m->num)){
1786         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1787   }
1788   else //is Past
1789     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1790   CmiUnlock(lockEverything);
1791   delete m;
1792
1793 }
1794
1795
1796 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1797         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1798         /*
1799                 FAULT_EVAC
1800         */
1801         if(blocked){
1802                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1803                 bufferedRemoteMsgs.enq(m);
1804                 return;
1805         }
1806         
1807         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1808             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1809             startReduction(m->redNo,CkMyNode());
1810             msgs.enq(m);
1811             nRemote++;
1812             finishReduction();
1813         }
1814         else {
1815             if (isFuture(m->redNo)) {
1816                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1817                     futureRemoteMsgs.enq(m);
1818             }else{
1819                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1820                    CkAbort("Recv'd late remote contribution!\n");
1821             }
1822         }
1823         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1824 }
1825
1826 //Sent up the reduction tree with reduced data
1827 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1828 {
1829 #if CMK_BIGSIM_CHARM
1830   _TRACE_BG_TLINE_END(&m->log);
1831 #endif
1832 #ifndef CMK_CPV_IS_SMP
1833 #if CMK_IMMEDIATE_MSG
1834         if(interrupt == 1){
1835                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1836                 CpvAccess(_qd)->process(-1);
1837                 CmiDelayImmediate();
1838                 return;
1839         }
1840 #endif  
1841 #endif
1842    interrupt = 1;       
1843    CmiLock(lockEverything);   
1844    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1845    doRecvMsg(m);
1846    CmiUnlock(lockEverything);    
1847    interrupt = 0;
1848    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
1849 }
1850
1851 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1852 {
1853         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1854         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1855         if (inProgress){
1856                 DEBR((AA"This Node reduction is already in progress\n"AB));
1857                 return;//This reduction already started
1858         }
1859         if (creating) //Don't start yet-- we're creating elements
1860         {
1861                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1862                 startRequested=CmiTrue;
1863                 return;
1864         }
1865         
1866         //If none of these cases, we need to start the reduction--
1867         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1868         inProgress=CmiTrue;
1869         //Sent start requests to our kids (in case they don't already know)
1870         
1871         for (int k=0;k<treeKids();k++)
1872         {
1873 #ifdef BINOMIAL_TREE
1874                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1875                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1876 #else
1877                 if(kids[k] != srcNode){
1878                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1879                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1880                 }       
1881 #endif
1882         }
1883
1884         DEBR((AA"Asking all local groups to start #%d\n"AB,redNo));
1885         // in case, nodegroup does not has the attached red group,
1886         // it has to restart groups again
1887         startLocalGroupReductions(number);
1888 /*
1889         if (startLocalGroupReductions(number) == 0)
1890           thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1891 */
1892 }
1893
1894 // restart local groups until succeed
1895 void CkNodeReductionMgr::restartLocalGroupReductions(int number) {
1896   CmiLock(lockEverything);    
1897   if (startLocalGroupReductions(number) == 0)
1898     thisProxy[CkMyNode()].restartLocalGroupReductions(number);
1899   CmiUnlock(lockEverything);    
1900 }
1901
1902 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1903         /*
1904                 FAULT_EVAC
1905         */
1906         if(blocked){
1907                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1908                 bufferedMsgs.enq(m);
1909                 return;
1910         }
1911         
1912         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1913                 DEBR((AA"Contributor gives early node contribution-- for #%d\n"AB,m->redNo));
1914                 futureMsgs.enq(m);
1915         } else {// An ordinary contribution
1916                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1917                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1918                 startReduction(m->redNo,CkMyNode());
1919                 msgs.enq(m);
1920                 nContrib++;
1921                 finishReduction();
1922         }
1923 }
1924
1925 //Handle a message from one element for the reduction
1926 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1927 {
1928   interrupt = 1;
1929   CmiLock(lockEverything);
1930   doAddContribution(m);
1931   CmiUnlock(lockEverything);
1932   interrupt = 0;
1933 }
1934
1935 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1936         if(blocked){
1937                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1938                 bufferedMsgs.enq(m);
1939                 return;
1940         }
1941         
1942         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1943                 DEBR((AA"Latemigrant gives early node contribution-- for #%d\n"AB,m->redNo));
1944 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1945                 futureLateMigrantMsgs.enq(m);
1946         } else {// An ordinary contribution
1947                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1948 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1949                 msgs.enq(m);
1950                 finishReduction();
1951         }
1952 }
1953
1954
1955
1956
1957
1958 /** check if the nodegroup reduction is finished at this node. In that case send it
1959 up the reduction tree **/
1960
1961 void CkNodeReductionMgr::finishReduction(void)
1962 {
1963   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1964   /***Check if reduction is finished in the next few ifs***/
1965   if ((!inProgress) || creating){
1966         DEBR((AA"Either not in Progress or creating\n"AB));
1967         return;
1968   }
1969
1970   if (nContrib<(lcount)){
1971         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1972
1973          return;//Need more local messages
1974   }
1975   if (nRemote<treeKids()){
1976         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1977
1978         return;//Need more remote messages
1979   }
1980   if (nRemote>treeKids()){
1981
1982           interrupt = 0;
1983            CkAbort("Nodegrp Excess remote reduction message received!\n");
1984   }
1985
1986   DEBR((AA"Reducing node data...\n"AB));
1987
1988   /**reduce all messages received at this node **/
1989   CkReductionMsg *result=reduceMessages();
1990
1991   if (hasParent())
1992   {//Pass data up tree to parent
1993         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1994         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1995         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
1996                 /*
1997                         FAULT_EVAC
1998                 */
1999                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2000             thisProxy[treeParent()].RecvMsg(result);
2001         }
2002
2003   }
2004   else
2005   {
2006                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2007                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2008                         msgs.enq(result);
2009                         return;
2010                 }
2011                 result->gcount += additionalGCount;
2012           /** if the reduction is finished and I am the root of the reduction tree
2013           then call the reductionhandler and other stuff ***/
2014                 
2015
2016                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2017     CkSetRefNum(result, result->getUserFlag());
2018     if (!result->callback.isInvalid()){
2019       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2020             result->callback.send(result);
2021     }
2022     else if (storedCallback!=NULL){
2023       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2024             storedCallback->send(result);
2025     }
2026     else{
2027                 DEBR((AA"Invalid Callback \n"AB));
2028             CkAbort("No reduction client!\n"
2029                     "You must register a client with either SetReductionClient or during contribute.\n");
2030                 }
2031   }
2032
2033   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
2034   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2035   redNo++;
2036         updateTree();
2037   int i;
2038   inProgress=CmiFalse;
2039   startRequested=CmiFalse;
2040   nRemote=nContrib=0;
2041
2042   //Look through the future queue for messages we can now handle
2043   int n=futureMsgs.length();
2044
2045   for (i=0;i<n;i++)
2046   {
2047     interrupt = 1;
2048
2049     CkReductionMsg *m=futureMsgs.deq();
2050
2051     interrupt = 0;
2052     if (m!=NULL){ //One of these addContributions may have finished us.
2053       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2054       doAddContribution(m);//<- if *still* early, puts it back in the queue
2055     }
2056   }
2057
2058   interrupt = 1;
2059
2060   n=futureRemoteMsgs.length();
2061
2062   interrupt = 0;
2063   for (i=0;i<n;i++)
2064   {
2065     interrupt = 1;
2066
2067     CkReductionMsg *m=futureRemoteMsgs.deq();
2068
2069     interrupt = 0;
2070     if (m!=NULL)
2071       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2072   }
2073   
2074   n = futureLateMigrantMsgs.length();
2075   for(i=0;i<n;i++){
2076     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2077     if(m != NULL){
2078       if(m->redNo == redNo){
2079         msgs.enq(m);
2080       }else{
2081         futureLateMigrantMsgs.enq(m);
2082       }
2083     }
2084   }
2085 }
2086
2087 //////////// Reduction Manager Utilities /////////////
2088
2089 void CkNodeReductionMgr::init_BinaryTree(){
2090         parent = (CkMyNode()-1)/TREE_WID;
2091         int firstkid = CkMyNode()*TREE_WID+1;
2092         numKids=CkNumNodes()-firstkid;
2093   if (numKids>TREE_WID) numKids=TREE_WID;
2094   if (numKids<0) numKids=0;
2095
2096         for(int i=0;i<numKids;i++){
2097                 kids.push_back(firstkid+i);
2098                 newKids.push_back(firstkid+i);
2099         }
2100 }
2101
2102 void CkNodeReductionMgr::init_BinomialTree(){
2103         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2104         /*upperSize = (unsigned )pow((double)2,depth);*/
2105         upperSize = (unsigned) 1 << depth;
2106         label = upperSize-CkMyNode()-1;
2107         int p=label;
2108         int count=0;
2109         while( p > 0){
2110                 if(p % 2 == 0)
2111                         break;
2112                 else{
2113                         p = p/2;
2114                         count++;
2115                 }
2116         }
2117         /*parent = label + rint(pow((double)2,count));*/
2118         parent = label + (1<<count);
2119         parent = upperSize -1 -parent;
2120         int temp;
2121         if(count != 0){
2122                 numKids = 0;
2123                 for(int i=0;i<count;i++){
2124                         /*temp = label - rint(pow((double)2,i));*/
2125                         temp = label - (1<<i);
2126                         temp = upperSize-1-temp;
2127                         if(temp <= CkNumNodes()-1){
2128                 //              kids[numKids] = temp;
2129                                 kids.push_back(temp);
2130                                 numKids++;
2131                         }
2132                 }
2133         }else{
2134                 numKids = 0;
2135         //      kids = NULL;
2136         }
2137 }
2138
2139
2140 int CkNodeReductionMgr::treeRoot(void)
2141 {
2142   return 0;
2143 }
2144 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
2145 {
2146   return (CmiBool)(CkMyNode()!=treeRoot());
2147 }
2148 int CkNodeReductionMgr::treeParent(void) //My parent Node
2149 {
2150   return parent;
2151 }
2152
2153 int CkNodeReductionMgr::firstKid(void) //My first child Node
2154 {
2155   return CkMyNode()*TREE_WID+1;
2156 }
2157 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2158 {
2159 #ifdef BINOMIAL_TREE
2160         return numKids;
2161 #else
2162 /*  int nKids=CkNumNodes()-firstKid();
2163   if (nKids>TREE_WID) nKids=TREE_WID;
2164   if (nKids<0) nKids=0;
2165   return nKids;*/
2166         return numKids;
2167 #endif
2168 }
2169
2170 //Combine (& free) the current message vector msgs.
2171 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2172 {
2173 #if CMK_BIGSIM_CHARM
2174   _TRACE_BG_END_EXECUTE(1);
2175   void* _bgParentLog = NULL;
2176   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2177 #endif
2178   CkReductionMsg *ret=NULL;
2179
2180   //Look through the vector for a valid reducer, swapping out placeholder messages
2181   CkReduction::reducerType r=CkReduction::invalid;
2182   int msgs_gcount=0;//Reduced gcount
2183   int msgs_nSources=0;//Reduced nSources
2184   int msgs_userFlag=-1;
2185   CkCallback msgs_callback;
2186   CkCallback msgs_secondaryCallback;
2187   int i;
2188   int nMsgs=0;
2189   CkReductionMsg *m;
2190   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2191   bool isMigratableContributor;
2192         
2193
2194   while(NULL!=(m=msgs.deq()))
2195   {
2196     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
2197     msgs_gcount+=m->gcount;
2198     if (m->sourceFlag!=0)
2199     { //This is a real message from an element, not just a placeholder
2200       msgArr[nMsgs++]=m;
2201       msgs_nSources+=m->nSources();
2202       r=m->reducer;
2203       if (!m->callback.isInvalid())
2204         msgs_callback=m->callback;
2205       if(!m->secondaryCallback.isInvalid()){
2206         msgs_secondaryCallback = m->secondaryCallback;
2207       }
2208       if (m->userFlag!=-1)
2209         msgs_userFlag=m->userFlag;
2210
2211         isMigratableContributor= m->isMigratableContributor();
2212 #if CMK_BIGSIM_CHARM
2213       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2214 #endif
2215                                 
2216     }
2217     else
2218     { //This is just a placeholder message-- replace it
2219       delete m;
2220     }
2221   }
2222
2223   if (nMsgs==0||r==CkReduction::invalid)
2224   //No valid reducer in the whole vector
2225     ret=CkReductionMsg::buildNew(0,NULL);
2226   else
2227   {//Use the reducer to reduce the messages
2228     if(nMsgs == 1){
2229         ret = msgArr[0];
2230     }else{
2231         CkReduction::reducerFn f=CkReduction::reducerTable[r];
2232         ret=(*f)(nMsgs,msgArr);
2233     }
2234     ret->reducer=r;
2235   }
2236
2237         
2238 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
2239 #if CRITICAL_PATH_DEBUG > 3
2240         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2241 #endif
2242         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2243         path.updateMax(UsrToEnv(ret));
2244         // Combine the critical paths from all the reduction messages into the header for the new result
2245         for (i=0;i<nMsgs;i++){
2246           if (msgArr[i]!=ret){
2247             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2248             path.updateMax(UsrToEnv(msgArr[i]));
2249           } else {
2250             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2251           }
2252         }
2253 #if CRITICAL_PATH_DEBUG > 3
2254         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2255 #endif
2256
2257 #endif
2258
2259
2260         //Go back through the vector, deleting old messages
2261   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2262   delete [] msgArr;
2263   //Set the message counts
2264   ret->redNo=redNo;
2265   ret->gcount=msgs_gcount;
2266   ret->userFlag=msgs_userFlag;
2267   ret->callback=msgs_callback;
2268   ret->secondaryCallback = msgs_secondaryCallback;
2269   ret->sourceFlag=msgs_nSources;
2270   ret->setMigratableContributor(isMigratableContributor);
2271   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
2272 #if CMK_BIGSIM_CHARM
2273   _TRACE_BG_TLINE_END(&ret->log);
2274 #endif
2275
2276   return ret;
2277 }
2278
2279 void CkNodeReductionMgr::pup(PUP::er &p)
2280 {
2281 //We do not store the client function pointer or the client function parameter,
2282 //it is the responsibility of the programmer to correctly restore these
2283   IrrGroup::pup(p);
2284   p(redNo);
2285   p(inProgress); p(creating); p(startRequested);
2286   p(lcount);
2287   p(nContrib); p(nRemote);
2288   p(interrupt);
2289   p|msgs;
2290   p|futureMsgs;
2291   p|futureRemoteMsgs;
2292   p|futureLateMigrantMsgs;
2293   p|parent;
2294   p|additionalGCount;
2295   p|newAdditionalGCount;
2296   if(p.isUnpacking()) {
2297     gcount=CkNumNodes();
2298     thisProxy = thisgroup;
2299     lockEverything = CmiCreateLock();
2300 #ifdef BINOMIAL_TREE
2301     init_BinomialTree();
2302 #else
2303     init_BinaryTree();
2304 #endif          
2305   }
2306   p | blocked;
2307   p | maxModificationRedNo;
2308
2309 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2310   int isnull = (storedCallback == NULL);
2311   p | isnull;
2312   if (!isnull) {
2313     if (p.isUnpacking()) {
2314       storedCallback = new CkCallback;
2315     }
2316     p|*storedCallback;
2317   }
2318 #endif
2319
2320 }
2321
2322 /*
2323         FAULT_EVAC
2324         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2325         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2326         reduction tree. 
2327 */
2328 void CkNodeReductionMgr::evacuate(){
2329         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2330         if(treeKids() == 0){
2331         /*
2332                 if the node going down is a leaf
2333         */
2334                 oldleaf=true;
2335                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2336                 /*
2337                         Need to ask parent for the reduction number that it has seen. 
2338                         Since it is a leaf, the tree does not need to be rewired. 
2339                         We reuse the oldparent type of tree modification message to get 
2340                         the parent to block and tell us about the highest reduction number it has seen.
2341                         
2342                 */
2343                 int data[2];
2344                 data[0]=CkMyNode();
2345                 data[1]=getTotalGCount()+additionalGCount;
2346                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2347                 newParent = treeParent();
2348         }else{
2349                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2350                 oldleaf= false;
2351         /*
2352                 It is not a leaf. It needs to rewire the tree around itself.
2353                 It also needs to decide on a reduction No after which the new tree will be used
2354                 Till it decides on the new tree and the redNo at which it becomes valid,
2355                 all received messages will be buffered
2356         */
2357                 newParent = kids[0];
2358                 for(int i=numKids-1;i>=0;i--){
2359                         newKids.remove(i);
2360                 }
2361                 /*
2362                         Ask everybody for the highest reduction number they have seen and
2363                         also tell them about the new tree
2364                 */
2365                 /*
2366                         Tell parent about its new child;
2367                 */
2368                 int oldParentData[2];
2369                 oldParentData[0] = CkMyNode();
2370                 oldParentData[1] = newParent;
2371                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2372
2373                 /*
2374                         Tell the other children about their new parent
2375                 */
2376                 int childrenData=newParent;
2377                 for(int i=1;i<numKids;i++){
2378                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2379                 }
2380                 
2381                 /*
2382                         Tell newParent (1st child) about its new children,
2383                         the current node and its children except the newParent
2384                 */
2385                 int *newParentData = new int[numKids+2];
2386                 for(int i=1;i<numKids;i++){
2387                         newParentData[i] = kids[i];
2388                 }
2389                 newParentData[0] = CkMyNode();
2390                 newParentData[numKids] = parent;
2391                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2392                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2393         }
2394         readyDeletion = false;
2395         blocked = true;
2396         numModificationReplies = 0;
2397         tempModificationRedNo = findMaxRedNo();
2398 }
2399
2400 /*
2401         Depending on the code, use the data to change the tree
2402         1. OLDPARENT : replace the old child with a new one
2403         2. OLDCHILDREN: replace the parent
2404         3. NEWPARENT:  add the children and change the parent
2405         4. LEAFPARENT: delete the old child
2406 */
2407
2408 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2409         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2410         int sender;
2411         newKids = kids;
2412         readyDeletion = false;
2413         newAdditionalGCount = additionalGCount;
2414         switch(code){
2415                 case OLDPARENT: 
2416                         for(int i=0;i<numKids;i++){
2417                                 if(newKids[i] == data[0]){
2418                                         newKids[i] = data[1];
2419                                         break;
2420                                 }
2421                         }
2422                         sender = data[0];
2423                         newParent = parent;
2424                         break;
2425                 case OLDCHILDREN:
2426                         newParent = data[0];
2427                         sender = parent;
2428                         break;
2429                 case NEWPARENT:
2430                         for(int i=0;i<size-2;i++){
2431                                 newKids.push_back(data[i]);
2432                         }
2433                         newParent = data[size-2];
2434                         newAdditionalGCount += data[size-1];
2435                         sender = parent;
2436                         break;
2437                 case LEAFPARENT:
2438                         for(int i=0;i<numKids;i++){
2439                                 if(newKids[i] == data[0]){
2440                                         newKids.remove(i);
2441                                         break;
2442                                 }
2443                         }
2444                         sender = data[0];
2445                         newParent = parent;
2446                         newAdditionalGCount += data[1];
2447                         break;
2448         };
2449         blocked = true;
2450         int maxRedNo = findMaxRedNo();
2451         
2452         thisProxy[sender].collectMaxRedNo(maxRedNo);
2453 }
2454
2455 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2456         /*
2457                 Find out the maximum redNo that has been seen by 
2458                 the affected nodes
2459         */
2460         numModificationReplies++;
2461         if(maxRedNo > tempModificationRedNo){
2462                 tempModificationRedNo = maxRedNo;
2463         }
2464         if(numModificationReplies == numKids+1){
2465                 maxModificationRedNo = tempModificationRedNo;
2466                 /*
2467                         when all the affected nodes have replied, tell them the maximum.
2468                         Unblock yourself. deal with the buffered messages local and remote
2469                 */
2470                 if(maxModificationRedNo == -1){
2471                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2472                 }else{
2473                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2474                 }
2475                 thisProxy[parent].unblockNode(maxModificationRedNo);
2476                 for(int i=0;i<numKids;i++){
2477                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2478                 }
2479                 blocked = false;
2480                 updateTree();
2481                 clearBlockedMsgs();
2482         }
2483 }
2484
2485 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2486         maxModificationRedNo = maxRedNo;
2487         updateTree();
2488         blocked = false;
2489         clearBlockedMsgs();
2490 }
2491
2492
2493 void CkNodeReductionMgr::clearBlockedMsgs(){
2494         int len = bufferedMsgs.length();
2495         for(int i=0;i<len;i++){
2496                 CkReductionMsg *m = bufferedMsgs.deq();
2497                 doAddContribution(m);
2498         }
2499         len = bufferedRemoteMsgs.length();
2500         for(int i=0;i<len;i++){
2501                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2502                 doRecvMsg(m);
2503         }
2504
2505 }
2506 /*
2507         if the reduction number exceeds the maxModificationRedNo, change the tree
2508         to become the new one
2509 */
2510
2511 void CkNodeReductionMgr::updateTree(){
2512         if(redNo > maxModificationRedNo){
2513                 parent = newParent;
2514                 kids = newKids;
2515                 maxModificationRedNo = INT_MAX;
2516                 numKids = kids.size();
2517                 readyDeletion = true;
2518                 additionalGCount = newAdditionalGCount;
2519                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2520                 for(int i=0;i<(int)(newKids.size());i++){
2521                         DEBREVAC(("%d ",newKids[i]));
2522                 }
2523                 DEBREVAC(("\n"));
2524         //      startReduction(redNo,CkMyNode());
2525         }else{
2526                 if(maxModificationRedNo != INT_MAX){
2527                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2528                         startReduction(redNo,CkMyNode());
2529                         finishReduction();
2530                 }       
2531         }
2532 }
2533
2534
2535 void CkNodeReductionMgr::doneEvacuate(){
2536         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2537 /*      if(oldleaf){
2538                 
2539                         It used to be a leaf
2540                         Then as soon as future messages have been emptied you can 
2541                         send the parent a message telling them that they are not going
2542                         to receive anymore messages from this child
2543                 
2544                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2545                 while(futureMsgs.length() != 0){
2546                         int n = futureMsgs.length();
2547                         for(int i=0;i<n;i++){
2548                                 CkReductionMsg *m = futureMsgs.deq();
2549                                 if(isPresent(m->redNo)){
2550                                         msgs.enq(m);
2551                                 }else{
2552                                         futureMsgs.enq(m);
2553                                 }
2554                         }
2555                         CkReductionMsg *result = reduceMessages();
2556                         thisProxy[treeParent()].RecvMsg(result);
2557                         redNo++;
2558                 }
2559                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2560                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2561         }else{*/
2562                 if(readyDeletion){
2563                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2564                 }else{
2565                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2566                 }
2567 //      }
2568 }
2569
2570 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2571         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2572         for(int i=0;i<numKids;i++){
2573                 if(kids[i] == deletedChild){
2574                         kids.remove(i);
2575                         break;
2576                 }
2577         }
2578         numKids = kids.length();
2579         finishReduction();
2580 }
2581
2582 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2583         for(int i=0;i<(int)(newKids.length());i++){
2584                 if(newKids[i] == deletedChild){
2585                         newKids.remove(i);
2586                         break;
2587                 }
2588         }
2589         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2590         for(int i=0;i<(int)(newKids.size());i++){
2591                 DEBREVAC(("%d ",newKids[i]));
2592         }
2593         DEBREVAC(("\n"));
2594         finishReduction();
2595 }
2596
2597 int CkNodeReductionMgr::findMaxRedNo(){
2598         int max = redNo;
2599         for(int i=0;i<futureRemoteMsgs.length();i++){
2600                 if(futureRemoteMsgs[i]->redNo  > max){
2601                         max = futureRemoteMsgs[i]->redNo;
2602                 }
2603         }
2604         /*
2605                 if redNo is max (that is no future message) and the current reduction has not started
2606                 then tree can be changed before the reduction redNo can be started
2607         */ 
2608         if(redNo == max && msgs.length() == 0){
2609                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2610                 max--;
2611         }
2612         return max;
2613 }
2614
2615 // initnode call. check the size of reduction table
2616 void CkReductionMgr::sanitycheck()
2617 {
2618 #if CMK_ERROR_CHECKING
2619   int count = 0;
2620   while (CkReduction::reducerTable[count] != NULL) count++;
2621   CmiAssert(CkReduction::nReducers == count);
2622 #endif
2623 }
2624
2625 #include "CkReduction.def.h"