Removed call to pupCkVec, since it didn't actually
[charm.git] / src / ck-core / ckreduction.C
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
4
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
11
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
27
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
41
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
48
49 */
50 #include "charm++.h"
51 #include "ck.h"
52
53 #if 0
54 //Debugging messages:
55 // Reduction mananger internal information:
56 #define DEBUGRED 1
57 #define DEBR(x) CkPrintf x
58 #define AA "Red PE%d Node%d #%d (%d,%d)> "
59 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib
60
61 #define DEBN(x) CkPrintf x
62 #define AAN "Red Node%d "
63 #define ABN ,CkMyNode()
64
65 // For status and data messages from the builtin reducer functions.
66 #define RED_DEB(x) //CkPrintf x
67
68 #else
69 //No debugging info-- empty defines
70 #define DEBUGRED 0
71 #define DEBR(x) //CkPrintf x
72 #define DEBN(x) //CkPrintf x
73 #define RED_DEB(x) //CkPrintf x
74 #endif
75
76 Group::Group()
77 {
78         creatingContributors();
79         contributorStamped(&reductionInfo);
80         contributorCreated(&reductionInfo);
81         doneCreatingContributors();
82 #if DEBUGRED
83         CkPrintf("[%d,%d]Creating nodeProxy with gid %d\n",CkMyNode(),CkMyPe(),CkpvAccess(_currentGroupRednMgr));
84 #endif                  
85         CProxy_CkArrayReductionMgr nodetemp(CkpvAccess(_currentGroupRednMgr));
86         nodeProxy = nodetemp;
87
88 }
89
90 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
91                                     ((CkReductionMgr *)this),
92                                     reductionInfo);
93 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid));
94
95
96
97 CkGroupInitCallback::CkGroupInitCallback(void) {}
98 /*
99 The callback is just used to tell the caller that this group
100 has been constructed.  (Now they can safely call CkLocalBranch)
101 */
102 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
103 {
104   m->call();
105   delete m;
106 }
107
108 /*
109 The callback is just used to tell the caller that this group
110 is constructed and ready to process other calls.
111 */
112 CkGroupReadyCallback::CkGroupReadyCallback(void)
113 {
114   _isReady = 0;
115 }
116 void
117 CkGroupReadyCallback::callBuffered(void)
118 {
119   int n = _msgs.length();
120   for(int i=0;i<n;i++)
121   {
122     CkGroupCallbackMsg *msg = _msgs.deq();
123     msg->call();
124     delete msg;
125   }
126 }
127 void
128 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
129 {
130   if(_isReady) {
131     msg->call();
132     delete msg;
133   } else {
134     _msgs.enq(msg);
135   }
136 }
137
138 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
139         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
140 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
141 {
142         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
143         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
144         b->fn(b->param,m->getSize(),m->getData());
145         delete m;
146 }
147
148 ///////////////// Reduction Manager //////////////////
149 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
150 public:
151   int num;
152   CkReductionNumberMsg(int n) {num=n;}
153 };
154
155 /*
156 One CkReductionMgr runs a non-overlapping set of reductions.
157 It collects messages from all local contributors, then sends
158 the reduced message up the reduction tree to node zero, where
159 they're passed to the user's client function.
160 */
161
162 CkReductionMgr::CkReductionMgr()//Constructor
163   : thisProxy(thisgroup)
164
165   redNo=0;
166   completedRedNo = -1;
167   inProgress=CmiFalse;
168   creating=CmiFalse;
169   startRequested=CmiFalse;
170   gcount=lcount=0;
171   nContrib=nRemote=0;
172   maxStartRequest=0;
173   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
174 }
175
176 //////////// Reduction Manager Client API /////////////
177
178 //Add the given client function.  Overwrites any previous client.
179 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
180 {
181   DEBR((AA"Setting reductionClient in ReductionMgr groupid %d in nodeProxy %p\n"AB,thisgroup.idx,&nodeProxy));
182
183   if (CkMyPe()!=0)
184           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
185   storedCallback=*cb;
186   CkCallback *callback =new CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(0),thishandle);
187   nodeProxy.ckSetReductionClient(callback);
188 }
189
190 ///////////////////////////// Contributor ////////////////////////
191 //Contributors keep a copy of this structure:
192
193 /*Contributor migration support:
194 */
195 void contributorInfo::pup(PUP::er &p)
196 {
197   p(redNo);
198 }
199
200 ////////////////////// Contributor list maintainance: /////////////////
201 //These just set and clear the "creating" flag to prevent
202 // reductions from finishing early because not all elements
203 // have been created.
204 void CkReductionMgr::creatingContributors(void)
205 {
206   DEBR((AA"Creating contributors...\n"AB));
207   creating=CmiTrue;
208 }
209 void CkReductionMgr::doneCreatingContributors(void)
210 {
211   DEBR((AA"Done creating contributors...\n"AB));
212   creating=CmiFalse;
213   if (startRequested) startReduction(redNo);
214   finishReduction();
215 }
216
217 //A new contributor will be created
218 void CkReductionMgr::contributorStamped(contributorInfo *ci)
219 {
220   DEBR((AA"Contributor %p stamped\n"AB,ci));
221   //There is another contributor
222   gcount++;
223   if (inProgress)
224   {
225     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
226     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
227   } else
228     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
229 }
230
231 //A new contributor was actually created
232 void CkReductionMgr::contributorCreated(contributorInfo *ci)
233 {
234   DEBR((AA"Contributor %p created in grp %d\n"AB,ci,thisgroup.idx));
235   //We've got another contributor
236   lcount++;
237   //He may not need to contribute to some of our reductions:
238   for (int r=redNo;r<ci->redNo;r++)
239     adj(r).lcount--;//He won't be contributing to r here
240 }
241
242 /*Don't expect any more contributions from this one.
243 This is rather horrifying because we now have to make
244 sure the global element count accurately reflects all the
245 contributions the element made before it died-- these may stretch
246 far into the future.  The adj() vector is what saves us here.
247 */
248 void CkReductionMgr::contributorDied(contributorInfo *ci)
249 {
250   DEBR((AA"Contributor %p(%d) died\n"AB,ci,ci->redNo));
251   //We lost a contributor
252   gcount--;
253
254   if (ci->redNo<redNo)
255   {//Must have been migrating during reductions-- root is waiting for his
256   // contribution, which will never come.
257     DEBR((AA"Dying guy %p must have been migrating-- he's at #%d!\n"AB,ci,ci->redNo));
258     for (int r=ci->redNo;r<redNo;r++)
259       thisProxy[treeRoot()].MigrantDied(new CkReductionNumberMsg(r));
260   }
261
262   //Add to the global count for all his future messages (wherever they are)
263   int r;
264   for (r=redNo;r<ci->redNo;r++)
265   {//He already contributed to this reduction, but won't show up in global count.
266     DEBR((AA"Dead guy %p left contribution for #%d\n"AB,ci,r));
267     adj(r).gcount++;
268   }
269
270   lcount--;
271   //He's already contributed to several reductions here
272   for (r=redNo;r<ci->redNo;r++)
273     adj(r).lcount++;//He'll be contributing to r here
274
275   finishReduction();
276 }
277 //Migrating away (note that global count doesn't change)
278 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
279 {
280   DEBR((AA"Contributor %p(%d) migrating away\n"AB,ci,ci->redNo));
281   lcount--;//We lost a local
282   //He's already contributed to several reductions here
283   for (int r=redNo;r<ci->redNo;r++)
284     adj(r).lcount++;//He'll be contributing to r here
285
286   finishReduction();
287 }
288 //Migrating in (note that global count doesn't change)
289 void CkReductionMgr::contributorArriving(contributorInfo *ci)
290 {
291   DEBR((AA"Contributor %p(%d) migrating in\n"AB,ci,ci->redNo));
292   lcount++;//We gained a local
293   //He has already contributed (elsewhere) to several reductions:
294   for (int r=redNo;r<ci->redNo;r++)
295     adj(r).lcount--;//He won't be contributing to r here
296
297 }
298
299 //Contribute-- the given msg can contain any data.  The reducerType
300 // field of the message must be valid.
301 // Each contributor must contribute exactly once to the each reduction.
302 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
303 {
304   DEBR((AA"Contributor %p contributed for %d in grp %d\n"AB,ci,ci->redNo,thisgroup.idx));
305   m->ci=ci;
306   m->redNo=ci->redNo++;
307   m->sourceFlag=-1;//A single contribution
308   m->gcount=0;
309   addContribution(m);
310 }
311
312 //////////// Reduction Manager Remote Entry Points /////////////
313 //Sent down the reduction tree (used by barren PEs)
314 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
315 {
316  if(CkMyPe()==0){
317         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
318         //delete m;
319         //return;
320  }
321   if (isPresent(m->num) && !inProgress)
322   {
323     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
324     startReduction(m->num);
325     finishReduction();
326   } else if (isFuture(m->num)){
327 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
328           DEBR((AA"Asked to startfuture Reduction %d \n"AB,m->num));
329           if(maxStartRequest < m->num){
330                   maxStartRequest = m->num;
331           }
332  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
333           
334     }
335   else //is Past
336     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
337   delete m;
338 }
339 //Sent to root of reduction tree with reduction contribution
340 // of migrants that missed the main reduction.
341 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
342 {
343         int len = finalMsgs.length();
344         finalMsgs.enq(m);
345 //      CkPrintf("[%d]Late Migrant Detected for %d ,  (%d %d )\n",CkMyPe(),m->redNo,len,finalMsgs.length());
346         endArrayReduction();
347 }
348
349 //A late migrating contributor will never contribute to this reduction
350 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
351 {
352   if (hasParent() || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
353   DEBR((AA"Migrant died before contributing to #%d\n"AB,m->num));
354  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
355   adj(m->num).gcount--;//He won't be contributing to this one.
356   finishReduction();
357 }
358 //Sent up the reduction tree with reduced data
359 void CkReductionMgr::RecvMsg(CkReductionMsg *m)
360 {
361   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
362     DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
363     startReduction(m->redNo);
364     msgs.enq(m);
365     nRemote++;
366     finishReduction();
367   }
368   else if (isFuture(m->redNo)) {
369     DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
370     futureRemoteMsgs.enq(m);
371   }
372   else CkAbort("Recv'd late remote contribution!\n");
373 }
374 //////////// Reduction Manager State /////////////
375 void CkReductionMgr::startReduction(int number)
376 {
377   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
378   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
379   if (inProgress){
380         DEBR((AA"This reduction is already in progress\n"AB));
381         return;//This reduction already started
382   }
383   if (creating) //Don't start yet-- we're creating elements
384   {
385     DEBR((AA"Postponing start request #%d until we're done creating\n"AB,redNo));
386     startRequested=CmiTrue;
387     return;
388   }
389
390 //If none of these cases, we need to start the reduction--
391   DEBR((AA"Starting reduction #%d  %d %d \n"AB,redNo,completedRedNo,number));
392   inProgress=CmiTrue;
393   //Sent start requests to our kids (in case they don't already know)
394  
395
396   //making it a broadcast done only by PE 0
397
398   if(CkMyPe()==0){
399         int temp = completedRedNo;
400         if(temp < 0)
401                 temp = 0;  
402         for(int i=temp;i<=number;i++){
403                 DEBR((AA"Asking all child PEs to start #%d \n"AB,i));
404                 thisProxy.ReductionStarting(new CkReductionNumberMsg(i));
405         }
406   }
407   else{
408           // kick-start your parent too ...
409           thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(number));
410           for (int k=0;k<treeKids();k++)
411           {
412             DEBR((AA"Asking child PE %d to start #%d\n"AB,firstKid()+k,redNo));
413             thisProxy[firstKid()+k].ReductionStarting(new CkReductionNumberMsg(number));
414           }
415   }
416 }
417 /*Handle a message from one element for the reduction*/
418 void CkReductionMgr::addContribution(CkReductionMsg *m)
419 {
420   if (isPast(m->redNo))
421   {//We've moved on-- forward late contribution straight to root
422     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
423    // if (!hasParent()) //Root moved on too soon-- should never happen
424    //   CkAbort("Late reduction contribution received at root!\n");
425     thisProxy[treeRoot()].LateMigrantMsg(m);
426   }
427   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
428     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
429     futureMsgs.enq(m);
430   } else {// An ordinary contribution
431     DEBR((AA"Recv'd local contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
432    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
433     startReduction(m->redNo);
434     msgs.enq(m);
435     nContrib++;
436     finishReduction();
437   }
438 }
439
440 /**function checks if it has got all contributions that it is supposed to
441 get at this processor. If it is done it sends the reduced result to the local
442 nodegroup */
443 void CkReductionMgr::finishReduction(void)
444 {
445   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
446   DEBR((AA"in finishReduction (inProgress=%d) in grp %d\n"AB,inProgress,thisgroup.idx));
447   if ((!inProgress) | creating){
448         DEBR((AA"Either not in Progress or creating\n"AB));
449         return;
450   }
451   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
452   if (nContrib<(lcount+adj(redNo).lcount)){
453         DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
454          return;//Need more local messages
455   }
456   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
457   CkReductionMsg *result=reduceMessages();
458
459   //CkPrintf("[%d] Got all local Messages in finishReduction %d in redNo %d\n",CkMyPe(),nContrib,redNo);
460
461   /* reduce the messages and then store the callback specified in the message ***/
462   CkReductionMsg *ret =CkReductionMsg::buildNew(result->getSize(),result->getData(),result->getReducer());
463   ret->redNo = redNo;
464   ret->userFlag= result->userFlag;
465   ret->sourceFlag = result->sourceFlag;
466   ret->gcount=result->gcount+gcount+adj(redNo).gcount;
467   ret->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
468   ret->secondaryCallback = result->callback;
469
470 #if DEBUGRED 
471   CkPrintf("[%d,%d]Callback for redNo %d in group %d  mesggcount=%d localgcount=%d\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,ret->gcount,gcount);
472 #endif
473     
474   nodeProxy[CkMyNode()].ckLocalBranch()->contributeArrayReduction(ret);
475
476   //House Keeping Operations will have to check later what needs to be changed
477   redNo++;
478   //Shift the count adjustment vector down one slot (to match new redNo)
479   int i;
480
481   if(hasParent()){
482         int i;
483         completedRedNo++;
484         for (i=1;i<adjVec.length();i++)
485            adjVec[i-1]=adjVec[i];
486         adjVec.length()--;  
487   }
488   inProgress=CmiFalse;
489   startRequested=CmiFalse;
490   nRemote=nContrib=0;
491
492   //Look through the future queue for messages we can now handle
493   int n=futureMsgs.length();
494   for (i=0;i<n;i++)
495   {
496     CkReductionMsg *m=futureMsgs.deq();
497     if (m!=NULL) //One of these addContributions may have finished us.
498       addContribution(m);//<- if *still* early, puts it back in the queue
499   }
500   if(maxStartRequest >= redNo){
501           startReduction(redNo);
502           finishReduction();
503   }
504 }
505
506 //////////// Reduction Manager Utilities /////////////
507 int CkReductionMgr::treeRoot(void)
508 {
509   return 0;
510 }
511 CmiBool CkReductionMgr::hasParent(void) //Root PE
512 {
513   return (CmiBool)(CkMyPe()!=treeRoot());
514 }
515 int CkReductionMgr::treeParent(void) //My parent PE
516 {
517   return (CkMyPe()-1)/TREE_WID;
518 }
519 int CkReductionMgr::firstKid(void) //My first child PE
520 {
521   return CkMyPe()*TREE_WID+1;
522 }
523 int CkReductionMgr::treeKids(void)//Number of children in tree
524 {
525   int nKids=CkNumPes()-firstKid();
526   if (nKids>TREE_WID) nKids=TREE_WID;
527   if (nKids<0) nKids=0;
528   return nKids;
529 }
530
531 //Return the countAdjustment struct for the given redNo:
532 countAdjustment &CkReductionMgr::adj(int number)
533 {
534   number-=completedRedNo;
535   number--;
536   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
537   //Pad the adjustment vector with zeros until it's at least number long
538   while (adjVec.length()<=number)
539     adjVec.push_back(countAdjustment());
540   return adjVec[number];
541 }
542
543 //Combine (& free) the current message vector msgs.
544 CkReductionMsg *CkReductionMgr::reduceMessages(void)
545 {
546   CkReductionMsg *ret=NULL;
547
548   //Look through the vector for a valid reducer, swapping out placeholder messages
549   CkReduction::reducerType r=CkReduction::invalid;
550   int msgs_gcount=0;//Reduced gcount
551   int msgs_nSources=0;//Reduced nSources
552   int msgs_userFlag=-1;
553   CkCallback msgs_callback;
554   int i;
555   int nMsgs=0;
556   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
557   CkReductionMsg *m;
558
559   // Copy message queue into msgArr, skipping placeholders:
560   while (NULL!=(m=msgs.deq()))
561   {
562     msgs_gcount+=m->gcount;
563     if (m->sourceFlag!=0)
564     { //This is a real message from an element, not just a placeholder
565       msgArr[nMsgs++]=m;
566       msgs_nSources+=m->nSources();
567       r=m->reducer;
568       if (!m->callback.isInvalid())
569         msgs_callback=m->callback;
570       if (m->userFlag!=-1)
571         msgs_userFlag=m->userFlag;
572     }
573     else
574     { //This is just a placeholder message-- forget it
575       delete m;
576     }
577   }
578
579   if (nMsgs==0||r==CkReduction::invalid)
580   //No valid reducer in the whole vector
581     ret=CkReductionMsg::buildNew(0,NULL);
582   else
583   {//Use the reducer to reduce the messages
584     CkReduction::reducerFn f=CkReduction::reducerTable[r];
585     ret=(*f)(nMsgs,msgArr);
586     ret->reducer=r;
587   }
588
589   //Go back through the vector, deleting old messages
590   for (i=0;i<nMsgs;i++) delete msgArr[i];
591
592   //Set the message counts
593   ret->redNo=redNo;
594   ret->gcount=msgs_gcount;
595   ret->userFlag=msgs_userFlag;
596   ret->callback=msgs_callback;
597   ret->sourceFlag=msgs_nSources;
598   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
599
600   return ret;
601 }
602
603
604 //Checkpointing utilities
605 //pack-unpack method for CkReductionMsg
606 //if packing pack the message and then unpack and return it
607 //if unpacking allocate memory for it read it off disk and then unapck
608 //and return it
609 void CkReductionMgr::pup(PUP::er &p)
610 {
611 //We do not store the client function pointer or the client function parameter,
612 //it is the responsibility of the programmer to correctly restore these
613   CkGroupInitCallback::pup(p);
614   p(redNo);
615   p(completedRedNo);
616   p(inProgress); p(creating); p(startRequested);
617   p(nContrib); p(nRemote);
618   p|msgs;
619   p|futureMsgs;
620   p|futureRemoteMsgs;
621   p|finalMsgs;
622   p|adjVec;
623   p|nodeProxy;
624   p | storedCallback;
625   if(p.isUnpacking()){
626     thisProxy = thisgroup;
627     lcount=0;
628     gcount=0;   
629     
630   }
631
632 #ifdef DEBUGRED
633     CkPrintf("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount);
634 #endif
635 }
636
637
638 //Callback for doing Reduction through NodeGroups added by Sayantan
639
640 void CkReductionMgr::ArrayReductionHandler(CkReductionMsg *m){
641
642         int total = m->gcount+adj(m->redNo).gcount;
643         finalMsgs.enq(m);
644         //CkPrintf("ArrayReduction Handler Invoked for %d \n",m->redNo);
645         adj(m->redNo).mainRecvd = 1;
646 #if DEBUGRED    
647         CkPrintf("~~~~~~~~~~~~~ ArrayReductionHandler Callback called for redNo %d with mesgredNo %d at %.6f %d\n",completedRedNo,m->redNo,CmiWallTimer());
648 #endif  
649         endArrayReduction();
650 }
651
652 void CkReductionMgr :: endArrayReduction(){
653         CkReductionMsg *ret=NULL;
654         int nMsgs=finalMsgs.length();
655         //CkPrintf("endArrayReduction Invoked for %d \n",completedRedNo+1);
656         //Look through the vector for a valid reducer, swapping out placeholder messages
657         //CkPrintf("Length of Final Message %d \n",nMsgs);
658         CkReduction::reducerType r=CkReduction::invalid;
659         int msgs_gcount=0;//Reduced gcount
660         int msgs_nSources=0;//Reduced nSources
661         int msgs_userFlag=-1;
662         CkCallback msgs_callback;
663         CkCallback msgs_secondaryCallback;
664         CkVec<CkReductionMsg *> tempMsgs;
665         int i;
666         int numMsgs = 0;
667         for (i=0;i<nMsgs;i++)
668         {
669                 CkReductionMsg *m=finalMsgs.deq();
670                 if(m->redNo == completedRedNo +1){
671                         msgs_gcount+=m->gcount;
672                         if (m->sourceFlag!=0)
673                         { //This is a real message from an element, not just a placeholder
674                                 msgs_nSources+=m->nSources();
675                                 r=m->reducer;
676                                 if (!m->callback.isInvalid())
677                                 msgs_callback=m->callback;
678                                 if(!m->secondaryCallback.isInvalid())
679                                         msgs_secondaryCallback = m->secondaryCallback;
680                                 if (m->userFlag!=-1)
681                                         msgs_userFlag=m->userFlag;
682                                 tempMsgs.push_back(m);
683                         }
684                 }else{
685                         finalMsgs.enq(m);
686                 }
687
688         }
689         numMsgs = tempMsgs.length();
690 #if DEBUGRED
691         CkPrintf("[%d]Total = %d %d Sources = %d Number of Messages %d Adj(Completed redno).mainRecvd %d\n",CkMyPe(),msgs_gcount,  adj(completedRedNo+1).gcount,msgs_nSources,numMsgs,adj(completedRedNo+1).mainRecvd);
692 #endif  
693         if(numMsgs == 0){
694                 return;
695         }
696         if(adj(completedRedNo+1).mainRecvd == 0){
697                 for(i=0;i<numMsgs;i++){
698                         finalMsgs.enq(tempMsgs[i]);
699                 }
700                 return;
701         }
702         if(msgs_gcount  > msgs_nSources){
703                 for(i=0;i<numMsgs;i++){
704                         finalMsgs.enq(tempMsgs[i]);
705                 }
706                 return;
707         }
708
709         if (nMsgs==0||r==CkReduction::invalid)
710                 //No valid reducer in the whole vector
711                 ret=CkReductionMsg::buildNew(0,NULL);
712         else{//Use the reducer to reduce the messages
713                 CkReduction::reducerFn f=CkReduction::reducerTable[r];
714                 // has to be corrected elements from above need to be put into a temporary vector
715                 CkReductionMsg **msgArr=&tempMsgs[0];//<-- HACK!
716                 ret=(*f)(numMsgs,msgArr);
717                 ret->reducer=r;
718
719         }
720
721         for(i = 0;i<numMsgs;i++){
722                 delete tempMsgs[i];
723         }
724
725         //CkPrintf("Length of finalMsgs after endReduction %d \n",finalMsgs.length());
726         //CkPrintf("Data size of result = %d Length of finalMsg %d \n",ret->getLength(),finalMsgs.length());
727
728         ret->redNo=completedRedNo+1;
729         ret->gcount=msgs_gcount;
730         ret->userFlag=msgs_userFlag;
731         ret->callback=msgs_callback;
732         ret->secondaryCallback = msgs_secondaryCallback;
733         ret->sourceFlag=msgs_nSources;
734
735 #if DEBUGRED
736         CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of GROUP REDUCTION %d at %.6f\n",completedRedNo,CmiWallTimer());
737 #endif
738         if (!ret->secondaryCallback.isInvalid())
739             ret->secondaryCallback.send(ret);
740     else if (!storedCallback.isInvalid())
741             storedCallback.send(ret);
742     else{
743 #if DEBUGRED
744             CkPrintf("No reduction client for group %d \n",thisgroup.idx);
745 #endif
746             CkAbort("No reduction client!\n"
747                     "You must register a client with either SetReductionClient or during contribute.\n");
748     }
749         completedRedNo++;
750 #if DEBUGRED
751        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
752 #endif
753         for (i=1;i<adjVec.length();i++)
754                 adjVec[i-1]=adjVec[i];
755         adjVec.length()--;
756         endArrayReduction();
757 }
758
759
760 /////////////////////////////////////////////////////////////////////////
761
762 ////////////////////////////////
763 //CkReductionMsg support
764
765 //ReductionMessage default private constructor-- does nothing
766 CkReductionMsg::CkReductionMsg(){}
767
768 //This define gives the distance from the start of the CkReductionMsg
769 // object to the start of the user data area (just below last object field)
770 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
771
772 //"Constructor"-- builds and returns a new CkReductionMsg.
773 //  the "data" array you specify will be copied into this object.
774 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
775     CkReduction::reducerType reducer)
776 {
777   int len[1];
778   len[0]=NdataSize;
779   CkReductionMsg *ret = new(len,0)CkReductionMsg();
780
781   ret->dataSize=NdataSize;
782   if (srcData!=NULL)
783     memcpy(ret->data,srcData,NdataSize);
784   ret->userFlag=-1;
785   ret->reducer=reducer;
786   ret->ci=NULL;
787   ret->sourceFlag=-1000;
788   ret->gcount=0;
789   return ret;
790 }
791
792 // Charm kernel message runtime support:
793 void *
794 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
795 {
796   int totalsize=ARM_DATASTART+(*sz);
797   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
798   CkReductionMsg *ret = (CkReductionMsg *)
799     CkAllocMsg(msgnum,totalsize,priobits);
800   ret->data=(void *)(&ret->dataStorage);
801   return (void *) ret;
802 }
803
804 void *
805 CkReductionMsg::pack(CkReductionMsg* in)
806 {
807   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
808   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
809   in->data = NULL;
810   return (void*) in;
811 }
812
813 CkReductionMsg* CkReductionMsg::unpack(void *in)
814 {
815   CkReductionMsg *ret = (CkReductionMsg *)in;
816   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
817   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
818   ret->data=(void *)(&ret->dataStorage);
819   return ret;
820 }
821
822
823 /////////////////////////////////////////////////////////////////////////////////////
824 ///////////////// Builtin Reducer Functions //////////////
825 /* A simple reducer, like sum_int, looks like this:
826 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
827 {
828   int i,ret=0;
829   for (i=0;i<nMsg;i++)
830     ret+=*(int *)(msg[i]->getData());
831   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
832 }
833
834 To keep the code small and easy to change, the implementations below
835 are built with preprocessor macros.
836 */
837
838 //////////////// simple reducers ///////////////////
839 /*A define used to quickly and tersely construct simple reductions.
840 The basic idea is to use the first message's data array as
841 (pre-initialized!) scratch space for folding in the other messages.
842  */
843
844 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
845 {
846         CkAbort("Called the invalid reducer type 0.  This probably\n"
847                 "means you forgot to initialize your custom reducer index.\n");
848         return NULL;
849 }
850
851 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
852 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
853 {\
854   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
855   int m,i;\
856   int nElem=msg[0]->getLength()/sizeof(dataType);\
857   dataType *ret=(dataType *)(msg[0]->getData());\
858   for (m=1;m<nMsg;m++)\
859   {\
860     dataType *value=(dataType *)(msg[m]->getData());\
861     for (i=0;i<nElem;i++)\
862     {\
863       RED_DEB(("|\tmsg%d (from %d) [%d]="typeStr"\n",m,msg[m]->sourceFlag,i,value[i]));\
864       loop\
865     }\
866   }\
867   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
868   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret);\
869 }
870
871 //Use this macro for reductions that have the same type for all inputs
872 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
873   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
874   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
875   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
876
877
878 //Compute the sum the numbers passed by each element.
879 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
880
881 //Compute the product of the numbers passed by each element.
882 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
883
884 //Compute the largest number passed by any element.
885 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
886
887 //Compute the smallest integer passed by any element.
888 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
889
890
891 //Compute the logical AND of the integers passed by each element.
892 // The resulting integer will be zero if any source integer is zero; else 1.
893 SIMPLE_REDUCTION(logical_and,int,"%d",
894         if (value[i]==0)
895      ret[i]=0;
896   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
897 )
898
899 //Compute the logical OR of the integers passed by each element.
900 // The resulting integer will be 1 if any source integer is nonzero; else 0.
901 SIMPLE_REDUCTION(logical_or,int,"%d",
902   if (value[i]!=0)
903            ret[i]=1;
904   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
905 )
906
907 /////////////// concat ////////////////
908 /*
909 This reducer simply appends the data it recieves from each element,
910 without any housekeeping data to separate them.
911 */
912 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
913 {
914   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
915   //Figure out how big a message we'll need
916   int i,retSize=0;
917   for (i=0;i<nMsg;i++)
918       retSize+=msg[i]->getSize();
919
920   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
921
922   //Allocate a new message
923   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
924
925   //Copy the source message data into the return message
926   char *cur=(char *)(ret->getData());
927   for (i=0;i<nMsg;i++) {
928     int messageBytes=msg[i]->getSize();
929     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
930     cur+=messageBytes;
931   }
932   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
933   return ret;
934 }
935
936 /////////////// set ////////////////
937 /*
938 This reducer appends the data it recieves from each element
939 along with some housekeeping data indicating contribution boundaries.
940 The message data is thus a list of reduction_set_element structures
941 terminated by a dummy reduction_set_element with a sourceElement of -1.
942 */
943
944 //This rounds an integer up to the nearest multiple of sizeof(double)
945 static const int alignSize=sizeof(double);
946 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
947
948 //This gives the size (in bytes) of a reduction_set_element
949 static int SET_SIZE(int dataSize)
950 {return SET_ALIGN(sizeof(int)+dataSize);}
951
952 //This returns a pointer to the next reduction_set_element in the list
953 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
954 {
955   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
956   return (CkReduction::setElement *)next;
957 }
958
959 //Combine the data passed by each element into an list of reduction_set_elements.
960 // Each element may contribute arbitrary data (with arbitrary length).
961 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
962 {
963   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
964   //Figure out how big a message we'll need
965   int i,retSize=0;
966   for (i=0;i<nMsg;i++) {
967     if (!msg[i]->isFromUser())
968     //This message is composite-- it will just be copied over (less terminating -1)
969       retSize+=(msg[i]->getSize()-sizeof(int));
970     else //This is a message from an element-- it will be wrapped in a reduction_set_element
971       retSize+=SET_SIZE(msg[i]->getSize());
972   }
973   retSize+=sizeof(int);//Leave room for terminating -1.
974
975   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
976
977   //Allocate a new message
978   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
979
980   //Copy the source message data into the return message
981   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
982   for (i=0;i<nMsg;i++)
983     if (!msg[i]->isFromUser())
984     {//This message is composite-- just copy it over (less terminating -1)
985                         int messageBytes=msg[i]->getSize()-sizeof(int);
986                         RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
987                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
988                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
989     }
990     else //This is a message from an element-- wrap it in a reduction_set_element
991     {
992       RED_DEB(("|\tmsg[%d] is %d bytes\n",i,msg[i]->getSize()));
993       cur->dataSize=msg[i]->getSize();
994       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
995       cur=SET_NEXT(cur);
996     }
997   cur->dataSize=-1;//Add a terminating -1.
998   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
999   return ret;
1000 }
1001
1002 //Utility routine: get the next reduction_set_element in the list
1003 // if there is one, or return NULL if there are none.
1004 //To get all the elements, just keep feeding this procedure's output back to
1005 // its input until it returns NULL.
1006 CkReduction::setElement *CkReduction::setElement::next(void)
1007 {
1008   CkReduction::setElement *n=SET_NEXT(this);
1009   if (n->dataSize==-1)
1010     return NULL;//This is the end of the list
1011   else
1012     return n;//This is just another element
1013 }
1014
1015 /////////////////// Reduction Function Table /////////////////////
1016 CkReduction::CkReduction() {} //Dummy private constructor
1017
1018 //Add the given reducer to the list.  Returns the new reducer's
1019 // reducerType.  Must be called in the same order on every node.
1020 CkReduction::reducerType CkReduction::addReducer(reducerFn fn)
1021 {
1022   reducerTable[nReducers]=fn;
1023   return (reducerType)nReducers++;
1024 }
1025
1026 /*Reducer table: maps reducerTypes to reducerFns.
1027 It's indexed by reducerType, so the order in this table
1028 must *exactly* match the reducerType enum declaration.
1029 The names don't have to match, but it helps.
1030 */
1031 int CkReduction::nReducers=CkReduction::lastSystemReducer;
1032
1033 CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
1034     ::invalid_reducer,
1035   //Compute the sum the numbers passed by each element.
1036     ::sum_int,::sum_float,::sum_double,
1037
1038   //Compute the product the numbers passed by each element.
1039     ::product_int,::product_float,::product_double,
1040
1041   //Compute the largest number passed by any element.
1042     ::max_int,::max_float,::max_double,
1043
1044   //Compute the smallest number passed by any element.
1045     ::min_int,::min_float,::min_double,
1046
1047   //Compute the logical AND of the integers passed by each element.
1048   // The resulting integer will be zero if any source integer is zero.
1049     ::logical_and,
1050
1051   //Compute the logical OR of the integers passed by each element.
1052   // The resulting integer will be 1 if any source integer is nonzero.
1053     ::logical_or,
1054
1055   //Concatenate the (arbitrary) data passed by each element
1056     ::concat,
1057
1058   //Combine the data passed by each element into an list of setElements.
1059   // Each element may contribute arbitrary data (with arbitrary length).
1060     ::set
1061 };
1062
1063
1064
1065
1066
1067
1068
1069
1070 /********** Code added by Sayantan *********************/
1071 /** Locking is a big problem in the nodegroup code for smp.
1072  So a few assumptions have had to be made. There is one lock
1073  called lockEverything. It protects all the data structures 
1074  of the nodegroup reduction mgr. I tried grabbing it separately 
1075  for each datastructure, modifying it and then releasing it and
1076  then grabbing it again, for the next change.
1077  That doesn't really help because the interleaved execution of 
1078  different threads makes the state of the reduction manager 
1079  inconsistent. 
1080  
1081  1. Grab lockEverything before calling finishreduction or startReduction
1082     or doRecvMsg
1083  2. lockEverything is grabbed only in entry methods reductionStarting
1084     or RecvMesg or  addcontribution.
1085  ****/
1086  
1087 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1088 NodeGroup::NodeGroup(void) {
1089   __nodelock=CmiCreateLock();
1090
1091
1092 }
1093 NodeGroup::~NodeGroup() {
1094   CmiDestroyLock(__nodelock);
1095 }
1096 void NodeGroup::pup(PUP::er &p)
1097 {
1098   CkNodeReductionMgr::pup(p);
1099   p|reductionInfo;
1100 }
1101
1102 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1103
1104 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1105   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1106  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1107   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1108  }
1109
1110 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1111                                     ((CkNodeReductionMgr *)this),
1112                                     reductionInfo);
1113
1114 /* this contribute also adds up the count across all messages it receives.
1115   Useful for summing up number of array elements who have contributed ****/ 
1116 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1117         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1118
1119
1120
1121 //#define BINOMIAL_TREE
1122
1123 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1124   : thisProxy(thisgroup)
1125 {
1126  if(CkMyRank() == 0){
1127 #ifdef BINOMIAL_TREE
1128   init_BinomialTree();
1129 #endif
1130   storedCallback=NULL;
1131   redNo=0;
1132   inProgress=CmiFalse;
1133   
1134   startRequested=CmiFalse;
1135   gcount=CkNumNodes();
1136   lcount=1;
1137   nContrib=nRemote=0;
1138   lockEverything = CmiCreateLock();
1139
1140
1141   creating=CmiFalse;
1142   interrupt = 0;
1143   DEBR((AA"In NodereductionMgr constructor at %d \n"AB,this));
1144   }
1145 }
1146
1147 //////////// Reduction Manager Client API /////////////
1148
1149 //Add the given client function.  Overwrites any previous client.
1150 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
1151 {
1152   DEBR((AA"Setting reductionClient in NodeReductionMgr %d at %d\n"AB,cb,this));
1153   if(cb->isInvalid()){
1154         DEBR((AA"Invalid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1155   }else{
1156         DEBR((AA"Valid Callback passed to setReductionClient in nodeReductionMgr\n"AB));
1157   }
1158
1159   if (CkMyNode()!=0)
1160           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
1161   delete storedCallback;
1162   storedCallback=cb;
1163 }
1164
1165
1166
1167 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
1168 {
1169
1170   m->ci=ci;
1171   m->redNo=ci->redNo++;
1172   m->sourceFlag=-1;//A single contribution
1173   m->gcount=0;
1174
1175   addContribution(m);
1176
1177 }
1178
1179
1180 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
1181 {
1182   m->ci=ci;
1183   m->redNo=ci->redNo++;
1184   m->gcount=count;
1185 #if DEBUGRED
1186  CkPrintf("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1187 #endif
1188   addContribution(m);
1189 #if DEBUGRED
1190   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
1191 #endif
1192
1193 }
1194
1195
1196 //////////// Reduction Manager Remote Entry Points /////////////
1197
1198 //Sent down the reduction tree (used by barren PEs)
1199 void CkNodeReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
1200 {
1201   CmiLock(lockEverything);
1202   if (isPresent(m->num) && !inProgress)
1203   {
1204     DEBR((AA"Starting Node reduction #%d at parent's request\n"AB,m->num));
1205     startReduction(m->num);
1206     finishReduction();
1207   } else if (isFuture(m->num)){
1208         //CkPrintf("[%d][%d] Message num %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->num,redNo);
1209         CkAbort("My reduction tree parent somehow got ahead of me! in nodegroups\n");
1210     }
1211   else //is Past
1212     DEBR((AA"Ignoring node parent's late request to start #%d\n"AB,m->num));
1213   CmiUnlock(lockEverything);
1214   delete m;
1215
1216 }
1217
1218
1219 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
1220 #if DEBUGRED
1221         CkPrintf("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1222 #endif
1223         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
1224             //DEBR((AA"Recv'd remote contribution %d for #%d at %d\n"AB,nRemote,m->redNo,this));
1225             startReduction(m->redNo);
1226             msgs.enq(m);
1227             nRemote++;
1228             finishReduction();
1229         }
1230         else {
1231             if (isFuture(m->redNo)) {
1232                    // DEBR((AA"Recv'd early remote contribution %d for #%d\n"AB,nRemote,m->redNo));
1233                     futureRemoteMsgs.enq(m);
1234             }
1235             else{
1236                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
1237                    CkAbort("Recv'd late remote contribution!\n");
1238             }
1239        }
1240 #if DEBUGRED        
1241        CkPrintf("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1242 #endif       
1243 }
1244
1245 //Sent up the reduction tree with reduced data
1246 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
1247 {
1248 #ifndef CMK_CPV_IS_SMP
1249 #if CMK_IMMEDIATE_MSG
1250         if(interrupt == 1){
1251                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
1252                 CpvAccess(_qd)->process(-1);
1253                 CmiDelayImmediate();
1254                 return;
1255         }
1256 #endif  
1257 #endif
1258    interrupt = 1;       
1259    CmiLock(lockEverything);   
1260 #if DEBUGRED   
1261    CkPrintf("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1262 #endif   
1263    doRecvMsg(m);
1264    CmiUnlock(lockEverything);    
1265    interrupt = 0;
1266 #if DEBUGRED  
1267    CkPrintf("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer());
1268 #endif 
1269 }
1270
1271 void CkNodeReductionMgr::startReduction(int number)
1272 {
1273         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
1274         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
1275         if (inProgress){
1276                 DEBR((AA"This Node reduction is already in progress\n"AB));
1277                 return;//This reduction already started
1278         }
1279         if (creating) //Don't start yet-- we're creating elements
1280         {
1281                 DEBR((AA" Node Postponing start request #%d until we're done creating\n"AB,redNo));
1282                 startRequested=CmiTrue;
1283                 return;
1284         }
1285
1286         //If none of these cases, we need to start the reduction--
1287         DEBR((AA"Starting Node reduction #%d\n"AB,redNo));
1288         inProgress=CmiTrue;
1289         //Sent start requests to our kids (in case they don't already know)
1290
1291         for (int k=0;k<treeKids();k++)
1292         {
1293 #ifdef BINOMIAL_TREE
1294                 DEBR((AA"Asking child Node %d to start #%d\n"AB,kids[k],redNo));
1295                 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
1296 #else
1297                 DEBR((AA"Asking child Node %d to start #%d\n"AB,firstKid()+k,redNo));
1298                 thisProxy[firstKid()+k].ReductionStarting(new CkReductionNumberMsg(redNo));
1299 #endif
1300         }
1301 }
1302
1303 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
1304         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
1305                 DEBR((AA"Contributor %p gives early node contribution-- for #%d\n"AB,m->ci,m->redNo));
1306                 futureMsgs.enq(m);
1307         } else {// An ordinary contribution
1308                 DEBR((AA"Recv'd local node contribution %d for #%d at %d\n"AB,nContrib,m->redNo,this));
1309                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
1310                 startReduction(m->redNo);
1311                 msgs.enq(m);
1312                 nContrib++;
1313                 finishReduction();
1314         }
1315 }
1316
1317 //Handle a message from one element for the reduction
1318 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
1319 {
1320    interrupt = 1;
1321    CmiLock(lockEverything);
1322    doAddContribution(m);
1323   CmiUnlock(lockEverything);
1324   interrupt = 0;
1325 }
1326 /** check if the nodegroup reduction is finished at this node. In that case send it
1327 up the reduction tree **/
1328
1329 void CkNodeReductionMgr::finishReduction(void)
1330 {
1331   DEBR((AA"in Nodegrp finishReduction %d \n"AB,inProgress));
1332   /***Check if reduction is finished in the next few ifs***/
1333   if ((!inProgress) | creating){
1334         DEBR((AA"Either not in Progress or creating\n"AB));
1335         return;
1336   }
1337
1338   if (nContrib<(lcount)){
1339         DEBR((AA"Nodegrp Need more local messages %d %d\n"AB,nContrib,(lcount)));
1340
1341          return;//Need more local messages
1342   }
1343   if (nRemote<treeKids()){
1344         DEBR((AA"Nodegrp Need more Remote messages %d %d\n"AB,nRemote,treeKids()));
1345
1346         return;//Need more remote messages
1347   }
1348   if (nRemote>treeKids()){
1349
1350           interrupt = 0;
1351            CkAbort("Nodegrp Excess remote reduction message received!\n");
1352   }
1353
1354   DEBR((AA"Reducing node data...\n"AB));
1355
1356   /**reduce all messages received at this node **/
1357   CkReductionMsg *result=reduceMessages();
1358
1359   if (hasParent())
1360   {//Pass data up tree to parent
1361     DEBR((AA"Passing reduced data up to parent node %d. \n"AB,treeParent()));
1362 #if DEBUGRED
1363     CkPrintf("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib);
1364 #endif
1365     thisProxy[treeParent()].RecvMsg(result);
1366
1367   }
1368   else
1369   {
1370           /** if the reduction is finished and I am the root of the reduction tree
1371           then call the reductionhandler and other stuff ***/
1372
1373 #if DEBUGRED
1374    CkPrintf("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer());
1375 #endif
1376     if (!result->callback.isInvalid()){
1377 #if DEBUGRED
1378             CkPrintf("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe());
1379 #endif      
1380             result->callback.send(result);
1381     }
1382     else if (storedCallback!=NULL){
1383 #if DEBUGRED
1384             CkPrintf("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe());
1385 #endif
1386             storedCallback->send(result);
1387     }
1388     else{
1389                 DEBR((AA"Invalid Callback at %d %d\n"AB,result->callback,storedCallback));
1390             CkAbort("No reduction client!\n"
1391                     "You must register a client with either SetReductionClient or during contribute.\n");
1392         }
1393   }
1394
1395   // DEBR((AA"Reduction %d finished in group!\n"AB,redNo));
1396   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
1397   redNo++;
1398   int i;
1399   inProgress=CmiFalse;
1400   startRequested=CmiFalse;
1401   nRemote=nContrib=0;
1402
1403   //Look through the future queue for messages we can now handle
1404   int n=futureMsgs.length();
1405
1406   for (i=0;i<n;i++)
1407   {
1408     interrupt = 1;
1409
1410     CkReductionMsg *m=futureMsgs.deq();
1411
1412     interrupt = 0;
1413     if (m!=NULL){ //One of these addContributions may have finished us.
1414       doAddContribution(m);//<- if *still* early, puts it back in the queue
1415     }
1416   }
1417
1418   interrupt = 1;
1419
1420   n=futureRemoteMsgs.length();
1421
1422   interrupt = 0;
1423   for (i=0;i<n;i++)
1424   {
1425     interrupt = 1;
1426
1427     CkReductionMsg *m=futureRemoteMsgs.deq();
1428
1429     interrupt = 0;
1430     if (m!=NULL)
1431       doRecvMsg(m);//<- if *still* early, puts it back in the queue
1432   }
1433 }
1434
1435 //////////// Reduction Manager Utilities /////////////
1436 void CkNodeReductionMgr::init_BinomialTree(){
1437         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
1438         /*upperSize = (unsigned )pow((double)2,depth);*/
1439         upperSize = (unsigned) 1 << depth;
1440         label = upperSize-CkMyNode()-1;
1441         int p=label;
1442         int count=0;
1443         while( p > 0){
1444                 if(p % 2 == 0)
1445                         break;
1446                 else{
1447                         p = p/2;
1448                         count++;
1449                 }
1450         }
1451         /*parent = label + rint(pow((double)2,count));*/
1452         parent = label + (1<<count);
1453         parent = upperSize -1 -parent;
1454         int temp;
1455         if(count != 0){
1456                 kids = new int[count];
1457                 numKids = 0;
1458                 for(int i=0;i<count;i++){
1459                         /*temp = label - rint(pow((double)2,i));*/
1460                         temp = label - (1<<i);
1461                         temp = upperSize-1-temp;
1462                         if(temp <= CkNumNodes()-1){
1463                                 kids[numKids] = temp;
1464                                 numKids++;
1465                         }
1466                 }
1467         }else{
1468                 numKids = 0;
1469                 kids = NULL;
1470         }
1471 }
1472
1473
1474 int CkNodeReductionMgr::treeRoot(void)
1475 {
1476   return 0;
1477 }
1478 CmiBool CkNodeReductionMgr::hasParent(void) //Root Node
1479 {
1480   return (CmiBool)(CkMyNode()!=treeRoot());
1481 }
1482 int CkNodeReductionMgr::treeParent(void) //My parent Node
1483 {
1484 #ifdef BINOMIAL_TREE
1485         return parent;
1486 #else
1487   return (CkMyNode()-1)/TREE_WID;
1488 #endif
1489 }
1490
1491 int CkNodeReductionMgr::firstKid(void) //My first child Node
1492 {
1493   return CkMyNode()*TREE_WID+1;
1494 }
1495 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
1496 {
1497 #ifdef BINOMIAL_TREE
1498         return numKids;
1499 #else
1500   int nKids=CkNumNodes()-firstKid();
1501   if (nKids>TREE_WID) nKids=TREE_WID;
1502   if (nKids<0) nKids=0;
1503   return nKids;
1504 #endif
1505 }
1506
1507 //Combine (& free) the current message vector msgs.
1508 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
1509 {
1510   CkReductionMsg *ret=NULL;
1511
1512   //Look through the vector for a valid reducer, swapping out placeholder messages
1513   CkReduction::reducerType r=CkReduction::invalid;
1514   int msgs_gcount=0;//Reduced gcount
1515   int msgs_nSources=0;//Reduced nSources
1516   int msgs_userFlag=-1;
1517   CkCallback msgs_callback;
1518   CkCallback msgs_secondaryCallback;
1519   int i;
1520   int nMsgs=0;
1521   CkReductionMsg *m;
1522   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
1523
1524   while(NULL!=(m=msgs.deq()))
1525   {
1526     DEBR((AA"***** gcount=%d; sourceFlag=%d\n"AB,m->gcount,m->nSources()));       
1527     msgs_gcount+=m->gcount;
1528     if (m->sourceFlag!=0)
1529     { //This is a real message from an element, not just a placeholder
1530       msgArr[nMsgs++]=m;
1531       msgs_nSources+=m->nSources();
1532       r=m->reducer;
1533       if (!m->callback.isInvalid())
1534         msgs_callback=m->callback;
1535       if(!m->secondaryCallback.isInvalid()){
1536         msgs_secondaryCallback = m->secondaryCallback;
1537       }
1538       if (m->userFlag!=-1)
1539         msgs_userFlag=m->userFlag;
1540     }
1541     else
1542     { //This is just a placeholder message-- replace it
1543       delete m;
1544     }
1545   }
1546
1547   if (nMsgs==0||r==CkReduction::invalid)
1548   //No valid reducer in the whole vector
1549     ret=CkReductionMsg::buildNew(0,NULL);
1550   else
1551   {//Use the reducer to reduce the messages
1552     CkReduction::reducerFn f=CkReduction::reducerTable[r];
1553     ret=(*f)(nMsgs,msgArr);
1554     ret->reducer=r;
1555   }
1556
1557   //Go back through the vector, deleting old messages
1558   for (i=0;i<nMsgs;i++) delete msgArr[i];
1559
1560   //Set the message counts
1561   ret->redNo=redNo;
1562   ret->gcount=msgs_gcount;
1563   ret->userFlag=msgs_userFlag;
1564   ret->callback=msgs_callback;
1565   ret->secondaryCallback = msgs_secondaryCallback;
1566   ret->sourceFlag=msgs_nSources;
1567   DEBR((AA"Reduced gcount=%d; sourceFlag=%d\n"AB,ret->gcount,ret->sourceFlag));
1568
1569   return ret;
1570 }
1571
1572 void CkNodeReductionMgr::pup(PUP::er &p)
1573 {
1574 //We do not store the client function pointer or the client function parameter,
1575 //it is the responsibility of the programmer to correctly restore these
1576   IrrGroup::pup(p);
1577   p(redNo);
1578   p(inProgress); p(creating); p(startRequested);
1579   p(gcount); p(lcount);
1580   p(nContrib); p(nRemote);
1581   p(interrupt);
1582   p|msgs;
1583   p|futureMsgs;
1584   p|futureRemoteMsgs;
1585   if(p.isUnpacking()) thisProxy = thisgroup;
1586 }
1587
1588
1589 #include "CkReduction.def.h"