1. a complete pup for CkReductionMgr (lcount/gcount), which fixed a bug in restart...
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #if 0
54 //Debugging messages:
55 // Reduction mananger internal information:
56 #define DEBUGRED 1
57 #define DEBR(x) CkPrintf x
58 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
59 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
60
61 #define DEBN(x) CkPrintf x
62 #define AAN "Red Node%d "
63 #define ABN ,CkMyNode()
64
65 // For status and data messages from the builtin reducer functions.
66 #define RED_DEB(x) //CkPrintf x
67 #define DEBREVAC(x) CkPrintf x
68 #else
69 //No debugging info-- empty defines
70 #define DEBUGRED 0
71 #define DEBR(x) //CkPrintf x
72 #define DEBN(x) //CkPrintf x
73 #define RED_DEB(x) //CkPrintf x
74 #define DEBREVAC(x) //CkPrintf x
75 #endif
76
77 #ifndef INT_MAX
78 #define INT_MAX 2147483647
79 #endif
80
81 extern int _inrestart;
82
83 Group::Group()
84 {
85         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
86
87         creatingContributors();
88         contributorStamped(&reductionInfo);
89         contributorCreated(&reductionInfo);
90         doneCreatingContributors();
91 #if DEBUGRED
92         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
93 #endif                  
94         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
95         nodeProxy = nodetemp;
96 }
97
98 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
99                                     ((CkReductionMgr *)this),
100                                     reductionInfo,false);
101 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid));
102
103
104
105 CkGroupInitCallback::CkGroupInitCallback(void) {}
106 /*
107 The callback is just used to tell the caller that this group
108 has been constructed.  (Now they can safely call CkLocalBranch)
109 */
110 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
111 {
112   m->call();
113   delete m;
114 }
115
116 /*
117 The callback is just used to tell the caller that this group
118 is constructed and ready to process other calls.
119 */
120 CkGroupReadyCallback::CkGroupReadyCallback(void)
121 {
122   _isReady = 0;
123 }
124 void
125 CkGroupReadyCallback::callBuffered(void)
126 {
127   int n = _msgs.length();
128   for(int i=0;i<n;i++)
129   {
130     CkGroupCallbackMsg *msg = _msgs.deq();
131     msg->call();
132     delete msg;
133   }
134 }
135 void
136 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
137 {
138   if(_isReady) {
139     msg->call();
140     delete msg;
141   } else {
142     _msgs.enq(msg);
143   }
144 }
145
146 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
147         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
148 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
149 {
150         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
151         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
152         b->fn(b->param,m->getSize(),m->getData());
153         delete m;
154 }
155
156 ///////////////// Reduction Manager //////////////////
157 /*
158 One CkReductionMgr runs a non-overlapping set of reductions.
159 It collects messages from all local contributors, then sends
160 the reduced message up the reduction tree to node zero, where
161 they're passed to the user's client function.
162 */
163
164 CkReductionMgr::CkReductionMgr()//Constructor
165   : thisProxy(thisgroup)
166
167   redNo=0;
168   completedRedNo = -1;
169   inProgress=CmiFalse;
170   creating=CmiFalse;
171   startRequested=CmiFalse;
172   gcount=lcount=0;
173   nContrib=nRemote=0;
174   maxStartRequest=0;
175   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
176 }
177
178 void CkReductionMgr::flushStates()
179 {
180   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
181   redNo=0;
182   completedRedNo = -1;
183   inProgress=CmiFalse;
184   creating=CmiFalse;
185   gcount=lcount=0;
186   startRequested=CmiFalse;
187   nContrib=nRemote=0;
188   maxStartRequest=0;
189
190   while (!msgs.isEmpty()) { delete msgs.deq(); }
191   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
192   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
193   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
194
195   adjVec.length()=0;
196
197   nodeProxy[CkMyNode()].ckLocalBranch()->flushStates();
198 }
199
200 //////////// Reduction Manager Client API /////////////
201
202 //Add the given client function.  Overwrites any previous client.
203 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
204 {
205   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
206
207   if (CkMyPe()!=0)
208           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
209   storedCallback=*cb;
210   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
211   nodeProxy.ckSetReductionClient(callback);
212 }
213
214 ///////////////////////////// Contributor ////////////////////////
215 //Contributors keep a copy of this structure:
216
217 /*Contributor migration support:
218 */
219 void contributorInfo::pup(PUP::er &p)
220 {
221   p(redNo);
222 }
223
224 ////////////////////// Contributor list maintainance: /////////////////
225 //These just set and clear the "creating" flag to prevent
226 // reductions from finishing early because not all elements
227 // have been created.
228 void CkReductionMgr::creatingContributors(void)
229 {
230   DEBR((AA"Creating contributors...\n"AB));
231   creating=CmiTrue;
232 }
233 void CkReductionMgr::doneCreatingContributors(void)
234 {
235   DEBR((AA"Done creating contributors...\n"AB));
236   creating=CmiFalse;
237   if (startRequested) startReduction(redNo,CkMyPe());
238   finishReduction();
239 }
240
241 //A new contributor will be created
242 void CkReductionMgr::contributorStamped(contributorInfo *ci)
243 {
244   DEBR((AA"Contributor %p stamped\n"AB,ci));
245   //There is another contributor
246   gcount++;
247   if (inProgress)
248   {
249     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
250     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
251   } else
252     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
253 }
254
255 //A new contributor was actually created
256 void CkReductionMgr::contributorCreated(contributorInfo *ci)
257 {
258   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
259   //We've got another contributor
260   lcount++;
261   //He may not need to contribute to some of our reductions:
262   for (int r=redNo;r<ci->redNo;r++)
263     adj(r).lcount--;//He won't be contributing to r here
264 }
265
266 /*Don't expect any more contributions from this one.
267 This is rather horrifying because we now have to make
268 sure the global element count accurately reflects all the
269 contributions the element made before it died-- these may stretch
270 far into the future.  The adj() vector is what saves us here.
271 */
272 void CkReductionMgr::contributorDied(contributorInfo *ci)
273 {
274 #if CMK_MEM_CHECKPOINT
275   // ignore from listener if it is during restart from crash
276   if (CkInRestarting()) return;
277 #endif
278   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
279   //We lost a contributor
280   gcount--;
281
282   if (ci->redNo<redNo)
283   {//Must have been migrating during reductions-- root is waiting for his
284   // contribution, which will never come.
285     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
286     for (int r=ci->redNo;r<redNo;r++)
287       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
288   }
289
290   //Add to the global count for all his future messages (wherever they are)
291   int r;
292   for (r=redNo;r<ci->redNo;r++)
293   {//He already contributed to this reduction, but won't show up in global count.
294     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
295     adj(r).gcount++;
296   }
297
298   lcount--;
299   //He's already contributed to several reductions here
300   for (r=redNo;r<ci->redNo;r++)
301     adj(r).lcount++;//He'll be contributing to r here
302
303   finishReduction();
304 }
305
306 //Migrating away (note that global count doesn't change)
307 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
308 {
309   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
310   lcount--;//We lost a local
311   //He's already contributed to several reductions here
312   for (int r=redNo;r<ci->redNo;r++)
313     adj(r).lcount++;//He'll be contributing to r here
314
315   finishReduction();
316 }
317
318 //Migrating in (note that global count doesn't change)
319 void CkReductionMgr::contributorArriving(contributorInfo *ci)
320 {
321   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
322   lcount++;//We gained a local
323 #if CMK_MEM_CHECKPOINT
324   // ignore from listener if it is during restart from crash
325   // because the ci may be old.
326   if (CkInRestarting()) return;
327 #endif
328   //He has already contributed (elsewhere) to several reductions:
329   for (int r=redNo;r<ci->redNo;r++)
330     adj(r).lcount--;//He won't be contributing to r here
331
332 }
333
334 //Contribute-- the given msg can contain any data.  The reducerType
335 // field of the message must be valid.
336 // Each contributor must contribute exactly once to the each reduction.
337 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
338 {
339   DEBR((AA"Contributor %p contributed for %d in grp %d ismigratable %d \n"AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
340   m->ci=ci;
341   m->redNo=ci->redNo++;
342   m->sourceFlag=-1;//A single contribution
343   m->gcount=0;
344   addContribution(m);
345 }
346
347 //////////// Reduction Manager Remote Entry Points /////////////
348 //Sent down the reduction tree (used by barren PEs)
349 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
350 {
351  if(CkMyPe()==0){
352         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
353         //delete m;
354         //return;
355  }
356  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
357  int srcPE = (UsrToEnv(m))->getSrcPe();
358   if (isPresent(m->num) && !inProgress)
359   {
360     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
361     startReduction(m->num,srcPE);
362     finishReduction();
363   } else if (isFuture(m->num)){
364 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
365           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
366           if(maxStartRequest < m->num){
367                   maxStartRequest = m->num;
368           }
369  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
370           
371     }
372   else //is Past
373     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
374   delete m;
375 }
376
377 //Sent to root of reduction tree with reduction contribution
378 // of migrants that missed the main reduction.
379 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
380 {
381         m->secondaryCallback = m->callback;
382   m->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
383   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
384   nodeMgr->LateMigrantMsg(m);
385 /*      int len = finalMsgs.length();
386         finalMsgs.enq(m);
387 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
388         endArrayReduction();*/
389 }
390
391 //A late migrating contributor will never contribute to this reduction
392 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
393 {
394   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
395   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
396  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
397   adj(m->num).gcount--;//He won't be contributing to this one.
398   finishReduction();
399   delete m;
400 }
401
402 //////////// Reduction Manager State /////////////
403 void CkReductionMgr::startReduction(int number,int srcPE)
404 {
405   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
406   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
407   if (inProgress){
408         DEBR((AA"This reduction is already in progress\n"AB));
409         return;//This reduction already started
410   }
411   if (creating) //Don't start yet-- we're creating elements
412   {
413     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
414     startRequested=CmiTrue;
415     return;
416   }
417
418 //If none of these cases, we need to start the reduction--
419   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
420   inProgress=CmiTrue;
421   //Sent start requests to our kids (in case they don't already know)
422  
423
424         /*
425                 FAULT_EVAC
426         */
427   if(!CmiNodeAlive(CkMyPe())){
428         return;
429   }
430         
431   nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
432   int temp;
433         /*
434   //making it a broadcast done only by PE 0
435   if(!hasParent()){
436                 temp = completedRedNo+1;
437                 for(int i=temp;i<=number;i++){
438                         for(int j=0;j<CkNumPes();j++){
439                                 if(j != CkMyPe() && j != srcPE){
440                                         if((CmiNodeAlive(j)||allowMessagesOnly !=-1){
441                                                 thisProxy[j].ReductionStarting(new CkReductionNumberMsg(i));
442                                         }
443                                 }
444                         }
445                 }       
446         }       else{
447                 temp = number;
448         }*/
449 /*  if(!hasParent()){
450                 temp = completedRedNo+1;
451         }       else{
452                 temp = number;
453         }
454         for(int i=temp;i<=number;i++){
455         //      DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
456                 if(hasParent()){
457           // kick-start your parent too ...
458                         if(treeParent() != srcPE){
459                                 if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
460                                 thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
461                                 }       
462                         }       
463                 }
464           for (int k=0;k<treeKids();k++)
465           {
466                         if(firstKid()+k != srcPE){
467                                 if(CmiNodeAlive(kids[k])||allowMessagesOnly !=-1){
468                             DEBR((AA"Asking child PE %d to start #%d\n"AB,kids[k],redNo));
469                             thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(i));
470                                 }       
471                         }       
472         }
473         }
474         */
475 }       
476
477 /*Handle a message from one element for the reduction*/
478 void CkReductionMgr::addContribution(CkReductionMsg *m)
479 {
480   if (isPast(m->redNo))
481   {//We've moved on-- forward late contribution straight to root
482     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
483    // if (!hasParent()) //Root moved on too soon-- should never happen
484    //   CkAbort("Late reduction contribution received at root!\n");
485     thisProxy[0].LateMigrantMsg(m);
486   }
487   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
488     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
489     futureMsgs.enq(m);
490   } else {// An ordinary contribution
491     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
492    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
493     startReduction(m->redNo,CkMyPe());
494     msgs.enq(m);
495     nContrib++;
496     finishReduction();
497   }
498 }
499
500 /**function checks if it has got all contributions that it is supposed to
501 get at this processor. If it is done it sends the reduced result to the local
502 nodegroup */
503 void CkReductionMgr::finishReduction(void)
504 {
505   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
506   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
507   if ((!inProgress) | creating){
508         DEBR((AA"Either not in Progress or creating\n"AB));
509         return;
510   }
511   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
512   if (nContrib<(lcount+adj(redNo).lcount)){
513         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
514          return;//Need more local messages
515   }
516   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
517   CkReductionMsg *result=reduceMessages();
518   result->redNo=redNo;
519   result->gcount+=gcount+adj(redNo).gcount;
520   result->secondaryCallback = result->callback;
521   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
522         DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
523
524   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
525
526 #if DEBUGRED
527  // CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
528 #endif
529   
530   // Find our node reduction manager, and pass reduction to him:
531   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
532   nodeMgr->contributeArrayReduction(result);
533
534   //House Keeping Operations will have to check later what needs to be changed
535   redNo++;
536   //Shift the count adjustment vector down one slot (to match new redNo)
537   int i;
538
539   if(CkMyPe()!=0){
540         int i;
541         completedRedNo++;
542         for (i=1;i<adjVec.length();i++)
543            adjVec[i-1]=adjVec[i];
544         adjVec.length()--;  
545   }
546   inProgress=CmiFalse;
547   startRequested=CmiFalse;
548   nRemote=nContrib=0;
549
550   //Look through the future queue for messages we can now handle
551   int n=futureMsgs.length();
552   for (i=0;i<n;i++)
553   {
554     CkReductionMsg *m=futureMsgs.deq();
555     if (m!=NULL) //One of these addContributions may have finished us.
556       addContribution(m);//<- if *still* early, puts it back in the queue
557   }
558   if(maxStartRequest >= redNo){
559           startReduction(redNo,CkMyPe());
560           finishReduction();
561   }
562 }
563
564 //////////// Reduction Manager Utilities /////////////
565
566 //Return the countAdjustment struct for the given redNo:
567 countAdjustment &CkReductionMgr::adj(int number)
568 {
569   number-=completedRedNo;
570   number--;
571   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
572   //Pad the adjustment vector with zeros until it's at least number long
573   while (adjVec.length()<=number)
574     adjVec.push_back(countAdjustment());
575   return adjVec[number];
576 }
577
578 //Combine (& free) the current message vector msgs.
579 CkReductionMsg *CkReductionMgr::reduceMessages(void)
580 {
581   CkReductionMsg *ret=NULL;
582
583   //Look through the vector for a valid reducer, swapping out placeholder messages
584   CkReduction::reducerType r=CkReduction::invalid;
585   int msgs_gcount=0;//Reduced gcount
586   int msgs_nSources=0;//Reduced nSources
587   int msgs_userFlag=-1;
588   CkCallback msgs_callback;
589   int i;
590   int nMsgs=0;
591   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
592   CkReductionMsg *m;
593         bool isMigratableContributor;
594
595   // Copy message queue into msgArr, skipping placeholders:
596   while (NULL!=(m=msgs.deq()))
597   {
598     msgs_gcount+=m->gcount;
599     if (m->sourceFlag!=0)
600     { //This is a real message from an element, not just a placeholder
601       msgArr[nMsgs++]=m;
602       msgs_nSources+=m->nSources();
603       r=m->reducer;
604       if (!m->callback.isInvalid())
605         msgs_callback=m->callback;
606       if (m->userFlag!=-1)
607         msgs_userFlag=m->userFlag;
608                         
609                         isMigratableContributor=m->isMigratableContributor();
610     }
611     else
612     { //This is just a placeholder message-- forget it
613       delete m;
614     }
615   }
616
617   if (nMsgs==0||r==CkReduction::invalid)
618   //No valid reducer in the whole vector
619     ret=CkReductionMsg::buildNew(0,NULL);
620   else
621   {//Use the reducer to reduce the messages
622                 //if there is only one msg to be reduced just return that message
623                 if(nMsgs == 1){
624                         ret = msgArr[0];        
625                 }else{
626             CkReduction::reducerFn f=CkReduction::reducerTable[r];
627           ret=(*f)(nMsgs,msgArr);
628                 }
629     ret->reducer=r;
630   }
631
632         //Go back through the vector, deleting old messages
633   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
634         delete [] msgArr;
635
636   //Set the message counts
637   ret->redNo=redNo;
638   ret->gcount=msgs_gcount;
639   ret->userFlag=msgs_userFlag;
640   ret->callback=msgs_callback;
641   ret->sourceFlag=msgs_nSources;
642         ret->setMigratableContributor(isMigratableContributor);
643   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
644
645   return ret;
646 }
647
648
649 //Checkpointing utilities
650 //pack-unpack method for CkReductionMsg
651 //if packing pack the message and then unpack and return it
652 //if unpacking allocate memory for it read it off disk and then unapck
653 //and return it
654 void CkReductionMgr::pup(PUP::er &p)
655 {
656 //We do not store the client function pointer or the client function parameter,
657 //it is the responsibility of the programmer to correctly restore these
658   CkGroupInitCallback::pup(p);
659   p(redNo);
660   p(completedRedNo);
661   p(inProgress); p(creating); p(startRequested);
662   p(nContrib); p(nRemote);
663   p|msgs;
664   p|futureMsgs;
665   p|futureRemoteMsgs;
666   p|finalMsgs;
667   p|adjVec;
668   p|nodeProxy;
669   p|storedCallback;
670   p|lcount;
671   p|gcount;
672   if(p.isUnpacking()){
673     thisProxy = thisgroup;
674 //    lcount=0;
675 //    gcount=0;
676     maxStartRequest=0;
677   }
678 #if DEBUGRED
679   CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
680 #endif
681 }
682
683
684 //Callback for doing Reduction through NodeGroups added by Sayantan
685
686 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
687
688         int total = m->gcount+adj(m->redNo).gcount;
689         finalMsgs.enq(m);
690         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
691         adj(m->redNo).mainRecvd = 1;
692 #if DEBUGRED
693         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
694 #endif  
695         endArrayReduction();
696 }
697
698 void CkReductionMgr :: endArrayReduction(){
699         CkReductionMsg *ret=NULL;
700         int nMsgs=finalMsgs.length();
701         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
702         //Look through the vector for a valid reducer, swapping out placeholder messages
703         //CkPrintf("Length of Final Message %d \n",nMsgs);
704         CkReduction::reducerType r=CkReduction::invalid;
705         int msgs_gcount=0;//Reduced gcount
706         int msgs_nSources=0;//Reduced nSources
707         int msgs_userFlag=-1;
708         CkCallback msgs_callback;
709         CkCallback msgs_secondaryCallback;
710         CkVec<CkReductionMsg *> tempMsgs;
711         int i;
712         int numMsgs = 0;
713         for (i=0;i<nMsgs;i++)
714         {
715                 CkReductionMsg *m=finalMsgs.deq();
716                 if(m->redNo == completedRedNo +1){
717                         msgs_gcount+=m->gcount;
718                         if (m->sourceFlag!=0)
719                         { //This is a real message from an element, not just a placeholder
720                                 msgs_nSources+=m->nSources();
721                                 r=m->reducer;
722                                 if (!m->callback.isInvalid())
723                                 msgs_callback=m->callback;
724                                 if(!m->secondaryCallback.isInvalid())
725                                         msgs_secondaryCallback = m->secondaryCallback;
726                                 if (m->userFlag!=-1)
727                                         msgs_userFlag=m->userFlag;
728                                 tempMsgs.push_back(m);
729                         }
730                 }else{
731                         finalMsgs.enq(m);
732                 }
733
734         }
735         numMsgs = tempMsgs.length();
736 #if DEBUGRED
737         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
738 #endif  
739         if(numMsgs == 0){
740                 return;
741         }
742         if(adj(completedRedNo+1).mainRecvd == 0){
743                 for(i=0;i<numMsgs;i++){
744                         finalMsgs.enq(tempMsgs[i]);
745                 }
746                 return;
747         }
748
749 /*
750         NOT NEEDED ANYMORE DONE at nodegroup level
751         if(msgs_gcount  > msgs_nSources){
752                 for(i=0;i<numMsgs;i++){
753                         finalMsgs.enq(tempMsgs[i]);
754                 }
755                 return;
756         }*/
757
758         if (nMsgs==0||r==CkReduction::invalid)
759                 //No valid reducer in the whole vector
760                 ret=CkReductionMsg::buildNew(0,NULL);
761         else{//Use the reducer to reduce the messages
762                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
763                 // has to be corrected elements from above need to be put into a temporary vector
764                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
765                 ret=(*f)(numMsgs,msgArr);
766                 ret->reducer=r;
767
768         }
769
770         for(i = 0;i<numMsgs;i++){
771                 if (tempMsgs[i] != ret) delete tempMsgs[i];
772         }
773
774         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
775         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
776
777         ret->redNo=completedRedNo+1;
778         ret->gcount=msgs_gcount;
779         ret->userFlag=msgs_userFlag;
780         ret->callback=msgs_callback;
781         ret->secondaryCallback = msgs_secondaryCallback;
782         ret->sourceFlag=msgs_nSources;
783
784 #if DEBUGRED
785         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
786 #endif
787         if (!ret->secondaryCallback.isInvalid())
788             ret->secondaryCallback.send(ret);
789     else if (!storedCallback.isInvalid())
790             storedCallback.send(ret);
791     else{
792 #if DEBUGRED
793             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
794 #endif
795             CkAbort("No reduction client!\n"
796                     "You must register a client with either SetReductionClient or during contribute.\n");
797     }
798         completedRedNo++;
799 #if DEBUGRED
800        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
801 #endif
802         for (i=1;i<adjVec.length();i++)
803                 adjVec[i-1]=adjVec[i];
804         adjVec.length()--;
805         endArrayReduction();
806 }
807
808
809 /////////////////////////////////////////////////////////////////////////
810
811 ////////////////////////////////
812 //CkReductionMsg support
813
814 //ReductionMessage default private constructor-- does nothing
815 CkReductionMsg::CkReductionMsg(){}
816 CkReductionMsg::~CkReductionMsg(){}
817
818 //This define gives the distance from the start of the CkReductionMsg
819 // object to the start of the user data area (just below last object field)
820 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
821
822 //"Constructor"-- builds and returns a new CkReductionMsg.
823 //  the "data" array you specify will be copied into this object.
824 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
825     CkReduction::reducerType reducer, CkReductionMsg *buf)
826 {
827   int len[1];
828   len[0]=NdataSize;
829   CkReductionMsg *ret = buf? buf:new(len,0)CkReductionMsg();
830   
831   ret->dataSize=NdataSize;
832   if (srcData!=NULL && !buf)
833     memcpy(ret->data,srcData,NdataSize);
834   ret->userFlag=-1;
835   ret->reducer=reducer;
836   ret->ci=NULL;
837   ret->sourceFlag=-1000;
838   ret->gcount=0;
839   ret->migratableContributor = true;
840   return ret;
841 }
842
843 // Charm kernel message runtime support:
844 void *
845 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
846 {
847   int totalsize=ARM_DATASTART+(*sz);
848   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
849   CkReductionMsg *ret = (CkReductionMsg *)
850     CkAllocMsg(msgnum,totalsize,priobits);
851   ret->data=(void *)(&ret->dataStorage);
852   return (void *) ret;
853 }
854
855 void *
856 CkReductionMsg::pack(CkReductionMsg* in)
857 {
858   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
859   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
860   in->data = NULL;
861   return (void*) in;
862 }
863
864 CkReductionMsg* CkReductionMsg::unpack(void *in)
865 {
866   CkReductionMsg *ret = (CkReductionMsg *)in;
867   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
868   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
869   ret->data=(void *)(&ret->dataStorage);
870   return ret;
871 }
872
873
874 /////////////////////////////////////////////////////////////////////////////////////
875 ///////////////// Builtin Reducer Functions //////////////
876 /* A simple reducer, like sum_int, looks like this:
877 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
878 {
879   int i,ret=0;
880   for (i=0;i<nMsg;i++)
881     ret+=*(int *)(msg[i]->getData());
882   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
883 }
884
885 To keep the code small and easy to change, the implementations below
886 are built with preprocessor macros.
887 */
888
889 //////////////// simple reducers ///////////////////
890 /*A define used to quickly and tersely construct simple reductions.
891 The basic idea is to use the first message's data array as
892 (pre-initialized!) scratch space for folding in the other messages.
893  */
894
895 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
896 {
897         CkAbort("Called the invalid reducer type 0.  This probably\n"
898                 "means you forgot to initialize your custom reducer index.\n");
899         return NULL;
900 }
901
902 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
903 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
904 {\
905   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
906   int m,i;\
907   int nElem=msg[0]->getLength()/sizeof(dataType);\
908   dataType *ret=(dataType *)(msg[0]->getData());\
909   for (m=1;m<nMsg;m++)\
910   {\
911     dataType *value=(dataType *)(msg[m]->getData());\
912     for (i=0;i<nElem;i++)\
913     {\
914       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
915       loop\
916     }\
917   }\
918   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
919   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
920 }
921
922 //Use this macro for reductions that have the same type for all inputs
923 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
924   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
925   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
926   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
927
928
929 //Compute the sum the numbers passed by each element.
930 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
931
932 //Compute the product of the numbers passed by each element.
933 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
934
935 //Compute the largest number passed by any element.
936 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
937
938 //Compute the smallest integer passed by any element.
939 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
940
941
942 //Compute the logical AND of the integers passed by each element.
943 // The resulting integer will be zero if any source integer is zero; else 1.
944 SIMPLE_REDUCTION(logical_and,int,"%d",
945         if (value[i]==0)
946      ret[i]=0;
947   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
948 )
949
950 //Compute the logical OR of the integers passed by each element.
951 // The resulting integer will be 1 if any source integer is nonzero; else 0.
952 SIMPLE_REDUCTION(logical_or,int,"%d",
953   if (value[i]!=0)
954            ret[i]=1;
955   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
956 )
957
958 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
959 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
960
961 /////////////// concat ////////////////
962 /*
963 This reducer simply appends the data it recieves from each element,
964 without any housekeeping data to separate them.
965 */
966 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
967 {
968   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
969   //Figure out how big a message we'll need
970   int i,retSize=0;
971   for (i=0;i<nMsg;i++)
972       retSize+=msg[i]->getSize();
973
974   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
975
976   //Allocate a new message
977   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
978
979   //Copy the source message data into the return message
980   char *cur=(char *)(ret->getData());
981   for (i=0;i<nMsg;i++) {
982     int messageBytes=msg[i]->getSize();
983     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
984     cur+=messageBytes;
985   }
986   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
987   return ret;
988 }
989
990 /////////////// set ////////////////
991 /*
992 This reducer appends the data it recieves from each element
993 along with some housekeeping data indicating contribution boundaries.
994 The message data is thus a list of reduction_set_element structures
995 terminated by a dummy reduction_set_element with a sourceElement of -1.
996 */
997
998 //This rounds an integer up to the nearest multiple of sizeof(double)
999 static const int alignSize=sizeof(double);
1000 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1001
1002 //This gives the size (in bytes) of a reduction_set_element
1003 static int SET_SIZE(int dataSize)
1004 {return SET_ALIGN(sizeof(int)+dataSize);}
1005
1006 //This returns a pointer to the next reduction_set_element in the list
1007 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1008 {
1009   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1010   return (CkReduction::setElement *)next;
1011 }
1012
1013 //Combine the data passed by each element into an list of reduction_set_elements.
1014 // Each element may contribute arbitrary data (with arbitrary length).
1015 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1016 {
1017   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1018   //Figure out how big a message we'll need
1019   int i,retSize=0;
1020   for (i=0;i<nMsg;i++) {
1021     if (!msg[i]->isFromUser())
1022     //This message is composite-- it will just be copied over (less terminating -1)
1023       retSize+=(msg[i]->getSize()-sizeof(int));
1024     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1025       retSize+=SET_SIZE(msg[i]->getSize());
1026   }
1027   retSize+=sizeof(int);//Leave room for terminating -1.
1028
1029   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1030
1031   //Allocate a new message
1032   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1033
1034   //Copy the source message data into the return message
1035   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1036   for (i=0;i<nMsg;i++)
1037     if (!msg[i]->isFromUser())
1038     {//This message is composite-- just copy it over (less terminating -1)
1039                         int messageBytes=msg[i]->getSize()-sizeof(int);
1040                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1041                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1042                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1043     }
1044     else //This is a message from an element-- wrap it in a reduction_set_element
1045     {
1046       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
1047       cur->dataSize=msg[i]->getSize();
1048       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1049       cur=SET_NEXT(cur);
1050     }
1051   cur->dataSize=-1;//Add a terminating -1.
1052   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1053   return ret;
1054 }
1055
1056 //Utility routine: get the next reduction_set_element in the list
1057 // if there is one, or return NULL if there are none.
1058 //To get all the elements, just keep feeding this procedure's output back to
1059 // its input until it returns NULL.
1060 CkReduction::setElement *CkReduction::setElement::next(void)
1061 {
1062   CkReduction::setElement *n=SET_NEXT(this);
1063   if (n->dataSize==-1)
1064     return NULL;//This is the end of the list
1065   else
1066     return n;//This is just another element
1067 }
1068
1069 /////////////////// Reduction Function Table /////////////////////
1070 CkReduction::CkReduction() {} //Dummy private constructor
1071
1072 //Add the given reducer to the list.  Returns the new reducer's
1073 // reducerType.  Must be called in the same order on every node.
1074 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1075 {
1076   reducerTable[nReducers]=fn;
1077   return (reducerType)nReducers++;
1078 }
1079
1080 /*Reducer table: maps reducerTypes to reducerFns.
1081 It's indexed by reducerType, so the order in this table
1082 must *exactly* match the reducerType enum declaration.
1083 The names don't have to match, but it helps.
1084 */
1085 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1086
1087 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1088     ::invalid_reducer,
1089   //Compute the sum the numbers passed by each element.
1090     ::sum_int,::sum_float,::sum_double,
1091
1092   //Compute the product the numbers passed by each element.
1093     ::product_int,::product_float,::product_double,
1094
1095   //Compute the largest number passed by any element.
1096     ::max_int,::max_float,::max_double,
1097
1098   //Compute the smallest number passed by any element.
1099     ::min_int,::min_float,::min_double,
1100
1101   //Compute the logical AND of the integers passed by each element.
1102   // The resulting integer will be zero if any source integer is zero.
1103     ::logical_and,
1104
1105   //Compute the logical OR of the integers passed by each element.
1106   // The resulting integer will be 1 if any source integer is nonzero.
1107     ::logical_or,
1108
1109     // Compute the logical bitvector AND of the integers passed by each element.
1110     ::bitvec_and,
1111
1112     // Compute the logical bitvector OR of the integers passed by each element.
1113     ::bitvec_or,
1114
1115   //Concatenate the (arbitrary) data passed by each element
1116     ::concat,
1117
1118   //Combine the data passed by each element into an list of setElements.
1119   // Each element may contribute arbitrary data (with arbitrary length).
1120     ::set
1121 };
1122
1123
1124
1125
1126
1127
1128
1129
1130 /********** Code added by Sayantan *********************/
1131 /** Locking is a big problem in the nodegroup code for smp.
1132  So a few assumptions have had to be made. There is one lock
1133  called lockEverything. It protects all the data structures 
1134  of the nodegroup reduction mgr. I tried grabbing it separately 
1135  for each datastructure, modifying it and then releasing it and
1136  then grabbing it again, for the next change.
1137  That doesn't really help because the interleaved execution of 
1138  different threads makes the state of the reduction manager 
1139  inconsistent. 
1140  
1141  1. Grab lockEverything before calling finishreduction or startReduction
1142     or doRecvMsg
1143  2. lockEverything is grabbed only in entry methods reductionStarting
1144     or RecvMesg or  addcontribution.
1145  ****/
1146  
1147 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1148 NodeGroup::NodeGroup(void) {
1149   __nodelock=CmiCreateLock();
1150
1151
1152 }
1153 NodeGroup::~NodeGroup() {
1154   CmiDestroyLock(__nodelock);
1155 }
1156 void NodeGroup::pup(PUP::er &p)
1157 {
1158   CkNodeReductionMgr::pup(p);
1159   p|reductionInfo;
1160 }
1161
1162 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1163
1164 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1165   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1166  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1167   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1168  }
1169
1170 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1171                                     ((CkNodeReductionMgr *)this),
1172                                     reductionInfo,false);
1173
1174 /* this contribute also adds up the count across all messages it receives.
1175   Useful for summing up number of array elements who have contributed ****/ 
1176 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1177         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1178
1179
1180
1181 //#define BINOMIAL_TREE
1182
1183 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1184   : thisProxy(thisgroup)
1185 {
1186 #ifdef BINOMIAL_TREE
1187   init_BinomialTree();
1188 #else
1189   init_BinaryTree();
1190 #endif
1191   storedCallback=NULL;
1192   redNo=0;
1193   inProgress=CmiFalse;
1194   
1195   startRequested=CmiFalse;
1196   gcount=CkNumNodes();
1197   lcount=1;
1198   nContrib=nRemote=0;
1199   lockEverything = CmiCreateLock();
1200
1201
1202   creating=CmiFalse;
1203   interrupt = 0;
1204   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1205         /*
1206                 FAULT_EVAC
1207         */
1208         blocked = false;
1209         maxModificationRedNo = INT_MAX;
1210         killed=0;
1211         additionalGCount = newAdditionalGCount = 0;
1212 }
1213
1214 void CkNodeReductionMgr::flushStates()
1215 {
1216  if(CkMyRank() == 0){
1217   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
1218   redNo=0;
1219   inProgress=CmiFalse;
1220
1221   startRequested=CmiFalse;
1222   gcount=CkNumNodes();
1223   lcount=1;
1224   nContrib=nRemote=0;
1225
1226   creating=CmiFalse;
1227   interrupt = 0;
1228   while (!msgs.isEmpty()) { delete msgs.deq(); }
1229   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
1230   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
1231   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
1232   }
1233 }
1234
1235 //////////// Reduction Manager Client API /////////////
1236
1237 //Add the given client function.  Overwrites any previous client.
1238 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1239 {
1240   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1241   if(cb->isInvalid()){
1242         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1243   }else{
1244         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1245   }
1246
1247   if (CkMyNode()!=0)
1248           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1249   delete storedCallback;
1250   storedCallback=cb;
1251 }
1252
1253
1254
1255 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1256 {
1257
1258   m->ci=ci;
1259   m->redNo=ci->redNo++;
1260   m->sourceFlag=-1;//A single contribution
1261   m->gcount=0;
1262 #if DEBUGRED
1263         CkPrintf("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo);
1264 #endif
1265   addContribution(m);
1266
1267 }
1268
1269
1270 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1271 {
1272   m->ci=ci;
1273   m->redNo=ci->redNo++;
1274   m->gcount=count;
1275 #if DEBUGRED
1276  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1277 #endif
1278   addContribution(m);
1279 #if DEBUGRED
1280   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1281 #endif
1282
1283 }
1284
1285
1286 //////////// Reduction Manager Remote Entry Points /////////////
1287
1288 //Sent down the reduction tree (used by barren PEs)
1289 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1290 {
1291   CmiLock(lockEverything);
1292   if(blocked){
1293         delete m;
1294         CmiUnlock(lockEverything);
1295         return ;
1296   }
1297   int srcNode = CmiNodeOf((UsrToEnv(m))->getSrcPe());
1298   if (isPresent(m->num) && !inProgress)
1299   {
1300     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1301     startReduction(m->num,srcNode);
1302     finishReduction();
1303   } else if (isFuture(m->num)){
1304         DEBR(("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo));
1305   }
1306   else //is Past
1307     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1308   CmiUnlock(lockEverything);
1309   delete m;
1310
1311 }
1312
1313
1314 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1315 #if DEBUGRED
1316         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1317 #endif
1318         /*
1319                 FAULT_EVAC
1320         */
1321         if(blocked){
1322                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
1323                 bufferedRemoteMsgs.enq(m);
1324                 return;
1325         }
1326         
1327         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1328             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1329             startReduction(m->redNo,CkMyNode());
1330             msgs.enq(m);
1331             nRemote++;
1332             finishReduction();
1333         }
1334         else {
1335             if (isFuture(m->redNo)) {
1336                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1337                     futureRemoteMsgs.enq(m);
1338             }else{
1339                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1340                    CkAbort("Recv'd late remote contribution!\n");
1341             }
1342   }
1343 #if DEBUGRED        
1344        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1345 #endif       
1346 }
1347
1348 //Sent up the reduction tree with reduced data
1349 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1350 {
1351 #ifndef CMK_CPV_IS_SMP
1352 #if CMK_IMMEDIATE_MSG
1353         if(interrupt == 1){
1354                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1355                 CpvAccess(_qd)->process(-1);
1356                 CmiDelayImmediate();
1357                 return;
1358         }
1359 #endif  
1360 #endif
1361    interrupt = 1;       
1362    CmiLock(lockEverything);   
1363 #if DEBUGRED   
1364    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1365 #endif   
1366    doRecvMsg(m);
1367    CmiUnlock(lockEverything);    
1368    interrupt = 0;
1369 #if DEBUGRED  
1370    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1371 #endif 
1372 }
1373
1374 void CkNodeReductionMgr::startReduction(int number,int srcNode)
1375 {
1376         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1377         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1378         if (inProgress){
1379                 DEBR((AA"This Node reduction is already in progress\n"AB));
1380                 return;//This reduction already started
1381         }
1382         if (creating) //Don't start yet-- we're creating elements
1383         {
1384                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1385                 startRequested=CmiTrue;
1386                 return;
1387         }
1388         
1389         //If none of these cases, we need to start the reduction--
1390         DEBR((AA"Starting Node reduction #%d on %p srcNode %d\n"AB,redNo,this,srcNode));
1391         inProgress=CmiTrue;
1392         //Sent start requests to our kids (in case they don't already know)
1393
1394         for (int k=0;k<treeKids();k++)
1395         {
1396 #ifdef BINOMIAL_TREE
1397                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1398                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1399 #else
1400                 if(kids[k] != srcNode){
1401                         DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1402                         thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1403                 }       
1404 #endif
1405         }
1406         startLocalGroupReductions(number);
1407 }
1408
1409 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1410         /*
1411                 FAULT_EVAC
1412         */
1413         if(blocked){
1414                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1415                 bufferedMsgs.enq(m);
1416                 return;
1417         }
1418         
1419         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1420                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1421                 futureMsgs.enq(m);
1422         } else {// An ordinary contribution
1423                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1424                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1425                 startReduction(m->redNo,CkMyNode());
1426                 msgs.enq(m);
1427                 nContrib++;
1428                 finishReduction();
1429         }
1430 }
1431
1432 //Handle a message from one element for the reduction
1433 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1434 {
1435   interrupt = 1;
1436   CmiLock(lockEverything);
1437   doAddContribution(m);
1438   CmiUnlock(lockEverything);
1439   interrupt = 0;
1440 }
1441
1442 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
1443         if(blocked){
1444                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
1445                 bufferedMsgs.enq(m);
1446                 return;
1447         }
1448         
1449         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1450                 DEBR((AA"Latemigrant %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1451 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1452                 futureLateMigrantMsgs.enq(m);
1453         } else {// An ordinary contribution
1454                 DEBR((AA"Recv'd late migrant contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1455 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1456                 msgs.enq(m);
1457                 finishReduction();
1458         }
1459 };
1460
1461
1462
1463
1464
1465 /** check if the nodegroup reduction is finished at this node. In that case send it
1466 up the reduction tree **/
1467
1468 void CkNodeReductionMgr::finishReduction(void)
1469 {
1470   DEBR((AA"in Nodegrp finishReduction %d treeKids %d \n"AB,inProgress,treeKids()));
1471   /***Check if reduction is finished in the next few ifs***/
1472   if ((!inProgress) | creating){
1473         DEBR((AA"Either not in Progress or creating\n"AB));
1474         return;
1475   }
1476
1477   if (nContrib<(lcount)){
1478         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1479
1480          return;//Need more local messages
1481   }
1482   if (nRemote<treeKids()){
1483         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1484
1485         return;//Need more remote messages
1486   }
1487   if (nRemote>treeKids()){
1488
1489           interrupt = 0;
1490            CkAbort("Nodegrp Excess remote reduction message received!\n");
1491   }
1492
1493   DEBR((AA"Reducing node data...\n"AB));
1494
1495   /**reduce all messages received at this node **/
1496   CkReductionMsg *result=reduceMessages();
1497
1498   if (hasParent())
1499   {//Pass data up tree to parent
1500         if(CmiNodeAlive(CkMyNode()) || killed == 0){
1501         DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1502 #if DEBUGRED
1503         CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
1504 #endif
1505                 /*
1506                         FAULT_EVAC
1507                 */
1508                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
1509             thisProxy[treeParent()].RecvMsg(result);
1510         }
1511
1512   }
1513   else
1514   {
1515                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
1516 #if DEBUGRED
1517                         CkPrintf("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor());
1518 #endif                  
1519                         msgs.enq(result);
1520                         return;
1521                 }
1522                 result->gcount += additionalGCount;
1523           /** if the reduction is finished and I am the root of the reduction tree
1524           then call the reductionhandler and other stuff ***/
1525                 
1526
1527 #if DEBUGRED
1528    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
1529 #endif
1530     if (!result->callback.isInvalid()){
1531 #if DEBUGRED
1532             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
1533 #endif      
1534             result->callback.send(result);
1535     }
1536     else if (storedCallback!=NULL){
1537 #if DEBUGRED
1538             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
1539 #endif
1540             storedCallback->send(result);
1541     }
1542     else{
1543                 DEBR((AA"Invalid Callback \n"AB));
1544             CkAbort("No reduction client!\n"
1545                     "You must register a client with either SetReductionClient or during contribute.\n");
1546                 }
1547   }
1548
1549   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1550   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1551   redNo++;
1552         updateTree();
1553   int i;
1554   inProgress=CmiFalse;
1555   startRequested=CmiFalse;
1556   nRemote=nContrib=0;
1557
1558   //Look through the future queue for messages we can now handle
1559   int n=futureMsgs.length();
1560
1561   for (i=0;i<n;i++)
1562   {
1563     interrupt = 1;
1564
1565     CkReductionMsg *m=futureMsgs.deq();
1566
1567     interrupt = 0;
1568     if (m!=NULL){ //One of these addContributions may have finished us.
1569 #if DEBUGRED
1570                 CkPrintf("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
1571 #endif
1572       doAddContribution(m);//<- if *still* early, puts it back in the queue
1573     }
1574   }
1575
1576   interrupt = 1;
1577
1578   n=futureRemoteMsgs.length();
1579
1580   interrupt = 0;
1581   for (i=0;i<n;i++)
1582   {
1583     interrupt = 1;
1584
1585     CkReductionMsg *m=futureRemoteMsgs.deq();
1586
1587     interrupt = 0;
1588     if (m!=NULL)
1589       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1590   }
1591   
1592   n = futureLateMigrantMsgs.length();
1593   for(i=0;i<n;i++){
1594     CkReductionMsg *m = futureLateMigrantMsgs.deq();
1595     if(m != NULL){
1596       if(m->redNo == redNo){
1597         msgs.enq(m);
1598       }else{
1599         futureLateMigrantMsgs.enq(m);
1600       }
1601     }
1602   }
1603 }
1604
1605 //////////// Reduction Manager Utilities /////////////
1606
1607 void CkNodeReductionMgr::init_BinaryTree(){
1608         parent = (CkMyNode()-1)/TREE_WID;
1609         int firstkid = CkMyNode()*TREE_WID+1;
1610         numKids=CkNumNodes()-firstkid;
1611   if (numKids>TREE_WID) numKids=TREE_WID;
1612   if (numKids<0) numKids=0;
1613
1614         for(int i=0;i<numKids;i++){
1615                 kids.push_back(firstkid+i);
1616                 newKids.push_back(firstkid+i);
1617         }
1618 }
1619
1620 void CkNodeReductionMgr::init_BinomialTree(){
1621         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
1622         /*upperSize = (unsigned )pow((double)2,depth);*/
1623         upperSize = (unsigned) 1 << depth;
1624         label = upperSize-CkMyNode()-1;
1625         int p=label;
1626         int count=0;
1627         while( p > 0){
1628                 if(p % 2 == 0)
1629                         break;
1630                 else{
1631                         p = p/2;
1632                         count++;
1633                 }
1634         }
1635         /*parent = label + rint(pow((double)2,count));*/
1636         parent = label + (1<<count);
1637         parent = upperSize -1 -parent;
1638         int temp;
1639         if(count != 0){
1640                 numKids = 0;
1641                 for(int i=0;i<count;i++){
1642                         /*temp = label - rint(pow((double)2,i));*/
1643                         temp = label - (1<<i);
1644                         temp = upperSize-1-temp;
1645                         if(temp <= CkNumNodes()-1){
1646                 //              kids[numKids] = temp;
1647                                 kids.push_back(temp);
1648                                 numKids++;
1649                         }
1650                 }
1651         }else{
1652                 numKids = 0;
1653         //      kids = NULL;
1654         }
1655 }
1656
1657
1658 int CkNodeReductionMgr::treeRoot(void)
1659 {
1660   return 0;
1661 }
1662 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
1663 {
1664   return (CmiBool)(CkMyNode()!=treeRoot());
1665 }
1666 int CkNodeReductionMgr::treeParent(void) //My parent Node
1667 {
1668 #ifdef BINOMIAL_TREE
1669         return parent;
1670 #else
1671   return parent;
1672 #endif
1673 }
1674
1675 int CkNodeReductionMgr::firstKid(void) //My first child Node
1676 {
1677   return CkMyNode()*TREE_WID+1;
1678 }
1679 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
1680 {
1681 #ifdef BINOMIAL_TREE
1682         return numKids;
1683 #else
1684 /*  int nKids=CkNumNodes()-firstKid();
1685   if (nKids>TREE_WID) nKids=TREE_WID;
1686   if (nKids<0) nKids=0;
1687   return nKids;*/
1688         return numKids;
1689 #endif
1690 }
1691
1692 //Combine (& free) the current message vector msgs.
1693 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
1694 {
1695   CkReductionMsg *ret=NULL;
1696
1697   //Look through the vector for a valid reducer, swapping out placeholder messages
1698   CkReduction::reducerType r=CkReduction::invalid;
1699   int msgs_gcount=0;//Reduced gcount
1700   int msgs_nSources=0;//Reduced nSources
1701   int msgs_userFlag=-1;
1702   CkCallback msgs_callback;
1703   CkCallback msgs_secondaryCallback;
1704   int i;
1705   int nMsgs=0;
1706   CkReductionMsg *m;
1707   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
1708         bool isMigratableContributor;
1709         
1710
1711   while(NULL!=(m=msgs.deq()))
1712   {
1713     DEBR((AA"***** gcount=%d; sourceFlag=%d ismigratable %d \n"AB,m->gcount,m->nSources(),m->isMigratableContributor()));         
1714     msgs_gcount+=m->gcount;
1715     if (m->sourceFlag!=0)
1716     { //This is a real message from an element, not just a placeholder
1717       msgArr[nMsgs++]=m;
1718       msgs_nSources+=m->nSources();
1719       r=m->reducer;
1720       if (!m->callback.isInvalid())
1721         msgs_callback=m->callback;
1722       if(!m->secondaryCallback.isInvalid()){
1723         msgs_secondaryCallback = m->secondaryCallback;
1724       }
1725       if (m->userFlag!=-1)
1726         msgs_userFlag=m->userFlag;
1727
1728                         isMigratableContributor= m->isMigratableContributor();
1729                                 
1730     }
1731     else
1732     { //This is just a placeholder message-- replace it
1733       delete m;
1734     }
1735   }
1736
1737   if (nMsgs==0||r==CkReduction::invalid)
1738   //No valid reducer in the whole vector
1739     ret=CkReductionMsg::buildNew(0,NULL);
1740   else
1741   {//Use the reducer to reduce the messages
1742                 if(nMsgs == 1){
1743                         ret = msgArr[0];
1744                 }else{
1745             CkReduction::reducerFn f=CkReduction::reducerTable[r];
1746           ret=(*f)(nMsgs,msgArr);
1747                 }
1748     ret->reducer=r;
1749   }
1750         
1751         //Go back through the vector, deleting old messages
1752   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
1753   delete [] msgArr;
1754   //Set the message counts
1755   ret->redNo=redNo;
1756   ret->gcount=msgs_gcount;
1757   ret->userFlag=msgs_userFlag;
1758   ret->callback=msgs_callback;
1759   ret->secondaryCallback = msgs_secondaryCallback;
1760   ret->sourceFlag=msgs_nSources;
1761         ret->setMigratableContributor(isMigratableContributor);
1762   DEBR((AA"Node Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
1763
1764   return ret;
1765 }
1766
1767 void CkNodeReductionMgr::pup(PUP::er &p)
1768 {
1769 //We do not store the client function pointer or the client function parameter,
1770 //it is the responsibility of the programmer to correctly restore these
1771   IrrGroup::pup(p);
1772   p(redNo);
1773   p(inProgress); p(creating); p(startRequested);
1774   p(lcount);
1775   p(nContrib); p(nRemote);
1776   p(interrupt);
1777   p|msgs;
1778   p|futureMsgs;
1779   p|futureRemoteMsgs;
1780   p|futureLateMigrantMsgs;
1781         p|parent;
1782   if(p.isUnpacking()) {
1783     gcount=CkNumNodes();
1784     thisProxy = thisgroup;
1785     lockEverything = CmiCreateLock();
1786 #ifdef BINOMIAL_TREE
1787     init_BinomialTree();
1788 #else
1789     init_BinaryTree();
1790 #endif          
1791   }
1792   p | blocked;
1793   p | maxModificationRedNo;
1794 }
1795
1796 /*
1797         FAULT_EVAC
1798         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
1799         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
1800         reduction tree. 
1801 */
1802 void CkNodeReductionMgr::evacuate(){
1803         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
1804         if(treeKids() == 0){
1805         /*
1806                 if the node going down is a leaf
1807         */
1808                 oldleaf=true;
1809                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
1810                 /*
1811                         Need to ask parent for the reduction number that it has seen. 
1812                         Since it is a leaf, the tree does not need to be rewired. 
1813                         We reuse the oldparent type of tree modification message to get 
1814                         the parent to block and tell us about the highest reduction number it has seen.
1815                         
1816                 */
1817                 int data[2];
1818                 data[0]=CkMyNode();
1819                 data[1]=getTotalGCount()+additionalGCount;
1820                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
1821                 newParent = treeParent();
1822         }else{
1823                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
1824                 oldleaf= false;
1825         /*
1826                 It is not a leaf. It needs to rewire the tree around itself.
1827                 It also needs to decide on a reduction No after which the new tree will be used
1828                 Till it decides on the new tree and the redNo at which it becomes valid,
1829                 all received messages will be buffered
1830         */
1831                 newParent = kids[0];
1832                 for(int i=numKids-1;i>=0;i--){
1833                         newKids.remove(i);
1834                 }
1835                 /*
1836                         Ask everybody for the highest reduction number they have seen and
1837                         also tell them about the new tree
1838                 */
1839                 /*
1840                         Tell parent about its new child;
1841                 */
1842                 int oldParentData[2];
1843                 oldParentData[0] = CkMyNode();
1844                 oldParentData[1] = newParent;
1845                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
1846
1847                 /*
1848                         Tell the other children about their new parent
1849                 */
1850                 int childrenData=newParent;
1851                 for(int i=1;i<numKids;i++){
1852                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
1853                 }
1854                 
1855                 /*
1856                         Tell newParent (1st child) about its new children,
1857                         the current node and its children except the newParent
1858                 */
1859                 int *newParentData = new int[numKids+2];
1860                 for(int i=1;i<numKids;i++){
1861                         newParentData[i] = kids[i];
1862                 }
1863                 newParentData[0] = CkMyNode();
1864                 newParentData[numKids] = parent;
1865                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
1866                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
1867         }
1868         readyDeletion = false;
1869         blocked = true;
1870         numModificationReplies = 0;
1871         tempModificationRedNo = findMaxRedNo();
1872 }
1873
1874 /*
1875         Depending on the code, use the data to change the tree
1876         1. OLDPARENT : replace the old child with a new one
1877         2. OLDCHILDREN: replace the parent
1878         3. NEWPARENT:  add the children and change the parent
1879         4. LEAFPARENT: delete the old child
1880 */
1881
1882 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
1883         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
1884         int sender;
1885         newKids = kids;
1886         readyDeletion = false;
1887         newAdditionalGCount = additionalGCount;
1888         switch(code){
1889                 case OLDPARENT: 
1890                         for(int i=0;i<numKids;i++){
1891                                 if(newKids[i] == data[0]){
1892                                         newKids[i] = data[1];
1893                                         break;
1894                                 }
1895                         }
1896                         sender = data[0];
1897                         newParent = parent;
1898                         break;
1899                 case OLDCHILDREN:
1900                         newParent = data[0];
1901                         sender = parent;
1902                         break;
1903                 case NEWPARENT:
1904                         for(int i=0;i<size-2;i++){
1905                                 newKids.push_back(data[i]);
1906                         }
1907                         newParent = data[size-2];
1908                         newAdditionalGCount += data[size-1];
1909                         sender = parent;
1910                         break;
1911                 case LEAFPARENT:
1912                         for(int i=0;i<numKids;i++){
1913                                 if(newKids[i] == data[0]){
1914                                         newKids.remove(i);
1915                                         break;
1916                                 }
1917                         }
1918                         sender = data[0];
1919                         newParent = parent;
1920                         newAdditionalGCount += data[1];
1921                         break;
1922         };
1923         blocked = true;
1924         int maxRedNo = findMaxRedNo();
1925         
1926         thisProxy[sender].collectMaxRedNo(maxRedNo);
1927 }
1928
1929 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
1930         /*
1931                 Find out the maximum redNo that has been seen by 
1932                 the affected nodes
1933         */
1934         numModificationReplies++;
1935         if(maxRedNo > tempModificationRedNo){
1936                 tempModificationRedNo = maxRedNo;
1937         }
1938         if(numModificationReplies == numKids+1){
1939                 maxModificationRedNo = tempModificationRedNo;
1940                 /*
1941                         when all the affected nodes have replied, tell them the maximum.
1942                         Unblock yourself. deal with the buffered messages local and remote
1943                 */
1944                 if(maxModificationRedNo == -1){
1945                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
1946                 }else{
1947                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
1948                 }
1949                 thisProxy[parent].unblockNode(maxModificationRedNo);
1950                 for(int i=0;i<numKids;i++){
1951                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
1952                 }
1953                 blocked = false;
1954                 updateTree();
1955                 clearBlockedMsgs();
1956         }
1957 };
1958
1959 void CkNodeReductionMgr::unblockNode(int maxRedNo){
1960         maxModificationRedNo = maxRedNo;
1961         updateTree();
1962         blocked = false;
1963         clearBlockedMsgs();
1964 }
1965
1966
1967 void CkNodeReductionMgr::clearBlockedMsgs(){
1968         int len = bufferedMsgs.length();
1969         for(int i=0;i<len;i++){
1970                 CkReductionMsg *m = bufferedMsgs.deq();
1971                 doAddContribution(m);
1972         }
1973         len = bufferedRemoteMsgs.length();
1974         for(int i=0;i<len;i++){
1975                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
1976                 doRecvMsg(m);
1977         }
1978
1979 }
1980 /*
1981         if the reduction number exceeds the maxModificationRedNo, change the tree
1982         to become the new one
1983 */
1984
1985 void CkNodeReductionMgr::updateTree(){
1986         if(redNo > maxModificationRedNo){
1987                 parent = newParent;
1988                 kids = newKids;
1989                 maxModificationRedNo = INT_MAX;
1990                 numKids = kids.size();
1991                 readyDeletion = true;
1992                 additionalGCount = newAdditionalGCount;
1993                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
1994                 for(int i=0;i<newKids.size();i++){
1995                         DEBREVAC(("%d ",newKids[i]));
1996                 }
1997                 DEBREVAC(("\n"));
1998         //      startReduction(redNo,CkMyNode());
1999         }else{
2000                 if(maxModificationRedNo != INT_MAX){
2001                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2002                         startReduction(redNo,CkMyNode());
2003                         finishReduction();
2004                 }       
2005         }
2006 }
2007
2008
2009 void CkNodeReductionMgr::doneEvacuate(){
2010         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2011 /*      if(oldleaf){
2012                 
2013                         It used to be a leaf
2014                         Then as soon as future messages have been emptied you can 
2015                         send the parent a message telling them that they are not going
2016                         to receive anymore messages from this child
2017                 
2018                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2019                 while(futureMsgs.length() != 0){
2020                         int n = futureMsgs.length();
2021                         for(int i=0;i<n;i++){
2022                                 CkReductionMsg *m = futureMsgs.deq();
2023                                 if(isPresent(m->redNo)){
2024                                         msgs.enq(m);
2025                                 }else{
2026                                         futureMsgs.enq(m);
2027                                 }
2028                         }
2029                         CkReductionMsg *result = reduceMessages();
2030                         thisProxy[treeParent()].RecvMsg(result);
2031                         redNo++;
2032                 }
2033                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2034                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2035         }else{*/
2036                 if(readyDeletion){
2037                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2038                 }else{
2039                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2040                 }
2041 //      }
2042 }
2043
2044 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2045         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2046         for(int i=0;i<numKids;i++){
2047                 if(kids[i] == deletedChild){
2048                         kids.remove(i);
2049                         break;
2050                 }
2051         }
2052         numKids = kids.length();
2053         finishReduction();
2054 }
2055
2056 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2057         for(int i=0;i<newKids.length();i++){
2058                 if(newKids[i] == deletedChild){
2059                         newKids.remove(i);
2060                         break;
2061                 }
2062         }
2063         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2064         for(int i=0;i<newKids.size();i++){
2065                 DEBREVAC(("%d ",newKids[i]));
2066         }
2067         DEBREVAC(("\n"));
2068         finishReduction();
2069 }
2070
2071 int CkNodeReductionMgr::findMaxRedNo(){
2072         int max = redNo;
2073         for(int i=0;i<futureRemoteMsgs.length();i++){
2074                 if(futureRemoteMsgs[i]->redNo  > max){
2075                         max = futureRemoteMsgs[i]->redNo;
2076                 }
2077         }
2078         /*
2079                 if redNo is max (that is no future message) and the current reduction has not started
2080                 then tree can be changed before the reduction redNo can be started
2081         */ 
2082         if(redNo == max && msgs.length() == 0){
2083                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2084                 max--;
2085         }
2086         return max;
2087 }
2088
2089 #include "CkReduction.def.h"