c7bbd4a3ed23e8ca72f97297e0177582549a734e
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #if 0
54 //Debugging messages:
55 // Reduction mananger internal information:
56 #define DEBUGRED 1
57 #define DEBR(x) CkPrintf x
58 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
59 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
60
61 #define DEBN(x) CkPrintf x
62 #define AAN "Red Node%d "
63 #define ABN ,CkMyNode()
64
65 // For status and data messages from the builtin reducer functions.
66 #define RED_DEB(x) //CkPrintf x
67 #define DEBREVAC(x) CkPrintf x
68 #else
69 //No debugging info-- empty defines
70 #define DEBUGRED 0
71 #define DEBR(x) //CkPrintf x
72 #define DEBN(x) //CkPrintf x
73 #define RED_DEB(x) //CkPrintf x
74 #define DEBREVAC(x) //CkPrintf x
75 #endif
76
77 #ifndef INT_MAX
78 #define INT_MAX 2147483647
79 #endif
80
81
82 Group::Group()
83 {
84         creatingContributors();
85         contributorStamped(&reductionInfo);
86         contributorCreated(&reductionInfo);
87         doneCreatingContributors();
88 #if DEBUGRED
89         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
90 #endif                  
91         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
92         nodeProxy = nodetemp;
93 }
94
95 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
96                                     ((CkReductionMgr *)this),
97                                     reductionInfo,false);
98 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid));
99
100
101
102 CkGroupInitCallback::CkGroupInitCallback(void) {}
103 /*
104 The callback is just used to tell the caller that this group
105 has been constructed.  (Now they can safely call CkLocalBranch)
106 */
107 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
108 {
109   m->call();
110   delete m;
111 }
112
113 /*
114 The callback is just used to tell the caller that this group
115 is constructed and ready to process other calls.
116 */
117 CkGroupReadyCallback::CkGroupReadyCallback(void)
118 {
119   _isReady = 0;
120 }
121 void
122 CkGroupReadyCallback::callBuffered(void)
123 {
124   int n = _msgs.length();
125   for(int i=0;i<n;i++)
126   {
127     CkGroupCallbackMsg *msg = _msgs.deq();
128     msg->call();
129     delete msg;
130   }
131 }
132 void
133 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
134 {
135   if(_isReady) {
136     msg->call();
137     delete msg;
138   } else {
139     _msgs.enq(msg);
140   }
141 }
142
143 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
144         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
145 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
146 {
147         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
148         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
149         b->fn(b->param,m->getSize(),m->getData());
150         delete m;
151 }
152
153 ///////////////// Reduction Manager //////////////////
154 /*
155 One CkReductionMgr runs a non-overlapping set of reductions.
156 It collects messages from all local contributors, then sends
157 the reduced message up the reduction tree to node zero, where
158 they're passed to the user's client function.
159 */
160
161 CkReductionMgr::CkReductionMgr()//Constructor
162   : thisProxy(thisgroup)
163
164   redNo=0;
165   completedRedNo = -1;
166   inProgress=CmiFalse;
167   creating=CmiFalse;
168   startRequested=CmiFalse;
169   gcount=lcount=0;
170   nContrib=nRemote=0;
171   maxStartRequest=0;
172   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
173 }
174
175 void CkReductionMgr::flushStates()
176 {
177   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
178   redNo=0;
179   completedRedNo = -1;
180   inProgress=CmiFalse;
181   creating=CmiFalse;
182   gcount=lcount=0;
183   startRequested=CmiFalse;
184   nContrib=nRemote=0;
185   maxStartRequest=0;
186
187   while (!msgs.isEmpty()) { delete msgs.deq(); }
188   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
189   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
190   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
191
192   adjVec.length()=0;
193
194   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
195 }
196
197 //////////// Reduction Manager Client API /////////////
198
199 //Add the given client function.  Overwrites any previous client.
200 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
201 {
202   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
203
204   if (CkMyPe()!=0)
205           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
206   storedCallback=*cb;
207   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
208   nodeProxy.ckSetReductionClient(callback);
209 }
210
211 ///////////////////////////// Contributor ////////////////////////
212 //Contributors keep a copy of this structure:
213
214 /*Contributor migration support:
215 */
216 void contributorInfo::pup(PUP::er &p)
217 {
218   p(redNo);
219 }
220
221 ////////////////////// Contributor list maintainance: /////////////////
222 //These just set and clear the "creating" flag to prevent
223 // reductions from finishing early because not all elements
224 // have been created.
225 void CkReductionMgr::creatingContributors(void)
226 {
227   DEBR((AA"Creating contributors...\n"AB));
228   creating=CmiTrue;
229 }
230 void CkReductionMgr::doneCreatingContributors(void)
231 {
232   DEBR((AA"Done creating contributors...\n"AB));
233   creating=CmiFalse;
234   if (startRequested) startReduction(redNo,CkMyPe());
235   finishReduction();
236 }
237
238 //A new contributor will be created
239 void CkReductionMgr::contributorStamped(contributorInfo *ci)
240 {
241   DEBR((AA"Contributor %p stamped\n"AB,ci));
242   //There is another contributor
243   gcount++;
244   if (inProgress)
245   {
246     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
247     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
248   } else
249     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
250 }
251
252 //A new contributor was actually created
253 void CkReductionMgr::contributorCreated(contributorInfo *ci)
254 {
255   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
256   //We've got another contributor
257   lcount++;
258   //He may not need to contribute to some of our reductions:
259   for (int r=redNo;r<ci->redNo;r++)
260     adj(r).lcount--;//He won't be contributing to r here
261 }
262
263 /*Don't expect any more contributions from this one.
264 This is rather horrifying because we now have to make
265 sure the global element count accurately reflects all the
266 contributions the element made before it died-- these may stretch
267 far into the future.  The adj() vector is what saves us here.
268 */
269 void CkReductionMgr::contributorDied(contributorInfo *ci)
270 {
271 #if CMK_MEM_CHECKPOINT
272   // ignore from listener if it is during restart from crash
273   if (CkInRestarting()) return;
274 #endif
275   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
276   //We lost a contributor
277   gcount--;
278
279   if (ci->redNo<redNo)
280   {//Must have been migrating during reductions-- root is waiting for his
281   // contribution, which will never come.
282     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
283     for (int r=ci->redNo;r<redNo;r++)
284       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
285   }
286
287   //Add to the global count for all his future messages (wherever they are)
288   int r;
289   for (r=redNo;r<ci->redNo;r++)
290   {//He already contributed to this reduction, but won't show up in global count.
291     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
292     adj(r).gcount++;
293   }
294
295   lcount--;
296   //He's already contributed to several reductions here
297   for (r=redNo;r<ci->redNo;r++)
298     adj(r).lcount++;//He'll be contributing to r here
299
300   finishReduction();
301 }
302
303 //Migrating away (note that global count doesn't change)
304 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
305 {
306   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
307   lcount--;//We lost a local
308   //He's already contributed to several reductions here
309   for (int r=redNo;r<ci->redNo;r++)
310     adj(r).lcount++;//He'll be contributing to r here
311
312   finishReduction();
313 }
314
315 //Migrating in (note that global count doesn't change)
316 void CkReductionMgr::contributorArriving(contributorInfo *ci)
317 {
318   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
319   lcount++;//We gained a local
320 #if CMK_MEM_CHECKPOINT
321   // ignore from listener if it is during restart from crash
322   // because the ci may be old.
323   if (CkInRestarting()) return;
324 #endif
325   //He has already contributed (elsewhere) to several reductions:
326   for (int r=redNo;r<ci->redNo;r++)
327     adj(r).lcount--;//He won't be contributing to r here
328
329 }
330
331 //Contribute-- the given msg can contain any data.  The reducerType
332 // field of the message must be valid.
333 // Each contributor must contribute exactly once to the each reduction.
334 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
335 {
336   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
337   m->ci=ci;
338   m->redNo=ci->redNo++;
339   m->sourceFlag=-1;//A single contribution
340   m->gcount=0;
341   addContribution(m);
342 }
343
344 //////////// Reduction Manager Remote Entry Points /////////////
345 //Sent down the reduction tree (used by barren PEs)
346 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
347 {
348  if(CkMyPe()==0){
349         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
350         //delete m;
351         //return;
352  }
353  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
354  int srcPE = (UsrToEnv(m))->getSrcPe();
355   if (isPresent(m->num) && !inProgress)
356   {
357     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
358     startReduction(m->num,srcPE);
359     finishReduction();
360   } else if (isFuture(m->num)){
361 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
362           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
363           if(maxStartRequest < m->num){
364                   maxStartRequest = m->num;
365           }
366  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
367           
368     }
369   else //is Past
370     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
371   delete m;
372 }
373
374 //Sent to root of reduction tree with reduction contribution
375 // of migrants that missed the main reduction.
376 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
377 {
378         m->secondaryCallback = m->callback;
379   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
380   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
381   nodeMgr->LateMigrantMsg(m);
382 /*      int len = finalMsgs.length();
383         finalMsgs.enq(m);
384 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
385         endArrayReduction();*/
386 }
387
388 //A late migrating contributor will never contribute to this reduction
389 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
390 {
391   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
392   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
393  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
394   adj(m->num).gcount--;//He won't be contributing to this one.
395   finishReduction();
396   delete m;
397 }
398
399 //////////// Reduction Manager State /////////////
400 void CkReductionMgr::startReduction(int number,int srcPE)
401 {
402   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
403   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
404   if (inProgress){
405         DEBR((AA"This reduction is already in progress\n"AB));
406         return;//This reduction already started
407   }
408   if (creating) //Don't start yet-- we're creating elements
409   {
410     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
411     startRequested=CmiTrue;
412     return;
413   }
414
415 //If none of these cases, we need to start the reduction--
416   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
417   inProgress=CmiTrue;
418   //Sent start requests to our kids (in case they don't already know)
419  
420
421         /*
422                 FAULT_EVAC
423         */
424   if(!CmiNodeAlive(CkMyPe())){
425         return;
426   }
427         
428   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
429   int temp;
430         /*
431   //making it a broadcast done only by PE 0
432   if(!hasParent()){
433                 temp = completedRedNo+1;
434                 for(int i=temp;i<=number;i++){
435                         for(int j=0;j<CkNumPes();j++){
436                                 if(j != CkMyPe() && j != srcPE){
437                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
438                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
439                                         }
440                                 }
441                         }
442                 }       
443         }       else{
444                 temp = number;
445         }*/
446 /*  if(!hasParent()){
447                 temp = completedRedNo+1;
448         }       else{
449                 temp = number;
450         }
451         for(int i=temp;i<=number;i++){
452         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
453                 if(hasParent()){
454           // kick-start your parent too ...
455                         if(treeParent() != srcPE){
456                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
457                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
458                                 }       
459                         }       
460                 }
461           for (int k=0;k<treeKids();k++)
462           {
463                         if(firstKid()+k != srcPE){
464                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
465                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
466                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
467                                 }       
468                         }       
469         }
470         }
471         */
472 }       
473
474 /*Handle a message from one element for the reduction*/
475 void CkReductionMgr::addContribution(CkReductionMsg *m)
476 {
477   if (isPast(m->redNo))
478   {//We've moved on-- forward late contribution straight to root
479     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
480    // if (!hasParent()) //Root moved on too soon-- should never happen
481    //   CkAbort("Late reduction contribution received at root!\n");
482     thisProxy[0].LateMigrantMsg(m);
483   }
484   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
485     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
486     futureMsgs.enq(m);
487   } else {// An ordinary contribution
488     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
489    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
490     startReduction(m->redNo,CkMyPe());
491     msgs.enq(m);
492     nContrib++;
493     finishReduction();
494   }
495 }
496
497 /**function checks if it has got all contributions that it is supposed to
498 get at this processor. If it is done it sends the reduced result to the local
499 nodegroup */
500 void CkReductionMgr::finishReduction(void)
501 {
502   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
503   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
504   if ((!inProgress) | creating){
505         DEBR((AA"Either not in Progress or creating\n"AB));
506         return;
507   }
508   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
509   if (nContrib<(lcount+adj(redNo).lcount)){
510         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
511          return;//Need more local messages
512   }
513   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
514   CkReductionMsg *result=reduceMessages();
515   result->redNo=redNo;
516   result->gcount+=gcount+adj(redNo).gcount;
517   result->secondaryCallback = result->callback;
518   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
519         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
520
521   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
522
523 #if DEBUGRED
524  // CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
525 #endif
526   
527   // Find our node reduction manager, and pass reduction to him:
528   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
529   nodeMgr->contributeArrayReduction(result);
530
531   //House Keeping Operations will have to check later what needs to be changed
532   redNo++;
533   //Shift the count adjustment vector down one slot (to match new redNo)
534   int i;
535
536   if(CkMyPe()!=0){
537         int i;
538         completedRedNo++;
539         for (i=1;i<adjVec.length();i++)
540            adjVec[i-1]=adjVec[i];
541         adjVec.length()--;  
542   }
543   inProgress=CmiFalse;
544   startRequested=CmiFalse;
545   nRemote=nContrib=0;
546
547   //Look through the future queue for messages we can now handle
548   int n=futureMsgs.length();
549   for (i=0;i<n;i++)
550   {
551     CkReductionMsg *m=futureMsgs.deq();
552     if (m!=NULL) //One of these addContributions may have finished us.
553       addContribution(m);//<- if *still* early, puts it back in the queue
554   }
555   if(maxStartRequest >= redNo){
556           startReduction(redNo,CkMyPe());
557           finishReduction();
558   }
559 }
560
561 //////////// Reduction Manager Utilities /////////////
562
563 //Return the countAdjustment struct for the given redNo:
564 countAdjustment &CkReductionMgr::adj(int number)
565 {
566   number-=completedRedNo;
567   number--;
568   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
569   //Pad the adjustment vector with zeros until it's at least number long
570   while (adjVec.length()<=number)
571     adjVec.push_back(countAdjustment());
572   return adjVec[number];
573 }
574
575 //Combine (& free) the current message vector msgs.
576 CkReductionMsg *CkReductionMgr::reduceMessages(void)
577 {
578   CkReductionMsg *ret=NULL;
579
580   //Look through the vector for a valid reducer, swapping out placeholder messages
581   CkReduction::reducerType r=CkReduction::invalid;
582   int msgs_gcount=0;//Reduced gcount
583   int msgs_nSources=0;//Reduced nSources
584   int msgs_userFlag=-1;
585   CkCallback msgs_callback;
586   int i;
587   int nMsgs=0;
588   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
589   CkReductionMsg *m;
590         bool isMigratableContributor;
591
592   // Copy message queue into msgArr, skipping placeholders:
593   while (NULL!=(m=msgs.deq()))
594   {
595     msgs_gcount+=m->gcount;
596     if (m->sourceFlag!=0)
597     { //This is a real message from an element, not just a placeholder
598       msgArr[nMsgs++]=m;
599       msgs_nSources+=m->nSources();
600       r=m->reducer;
601       if (!m->callback.isInvalid())
602         msgs_callback=m->callback;
603       if (m->userFlag!=-1)
604         msgs_userFlag=m->userFlag;
605                         
606                         isMigratableContributor=m->isMigratableContributor();
607     }
608     else
609     { //This is just a placeholder message-- forget it
610       delete m;
611     }
612   }
613
614   if (nMsgs==0||r==CkReduction::invalid)
615   //No valid reducer in the whole vector
616     ret=CkReductionMsg::buildNew(0,NULL);
617   else
618   {//Use the reducer to reduce the messages
619                 //if there is only one msg to be reduced just return that message
620                 if(nMsgs == 1){
621                         ret = msgArr[0];        
622                 }else{
623             CkReduction::reducerFn f=CkReduction::reducerTable[r];
624           ret=(*f)(nMsgs,msgArr);
625                 }
626     ret->reducer=r;
627   }
628
629         //Go back through the vector, deleting old messages
630   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
631         delete [] msgArr;
632
633   //Set the message counts
634   ret->redNo=redNo;
635   ret->gcount=msgs_gcount;
636   ret->userFlag=msgs_userFlag;
637   ret->callback=msgs_callback;
638   ret->sourceFlag=msgs_nSources;
639         ret->setMigratableContributor(isMigratableContributor);
640   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
641
642   return ret;
643 }
644
645
646 //Checkpointing utilities
647 //pack-unpack method for CkReductionMsg
648 //if packing pack the message and then unpack and return it
649 //if unpacking allocate memory for it read it off disk and then unapck
650 //and return it
651 void CkReductionMgr::pup(PUP::er &p)
652 {
653 //We do not store the client function pointer or the client function parameter,
654 //it is the responsibility of the programmer to correctly restore these
655   CkGroupInitCallback::pup(p);
656   p(redNo);
657   p(completedRedNo);
658   p(inProgress); p(creating); p(startRequested);
659   p(nContrib); p(nRemote);
660   p|msgs;
661   p|futureMsgs;
662   p|futureRemoteMsgs;
663   p|finalMsgs;
664   p|adjVec;
665   p|nodeProxy;
666   p|storedCallback;
667   if(p.isUnpacking()){
668     thisProxy = thisgroup;
669     lcount=0;
670     gcount=0;
671     maxStartRequest=0;
672   }
673 #if DEBUGRED
674   CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
675 #endif
676 }
677
678
679 //Callback for doing Reduction through NodeGroups added by Sayantan
680
681 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
682
683         int total = m->gcount+adj(m->redNo).gcount;
684         finalMsgs.enq(m);
685         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
686         adj(m->redNo).mainRecvd = 1;
687 #if DEBUGRED
688         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
689 #endif  
690         endArrayReduction();
691 }
692
693 void CkReductionMgr :: endArrayReduction(){
694         CkReductionMsg *ret=NULL;
695         int nMsgs=finalMsgs.length();
696         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
697         //Look through the vector for a valid reducer, swapping out placeholder messages
698         //CkPrintf("Length of Final Message %d \n",nMsgs);
699         CkReduction::reducerType r=CkReduction::invalid;
700         int msgs_gcount=0;//Reduced gcount
701         int msgs_nSources=0;//Reduced nSources
702         int msgs_userFlag=-1;
703         CkCallback msgs_callback;
704         CkCallback msgs_secondaryCallback;
705         CkVec<CkReductionMsg *> tempMsgs;
706         int i;
707         int numMsgs = 0;
708         for (i=0;i<nMsgs;i++)
709         {
710                 CkReductionMsg *m=finalMsgs.deq();
711                 if(m->redNo == completedRedNo +1){
712                         msgs_gcount+=m->gcount;
713                         if (m->sourceFlag!=0)
714                         { //This is a real message from an element, not just a placeholder
715                                 msgs_nSources+=m->nSources();
716                                 r=m->reducer;
717                                 if (!m->callback.isInvalid())
718                                 msgs_callback=m->callback;
719                                 if(!m->secondaryCallback.isInvalid())
720                                         msgs_secondaryCallback = m->secondaryCallback;
721                                 if (m->userFlag!=-1)
722                                         msgs_userFlag=m->userFlag;
723                                 tempMsgs.push_back(m);
724                         }
725                 }else{
726                         finalMsgs.enq(m);
727                 }
728
729         }
730         numMsgs = tempMsgs.length();
731 #if DEBUGRED
732         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
733 #endif  
734         if(numMsgs == 0){
735                 return;
736         }
737         if(adj(completedRedNo+1).mainRecvd == 0){
738                 for(i=0;i<numMsgs;i++){
739                         finalMsgs.enq(tempMsgs[i]);
740                 }
741                 return;
742         }
743
744 /*
745         NOT NEEDED ANYMORE DONE at nodegroup level
746         if(msgs_gcount  > msgs_nSources){
747                 for(i=0;i<numMsgs;i++){
748                         finalMsgs.enq(tempMsgs[i]);
749                 }
750                 return;
751         }*/
752
753         if (nMsgs==0||r==CkReduction::invalid)
754                 //No valid reducer in the whole vector
755                 ret=CkReductionMsg::buildNew(0,NULL);
756         else{//Use the reducer to reduce the messages
757                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
758                 // has to be corrected elements from above need to be put into a temporary vector
759                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
760                 ret=(*f)(numMsgs,msgArr);
761                 ret->reducer=r;
762
763         }
764
765         for(i = 0;i<numMsgs;i++){
766                 if (tempMsgs[i] != ret) delete tempMsgs[i];
767         }
768
769         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
770         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
771
772         ret->redNo=completedRedNo+1;
773         ret->gcount=msgs_gcount;
774         ret->userFlag=msgs_userFlag;
775         ret->callback=msgs_callback;
776         ret->secondaryCallback = msgs_secondaryCallback;
777         ret->sourceFlag=msgs_nSources;
778
779 #if DEBUGRED
780         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
781 #endif
782         if (!ret->secondaryCallback.isInvalid())
783             ret->secondaryCallback.send(ret);
784     else if (!storedCallback.isInvalid())
785             storedCallback.send(ret);
786     else{
787 #if DEBUGRED
788             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
789 #endif
790             CkAbort("No reduction client!\n"
791                     "You must register a client with either SetReductionClient or during contribute.\n");
792     }
793         completedRedNo++;
794 #if DEBUGRED
795        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
796 #endif
797         for (i=1;i<adjVec.length();i++)
798                 adjVec[i-1]=adjVec[i];
799         adjVec.length()--;
800         endArrayReduction();
801 }
802
803
804 /////////////////////////////////////////////////////////////////////////
805
806 ////////////////////////////////
807 //CkReductionMsg support
808
809 //ReductionMessage default private constructor-- does nothing
810 CkReductionMsg::CkReductionMsg(){}
811 CkReductionMsg::~CkReductionMsg(){}
812
813 //This define gives the distance from the start of the CkReductionMsg
814 // object to the start of the user data area (just below last object field)
815 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
816
817 //"Constructor"-- builds and returns a new CkReductionMsg.
818 //  the "data" array you specify will be copied into this object.
819 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
820     CkReduction::reducerType reducer, CkReductionMsg *buf)
821 {
822   int len[1];
823   len[0]=NdataSize;
824   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
825   
826   ret->dataSize=NdataSize;
827   if (srcData!=NULL && !buf)
828     memcpy(ret->data,srcData,NdataSize);
829   ret->userFlag=-1;
830   ret->reducer=reducer;
831   ret->ci=NULL;
832   ret->sourceFlag=-1000;
833   ret->gcount=0;
834   ret->migratableContributor = true;
835   return ret;
836 }
837
838 // Charm kernel message runtime support:
839 void *
840 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
841 {
842   int totalsize=ARM_DATASTART+(*sz);
843   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
844   CkReductionMsg *ret = (CkReductionMsg *)
845     CkAllocMsg(msgnum,totalsize,priobits);
846   ret->data=(void *)(&ret->dataStorage);
847   return (void *) ret;
848 }
849
850 void *
851 CkReductionMsg::pack(CkReductionMsg* in)
852 {
853   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
854   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
855   in->data = NULL;
856   return (void*) in;
857 }
858
859 CkReductionMsg* CkReductionMsg::unpack(void *in)
860 {
861   CkReductionMsg *ret = (CkReductionMsg *)in;
862   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
863   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
864   ret->data=(void *)(&ret->dataStorage);
865   return ret;
866 }
867
868
869 /////////////////////////////////////////////////////////////////////////////////////
870 ///////////////// Builtin Reducer Functions //////////////
871 /* A simple reducer, like sum_int, looks like this:
872 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
873 {
874   int i,ret=0;
875   for (i=0;i<nMsg;i++)
876     ret+=*(int *)(msg[i]->getData());
877   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
878 }
879
880 To keep the code small and easy to change, the implementations below
881 are built with preprocessor macros.
882 */
883
884 //////////////// simple reducers ///////////////////
885 /*A define used to quickly and tersely construct simple reductions.
886 The basic idea is to use the first message's data array as
887 (pre-initialized!) scratch space for folding in the other messages.
888  */
889
890 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
891 {
892         CkAbort("Called the invalid reducer type 0.  This probably\n"
893                 "means you forgot to initialize your custom reducer index.\n");
894         return NULL;
895 }
896
897 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
898 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
899 {\
900   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
901   int m,i;\
902   int nElem=msg[0]->getLength()/sizeof(dataType);\
903   dataType *ret=(dataType *)(msg[0]->getData());\
904   for (m=1;m<nMsg;m++)\
905   {\
906     dataType *value=(dataType *)(msg[m]->getData());\
907     for (i=0;i<nElem;i++)\
908     {\
909       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
910       loop\
911     }\
912   }\
913   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
914   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
915 }
916
917 //Use this macro for reductions that have the same type for all inputs
918 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
919   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
920   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
921   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
922
923
924 //Compute the sum the numbers passed by each element.
925 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
926
927 //Compute the product of the numbers passed by each element.
928 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
929
930 //Compute the largest number passed by any element.
931 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
932
933 //Compute the smallest integer passed by any element.
934 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
935
936
937 //Compute the logical AND of the integers passed by each element.
938 // The resulting integer will be zero if any source integer is zero; else 1.
939 SIMPLE_REDUCTION(logical_and,int,"%d",
940         if (value[i]==0)
941      ret[i]=0;
942   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
943 )
944
945 //Compute the logical OR of the integers passed by each element.
946 // The resulting integer will be 1 if any source integer is nonzero; else 0.
947 SIMPLE_REDUCTION(logical_or,int,"%d",
948   if (value[i]!=0)
949            ret[i]=1;
950   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
951 )
952
953 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
954 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
955
956 /////////////// concat ////////////////
957 /*
958 This reducer simply appends the data it recieves from each element,
959 without any housekeeping data to separate them.
960 */
961 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
962 {
963   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
964   //Figure out how big a message we'll need
965   int i,retSize=0;
966   for (i=0;i<nMsg;i++)
967       retSize+=msg[i]->getSize();
968
969   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
970
971   //Allocate a new message
972   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
973
974   //Copy the source message data into the return message
975   char *cur=(char *)(ret->getData());
976   for (i=0;i<nMsg;i++) {
977     int messageBytes=msg[i]->getSize();
978     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
979     cur+=messageBytes;
980   }
981   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
982   return ret;
983 }
984
985 /////////////// set ////////////////
986 /*
987 This reducer appends the data it recieves from each element
988 along with some housekeeping data indicating contribution boundaries.
989 The message data is thus a list of reduction_set_element structures
990 terminated by a dummy reduction_set_element with a sourceElement of -1.
991 */
992
993 //This rounds an integer up to the nearest multiple of sizeof(double)
994 static const int alignSize=sizeof(double);
995 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
996
997 //This gives the size (in bytes) of a reduction_set_element
998 static int SET_SIZE(int dataSize)
999 {return SET_ALIGN(sizeof(int)+dataSize);}
1000
1001 //This returns a pointer to the next reduction_set_element in the list
1002 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1003 {
1004   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1005   return (CkReduction::setElement *)next;
1006 }
1007
1008 //Combine the data passed by each element into an list of reduction_set_elements.
1009 // Each element may contribute arbitrary data (with arbitrary length).
1010 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1011 {
1012   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1013   //Figure out how big a message we'll need
1014   int i,retSize=0;
1015   for (i=0;i<nMsg;i++) {
1016     if (!msg[i]->isFromUser())
1017     //This message is composite-- it will just be copied over (less terminating -1)
1018       retSize+=(msg[i]->getSize()-sizeof(int));
1019     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1020       retSize+=SET_SIZE(msg[i]->getSize());
1021   }
1022   retSize+=sizeof(int);//Leave room for terminating -1.
1023
1024   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1025
1026   //Allocate a new message
1027   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1028
1029   //Copy the source message data into the return message
1030   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1031   for (i=0;i<nMsg;i++)
1032     if (!msg[i]->isFromUser())
1033     {//This message is composite-- just copy it over (less terminating -1)
1034                         int messageBytes=msg[i]->getSize()-sizeof(int);
1035                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1036                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1037                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1038     }
1039     else //This is a message from an element-- wrap it in a reduction_set_element
1040     {
1041       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1042       cur->dataSize=msg[i]->getSize();
1043       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1044       cur=SET_NEXT(cur);
1045     }
1046   cur->dataSize=-1;//Add a terminating -1.
1047   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1048   return ret;
1049 }
1050
1051 //Utility routine: get the next reduction_set_element in the list
1052 // if there is one, or return NULL if there are none.
1053 //To get all the elements, just keep feeding this procedure's output back to
1054 // its input until it returns NULL.
1055 CkReduction::setElement *CkReduction::setElement::next(void)
1056 {
1057   CkReduction::setElement *n=SET_NEXT(this);
1058   if (n->dataSize==-1)
1059     return NULL;//This is the end of the list
1060   else
1061     return n;//This is just another element
1062 }
1063
1064 /////////////////// Reduction Function Table /////////////////////
1065 CkReduction::CkReduction() {} //Dummy private constructor
1066
1067 //Add the given reducer to the list.  Returns the new reducer's
1068 // reducerType.  Must be called in the same order on every node.
1069 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1070 {
1071   reducerTable[nReducers]=fn;
1072   return (reducerType)nReducers++;
1073 }
1074
1075 /*Reducer table: maps reducerTypes to reducerFns.
1076 It's indexed by reducerType, so the order in this table
1077 must *exactly* match the reducerType enum declaration.
1078 The names don't have to match, but it helps.
1079 */
1080 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1081
1082 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1083     ::invalid_reducer,
1084   //Compute the sum the numbers passed by each element.
1085     ::sum_int,::sum_float,::sum_double,
1086
1087   //Compute the product the numbers passed by each element.
1088     ::product_int,::product_float,::product_double,
1089
1090   //Compute the largest number passed by any element.
1091     ::max_int,::max_float,::max_double,
1092
1093   //Compute the smallest number passed by any element.
1094     ::min_int,::min_float,::min_double,
1095
1096   //Compute the logical AND of the integers passed by each element.
1097   // The resulting integer will be zero if any source integer is zero.
1098     ::logical_and,
1099
1100   //Compute the logical OR of the integers passed by each element.
1101   // The resulting integer will be 1 if any source integer is nonzero.
1102     ::logical_or,
1103
1104     // Compute the logical bitvector AND of the integers passed by each element.
1105     ::bitvec_and,
1106
1107     // Compute the logical bitvector OR of the integers passed by each element.
1108     ::bitvec_or,
1109
1110   //Concatenate the (arbitrary) data passed by each element
1111     ::concat,
1112
1113   //Combine the data passed by each element into an list of setElements.
1114   // Each element may contribute arbitrary data (with arbitrary length).
1115     ::set
1116 };
1117
1118
1119
1120
1121
1122
1123
1124
1125 /********** Code added by Sayantan *********************/
1126 /** Locking is a big problem in the nodegroup code for smp.
1127  So a few assumptions have had to be made. There is one lock
1128  called lockEverything. It protects all the data structures 
1129  of the nodegroup reduction mgr. I tried grabbing it separately 
1130  for each datastructure, modifying it and then releasing it and
1131  then grabbing it again, for the next change.
1132  That doesn't really help because the interleaved execution of 
1133  different threads makes the state of the reduction manager 
1134  inconsistent. 
1135  
1136  1. Grab lockEverything before calling finishreduction or startReduction
1137     or doRecvMsg
1138  2. lockEverything is grabbed only in entry methods reductionStarting
1139     or RecvMesg or  addcontribution.
1140  ****/
1141  
1142 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1143 NodeGroup::NodeGroup(void) {
1144   __nodelock=CmiCreateLock();
1145
1146
1147 }
1148 NodeGroup::~NodeGroup() {
1149   CmiDestroyLock(__nodelock);
1150 }
1151 void NodeGroup::pup(PUP::er &p)
1152 {
1153   CkNodeReductionMgr::pup(p);
1154   p|reductionInfo;
1155 }
1156
1157 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1158
1159 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1160   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1161  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1162   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1163  }
1164
1165 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1166                                     ((CkNodeReductionMgr *)this),
1167                                     reductionInfo,false);
1168
1169 /* this contribute also adds up the count across all messages it receives.
1170   Useful for summing up number of array elements who have contributed ****/ 
1171 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1172         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1173
1174
1175
1176 //#define BINOMIAL_TREE
1177
1178 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1179   : thisProxy(thisgroup)
1180 {
1181 #ifdef BINOMIAL_TREE
1182   init_BinomialTree();
1183 #else
1184         init_BinaryTree();
1185 #endif
1186   storedCallback=NULL;
1187   redNo=0;
1188   inProgress=CmiFalse;
1189   
1190   startRequested=CmiFalse;
1191   gcount=CkNumNodes();
1192   lcount=1;
1193   nContrib=nRemote=0;
1194   lockEverything = CmiCreateLock();
1195
1196
1197   creating=CmiFalse;
1198   interrupt = 0;
1199   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1200         /*
1201                 FAULT_EVAC
1202         */
1203         blocked = false;
1204         maxModificationRedNo = INT_MAX;
1205         killed=0;
1206         additionalGCount = newAdditionalGCount = 0;
1207 }
1208
1209 void CkNodeReductionMgr::flushStates()
1210 {
1211  if(CkMyRank() == 0){
1212   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1213   redNo=0;
1214   inProgress=CmiFalse;
1215
1216   startRequested=CmiFalse;
1217   gcount=CkNumNodes();
1218   lcount=1;
1219   nContrib=nRemote=0;
1220
1221   creating=CmiFalse;
1222   interrupt = 0;
1223   while (!msgs.isEmpty()) { delete msgs.deq(); }
1224   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1225   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1226   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1227   }
1228 }
1229
1230 //////////// Reduction Manager Client API /////////////
1231
1232 //Add the given client function.  Overwrites any previous client.
1233 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1234 {
1235   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1236   if(cb->isInvalid()){
1237         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1238   }else{
1239         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1240   }
1241
1242   if (CkMyNode()!=0)
1243           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1244   delete storedCallback;
1245   storedCallback=cb;
1246 }
1247
1248
1249
1250 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1251 {
1252
1253   m->ci=ci;
1254   m->redNo=ci->redNo++;
1255   m->sourceFlag=-1;//A single contribution
1256   m->gcount=0;
1257 #if DEBUGRED
1258         CkPrintf("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo);
1259 #endif
1260   addContribution(m);
1261
1262 }
1263
1264
1265 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1266 {
1267   m->ci=ci;
1268   m->redNo=ci->redNo++;
1269   m->gcount=count;
1270 #if DEBUGRED
1271  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1272 #endif
1273   addContribution(m);
1274 #if DEBUGRED
1275   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1276 #endif
1277
1278 }
1279
1280
1281 //////////// Reduction Manager Remote Entry Points /////////////
1282
1283 //Sent down the reduction tree (used by barren PEs)
1284 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1285 {
1286   CmiLock(lockEverything);
1287         if(blocked){
1288                 delete m;
1289         CmiUnlock(lockEverything);
1290                 return ;
1291         }
1292         int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1293   if (isPresent(m->num) && !inProgress)
1294   {
1295     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1296     startReduction(m->num,srcNode);
1297     finishReduction();
1298   } else if (isFuture(m->num)){
1299         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1300   }
1301   else //is Past
1302     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1303   CmiUnlock(lockEverything);
1304   delete m;
1305
1306 }
1307
1308
1309 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1310 #if DEBUGRED
1311         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1312 #endif
1313         /*
1314                 FAULT_EVAC
1315         */
1316         if(blocked){
1317                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1318                 bufferedRemoteMsgs.enq(m);
1319                 return;
1320         }
1321         
1322         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1323             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1324             startReduction(m->redNo,CkMyNode());
1325             msgs.enq(m);
1326             nRemote++;
1327             finishReduction();
1328         }
1329         else {
1330             if (isFuture(m->redNo)) {
1331                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1332                     futureRemoteMsgs.enq(m);
1333             }else{
1334                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1335                    CkAbort("Recv'd late remote contribution!\n");
1336             }
1337   }
1338 #if DEBUGRED        
1339        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1340 #endif       
1341 }
1342
1343 //Sent up the reduction tree with reduced data
1344 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1345 {
1346 #ifndef CMK_CPV_IS_SMP
1347 #if CMK_IMMEDIATE_MSG
1348         if(interrupt == 1){
1349                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1350                 CpvAccess(_qd)->process(-1);
1351                 CmiDelayImmediate();
1352                 return;
1353         }
1354 #endif  
1355 #endif
1356    interrupt = 1;       
1357    CmiLock(lockEverything);   
1358 #if DEBUGRED   
1359    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1360 #endif   
1361    doRecvMsg(m);
1362    CmiUnlock(lockEverything);    
1363    interrupt = 0;
1364 #if DEBUGRED  
1365    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1366 #endif 
1367 }
1368
1369 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1370 {
1371         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1372         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1373         if (inProgress){
1374                 DEBR((AA"This Node reduction is already in progress\n"AB));
1375                 return;//This reduction already started
1376         }
1377         if (creating) //Don't start yet-- we're creating elements
1378         {
1379                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1380                 startRequested=CmiTrue;
1381                 return;
1382         }
1383         
1384         //If none of these cases, we need to start the reduction--
1385         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1386         inProgress=CmiTrue;
1387         //Sent start requests to our kids (in case they don't already know)
1388
1389         for (int k=0;k<treeKids();k++)
1390         {
1391 #ifdef BINOMIAL_TREE
1392                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1393                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1394 #else
1395                 if(kids[k] != srcNode){
1396                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1397                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1398                 }       
1399 #endif
1400         }
1401         startLocalGroupReductions(number);
1402 }
1403
1404 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1405         /*
1406                 FAULT_EVAC
1407         */
1408         if(blocked){
1409                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1410                 bufferedMsgs.enq(m);
1411                 return;
1412         }
1413         
1414         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1415                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1416                 futureMsgs.enq(m);
1417         } else {// An ordinary contribution
1418                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1419                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1420                 startReduction(m->redNo,CkMyNode());
1421                 msgs.enq(m);
1422                 nContrib++;
1423                 finishReduction();
1424         }
1425 }
1426
1427 //Handle a message from one element for the reduction
1428 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1429 {
1430    interrupt = 1;
1431    CmiLock(lockEverything);
1432    doAddContribution(m);
1433   CmiUnlock(lockEverything);
1434   interrupt = 0;
1435 }
1436
1437 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1438         if(blocked){
1439                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1440                 bufferedMsgs.enq(m);
1441                 return;
1442         }
1443         
1444         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1445                 DEBR((AA"Latemigrant %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1446 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1447                 futureLateMigrantMsgs.enq(m);
1448         } else {// An ordinary contribution
1449                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1450 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1451                 msgs.enq(m);
1452                 finishReduction();
1453         }
1454 };
1455
1456
1457
1458
1459
1460 /** check if the nodegroup reduction is finished at this node. In that case send it
1461 up the reduction tree **/
1462
1463 void CkNodeReductionMgr::finishReduction(void)
1464 {
1465   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1466   /***Check if reduction is finished in the next few ifs***/
1467   if ((!inProgress) | creating){
1468         DEBR((AA"Either not in Progress or creating\n"AB));
1469         return;
1470   }
1471
1472   if (nContrib<(lcount)){
1473         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1474
1475          return;//Need more local messages
1476   }
1477   if (nRemote<treeKids()){
1478         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1479
1480         return;//Need more remote messages
1481   }
1482   if (nRemote>treeKids()){
1483
1484           interrupt = 0;
1485            CkAbort("Nodegrp Excess remote reduction message received!\n");
1486   }
1487
1488   DEBR((AA"Reducing node data...\n"AB));
1489
1490   /**reduce all messages received at this node **/
1491   CkReductionMsg *result=reduceMessages();
1492
1493   if (hasParent())
1494   {//Pass data up tree to parent
1495         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1496         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1497 #if DEBUGRED
1498         CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
1499 #endif
1500                 /*
1501                         FAULT_EVAC
1502                 */
1503                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1504             thisProxy[treeParent()].RecvMsg(result);
1505                 }
1506
1507   }
1508   else
1509   {
1510                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1511 #if DEBUGRED
1512                         CkPrintf("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor());
1513 #endif                  
1514                         msgs.enq(result);
1515                         return;
1516                 }
1517                 result->gcount += additionalGCount;
1518           /** if the reduction is finished and I am the root of the reduction tree
1519           then call the reductionhandler and other stuff ***/
1520                 
1521
1522 #if DEBUGRED
1523    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
1524 #endif
1525     if (!result->callback.isInvalid()){
1526 #if DEBUGRED
1527             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
1528 #endif      
1529             result->callback.send(result);
1530     }
1531     else if (storedCallback!=NULL){
1532 #if DEBUGRED
1533             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
1534 #endif
1535             storedCallback->send(result);
1536     }
1537     else{
1538                 DEBR((AA"Invalid Callback \n"AB));
1539             CkAbort("No reduction client!\n"
1540                     "You must register a client with either SetReductionClient or during contribute.\n");
1541                 }
1542   }
1543
1544   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1545   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1546   redNo++;
1547         updateTree();
1548   int i;
1549   inProgress=CmiFalse;
1550   startRequested=CmiFalse;
1551   nRemote=nContrib=0;
1552
1553   //Look through the future queue for messages we can now handle
1554   int n=futureMsgs.length();
1555
1556   for (i=0;i<n;i++)
1557   {
1558     interrupt = 1;
1559
1560     CkReductionMsg *m=futureMsgs.deq();
1561
1562     interrupt = 0;
1563     if (m!=NULL){ //One of these addContributions may have finished us.
1564 #if DEBUGRED
1565                 CkPrintf("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1566 #endif
1567       doAddContribution(m);//<- if *still* early, puts it back in the queue
1568     }
1569   }
1570
1571   interrupt = 1;
1572
1573   n=futureRemoteMsgs.length();
1574
1575   interrupt = 0;
1576   for (i=0;i<n;i++)
1577   {
1578     interrupt = 1;
1579
1580     CkReductionMsg *m=futureRemoteMsgs.deq();
1581
1582     interrupt = 0;
1583     if (m!=NULL)
1584       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1585   }
1586   
1587   n = futureLateMigrantMsgs.length();
1588   for(i=0;i<n;i++){
1589     CkReductionMsg *m = futureLateMigrantMsgs.deq();
1590     if(m != NULL){
1591       if(m->redNo == redNo){
1592         msgs.enq(m);
1593       }else{
1594         futureLateMigrantMsgs.enq(m);
1595       }
1596     }
1597   }
1598 }
1599
1600 //////////// Reduction Manager Utilities /////////////
1601
1602 void CkNodeReductionMgr::init_BinaryTree(){
1603         parent = (CkMyNode()-1)/TREE_WID;
1604         int firstkid = CkMyNode()*TREE_WID+1;
1605         numKids=CkNumNodes()-firstkid;
1606   if (numKids>TREE_WID) numKids=TREE_WID;
1607   if (numKids<0) numKids=0;
1608
1609         for(int i=0;i<numKids;i++){
1610                 kids.push_back(firstkid+i);
1611                 newKids.push_back(firstkid+i);
1612         }
1613 }
1614
1615 void CkNodeReductionMgr::init_BinomialTree(){
1616         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
1617         /*upperSize = (unsigned )pow((double)2,depth);*/
1618         upperSize = (unsigned) 1 << depth;
1619         label = upperSize-CkMyNode()-1;
1620         int p=label;
1621         int count=0;
1622         while( p > 0){
1623                 if(p % 2 == 0)
1624                         break;
1625                 else{
1626                         p = p/2;
1627                         count++;
1628                 }
1629         }
1630         /*parent = label + rint(pow((double)2,count));*/
1631         parent = label + (1<<count);
1632         parent = upperSize -1 -parent;
1633         int temp;
1634         if(count != 0){
1635                 numKids = 0;
1636                 for(int i=0;i<count;i++){
1637                         /*temp = label - rint(pow((double)2,i));*/
1638                         temp = label - (1<<i);
1639                         temp = upperSize-1-temp;
1640                         if(temp <= CkNumNodes()-1){
1641                 //              kids[numKids] = temp;
1642                                 kids.push_back(temp);
1643                                 numKids++;
1644                         }
1645                 }
1646         }else{
1647                 numKids = 0;
1648         //      kids = NULL;
1649         }
1650 }
1651
1652
1653 int CkNodeReductionMgr::treeRoot(void)
1654 {
1655   return 0;
1656 }
1657 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
1658 {
1659   return (CmiBool)(CkMyNode()!=treeRoot());
1660 }
1661 int CkNodeReductionMgr::treeParent(void) //My parent Node
1662 {
1663 #ifdef BINOMIAL_TREE
1664         return parent;
1665 #else
1666   return parent;
1667 #endif
1668 }
1669
1670 int CkNodeReductionMgr::firstKid(void) //My first child Node
1671 {
1672   return CkMyNode()*TREE_WID+1;
1673 }
1674 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
1675 {
1676 #ifdef BINOMIAL_TREE
1677         return numKids;
1678 #else
1679 /*  int nKids=CkNumNodes()-firstKid();
1680   if (nKids>TREE_WID) nKids=TREE_WID;
1681   if (nKids<0) nKids=0;
1682   return nKids;*/
1683         return numKids;
1684 #endif
1685 }
1686
1687 //Combine (& free) the current message vector msgs.
1688 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
1689 {
1690   CkReductionMsg *ret=NULL;
1691
1692   //Look through the vector for a valid reducer, swapping out placeholder messages
1693   CkReduction::reducerType r=CkReduction::invalid;
1694   int msgs_gcount=0;//Reduced gcount
1695   int msgs_nSources=0;//Reduced nSources
1696   int msgs_userFlag=-1;
1697   CkCallback msgs_callback;
1698   CkCallback msgs_secondaryCallback;
1699   int i;
1700   int nMsgs=0;
1701   CkReductionMsg *m;
1702   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
1703         bool isMigratableContributor;
1704         
1705
1706   while(NULL!=(m=msgs.deq()))
1707   {
1708     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
1709     msgs_gcount+=m->gcount;
1710     if (m->sourceFlag!=0)
1711     { //This is a real message from an element, not just a placeholder
1712       msgArr[nMsgs++]=m;
1713       msgs_nSources+=m->nSources();
1714       r=m->reducer;
1715       if (!m->callback.isInvalid())
1716         msgs_callback=m->callback;
1717       if(!m->secondaryCallback.isInvalid()){
1718         msgs_secondaryCallback = m->secondaryCallback;
1719       }
1720       if (m->userFlag!=-1)
1721         msgs_userFlag=m->userFlag;
1722
1723                         isMigratableContributor= m->isMigratableContributor();
1724                                 
1725     }
1726     else
1727     { //This is just a placeholder message-- replace it
1728       delete m;
1729     }
1730   }
1731
1732   if (nMsgs==0||r==CkReduction::invalid)
1733   //No valid reducer in the whole vector
1734     ret=CkReductionMsg::buildNew(0,NULL);
1735   else
1736   {//Use the reducer to reduce the messages
1737                 if(nMsgs == 1){
1738                         ret = msgArr[0];
1739                 }else{
1740             CkReduction::reducerFn f=CkReduction::reducerTable[r];
1741           ret=(*f)(nMsgs,msgArr);
1742                 }
1743     ret->reducer=r;
1744   }
1745         
1746         //Go back through the vector, deleting old messages
1747   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
1748   delete [] msgArr;
1749   //Set the message counts
1750   ret->redNo=redNo;
1751   ret->gcount=msgs_gcount;
1752   ret->userFlag=msgs_userFlag;
1753   ret->callback=msgs_callback;
1754   ret->secondaryCallback = msgs_secondaryCallback;
1755   ret->sourceFlag=msgs_nSources;
1756         ret->setMigratableContributor(isMigratableContributor);
1757   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
1758
1759   return ret;
1760 }
1761
1762 void CkNodeReductionMgr::pup(PUP::er &p)
1763 {
1764 //We do not store the client function pointer or the client function parameter,
1765 //it is the responsibility of the programmer to correctly restore these
1766   IrrGroup::pup(p);
1767   p(redNo);
1768   p(inProgress); p(creating); p(startRequested);
1769   p(lcount);
1770   p(nContrib); p(nRemote);
1771   p(interrupt);
1772   p|msgs;
1773   p|futureMsgs;
1774   p|futureRemoteMsgs;
1775   p|futureLateMigrantMsgs;
1776         p|parent;
1777   if(p.isUnpacking()) {
1778     gcount=CkNumNodes();
1779     thisProxy = thisgroup;
1780     lockEverything = CmiCreateLock();
1781 #ifdef BINOMIAL_TREE
1782                 init_BinomialTree();
1783 #else
1784                 init_BinaryTree();
1785 #endif          
1786   }
1787         p | blocked;
1788         p | maxModificationRedNo;
1789 }
1790
1791 /*
1792         FAULT_EVAC
1793         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
1794         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
1795         reduction tree. 
1796 */
1797 void CkNodeReductionMgr::evacuate(){
1798         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
1799         if(treeKids() == 0){
1800         /*
1801                 if the node going down is a leaf
1802         */
1803                 oldleaf=true;
1804                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
1805                 /*
1806                         Need to ask parent for the reduction number that it has seen. 
1807                         Since it is a leaf, the tree does not need to be rewired. 
1808                         We reuse the oldparent type of tree modification message to get 
1809                         the parent to block and tell us about the highest reduction number it has seen.
1810                         
1811                 */
1812                 int data[2];
1813                 data[0]=CkMyNode();
1814                 data[1]=getTotalGCount()+additionalGCount;
1815                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
1816                 newParent = treeParent();
1817         }else{
1818                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
1819                 oldleaf= false;
1820         /*
1821                 It is not a leaf. It needs to rewire the tree around itself.
1822                 It also needs to decide on a reduction No after which the new tree will be used
1823                 Till it decides on the new tree and the redNo at which it becomes valid,
1824                 all received messages will be buffered
1825         */
1826                 newParent = kids[0];
1827                 for(int i=numKids-1;i>=0;i--){
1828                         newKids.remove(i);
1829                 }
1830                 /*
1831                         Ask everybody for the highest reduction number they have seen and
1832                         also tell them about the new tree
1833                 */
1834                 /*
1835                         Tell parent about its new child;
1836                 */
1837                 int oldParentData[2];
1838                 oldParentData[0] = CkMyNode();
1839                 oldParentData[1] = newParent;
1840                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
1841
1842                 /*
1843                         Tell the other children about their new parent
1844                 */
1845                 int childrenData=newParent;
1846                 for(int i=1;i<numKids;i++){
1847                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
1848                 }
1849                 
1850                 /*
1851                         Tell newParent (1st child) about its new children,
1852                         the current node and its children except the newParent
1853                 */
1854                 int *newParentData = new int[numKids+2];
1855                 for(int i=1;i<numKids;i++){
1856                         newParentData[i] = kids[i];
1857                 }
1858                 newParentData[0] = CkMyNode();
1859                 newParentData[numKids] = parent;
1860                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
1861                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
1862         }
1863         readyDeletion = false;
1864         blocked = true;
1865         numModificationReplies = 0;
1866         tempModificationRedNo = findMaxRedNo();
1867 }
1868
1869 /*
1870         Depending on the code, use the data to change the tree
1871         1. OLDPARENT : replace the old child with a new one
1872         2. OLDCHILDREN: replace the parent
1873         3. NEWPARENT:  add the children and change the parent
1874         4. LEAFPARENT: delete the old child
1875 */
1876
1877 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
1878         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
1879         int sender;
1880         newKids = kids;
1881         readyDeletion = false;
1882         newAdditionalGCount = additionalGCount;
1883         switch(code){
1884                 case OLDPARENT: 
1885                         for(int i=0;i<numKids;i++){
1886                                 if(newKids[i] == data[0]){
1887                                         newKids[i] = data[1];
1888                                         break;
1889                                 }
1890                         }
1891                         sender = data[0];
1892                         newParent = parent;
1893                         break;
1894                 case OLDCHILDREN:
1895                         newParent = data[0];
1896                         sender = parent;
1897                         break;
1898                 case NEWPARENT:
1899                         for(int i=0;i<size-2;i++){
1900                                 newKids.push_back(data[i]);
1901                         }
1902                         newParent = data[size-2];
1903                         newAdditionalGCount += data[size-1];
1904                         sender = parent;
1905                         break;
1906                 case LEAFPARENT:
1907                         for(int i=0;i<numKids;i++){
1908                                 if(newKids[i] == data[0]){
1909                                         newKids.remove(i);
1910                                         break;
1911                                 }
1912                         }
1913                         sender = data[0];
1914                         newParent = parent;
1915                         newAdditionalGCount += data[1];
1916                         break;
1917         };
1918         blocked = true;
1919         int maxRedNo = findMaxRedNo();
1920         
1921         thisProxy[sender].collectMaxRedNo(maxRedNo);
1922 }
1923
1924 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
1925         /*
1926                 Find out the maximum redNo that has been seen by 
1927                 the affected nodes
1928         */
1929         numModificationReplies++;
1930         if(maxRedNo > tempModificationRedNo){
1931                 tempModificationRedNo = maxRedNo;
1932         }
1933         if(numModificationReplies == numKids+1){
1934                 maxModificationRedNo = tempModificationRedNo;
1935                 /*
1936                         when all the affected nodes have replied, tell them the maximum.
1937                         Unblock yourself. deal with the buffered messages local and remote
1938                 */
1939                 if(maxModificationRedNo == -1){
1940                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
1941                 }else{
1942                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
1943                 }
1944                 thisProxy[parent].unblockNode(maxModificationRedNo);
1945                 for(int i=0;i<numKids;i++){
1946                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
1947                 }
1948                 blocked = false;
1949                 updateTree();
1950                 clearBlockedMsgs();
1951         }
1952 };
1953
1954 void CkNodeReductionMgr::unblockNode(int maxRedNo){
1955         maxModificationRedNo = maxRedNo;
1956         updateTree();
1957         blocked = false;
1958         clearBlockedMsgs();
1959 }
1960
1961
1962 void CkNodeReductionMgr::clearBlockedMsgs(){
1963         int len = bufferedMsgs.length();
1964         for(int i=0;i<len;i++){
1965                 CkReductionMsg *m = bufferedMsgs.deq();
1966                 doAddContribution(m);
1967         }
1968         len = bufferedRemoteMsgs.length();
1969         for(int i=0;i<len;i++){
1970                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
1971                 doRecvMsg(m);
1972         }
1973
1974 }
1975 /*
1976         if the reduction number exceeds the maxModificationRedNo, change the tree
1977         to become the new one
1978 */
1979
1980 void CkNodeReductionMgr::updateTree(){
1981         if(redNo > maxModificationRedNo){
1982                 parent = newParent;
1983                 kids = newKids;
1984                 maxModificationRedNo = INT_MAX;
1985                 numKids = kids.size();
1986                 readyDeletion = true;
1987                 additionalGCount = newAdditionalGCount;
1988                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
1989                 for(int i=0;i<newKids.size();i++){
1990                         DEBREVAC(("%d ",newKids[i]));
1991                 }
1992                 DEBREVAC(("\n"));
1993         //      startReduction(redNo,CkMyNode());
1994         }else{
1995                 if(maxModificationRedNo != INT_MAX){
1996                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
1997                         startReduction(redNo,CkMyNode());
1998                         finishReduction();
1999                 }       
2000         }
2001 }
2002
2003
2004 void CkNodeReductionMgr::doneEvacuate(){
2005         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2006 /*      if(oldleaf){
2007                 
2008                         It used to be a leaf
2009                         Then as soon as future messages have been emptied you can 
2010                         send the parent a message telling them that they are not going
2011                         to receive anymore messages from this child
2012                 
2013                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2014                 while(futureMsgs.length() != 0){
2015                         int n = futureMsgs.length();
2016                         for(int i=0;i<n;i++){
2017                                 CkReductionMsg *m = futureMsgs.deq();
2018                                 if(isPresent(m->redNo)){
2019                                         msgs.enq(m);
2020                                 }else{
2021                                         futureMsgs.enq(m);
2022                                 }
2023                         }
2024                         CkReductionMsg *result = reduceMessages();
2025                         thisProxy[treeParent()].RecvMsg(result);
2026                         redNo++;
2027                 }
2028                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2029                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2030         }else{*/
2031                 if(readyDeletion){
2032                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2033                 }else{
2034                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2035                 }
2036 //      }
2037 }
2038
2039 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2040         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2041         for(int i=0;i<numKids;i++){
2042                 if(kids[i] == deletedChild){
2043                         kids.remove(i);
2044                         break;
2045                 }
2046         }
2047         numKids = kids.length();
2048         finishReduction();
2049 }
2050
2051 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2052         for(int i=0;i<newKids.length();i++){
2053                 if(newKids[i] == deletedChild){
2054                         newKids.remove(i);
2055                         break;
2056                 }
2057         }
2058         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2059         for(int i=0;i<newKids.size();i++){
2060                 DEBREVAC(("%d ",newKids[i]));
2061         }
2062         DEBREVAC(("\n"));
2063         finishReduction();
2064 }
2065
2066 int CkNodeReductionMgr::findMaxRedNo(){
2067         int max = redNo;
2068         for(int i=0;i<futureRemoteMsgs.length();i++){
2069                 if(futureRemoteMsgs[i]->redNo  > max){
2070                         max = futureRemoteMsgs[i]->redNo;
2071                 }
2072         }
2073         /*
2074                 if redNo is max (that is no future message) and the current reduction has not started
2075                 then tree can be changed before the reduction redNo can be started
2076         */ 
2077         if(redNo == max && msgs.length() == 0){
2078                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2079                 max--;
2080         }
2081         return max;
2082 }
2083
2084 #include "CkReduction.def.h"