a better fix.
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #if 0
54 //Debugging messages:
55 // Reduction mananger internal information:
56 #define DEBUGRED 1
57 #define DEBR(x) CkPrintf x
58 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
59 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
60
61 #define DEBN(x) CkPrintf x
62 #define AAN "Red Node%d "
63 #define ABN ,CkMyNode()
64
65 // For status and data messages from the builtin reducer functions.
66 #define RED_DEB(x) //CkPrintf x
67 #define DEBREVAC(x) CkPrintf x
68 #else
69 //No debugging info-- empty defines
70 #define DEBUGRED 0
71 #define DEBR(x) //CkPrintf x
72 #define DEBN(x) //CkPrintf x
73 #define RED_DEB(x) //CkPrintf x
74 #define DEBREVAC(x) //CkPrintf x
75 #endif
76
77 #ifndef INT_MAX
78 #define INT_MAX 2147483647
79 #endif
80
81 extern int _inrestart;
82
83 Group::Group()
84 {
85         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
86
87         creatingContributors();
88         contributorStamped(&reductionInfo);
89         contributorCreated(&reductionInfo);
90         doneCreatingContributors();
91 #if DEBUGRED
92         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
93 #endif                  
94         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
95         nodeProxy = nodetemp;
96 }
97
98 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg)
99 {
100         creatingContributors();
101         contributorStamped(&reductionInfo);
102         contributorCreated(&reductionInfo);
103         doneCreatingContributors();
104 }
105
106 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
107                                     ((CkReductionMgr *)this),
108                                     reductionInfo,false);
109 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid));
110
111
112
113 CkGroupInitCallback::CkGroupInitCallback(void) {}
114 /*
115 The callback is just used to tell the caller that this group
116 has been constructed.  (Now they can safely call CkLocalBranch)
117 */
118 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
119 {
120   m->call();
121   delete m;
122 }
123
124 /*
125 The callback is just used to tell the caller that this group
126 is constructed and ready to process other calls.
127 */
128 CkGroupReadyCallback::CkGroupReadyCallback(void)
129 {
130   _isReady = 0;
131 }
132 void
133 CkGroupReadyCallback::callBuffered(void)
134 {
135   int n = _msgs.length();
136   for(int i=0;i<n;i++)
137   {
138     CkGroupCallbackMsg *msg = _msgs.deq();
139     msg->call();
140     delete msg;
141   }
142 }
143 void
144 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
145 {
146   if(_isReady) {
147     msg->call();
148     delete msg;
149   } else {
150     _msgs.enq(msg);
151   }
152 }
153
154 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
155         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
156 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
157 {
158         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
159         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
160         b->fn(b->param,m->getSize(),m->getData());
161         delete m;
162 }
163
164 ///////////////// Reduction Manager //////////////////
165 /*
166 One CkReductionMgr runs a non-overlapping set of reductions.
167 It collects messages from all local contributors, then sends
168 the reduced message up the reduction tree to node zero, where
169 they're passed to the user's client function.
170 */
171
172 CkReductionMgr::CkReductionMgr()//Constructor
173   : thisProxy(thisgroup)
174
175   redNo=0;
176   completedRedNo = -1;
177   inProgress=CmiFalse;
178   creating=CmiFalse;
179   startRequested=CmiFalse;
180   gcount=lcount=0;
181   nContrib=nRemote=0;
182   maxStartRequest=0;
183   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
184 }
185
186 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
187 {
188   gcount=lcount=0;
189 }
190
191 void CkReductionMgr::flushStates()
192 {
193   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
194   redNo=0;
195   completedRedNo = -1;
196   inProgress=CmiFalse;
197   creating=CmiFalse;
198   gcount=lcount=0;
199   startRequested=CmiFalse;
200   nContrib=nRemote=0;
201   maxStartRequest=0;
202
203   while (!msgs.isEmpty()) { delete msgs.deq(); }
204   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
205   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
206   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
207
208   adjVec.length()=0;
209
210   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
211 }
212
213 //////////// Reduction Manager Client API /////////////
214
215 //Add the given client function.  Overwrites any previous client.
216 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
217 {
218   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
219
220   if (CkMyPe()!=0)
221           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
222   storedCallback=*cb;
223   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
224   nodeProxy.ckSetReductionClient(callback);
225 }
226
227 ///////////////////////////// Contributor ////////////////////////
228 //Contributors keep a copy of this structure:
229
230 /*Contributor migration support:
231 */
232 void contributorInfo::pup(PUP::er &p)
233 {
234   p(redNo);
235 }
236
237 ////////////////////// Contributor list maintainance: /////////////////
238 //These just set and clear the "creating" flag to prevent
239 // reductions from finishing early because not all elements
240 // have been created.
241 void CkReductionMgr::creatingContributors(void)
242 {
243   DEBR((AA"Creating contributors...\n"AB));
244   creating=CmiTrue;
245 }
246 void CkReductionMgr::doneCreatingContributors(void)
247 {
248   DEBR((AA"Done creating contributors...\n"AB));
249   creating=CmiFalse;
250   if (startRequested) startReduction(redNo,CkMyPe());
251   finishReduction();
252 }
253
254 //A new contributor will be created
255 void CkReductionMgr::contributorStamped(contributorInfo *ci)
256 {
257   DEBR((AA"Contributor %p stamped\n"AB,ci));
258   //There is another contributor
259   gcount++;
260   if (inProgress)
261   {
262     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
263     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
264   } else
265     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
266 }
267
268 //A new contributor was actually created
269 void CkReductionMgr::contributorCreated(contributorInfo *ci)
270 {
271   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
272   //We've got another contributor
273   lcount++;
274   //He may not need to contribute to some of our reductions:
275   for (int r=redNo;r<ci->redNo;r++)
276     adj(r).lcount--;//He won't be contributing to r here
277 }
278
279 /*Don't expect any more contributions from this one.
280 This is rather horrifying because we now have to make
281 sure the global element count accurately reflects all the
282 contributions the element made before it died-- these may stretch
283 far into the future.  The adj() vector is what saves us here.
284 */
285 void CkReductionMgr::contributorDied(contributorInfo *ci)
286 {
287 #if CMK_MEM_CHECKPOINT
288   // ignore from listener if it is during restart from crash
289   if (CkInRestarting()) return;
290 #endif
291   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
292   //We lost a contributor
293   gcount--;
294
295   if (ci->redNo<redNo)
296   {//Must have been migrating during reductions-- root is waiting for his
297   // contribution, which will never come.
298     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
299     for (int r=ci->redNo;r<redNo;r++)
300       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
301   }
302
303   //Add to the global count for all his future messages (wherever they are)
304   int r;
305   for (r=redNo;r<ci->redNo;r++)
306   {//He already contributed to this reduction, but won't show up in global count.
307     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
308     adj(r).gcount++;
309   }
310
311   lcount--;
312   //He's already contributed to several reductions here
313   for (r=redNo;r<ci->redNo;r++)
314     adj(r).lcount++;//He'll be contributing to r here
315
316   finishReduction();
317 }
318
319 //Migrating away (note that global count doesn't change)
320 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
321 {
322   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
323   lcount--;//We lost a local
324   //He's already contributed to several reductions here
325   for (int r=redNo;r<ci->redNo;r++)
326     adj(r).lcount++;//He'll be contributing to r here
327
328   finishReduction();
329 }
330
331 //Migrating in (note that global count doesn't change)
332 void CkReductionMgr::contributorArriving(contributorInfo *ci)
333 {
334   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
335   lcount++;//We gained a local
336 #if CMK_MEM_CHECKPOINT
337   // ignore from listener if it is during restart from crash
338   // because the ci may be old.
339   if (CkInRestarting()) return;
340 #endif
341   //He has already contributed (elsewhere) to several reductions:
342   for (int r=redNo;r<ci->redNo;r++)
343     adj(r).lcount--;//He won't be contributing to r here
344
345 }
346
347 //Contribute-- the given msg can contain any data.  The reducerType
348 // field of the message must be valid.
349 // Each contributor must contribute exactly once to the each reduction.
350 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
351 {
352   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
353   m->ci=ci;
354   m->redNo=ci->redNo++;
355   m->sourceFlag=-1;//A single contribution
356   m->gcount=0;
357   addContribution(m);
358 }
359
360 //////////// Reduction Manager Remote Entry Points /////////////
361 //Sent down the reduction tree (used by barren PEs)
362 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
363 {
364  if(CkMyPe()==0){
365         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
366         //delete m;
367         //return;
368  }
369  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
370  int srcPE = (UsrToEnv(m))->getSrcPe();
371   if (isPresent(m->num) && !inProgress)
372   {
373     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
374     startReduction(m->num,srcPE);
375     finishReduction();
376   } else if (isFuture(m->num)){
377 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
378           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
379           if(maxStartRequest < m->num){
380                   maxStartRequest = m->num;
381           }
382  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
383           
384     }
385   else //is Past
386     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
387   delete m;
388 }
389
390 //Sent to root of reduction tree with reduction contribution
391 // of migrants that missed the main reduction.
392 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
393 {
394         m->secondaryCallback = m->callback;
395   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
396   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
397   nodeMgr->LateMigrantMsg(m);
398 /*      int len = finalMsgs.length();
399         finalMsgs.enq(m);
400 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
401         endArrayReduction();*/
402 }
403
404 //A late migrating contributor will never contribute to this reduction
405 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
406 {
407   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
408   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
409  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
410   adj(m->num).gcount--;//He won't be contributing to this one.
411   finishReduction();
412   delete m;
413 }
414
415 //////////// Reduction Manager State /////////////
416 void CkReductionMgr::startReduction(int number,int srcPE)
417 {
418   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
419   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
420   if (inProgress){
421         DEBR((AA"This reduction is already in progress\n"AB));
422         return;//This reduction already started
423   }
424   if (creating) //Don't start yet-- we're creating elements
425   {
426     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
427     startRequested=CmiTrue;
428     return;
429   }
430
431 //If none of these cases, we need to start the reduction--
432   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
433   inProgress=CmiTrue;
434   //Sent start requests to our kids (in case they don't already know)
435  
436
437         /*
438                 FAULT_EVAC
439         */
440   if(!CmiNodeAlive(CkMyPe())){
441         return;
442   }
443         
444   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
445   int temp;
446         /*
447   //making it a broadcast done only by PE 0
448   if(!hasParent()){
449                 temp = completedRedNo+1;
450                 for(int i=temp;i<=number;i++){
451                         for(int j=0;j<CkNumPes();j++){
452                                 if(j != CkMyPe() && j != srcPE){
453                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
454                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
455                                         }
456                                 }
457                         }
458                 }       
459         }       else{
460                 temp = number;
461         }*/
462 /*  if(!hasParent()){
463                 temp = completedRedNo+1;
464         }       else{
465                 temp = number;
466         }
467         for(int i=temp;i<=number;i++){
468         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
469                 if(hasParent()){
470           // kick-start your parent too ...
471                         if(treeParent() != srcPE){
472                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
473                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
474                                 }       
475                         }       
476                 }
477           for (int k=0;k<treeKids();k++)
478           {
479                         if(firstKid()+k != srcPE){
480                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
481                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
482                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
483                                 }       
484                         }       
485         }
486         }
487         */
488 }       
489
490 /*Handle a message from one element for the reduction*/
491 void CkReductionMgr::addContribution(CkReductionMsg *m)
492 {
493   if (isPast(m->redNo))
494   {//We've moved on-- forward late contribution straight to root
495     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
496    // if (!hasParent()) //Root moved on too soon-- should never happen
497    //   CkAbort("Late reduction contribution received at root!\n");
498     thisProxy[0].LateMigrantMsg(m);
499   }
500   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
501     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
502     futureMsgs.enq(m);
503   } else {// An ordinary contribution
504     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
505    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
506     startReduction(m->redNo,CkMyPe());
507     msgs.enq(m);
508     nContrib++;
509     finishReduction();
510   }
511 }
512
513 /**function checks if it has got all contributions that it is supposed to
514 get at this processor. If it is done it sends the reduced result to the local
515 nodegroup */
516 void CkReductionMgr::finishReduction(void)
517 {
518   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
519   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
520   if ((!inProgress) | creating){
521         DEBR((AA"Either not in Progress or creating\n"AB));
522         return;
523   }
524   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
525   if (nContrib<(lcount+adj(redNo).lcount)){
526         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
527          return;//Need more local messages
528   }
529   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
530   CkReductionMsg *result=reduceMessages();
531   result->redNo=redNo;
532   result->gcount+=gcount+adj(redNo).gcount;
533   result->secondaryCallback = result->callback;
534   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
535         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
536
537   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
538
539 #if DEBUGRED
540  // CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
541 #endif
542   
543   // Find our node reduction manager, and pass reduction to him:
544   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
545   nodeMgr->contributeArrayReduction(result);
546
547   //House Keeping Operations will have to check later what needs to be changed
548   redNo++;
549   //Shift the count adjustment vector down one slot (to match new redNo)
550   int i;
551
552   if(CkMyPe()!=0){
553         int i;
554         completedRedNo++;
555         for (i=1;i<adjVec.length();i++)
556            adjVec[i-1]=adjVec[i];
557         adjVec.length()--;  
558   }
559   inProgress=CmiFalse;
560   startRequested=CmiFalse;
561   nRemote=nContrib=0;
562
563   //Look through the future queue for messages we can now handle
564   int n=futureMsgs.length();
565   for (i=0;i<n;i++)
566   {
567     CkReductionMsg *m=futureMsgs.deq();
568     if (m!=NULL) //One of these addContributions may have finished us.
569       addContribution(m);//<- if *still* early, puts it back in the queue
570   }
571   if(maxStartRequest >= redNo){
572           startReduction(redNo,CkMyPe());
573           finishReduction();
574   }
575 }
576
577 //////////// Reduction Manager Utilities /////////////
578
579 //Return the countAdjustment struct for the given redNo:
580 countAdjustment &CkReductionMgr::adj(int number)
581 {
582   number-=completedRedNo;
583   number--;
584   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
585   //Pad the adjustment vector with zeros until it's at least number long
586   while (adjVec.length()<=number)
587     adjVec.push_back(countAdjustment());
588   return adjVec[number];
589 }
590
591 //Combine (& free) the current message vector msgs.
592 CkReductionMsg *CkReductionMgr::reduceMessages(void)
593 {
594   CkReductionMsg *ret=NULL;
595
596   //Look through the vector for a valid reducer, swapping out placeholder messages
597   CkReduction::reducerType r=CkReduction::invalid;
598   int msgs_gcount=0;//Reduced gcount
599   int msgs_nSources=0;//Reduced nSources
600   int msgs_userFlag=-1;
601   CkCallback msgs_callback;
602   int i;
603   int nMsgs=0;
604   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
605   CkReductionMsg *m;
606         bool isMigratableContributor;
607
608   // Copy message queue into msgArr, skipping placeholders:
609   while (NULL!=(m=msgs.deq()))
610   {
611     msgs_gcount+=m->gcount;
612     if (m->sourceFlag!=0)
613     { //This is a real message from an element, not just a placeholder
614       msgArr[nMsgs++]=m;
615       msgs_nSources+=m->nSources();
616       r=m->reducer;
617       if (!m->callback.isInvalid())
618         msgs_callback=m->callback;
619       if (m->userFlag!=-1)
620         msgs_userFlag=m->userFlag;
621                         
622                         isMigratableContributor=m->isMigratableContributor();
623     }
624     else
625     { //This is just a placeholder message-- forget it
626       delete m;
627     }
628   }
629
630   if (nMsgs==0||r==CkReduction::invalid)
631   //No valid reducer in the whole vector
632     ret=CkReductionMsg::buildNew(0,NULL);
633   else
634   {//Use the reducer to reduce the messages
635                 //if there is only one msg to be reduced just return that message
636                 if(nMsgs == 1){
637                         ret = msgArr[0];        
638                 }else{
639             CkReduction::reducerFn f=CkReduction::reducerTable[r];
640           ret=(*f)(nMsgs,msgArr);
641                 }
642     ret->reducer=r;
643   }
644
645         //Go back through the vector, deleting old messages
646   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
647         delete [] msgArr;
648
649   //Set the message counts
650   ret->redNo=redNo;
651   ret->gcount=msgs_gcount;
652   ret->userFlag=msgs_userFlag;
653   ret->callback=msgs_callback;
654   ret->sourceFlag=msgs_nSources;
655         ret->setMigratableContributor(isMigratableContributor);
656   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
657
658   return ret;
659 }
660
661
662 //Checkpointing utilities
663 //pack-unpack method for CkReductionMsg
664 //if packing pack the message and then unpack and return it
665 //if unpacking allocate memory for it read it off disk and then unapck
666 //and return it
667 void CkReductionMgr::pup(PUP::er &p)
668 {
669 //We do not store the client function pointer or the client function parameter,
670 //it is the responsibility of the programmer to correctly restore these
671   CkGroupInitCallback::pup(p);
672   p(redNo);
673   p(completedRedNo);
674   p(inProgress); p(creating); p(startRequested);
675   p(nContrib); p(nRemote);
676   p|msgs;
677   p|futureMsgs;
678   p|futureRemoteMsgs;
679   p|finalMsgs;
680   p|adjVec;
681   p|nodeProxy;
682   p|storedCallback;
683
684   // subtle --- Gengbin
685   // Group : CkReductionMgr
686   // CkArray: CkReductionMgr
687   // lcount/gcount in Group is set in Group constructor
688   // lcount/gcount in CkArray is not, it is set when array elements are created
689   // we can not pup because inserting array elems will add the counters again
690 //  p|lcount;
691 //  p|gcount;
692
693   if(p.isUnpacking()){
694     thisProxy = thisgroup;
695     maxStartRequest=0;
696   }
697 #if DEBUGRED
698   CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
699 #endif
700 }
701
702
703 //Callback for doing Reduction through NodeGroups added by Sayantan
704
705 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
706
707         int total = m->gcount+adj(m->redNo).gcount;
708         finalMsgs.enq(m);
709         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
710         adj(m->redNo).mainRecvd = 1;
711 #if DEBUGRED
712         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
713 #endif  
714         endArrayReduction();
715 }
716
717 void CkReductionMgr :: endArrayReduction(){
718         CkReductionMsg *ret=NULL;
719         int nMsgs=finalMsgs.length();
720         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
721         //Look through the vector for a valid reducer, swapping out placeholder messages
722         //CkPrintf("Length of Final Message %d \n",nMsgs);
723         CkReduction::reducerType r=CkReduction::invalid;
724         int msgs_gcount=0;//Reduced gcount
725         int msgs_nSources=0;//Reduced nSources
726         int msgs_userFlag=-1;
727         CkCallback msgs_callback;
728         CkCallback msgs_secondaryCallback;
729         CkVec<CkReductionMsg *> tempMsgs;
730         int i;
731         int numMsgs = 0;
732         for (i=0;i<nMsgs;i++)
733         {
734                 CkReductionMsg *m=finalMsgs.deq();
735                 if(m->redNo == completedRedNo +1){
736                         msgs_gcount+=m->gcount;
737                         if (m->sourceFlag!=0)
738                         { //This is a real message from an element, not just a placeholder
739                                 msgs_nSources+=m->nSources();
740                                 r=m->reducer;
741                                 if (!m->callback.isInvalid())
742                                 msgs_callback=m->callback;
743                                 if(!m->secondaryCallback.isInvalid())
744                                         msgs_secondaryCallback = m->secondaryCallback;
745                                 if (m->userFlag!=-1)
746                                         msgs_userFlag=m->userFlag;
747                                 tempMsgs.push_back(m);
748                         }
749                 }else{
750                         finalMsgs.enq(m);
751                 }
752
753         }
754         numMsgs = tempMsgs.length();
755 #if DEBUGRED
756         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
757 #endif  
758         if(numMsgs == 0){
759                 return;
760         }
761         if(adj(completedRedNo+1).mainRecvd == 0){
762                 for(i=0;i<numMsgs;i++){
763                         finalMsgs.enq(tempMsgs[i]);
764                 }
765                 return;
766         }
767
768 /*
769         NOT NEEDED ANYMORE DONE at nodegroup level
770         if(msgs_gcount  > msgs_nSources){
771                 for(i=0;i<numMsgs;i++){
772                         finalMsgs.enq(tempMsgs[i]);
773                 }
774                 return;
775         }*/
776
777         if (nMsgs==0||r==CkReduction::invalid)
778                 //No valid reducer in the whole vector
779                 ret=CkReductionMsg::buildNew(0,NULL);
780         else{//Use the reducer to reduce the messages
781                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
782                 // has to be corrected elements from above need to be put into a temporary vector
783                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
784                 ret=(*f)(numMsgs,msgArr);
785                 ret->reducer=r;
786
787         }
788
789         for(i = 0;i<numMsgs;i++){
790                 if (tempMsgs[i] != ret) delete tempMsgs[i];
791         }
792
793         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
794         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
795
796         ret->redNo=completedRedNo+1;
797         ret->gcount=msgs_gcount;
798         ret->userFlag=msgs_userFlag;
799         ret->callback=msgs_callback;
800         ret->secondaryCallback = msgs_secondaryCallback;
801         ret->sourceFlag=msgs_nSources;
802
803 #if DEBUGRED
804         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
805 #endif
806         if (!ret->secondaryCallback.isInvalid())
807             ret->secondaryCallback.send(ret);
808     else if (!storedCallback.isInvalid())
809             storedCallback.send(ret);
810     else{
811 #if DEBUGRED
812             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
813 #endif
814             CkAbort("No reduction client!\n"
815                     "You must register a client with either SetReductionClient or during contribute.\n");
816     }
817         completedRedNo++;
818 #if DEBUGRED
819        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
820 #endif
821         for (i=1;i<adjVec.length();i++)
822                 adjVec[i-1]=adjVec[i];
823         adjVec.length()--;
824         endArrayReduction();
825 }
826
827
828 /////////////////////////////////////////////////////////////////////////
829
830 ////////////////////////////////
831 //CkReductionMsg support
832
833 //ReductionMessage default private constructor-- does nothing
834 CkReductionMsg::CkReductionMsg(){}
835 CkReductionMsg::~CkReductionMsg(){}
836
837 //This define gives the distance from the start of the CkReductionMsg
838 // object to the start of the user data area (just below last object field)
839 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
840
841 //"Constructor"-- builds and returns a new CkReductionMsg.
842 //  the "data" array you specify will be copied into this object.
843 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
844     CkReduction::reducerType reducer, CkReductionMsg *buf)
845 {
846   int len[1];
847   len[0]=NdataSize;
848   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
849   
850   ret->dataSize=NdataSize;
851   if (srcData!=NULL && !buf)
852     memcpy(ret->data,srcData,NdataSize);
853   ret->userFlag=-1;
854   ret->reducer=reducer;
855   ret->ci=NULL;
856   ret->sourceFlag=-1000;
857   ret->gcount=0;
858   ret->migratableContributor = true;
859   return ret;
860 }
861
862 // Charm kernel message runtime support:
863 void *
864 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
865 {
866   int totalsize=ARM_DATASTART+(*sz);
867   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
868   CkReductionMsg *ret = (CkReductionMsg *)
869     CkAllocMsg(msgnum,totalsize,priobits);
870   ret->data=(void *)(&ret->dataStorage);
871   return (void *) ret;
872 }
873
874 void *
875 CkReductionMsg::pack(CkReductionMsg* in)
876 {
877   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
878   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
879   in->data = NULL;
880   return (void*) in;
881 }
882
883 CkReductionMsg* CkReductionMsg::unpack(void *in)
884 {
885   CkReductionMsg *ret = (CkReductionMsg *)in;
886   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
887   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
888   ret->data=(void *)(&ret->dataStorage);
889   return ret;
890 }
891
892
893 /////////////////////////////////////////////////////////////////////////////////////
894 ///////////////// Builtin Reducer Functions //////////////
895 /* A simple reducer, like sum_int, looks like this:
896 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
897 {
898   int i,ret=0;
899   for (i=0;i<nMsg;i++)
900     ret+=*(int *)(msg[i]->getData());
901   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
902 }
903
904 To keep the code small and easy to change, the implementations below
905 are built with preprocessor macros.
906 */
907
908 //////////////// simple reducers ///////////////////
909 /*A define used to quickly and tersely construct simple reductions.
910 The basic idea is to use the first message's data array as
911 (pre-initialized!) scratch space for folding in the other messages.
912  */
913
914 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
915 {
916         CkAbort("Called the invalid reducer type 0.  This probably\n"
917                 "means you forgot to initialize your custom reducer index.\n");
918         return NULL;
919 }
920
921 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
922 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
923 {\
924   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
925   int m,i;\
926   int nElem=msg[0]->getLength()/sizeof(dataType);\
927   dataType *ret=(dataType *)(msg[0]->getData());\
928   for (m=1;m<nMsg;m++)\
929   {\
930     dataType *value=(dataType *)(msg[m]->getData());\
931     for (i=0;i<nElem;i++)\
932     {\
933       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
934       loop\
935     }\
936   }\
937   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
938   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
939 }
940
941 //Use this macro for reductions that have the same type for all inputs
942 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
943   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
944   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
945   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
946
947
948 //Compute the sum the numbers passed by each element.
949 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
950
951 //Compute the product of the numbers passed by each element.
952 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
953
954 //Compute the largest number passed by any element.
955 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
956
957 //Compute the smallest integer passed by any element.
958 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
959
960
961 //Compute the logical AND of the integers passed by each element.
962 // The resulting integer will be zero if any source integer is zero; else 1.
963 SIMPLE_REDUCTION(logical_and,int,"%d",
964         if (value[i]==0)
965      ret[i]=0;
966   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
967 )
968
969 //Compute the logical OR of the integers passed by each element.
970 // The resulting integer will be 1 if any source integer is nonzero; else 0.
971 SIMPLE_REDUCTION(logical_or,int,"%d",
972   if (value[i]!=0)
973            ret[i]=1;
974   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
975 )
976
977 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
978 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
979
980 /////////////// concat ////////////////
981 /*
982 This reducer simply appends the data it recieves from each element,
983 without any housekeeping data to separate them.
984 */
985 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
986 {
987   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
988   //Figure out how big a message we'll need
989   int i,retSize=0;
990   for (i=0;i<nMsg;i++)
991       retSize+=msg[i]->getSize();
992
993   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
994
995   //Allocate a new message
996   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
997
998   //Copy the source message data into the return message
999   char *cur=(char *)(ret->getData());
1000   for (i=0;i<nMsg;i++) {
1001     int messageBytes=msg[i]->getSize();
1002     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1003     cur+=messageBytes;
1004   }
1005   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1006   return ret;
1007 }
1008
1009 /////////////// set ////////////////
1010 /*
1011 This reducer appends the data it recieves from each element
1012 along with some housekeeping data indicating contribution boundaries.
1013 The message data is thus a list of reduction_set_element structures
1014 terminated by a dummy reduction_set_element with a sourceElement of -1.
1015 */
1016
1017 //This rounds an integer up to the nearest multiple of sizeof(double)
1018 static const int alignSize=sizeof(double);
1019 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1020
1021 //This gives the size (in bytes) of a reduction_set_element
1022 static int SET_SIZE(int dataSize)
1023 {return SET_ALIGN(sizeof(int)+dataSize);}
1024
1025 //This returns a pointer to the next reduction_set_element in the list
1026 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1027 {
1028   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1029   return (CkReduction::setElement *)next;
1030 }
1031
1032 //Combine the data passed by each element into an list of reduction_set_elements.
1033 // Each element may contribute arbitrary data (with arbitrary length).
1034 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1035 {
1036   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1037   //Figure out how big a message we'll need
1038   int i,retSize=0;
1039   for (i=0;i<nMsg;i++) {
1040     if (!msg[i]->isFromUser())
1041     //This message is composite-- it will just be copied over (less terminating -1)
1042       retSize+=(msg[i]->getSize()-sizeof(int));
1043     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1044       retSize+=SET_SIZE(msg[i]->getSize());
1045   }
1046   retSize+=sizeof(int);//Leave room for terminating -1.
1047
1048   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1049
1050   //Allocate a new message
1051   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1052
1053   //Copy the source message data into the return message
1054   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1055   for (i=0;i<nMsg;i++)
1056     if (!msg[i]->isFromUser())
1057     {//This message is composite-- just copy it over (less terminating -1)
1058                         int messageBytes=msg[i]->getSize()-sizeof(int);
1059                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1060                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1061                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1062     }
1063     else //This is a message from an element-- wrap it in a reduction_set_element
1064     {
1065       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1066       cur->dataSize=msg[i]->getSize();
1067       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1068       cur=SET_NEXT(cur);
1069     }
1070   cur->dataSize=-1;//Add a terminating -1.
1071   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1072   return ret;
1073 }
1074
1075 //Utility routine: get the next reduction_set_element in the list
1076 // if there is one, or return NULL if there are none.
1077 //To get all the elements, just keep feeding this procedure's output back to
1078 // its input until it returns NULL.
1079 CkReduction::setElement *CkReduction::setElement::next(void)
1080 {
1081   CkReduction::setElement *n=SET_NEXT(this);
1082   if (n->dataSize==-1)
1083     return NULL;//This is the end of the list
1084   else
1085     return n;//This is just another element
1086 }
1087
1088 /////////////////// Reduction Function Table /////////////////////
1089 CkReduction::CkReduction() {} //Dummy private constructor
1090
1091 //Add the given reducer to the list.  Returns the new reducer's
1092 // reducerType.  Must be called in the same order on every node.
1093 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1094 {
1095   reducerTable[nReducers]=fn;
1096   return (reducerType)nReducers++;
1097 }
1098
1099 /*Reducer table: maps reducerTypes to reducerFns.
1100 It's indexed by reducerType, so the order in this table
1101 must *exactly* match the reducerType enum declaration.
1102 The names don't have to match, but it helps.
1103 */
1104 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1105
1106 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1107     ::invalid_reducer,
1108   //Compute the sum the numbers passed by each element.
1109     ::sum_int,::sum_float,::sum_double,
1110
1111   //Compute the product the numbers passed by each element.
1112     ::product_int,::product_float,::product_double,
1113
1114   //Compute the largest number passed by any element.
1115     ::max_int,::max_float,::max_double,
1116
1117   //Compute the smallest number passed by any element.
1118     ::min_int,::min_float,::min_double,
1119
1120   //Compute the logical AND of the integers passed by each element.
1121   // The resulting integer will be zero if any source integer is zero.
1122     ::logical_and,
1123
1124   //Compute the logical OR of the integers passed by each element.
1125   // The resulting integer will be 1 if any source integer is nonzero.
1126     ::logical_or,
1127
1128     // Compute the logical bitvector AND of the integers passed by each element.
1129     ::bitvec_and,
1130
1131     // Compute the logical bitvector OR of the integers passed by each element.
1132     ::bitvec_or,
1133
1134   //Concatenate the (arbitrary) data passed by each element
1135     ::concat,
1136
1137   //Combine the data passed by each element into an list of setElements.
1138   // Each element may contribute arbitrary data (with arbitrary length).
1139     ::set
1140 };
1141
1142
1143
1144
1145
1146
1147
1148
1149 /********** Code added by Sayantan *********************/
1150 /** Locking is a big problem in the nodegroup code for smp.
1151  So a few assumptions have had to be made. There is one lock
1152  called lockEverything. It protects all the data structures 
1153  of the nodegroup reduction mgr. I tried grabbing it separately 
1154  for each datastructure, modifying it and then releasing it and
1155  then grabbing it again, for the next change.
1156  That doesn't really help because the interleaved execution of 
1157  different threads makes the state of the reduction manager 
1158  inconsistent. 
1159  
1160  1. Grab lockEverything before calling finishreduction or startReduction
1161     or doRecvMsg
1162  2. lockEverything is grabbed only in entry methods reductionStarting
1163     or RecvMesg or  addcontribution.
1164  ****/
1165  
1166 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1167 NodeGroup::NodeGroup(void) {
1168   __nodelock=CmiCreateLock();
1169
1170
1171 }
1172 NodeGroup::~NodeGroup() {
1173   CmiDestroyLock(__nodelock);
1174 }
1175 void NodeGroup::pup(PUP::er &p)
1176 {
1177   CkNodeReductionMgr::pup(p);
1178   p|reductionInfo;
1179 }
1180
1181 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1182
1183 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1184   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1185  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1186   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1187  }
1188
1189 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1190                                     ((CkNodeReductionMgr *)this),
1191                                     reductionInfo,false);
1192
1193 /* this contribute also adds up the count across all messages it receives.
1194   Useful for summing up number of array elements who have contributed ****/ 
1195 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1196         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1197
1198
1199
1200 //#define BINOMIAL_TREE
1201
1202 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1203   : thisProxy(thisgroup)
1204 {
1205 #ifdef BINOMIAL_TREE
1206   init_BinomialTree();
1207 #else
1208   init_BinaryTree();
1209 #endif
1210   storedCallback=NULL;
1211   redNo=0;
1212   inProgress=CmiFalse;
1213   
1214   startRequested=CmiFalse;
1215   gcount=CkNumNodes();
1216   lcount=1;
1217   nContrib=nRemote=0;
1218   lockEverything = CmiCreateLock();
1219
1220
1221   creating=CmiFalse;
1222   interrupt = 0;
1223   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1224         /*
1225                 FAULT_EVAC
1226         */
1227         blocked = false;
1228         maxModificationRedNo = INT_MAX;
1229         killed=0;
1230         additionalGCount = newAdditionalGCount = 0;
1231 }
1232
1233 void CkNodeReductionMgr::flushStates()
1234 {
1235  if(CkMyRank() == 0){
1236   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1237   redNo=0;
1238   inProgress=CmiFalse;
1239
1240   startRequested=CmiFalse;
1241   gcount=CkNumNodes();
1242   lcount=1;
1243   nContrib=nRemote=0;
1244
1245   creating=CmiFalse;
1246   interrupt = 0;
1247   while (!msgs.isEmpty()) { delete msgs.deq(); }
1248   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1249   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1250   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1251   }
1252 }
1253
1254 //////////// Reduction Manager Client API /////////////
1255
1256 //Add the given client function.  Overwrites any previous client.
1257 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1258 {
1259   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1260   if(cb->isInvalid()){
1261         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1262   }else{
1263         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1264   }
1265
1266   if (CkMyNode()!=0)
1267           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1268   delete storedCallback;
1269   storedCallback=cb;
1270 }
1271
1272
1273
1274 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1275 {
1276
1277   m->ci=ci;
1278   m->redNo=ci->redNo++;
1279   m->sourceFlag=-1;//A single contribution
1280   m->gcount=0;
1281 #if DEBUGRED
1282         CkPrintf("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo);
1283 #endif
1284   addContribution(m);
1285
1286 }
1287
1288
1289 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1290 {
1291   m->ci=ci;
1292   m->redNo=ci->redNo++;
1293   m->gcount=count;
1294 #if DEBUGRED
1295  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1296 #endif
1297   addContribution(m);
1298 #if DEBUGRED
1299   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1300 #endif
1301
1302 }
1303
1304
1305 //////////// Reduction Manager Remote Entry Points /////////////
1306
1307 //Sent down the reduction tree (used by barren PEs)
1308 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1309 {
1310   CmiLock(lockEverything);
1311   if(blocked){
1312         delete m;
1313         CmiUnlock(lockEverything);
1314         return ;
1315   }
1316   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1317   if (isPresent(m->num) && !inProgress)
1318   {
1319     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1320     startReduction(m->num,srcNode);
1321     finishReduction();
1322   } else if (isFuture(m->num)){
1323         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1324   }
1325   else //is Past
1326     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1327   CmiUnlock(lockEverything);
1328   delete m;
1329
1330 }
1331
1332
1333 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1334 #if DEBUGRED
1335         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1336 #endif
1337         /*
1338                 FAULT_EVAC
1339         */
1340         if(blocked){
1341                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1342                 bufferedRemoteMsgs.enq(m);
1343                 return;
1344         }
1345         
1346         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1347             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1348             startReduction(m->redNo,CkMyNode());
1349             msgs.enq(m);
1350             nRemote++;
1351             finishReduction();
1352         }
1353         else {
1354             if (isFuture(m->redNo)) {
1355                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1356                     futureRemoteMsgs.enq(m);
1357             }else{
1358                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1359                    CkAbort("Recv'd late remote contribution!\n");
1360             }
1361   }
1362 #if DEBUGRED        
1363        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1364 #endif       
1365 }
1366
1367 //Sent up the reduction tree with reduced data
1368 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1369 {
1370 #ifndef CMK_CPV_IS_SMP
1371 #if CMK_IMMEDIATE_MSG
1372         if(interrupt == 1){
1373                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1374                 CpvAccess(_qd)->process(-1);
1375                 CmiDelayImmediate();
1376                 return;
1377         }
1378 #endif  
1379 #endif
1380    interrupt = 1;       
1381    CmiLock(lockEverything);   
1382 #if DEBUGRED   
1383    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1384 #endif   
1385    doRecvMsg(m);
1386    CmiUnlock(lockEverything);    
1387    interrupt = 0;
1388 #if DEBUGRED  
1389    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1390 #endif 
1391 }
1392
1393 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1394 {
1395         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1396         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1397         if (inProgress){
1398                 DEBR((AA"This Node reduction is already in progress\n"AB));
1399                 return;//This reduction already started
1400         }
1401         if (creating) //Don't start yet-- we're creating elements
1402         {
1403                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1404                 startRequested=CmiTrue;
1405                 return;
1406         }
1407         
1408         //If none of these cases, we need to start the reduction--
1409         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1410         inProgress=CmiTrue;
1411         //Sent start requests to our kids (in case they don't already know)
1412
1413         for (int k=0;k<treeKids();k++)
1414         {
1415 #ifdef BINOMIAL_TREE
1416                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1417                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1418 #else
1419                 if(kids[k] != srcNode){
1420                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1421                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1422                 }       
1423 #endif
1424         }
1425         startLocalGroupReductions(number);
1426 }
1427
1428 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1429         /*
1430                 FAULT_EVAC
1431         */
1432         if(blocked){
1433                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1434                 bufferedMsgs.enq(m);
1435                 return;
1436         }
1437         
1438         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1439                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1440                 futureMsgs.enq(m);
1441         } else {// An ordinary contribution
1442                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1443                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1444                 startReduction(m->redNo,CkMyNode());
1445                 msgs.enq(m);
1446                 nContrib++;
1447                 finishReduction();
1448         }
1449 }
1450
1451 //Handle a message from one element for the reduction
1452 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1453 {
1454   interrupt = 1;
1455   CmiLock(lockEverything);
1456   doAddContribution(m);
1457   CmiUnlock(lockEverything);
1458   interrupt = 0;
1459 }
1460
1461 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1462         if(blocked){
1463                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1464                 bufferedMsgs.enq(m);
1465                 return;
1466         }
1467         
1468         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1469                 DEBR((AA"Latemigrant %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1470 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1471                 futureLateMigrantMsgs.enq(m);
1472         } else {// An ordinary contribution
1473                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1474 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1475                 msgs.enq(m);
1476                 finishReduction();
1477         }
1478 };
1479
1480
1481
1482
1483
1484 /** check if the nodegroup reduction is finished at this node. In that case send it
1485 up the reduction tree **/
1486
1487 void CkNodeReductionMgr::finishReduction(void)
1488 {
1489   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1490   /***Check if reduction is finished in the next few ifs***/
1491   if ((!inProgress) | creating){
1492         DEBR((AA"Either not in Progress or creating\n"AB));
1493         return;
1494   }
1495
1496   if (nContrib<(lcount)){
1497         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1498
1499          return;//Need more local messages
1500   }
1501   if (nRemote<treeKids()){
1502         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1503
1504         return;//Need more remote messages
1505   }
1506   if (nRemote>treeKids()){
1507
1508           interrupt = 0;
1509            CkAbort("Nodegrp Excess remote reduction message received!\n");
1510   }
1511
1512   DEBR((AA"Reducing node data...\n"AB));
1513
1514   /**reduce all messages received at this node **/
1515   CkReductionMsg *result=reduceMessages();
1516
1517   if (hasParent())
1518   {//Pass data up tree to parent
1519         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1520         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1521 #if DEBUGRED
1522         CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
1523 #endif
1524                 /*
1525                         FAULT_EVAC
1526                 */
1527                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1528             thisProxy[treeParent()].RecvMsg(result);
1529         }
1530
1531   }
1532   else
1533   {
1534                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1535 #if DEBUGRED
1536                         CkPrintf("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor());
1537 #endif                  
1538                         msgs.enq(result);
1539                         return;
1540                 }
1541                 result->gcount += additionalGCount;
1542           /** if the reduction is finished and I am the root of the reduction tree
1543           then call the reductionhandler and other stuff ***/
1544                 
1545
1546 #if DEBUGRED
1547    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
1548 #endif
1549     if (!result->callback.isInvalid()){
1550 #if DEBUGRED
1551             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
1552 #endif      
1553             result->callback.send(result);
1554     }
1555     else if (storedCallback!=NULL){
1556 #if DEBUGRED
1557             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
1558 #endif
1559             storedCallback->send(result);
1560     }
1561     else{
1562                 DEBR((AA"Invalid Callback \n"AB));
1563             CkAbort("No reduction client!\n"
1564                     "You must register a client with either SetReductionClient or during contribute.\n");
1565                 }
1566   }
1567
1568   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1569   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1570   redNo++;
1571         updateTree();
1572   int i;
1573   inProgress=CmiFalse;
1574   startRequested=CmiFalse;
1575   nRemote=nContrib=0;
1576
1577   //Look through the future queue for messages we can now handle
1578   int n=futureMsgs.length();
1579
1580   for (i=0;i<n;i++)
1581   {
1582     interrupt = 1;
1583
1584     CkReductionMsg *m=futureMsgs.deq();
1585
1586     interrupt = 0;
1587     if (m!=NULL){ //One of these addContributions may have finished us.
1588 #if DEBUGRED
1589                 CkPrintf("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1590 #endif
1591       doAddContribution(m);//<- if *still* early, puts it back in the queue
1592     }
1593   }
1594
1595   interrupt = 1;
1596
1597   n=futureRemoteMsgs.length();
1598
1599   interrupt = 0;
1600   for (i=0;i<n;i++)
1601   {
1602     interrupt = 1;
1603
1604     CkReductionMsg *m=futureRemoteMsgs.deq();
1605
1606     interrupt = 0;
1607     if (m!=NULL)
1608       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1609   }
1610   
1611   n = futureLateMigrantMsgs.length();
1612   for(i=0;i<n;i++){
1613     CkReductionMsg *m = futureLateMigrantMsgs.deq();
1614     if(m != NULL){
1615       if(m->redNo == redNo){
1616         msgs.enq(m);
1617       }else{
1618         futureLateMigrantMsgs.enq(m);
1619       }
1620     }
1621   }
1622 }
1623
1624 //////////// Reduction Manager Utilities /////////////
1625
1626 void CkNodeReductionMgr::init_BinaryTree(){
1627         parent = (CkMyNode()-1)/TREE_WID;
1628         int firstkid = CkMyNode()*TREE_WID+1;
1629         numKids=CkNumNodes()-firstkid;
1630   if (numKids>TREE_WID) numKids=TREE_WID;
1631   if (numKids<0) numKids=0;
1632
1633         for(int i=0;i<numKids;i++){
1634                 kids.push_back(firstkid+i);
1635                 newKids.push_back(firstkid+i);
1636         }
1637 }
1638
1639 void CkNodeReductionMgr::init_BinomialTree(){
1640         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
1641         /*upperSize = (unsigned )pow((double)2,depth);*/
1642         upperSize = (unsigned) 1 << depth;
1643         label = upperSize-CkMyNode()-1;
1644         int p=label;
1645         int count=0;
1646         while( p > 0){
1647                 if(p % 2 == 0)
1648                         break;
1649                 else{
1650                         p = p/2;
1651                         count++;
1652                 }
1653         }
1654         /*parent = label + rint(pow((double)2,count));*/
1655         parent = label + (1<<count);
1656         parent = upperSize -1 -parent;
1657         int temp;
1658         if(count != 0){
1659                 numKids = 0;
1660                 for(int i=0;i<count;i++){
1661                         /*temp = label - rint(pow((double)2,i));*/
1662                         temp = label - (1<<i);
1663                         temp = upperSize-1-temp;
1664                         if(temp <= CkNumNodes()-1){
1665                 //              kids[numKids] = temp;
1666                                 kids.push_back(temp);
1667                                 numKids++;
1668                         }
1669                 }
1670         }else{
1671                 numKids = 0;
1672         //      kids = NULL;
1673         }
1674 }
1675
1676
1677 int CkNodeReductionMgr::treeRoot(void)
1678 {
1679   return 0;
1680 }
1681 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
1682 {
1683   return (CmiBool)(CkMyNode()!=treeRoot());
1684 }
1685 int CkNodeReductionMgr::treeParent(void) //My parent Node
1686 {
1687 #ifdef BINOMIAL_TREE
1688         return parent;
1689 #else
1690   return parent;
1691 #endif
1692 }
1693
1694 int CkNodeReductionMgr::firstKid(void) //My first child Node
1695 {
1696   return CkMyNode()*TREE_WID+1;
1697 }
1698 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
1699 {
1700 #ifdef BINOMIAL_TREE
1701         return numKids;
1702 #else
1703 /*  int nKids=CkNumNodes()-firstKid();
1704   if (nKids>TREE_WID) nKids=TREE_WID;
1705   if (nKids<0) nKids=0;
1706   return nKids;*/
1707         return numKids;
1708 #endif
1709 }
1710
1711 //Combine (& free) the current message vector msgs.
1712 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
1713 {
1714   CkReductionMsg *ret=NULL;
1715
1716   //Look through the vector for a valid reducer, swapping out placeholder messages
1717   CkReduction::reducerType r=CkReduction::invalid;
1718   int msgs_gcount=0;//Reduced gcount
1719   int msgs_nSources=0;//Reduced nSources
1720   int msgs_userFlag=-1;
1721   CkCallback msgs_callback;
1722   CkCallback msgs_secondaryCallback;
1723   int i;
1724   int nMsgs=0;
1725   CkReductionMsg *m;
1726   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
1727         bool isMigratableContributor;
1728         
1729
1730   while(NULL!=(m=msgs.deq()))
1731   {
1732     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
1733     msgs_gcount+=m->gcount;
1734     if (m->sourceFlag!=0)
1735     { //This is a real message from an element, not just a placeholder
1736       msgArr[nMsgs++]=m;
1737       msgs_nSources+=m->nSources();
1738       r=m->reducer;
1739       if (!m->callback.isInvalid())
1740         msgs_callback=m->callback;
1741       if(!m->secondaryCallback.isInvalid()){
1742         msgs_secondaryCallback = m->secondaryCallback;
1743       }
1744       if (m->userFlag!=-1)
1745         msgs_userFlag=m->userFlag;
1746
1747                         isMigratableContributor= m->isMigratableContributor();
1748                                 
1749     }
1750     else
1751     { //This is just a placeholder message-- replace it
1752       delete m;
1753     }
1754   }
1755
1756   if (nMsgs==0||r==CkReduction::invalid)
1757   //No valid reducer in the whole vector
1758     ret=CkReductionMsg::buildNew(0,NULL);
1759   else
1760   {//Use the reducer to reduce the messages
1761                 if(nMsgs == 1){
1762                         ret = msgArr[0];
1763                 }else{
1764             CkReduction::reducerFn f=CkReduction::reducerTable[r];
1765           ret=(*f)(nMsgs,msgArr);
1766                 }
1767     ret->reducer=r;
1768   }
1769         
1770         //Go back through the vector, deleting old messages
1771   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
1772   delete [] msgArr;
1773   //Set the message counts
1774   ret->redNo=redNo;
1775   ret->gcount=msgs_gcount;
1776   ret->userFlag=msgs_userFlag;
1777   ret->callback=msgs_callback;
1778   ret->secondaryCallback = msgs_secondaryCallback;
1779   ret->sourceFlag=msgs_nSources;
1780         ret->setMigratableContributor(isMigratableContributor);
1781   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
1782
1783   return ret;
1784 }
1785
1786 void CkNodeReductionMgr::pup(PUP::er &p)
1787 {
1788 //We do not store the client function pointer or the client function parameter,
1789 //it is the responsibility of the programmer to correctly restore these
1790   IrrGroup::pup(p);
1791   p(redNo);
1792   p(inProgress); p(creating); p(startRequested);
1793   p(lcount);
1794   p(nContrib); p(nRemote);
1795   p(interrupt);
1796   p|msgs;
1797   p|futureMsgs;
1798   p|futureRemoteMsgs;
1799   p|futureLateMigrantMsgs;
1800   p|parent;
1801   if(p.isUnpacking()) {
1802     gcount=CkNumNodes();
1803     thisProxy = thisgroup;
1804     lockEverything = CmiCreateLock();
1805 #ifdef BINOMIAL_TREE
1806     init_BinomialTree();
1807 #else
1808     init_BinaryTree();
1809 #endif          
1810   }
1811   p | blocked;
1812   p | maxModificationRedNo;
1813
1814   int isnull = (storedCallback == NULL);
1815   p | isnull;
1816   if (!isnull) {
1817     if (p.isUnpacking()) {
1818       storedCallback = new CkCallback;
1819     }
1820     p|*storedCallback;
1821   }
1822 }
1823
1824 /*
1825         FAULT_EVAC
1826         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
1827         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
1828         reduction tree. 
1829 */
1830 void CkNodeReductionMgr::evacuate(){
1831         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
1832         if(treeKids() == 0){
1833         /*
1834                 if the node going down is a leaf
1835         */
1836                 oldleaf=true;
1837                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
1838                 /*
1839                         Need to ask parent for the reduction number that it has seen. 
1840                         Since it is a leaf, the tree does not need to be rewired. 
1841                         We reuse the oldparent type of tree modification message to get 
1842                         the parent to block and tell us about the highest reduction number it has seen.
1843                         
1844                 */
1845                 int data[2];
1846                 data[0]=CkMyNode();
1847                 data[1]=getTotalGCount()+additionalGCount;
1848                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
1849                 newParent = treeParent();
1850         }else{
1851                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
1852                 oldleaf= false;
1853         /*
1854                 It is not a leaf. It needs to rewire the tree around itself.
1855                 It also needs to decide on a reduction No after which the new tree will be used
1856                 Till it decides on the new tree and the redNo at which it becomes valid,
1857                 all received messages will be buffered
1858         */
1859                 newParent = kids[0];
1860                 for(int i=numKids-1;i>=0;i--){
1861                         newKids.remove(i);
1862                 }
1863                 /*
1864                         Ask everybody for the highest reduction number they have seen and
1865                         also tell them about the new tree
1866                 */
1867                 /*
1868                         Tell parent about its new child;
1869                 */
1870                 int oldParentData[2];
1871                 oldParentData[0] = CkMyNode();
1872                 oldParentData[1] = newParent;
1873                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
1874
1875                 /*
1876                         Tell the other children about their new parent
1877                 */
1878                 int childrenData=newParent;
1879                 for(int i=1;i<numKids;i++){
1880                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
1881                 }
1882                 
1883                 /*
1884                         Tell newParent (1st child) about its new children,
1885                         the current node and its children except the newParent
1886                 */
1887                 int *newParentData = new int[numKids+2];
1888                 for(int i=1;i<numKids;i++){
1889                         newParentData[i] = kids[i];
1890                 }
1891                 newParentData[0] = CkMyNode();
1892                 newParentData[numKids] = parent;
1893                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
1894                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
1895         }
1896         readyDeletion = false;
1897         blocked = true;
1898         numModificationReplies = 0;
1899         tempModificationRedNo = findMaxRedNo();
1900 }
1901
1902 /*
1903         Depending on the code, use the data to change the tree
1904         1. OLDPARENT : replace the old child with a new one
1905         2. OLDCHILDREN: replace the parent
1906         3. NEWPARENT:  add the children and change the parent
1907         4. LEAFPARENT: delete the old child
1908 */
1909
1910 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
1911         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
1912         int sender;
1913         newKids = kids;
1914         readyDeletion = false;
1915         newAdditionalGCount = additionalGCount;
1916         switch(code){
1917                 case OLDPARENT: 
1918                         for(int i=0;i<numKids;i++){
1919                                 if(newKids[i] == data[0]){
1920                                         newKids[i] = data[1];
1921                                         break;
1922                                 }
1923                         }
1924                         sender = data[0];
1925                         newParent = parent;
1926                         break;
1927                 case OLDCHILDREN:
1928                         newParent = data[0];
1929                         sender = parent;
1930                         break;
1931                 case NEWPARENT:
1932                         for(int i=0;i<size-2;i++){
1933                                 newKids.push_back(data[i]);
1934                         }
1935                         newParent = data[size-2];
1936                         newAdditionalGCount += data[size-1];
1937                         sender = parent;
1938                         break;
1939                 case LEAFPARENT:
1940                         for(int i=0;i<numKids;i++){
1941                                 if(newKids[i] == data[0]){
1942                                         newKids.remove(i);
1943                                         break;
1944                                 }
1945                         }
1946                         sender = data[0];
1947                         newParent = parent;
1948                         newAdditionalGCount += data[1];
1949                         break;
1950         };
1951         blocked = true;
1952         int maxRedNo = findMaxRedNo();
1953         
1954         thisProxy[sender].collectMaxRedNo(maxRedNo);
1955 }
1956
1957 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
1958         /*
1959                 Find out the maximum redNo that has been seen by 
1960                 the affected nodes
1961         */
1962         numModificationReplies++;
1963         if(maxRedNo > tempModificationRedNo){
1964                 tempModificationRedNo = maxRedNo;
1965         }
1966         if(numModificationReplies == numKids+1){
1967                 maxModificationRedNo = tempModificationRedNo;
1968                 /*
1969                         when all the affected nodes have replied, tell them the maximum.
1970                         Unblock yourself. deal with the buffered messages local and remote
1971                 */
1972                 if(maxModificationRedNo == -1){
1973                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
1974                 }else{
1975                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
1976                 }
1977                 thisProxy[parent].unblockNode(maxModificationRedNo);
1978                 for(int i=0;i<numKids;i++){
1979                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
1980                 }
1981                 blocked = false;
1982                 updateTree();
1983                 clearBlockedMsgs();
1984         }
1985 };
1986
1987 void CkNodeReductionMgr::unblockNode(int maxRedNo){
1988         maxModificationRedNo = maxRedNo;
1989         updateTree();
1990         blocked = false;
1991         clearBlockedMsgs();
1992 }
1993
1994
1995 void CkNodeReductionMgr::clearBlockedMsgs(){
1996         int len = bufferedMsgs.length();
1997         for(int i=0;i<len;i++){
1998                 CkReductionMsg *m = bufferedMsgs.deq();
1999                 doAddContribution(m);
2000         }
2001         len = bufferedRemoteMsgs.length();
2002         for(int i=0;i<len;i++){
2003                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2004                 doRecvMsg(m);
2005         }
2006
2007 }
2008 /*
2009         if the reduction number exceeds the maxModificationRedNo, change the tree
2010         to become the new one
2011 */
2012
2013 void CkNodeReductionMgr::updateTree(){
2014         if(redNo > maxModificationRedNo){
2015                 parent = newParent;
2016                 kids = newKids;
2017                 maxModificationRedNo = INT_MAX;
2018                 numKids = kids.size();
2019                 readyDeletion = true;
2020                 additionalGCount = newAdditionalGCount;
2021                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2022                 for(int i=0;i<newKids.size();i++){
2023                         DEBREVAC(("%d ",newKids[i]));
2024                 }
2025                 DEBREVAC(("\n"));
2026         //      startReduction(redNo,CkMyNode());
2027         }else{
2028                 if(maxModificationRedNo != INT_MAX){
2029                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2030                         startReduction(redNo,CkMyNode());
2031                         finishReduction();
2032                 }       
2033         }
2034 }
2035
2036
2037 void CkNodeReductionMgr::doneEvacuate(){
2038         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2039 /*      if(oldleaf){
2040                 
2041                         It used to be a leaf
2042                         Then as soon as future messages have been emptied you can 
2043                         send the parent a message telling them that they are not going
2044                         to receive anymore messages from this child
2045                 
2046                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2047                 while(futureMsgs.length() != 0){
2048                         int n = futureMsgs.length();
2049                         for(int i=0;i<n;i++){
2050                                 CkReductionMsg *m = futureMsgs.deq();
2051                                 if(isPresent(m->redNo)){
2052                                         msgs.enq(m);
2053                                 }else{
2054                                         futureMsgs.enq(m);
2055                                 }
2056                         }
2057                         CkReductionMsg *result = reduceMessages();
2058                         thisProxy[treeParent()].RecvMsg(result);
2059                         redNo++;
2060                 }
2061                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2062                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2063         }else{*/
2064                 if(readyDeletion){
2065                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2066                 }else{
2067                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2068                 }
2069 //      }
2070 }
2071
2072 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2073         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2074         for(int i=0;i<numKids;i++){
2075                 if(kids[i] == deletedChild){
2076                         kids.remove(i);
2077                         break;
2078                 }
2079         }
2080         numKids = kids.length();
2081         finishReduction();
2082 }
2083
2084 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2085         for(int i=0;i<newKids.length();i++){
2086                 if(newKids[i] == deletedChild){
2087                         newKids.remove(i);
2088                         break;
2089                 }
2090         }
2091         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2092         for(int i=0;i<newKids.size();i++){
2093                 DEBREVAC(("%d ",newKids[i]));
2094         }
2095         DEBREVAC(("\n"));
2096         finishReduction();
2097 }
2098
2099 int CkNodeReductionMgr::findMaxRedNo(){
2100         int max = redNo;
2101         for(int i=0;i<futureRemoteMsgs.length();i++){
2102                 if(futureRemoteMsgs[i]->redNo  > max){
2103                         max = futureRemoteMsgs[i]->redNo;
2104                 }
2105         }
2106         /*
2107                 if redNo is max (that is no future message) and the current reduction has not started
2108                 then tree can be changed before the reduction redNo can be started
2109         */ 
2110         if(redNo == max && msgs.length() == 0){
2111                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2112                 max--;
2113         }
2114         return max;
2115 }
2116
2117 #include "CkReduction.def.h"