Fixing the way to handle immigrant recovering objects.
[charm.git] / src / ck-core / ckreduction.h
1 /*
2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
5
6
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE.  This library provides several different kinds of
11 combination routines (reducers), and various utilities
12 for supporting them.
13
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
16
17 */
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
20
21 #include "CkReduction.decl.h"
22 #include "CkArrayReductionMgr.decl.h"
23
24 #if CMK_BIGSIM_CHARM || CMK_MULTICORE || !CMK_SMP
25 #define GROUP_LEVEL_REDUCTION           1
26 #endif
27
28 #ifdef _PIPELINED_ALLREDUCE_
29 #define FRAG_SIZE 131072
30 #define FRAG_THRESHOLD 131072
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 #define MAX_INT 5000000
35 #define _MLOG_REDUCE_P2P_ 0
36 #endif
37
38 //This message is sent between group objects on a single PE
39 // to let each know the other has been created.
40 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
41 public:
42         typedef void (*callbackType)(void *param);
43         CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
44                 {callback=Ncallback;param=Nparam;}
45         void call(void) {(*callback)(param);}
46 private:
47         callbackType callback;
48         void *param;
49 };
50
51 class CkGroupInitCallback : public IrrGroup {
52 public:
53         CkGroupInitCallback(void);
54         CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
55         void callMeBack(CkGroupCallbackMsg *m);
56         void pup(PUP::er& p){ IrrGroup::pup(p); }
57 };
58
59
60 class CkGroupReadyCallback : public IrrGroup {
61 private:
62   int _isReady;
63   CkQ<CkGroupCallbackMsg *> _msgs;
64   void callBuffered(void);
65 public:
66         CkGroupReadyCallback(void);
67         CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
68         void callMeBack(CkGroupCallbackMsg *m);
69         int isReady(void) { return _isReady; }
70 protected:
71         void setReady(void) {_isReady = 1; callBuffered(); }
72         void setNotReady(void) {_isReady = 0; }
73 };
74
75 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
76 public:
77   int num;
78   CkReductionNumberMsg(int n) {num=n;}
79 };
80
81
82 /**some data classes used by both ckreductionmgr and cknodereductionmgr**/
83 class contributorInfo {
84 public:
85         int redNo;//Current reduction number
86         contributorInfo() {redNo=0;}
87         //Migration utilities:
88         void pup(PUP::er &p);
89 };
90
91 class countAdjustment {
92 public:
93         int gcount;//Adjustment to global count (applied at reduction end)
94         int lcount;//Adjustment to local count (applied continually)
95         int mainRecvd;
96         countAdjustment(int ignored=0) {gcount=lcount=0;mainRecvd=0;}
97         void pup(PUP::er& p){ p|gcount; p|lcount; p|mainRecvd; }
98 };
99
100 /** @todo: Fwd decl for a temporary class. Remove after
101  * delegated cross-array reductions are implemented more optimally
102  */
103 namespace ck { namespace impl { class XArraySectionReducer; } }
104
105 //CkReduction is just a "namespace class" for the user-visible
106 // parts of the reduction system.
107 class CkReduction {
108 public:
109         /*These are the reducers you can use,
110           in addition to any user-defined reducers.*/
111
112         /*  !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
113             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
114
115                  remember to update CkReduction::reducerTable
116
117             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
118             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!  */
119
120         typedef enum {
121         //A placeholder invalid reduction type
122                 invalid=0,
123                 nop,
124         //Compute the sum the numbers passed by each element.
125                 sum_int,sum_long,sum_float,sum_double,
126
127         //Compute the product the numbers passed by each element.
128                 product_int,product_long,product_float,product_double,
129
130         //Compute the largest number passed by any element.
131                 max_int,max_long,max_float,max_double,
132
133         //Compute the smallest number passed by any element.
134                 min_int,min_long,min_float,min_double,
135
136         //Compute the logical AND of the integers passed by each element.
137         // The resulting integer will be zero if any source integer is zero.
138                 logical_and,
139
140         //Compute the logical OR of the integers passed by each element.
141         // The resulting integer will be 1 if any source integer is nonzero.
142                 logical_or,
143
144                 // Compute the logical bitvector AND of the integers passed by each element.
145                 bitvec_and,
146
147                 // Compute the logical bitvector OR of the integers passed by each element.
148                 bitvec_or,
149
150         // Select one message at random to pass on
151                 random,
152
153         //Concatenate the (arbitrary) data passed by each element
154                 concat,
155
156         //Combine the data passed by each element into an list of setElements.
157         // Each element may contribute arbitrary data (with arbitrary length).
158                 set,
159
160         //Last system-defined reducer number (user-defined reducers follow)
161                 lastSystemReducer
162         } reducerType;
163
164         //This structure is used with the set reducer above,
165         // and contains the data from one contribution.
166         class setElement {
167         public:
168                 int dataSize;//The length of the data array below
169                 char data[1];//The (dataSize-long) array of data
170                 //Utility routine: get the next setElement,
171                 // or return NULL if there are none.
172                 setElement *next(void);
173         };
174
175 //Support for adding new reducerTypes:
176         //A reducerFunction is used to combine several contributions
177         //into a single summed contribution:
178         //  nMsg gives the number of messages to reduce.
179         //  msgs[i] contains a contribution or summed contribution.
180         typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
181
182         //Add the given reducer to the list.  Returns the new reducer's
183         // reducerType.  Must be called in the same order on every node.
184         static reducerType addReducer(reducerFn fn);
185
186 private:
187         friend class CkReductionMgr;
188         friend class CkNodeReductionMgr;
189         friend class CkArrayReductionMgr;
190         friend class CkMulticastMgr;
191     friend class ck::impl::XArraySectionReducer;
192 //System-level interface
193         //This is the maximum number of possible reducers,
194         // including both builtin and user-defined types
195         enum {MAXREDUCERS=256};
196
197         //Reducer table: maps reducerTypes to reducerFns.
198         static reducerFn reducerTable[MAXREDUCERS];
199         static int nReducers;//Number of reducers currently in table above
200
201         //Don't instantiate a CkReduction object-- it's just a namespace.
202         CkReduction();
203 };
204 PUPbytes(CkReduction::reducerType)
205
206 //A CkReductionMsg is sent up the reduction tree-- it
207 // carries a contribution, or several reduced contributions.
208 class CkReductionMsg : public CMessage_CkReductionMsg
209 {
210         friend class CkReduction;
211         friend class CkReductionMgr;
212         friend class CkNodeReductionMgr;
213         friend class CkArrayReductionMgr;
214         friend class CkMulticastMgr;
215 #ifdef _PIPELINED_ALLREDUCE_
216         friend class ArrayElement;
217         friend class AllreduceMgr;
218 #endif
219         friend class ck::impl::XArraySectionReducer;
220 public:
221
222 //Publically-accessible fields:
223         //"Constructor"-- builds and returns a new CkReductionMsg.
224         //  the "srcData" array you specify will be copied into this object (unless NULL).
225         static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
226                 CkReduction::reducerType reducer=CkReduction::invalid,
227                 CkReductionMsg *buf = NULL);
228
229         inline int getLength(void) const {return dataSize;}
230         inline int getSize(void) const {return dataSize;}
231         inline void *getData(void) {return data;}
232         inline const void *getData(void) const {return data;}
233
234         inline int getGcount(void){return gcount;}
235         inline CkReduction::reducerType getReducer(void){return reducer;}
236         inline int getRedNo(void){return redNo;}
237
238         inline CMK_REFNUM_TYPE getUserFlag(void) const {return userFlag;}
239         inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
240
241         inline void setCallback(const CkCallback &cb) { callback=cb; }
242
243         //Return true if this message came straight from a contribute call--
244         // if it didn't come from a previous reduction function.
245         inline int isFromUser(void) const {return sourceFlag==-1;}
246
247         inline bool isMigratableContributor(void) const {return migratableContributor;}
248         inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
249
250         ~CkReductionMsg();
251
252 //Implementation-only fields (don't access these directly!)
253         //Msg runtime support
254         static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
255         static void *pack(CkReductionMsg *);
256         static CkReductionMsg *unpack(void *in);
257
258 private:
259         int dataSize;//Length of array below, in bytes
260         void *data;//Reduction data
261         CMK_REFNUM_TYPE userFlag; //Some sort of identifying flag, for client use
262         CkCallback callback; //What to do when done
263         CkCallback secondaryCallback; // the group callback is piggybacked on the nodegrp reduction
264         bool migratableContributor; // are the contributors migratable
265
266         int sourceFlag;/*Flag:
267                 0 indicates this is a placeholder message (meaning: nothing to report)
268                 -1 indicates this is a single (non-reduced) contribution.
269                 >0 indicates this is a reduced contribution.
270         */
271         int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
272 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
273     int sourceProcessorCount;
274     int fromPE;
275 #endif
276 private:
277 #if CMK_BIGSIM_CHARM
278         void *log;
279 #endif
280         CkReduction::reducerType reducer;
281         //contributorInfo *ci;//Source contributor, or NULL if none
282         int redNo;//The serial number of this reduction
283         int gcount;//Contribution to the global contributor count
284         // for section multicast/reduction library
285         CkSectionInfo sid;   // section cookie for multicast
286         char rebuilt;          // indicate if the multicast tree needs rebuilt
287         int nFrags;
288         int fragNo;      // fragment of a reduction msg (when pipelined)
289                          // value = 0 to nFrags-1
290         double dataStorage;//Start of data array (so it's double-aligned)
291
292         int no;
293
294         //Default constructor is private so you must use "buildNew", above
295         CkReductionMsg();
296 };
297
298
299 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
300   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
301         CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
302   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
303         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
304   void contribute(CkReductionMsg *msg); \
305   void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
306   void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
307
308
309 class CkNodeReductionMgr : public IrrGroup {
310 public:
311         CProxy_CkNodeReductionMgr thisProxy;
312 public:
313         CkNodeReductionMgr(void);
314         CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
315           storedCallback = NULL;
316         }
317
318         typedef CkReductionClientFn clientFn;
319
320         /**
321          * Add the given client function.  Overwrites any previous client.
322          * This manager will dispose of the callback when replaced or done.
323          */
324         void ckSetReductionClient(CkCallback *cb);
325
326 //Contribute-- the given msg can contain any data.  The reducerType
327 // field of the message must be valid.
328 // Each contributor must contribute exactly once to each reduction.
329         void contribute(contributorInfo *ci,CkReductionMsg *msg);
330         void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
331 //Communication (library-private)
332         void restartLocalGroupReductions(int number);
333         //Sent down the reduction tree (used by barren PEs)
334         void ReductionStarting(CkReductionNumberMsg *m);
335         //Sent up the reduction tree with reduced data
336         void RecvMsg(CkReductionMsg *m);
337         void doRecvMsg(CkReductionMsg *m);
338         void LateMigrantMsg(CkReductionMsg *m);
339
340         virtual void flushStates();     // flush state varaibles
341         virtual int startLocalGroupReductions(int number){ return 1;} // can be used to start reductions on all the 
342         //CkReductionMgrs on a particular node. It is overwritten by CkArrayReductionMgr to make the actual calls
343         // since it knows the CkReductionMgrs on a node.
344
345         virtual int getTotalGCount(){return 0;};
346
347 private:
348 //Data members
349         //Stored callback function (may be NULL if none has been set)
350         CkCallback *storedCallback;
351
352         int redNo;//Number of current reduction (incremented at end)
353         CmiBool inProgress;//Is a reduction started, but not complete?
354         CmiBool creating;//Are elements still being created?
355         CmiBool startRequested;//Should we start the next reduction when creation finished?
356         int gcount;
357         int lcount;//Number of local contributors
358
359         //Current local and remote contributions
360         int nContrib,nRemote;
361         //Contributions queued for the current reduction
362         CkMsgQ<CkReductionMsg> msgs;
363         //Contributions queued for future reductions (sent to us too early)
364         CkMsgQ<CkReductionMsg> futureMsgs;
365         //Remote messages queued for future reductions (sent to us too early)
366         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
367         //Late migrant messages queued for future reductions
368         CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
369         
370         //My Big LOCK
371         CmiNodeLock lockEverything;
372
373         int interrupt; /* flag for use in non-smp 0 means interrupt can occur 1 means not (also acts as a lock)*/
374
375         /*vector storing the children of this node*/
376         CkVec<int> kids;
377         
378 //State:
379         void startReduction(int number,int srcPE);
380         void doAddContribution(CkReductionMsg *m);
381         void finishReduction(void);
382 protected:      
383         void addContribution(CkReductionMsg *m);
384
385 private:
386
387 //Reduction tree utilities
388 /* for binomial trees*/
389         unsigned upperSize;
390         unsigned label;
391         int parent;
392         int numKids;
393 //      int *kids;
394         void init_BinomialTree();
395
396         
397         void init_BinaryTree();
398         enum {TREE_WID=2};
399         int treeRoot(void);//Root PE
400         CmiBool hasParent(void);
401         int treeParent(void);//My parent PE
402         int firstKid(void);//My first child PE
403         int treeKids(void);//Number of children in tree
404
405         //Combine (& free) the current message vector.
406         CkReductionMsg *reduceMessages(void);
407
408         //Map reduction number to a time
409         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
410         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
411         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
412
413         /*FAULT_EVAC*/
414         bool oldleaf;
415         bool blocked;
416         int newParent;
417         int additionalGCount,newAdditionalGCount; //gcount that gets passed to u from the node u replace
418         CkVec<int> newKids;
419         CkMsgQ<CkReductionMsg> bufferedMsgs;
420         CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
421         enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
422         int numModificationReplies;
423         int maxModificationRedNo;
424         int tempModificationRedNo;
425         bool readyDeletion;
426         int killed;     
427         
428 //Checkpointing utilities
429  public:
430         virtual void pup(PUP::er &p);
431         /*FAULT_EVAC*/
432         virtual void evacuate();
433         virtual void doneEvacuate();
434         void DeleteChild(int deletedChild);
435         void DeleteNewChild(int deletedChild);
436         void collectMaxRedNo(int maxRedNo);
437         void unblockNode(int maxRedNo);
438         void modifyTree(int code,int size,int *data);
439 private:        
440         int findMaxRedNo();
441         void updateTree();
442         void clearBlockedMsgs();
443 };
444
445
446
447 //A NodeGroup that contribute to reductions
448 class NodeGroup : public CkNodeReductionMgr {
449   protected:
450     contributorInfo reductionInfo;//My reduction information
451   public:
452     CmiNodeLock __nodelock;
453     NodeGroup();
454     NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m) { __nodelock=CmiCreateLock(); }
455     
456     ~NodeGroup();
457     inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
458     inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
459     virtual int isNodeGroup() { return 1; }
460
461     virtual void pup(PUP::er &p);
462     virtual void flushStates() {
463         CkNodeReductionMgr::flushStates();
464         reductionInfo.redNo = 0;
465     }
466
467     CK_REDUCTION_CONTRIBUTE_METHODS_DECL
468     void contributeWithCounter(CkReductionMsg *msg,int count);
469 };
470
471
472 /**
473  * One CkReductionMgr runs a non-overlapping set of reductions.
474  * It collects messages from all local contributors, then sends
475  * the reduced message up the reduction tree to node zero, where
476  * they're passed to the user's client function.
477  */
478 class CkNodeReductionMgr;
479
480 class CProxy_CkArrayReductionMgr;
481 class CkReductionMgr : public CkGroupInitCallback {
482 public:
483         CProxy_CkReductionMgr thisProxy;
484
485 public:
486         CProxy_CkArrayReductionMgr nodeProxy; //holds the local branch of the nodegroup tree
487         CkReductionMgr(void);
488         CkReductionMgr(CkMigrateMessage *m);
489
490         typedef CkReductionClientFn clientFn;
491
492         /**
493          * Add the given client function.  Overwrites any previous client.
494          * This manager will dispose of the callback when replaced or done.
495          */
496         void ckSetReductionClient(CkCallback *cb);
497
498 //Contributors keep a copy of this structure:
499
500
501 //Contributor list maintainance:
502         //These just set and clear the "creating" flag to prevent
503         // reductions from finishing early because not all elements
504         // have been created.
505         void creatingContributors(void);
506         void doneCreatingContributors(void);
507         //Initializes a new contributor
508         void contributorStamped(contributorInfo *ci);//Increment global number
509         void contributorCreated(contributorInfo *ci);//Increment local number
510         void contributorDied(contributorInfo *ci);//Don't expect more contributions
511         //Migrating away
512         void contributorLeaving(contributorInfo *ci);
513         //Migrating in
514         void contributorArriving(contributorInfo *ci);
515
516 //Contribute-- the given msg can contain any data.  The reducerType
517 // field of the message must be valid.
518 // Each contributor must contribute exactly once to each reduction.
519         void contribute(contributorInfo *ci,CkReductionMsg *msg);
520
521 //Communication (library-private)
522         //Sent down the reduction tree (used by barren PEs)
523         void ReductionStarting(CkReductionNumberMsg *m);
524         //Sent to root of the reduction tree with late migrant data
525         void LateMigrantMsg(CkReductionMsg *m);
526         //A late migrating contributor will never contribute
527         void MigrantDied(CkReductionNumberMsg *m);
528
529         void RecvMsg(CkReductionMsg *m);
530
531         //Call back for using Node added by Sayantan
532         void ArrayReductionHandler(CkReductionMsg *m);
533         void endArrayReduction();
534
535         virtual CmiBool isReductionMgr(void){ return CmiTrue; }
536         virtual void flushStates(int isgroup);
537         /*FAULT_EVAC: used to get the gcount on a processor when 
538                 it is evacuated.
539                 TODO: It needs to be fixed as it should return the gcount
540                 and the adjustment information for objects that might have
541                 contributed and died.
542                 The current implementation lets us get by in the case
543                 when there are no gcount
544         */
545         int getGCount(){return gcount;};
546         static void sanitycheck();
547 #if defined(_FAULT_CAUSAL_)
548         void incNumImmigrantRecObjs(){
549                 numImmigrantRecObjs++;
550         }
551         void decNumImmigrantRecObjs(){
552                 numImmigrantRecObjs--;
553         }
554 #endif
555
556 private:
557
558 #if defined(_FAULT_CAUSAL_)
559         int numImmigrantRecObjs;
560 #endif
561
562 //Data members
563         //Stored callback function (may be NULL if none has been set)
564         CkCallback storedCallback;
565         // calback that came along with the contribute
566         CkCallback *secondaryStoredCallback;
567
568         int redNo;//Number of current reduction (incremented at end) to be deposited with NodeGroups
569         int completedRedNo;//Number of reduction Completed ie recieved callback from NodeGroups
570         CmiBool inProgress;//Is a reduction started, but not complete?
571         CmiBool creating;//Are elements still being created?
572         CmiBool startRequested;//Should we start the next reduction when creation finished?
573         int gcount;//=el't created here - el't deleted here
574         int lcount;//Number of local contributors
575         int maxStartRequest; // the highest future ReductionStarting message received
576
577         //Current local and remote contributions
578         int nContrib,nRemote;
579         //Contributions queued for the current reduction
580         CkMsgQ<CkReductionMsg> msgs;
581
582         //Contributions queued for future reductions (sent to us too early)
583         CkMsgQ<CkReductionMsg> futureMsgs;
584         //Remote messages queued for future reductions (sent to us too early)
585         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
586
587         CkMsgQ<CkReductionMsg> finalMsgs;
588
589 //State:
590         void startReduction(int number,int srcPE);
591         void addContribution(CkReductionMsg *m);
592         void finishReduction(void);
593
594 #if GROUP_LEVEL_REDUCTION
595 //Reduction tree utilities
596         unsigned upperSize;
597         unsigned label;
598         int parent;
599         int numKids;
600         /*vector storing the children of this node*/
601         CkVec<int> newKids;
602         CkVec<int> kids;
603         void init_BinomialTree();
604
605         void init_BinaryTree();
606         enum {TREE_WID=2};
607         int treeRoot(void);//Root PE
608         CmiBool hasParent(void);
609         int treeParent(void);//My parent PE
610         int firstKid(void);//My first child PE
611         int treeKids(void);//Number of children in tree
612 #endif
613
614         //Combine (& free) the current message vector.
615         CkReductionMsg *reduceMessages(void);
616
617         //Map reduction number to a time
618         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
619         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
620         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
621
622
623         //This vector of adjustments is indexed by redNo,
624         // starting from the current redNo.
625         CkVec<countAdjustment> adjVec;
626         //Return the countAdjustment struct for the given redNo:
627         countAdjustment &adj(int number);
628         //Shift the list of countAdjustments down
629         void shiftAdjVec(void);
630
631 protected:
632         //whether to notify children that reduction starts
633         CmiBool disableNotifyChildrenStart;
634
635 //Checkpointing utilities
636 public:
637 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
638     int *perProcessorCounts;
639     int processorCount;
640     int totalCount;
641     int numberReductionMessages(){
642             if(totalCount != 0){
643                 return totalCount;
644             }else{
645                 return MAX_INT;
646             }
647     }
648 #endif
649         virtual void pup(PUP::er &p);
650         static int isIrreducible(){ return 0;}
651         void contributeViaMessage(CkReductionMsg *m);
652 };
653
654 //Define methods used to contribute to the given reduction type.
655 //  Data is copied, not deleted.
656 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
657   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
658         CMK_REFNUM_TYPE userFlag=-1); \
659   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
660         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=-1); \
661   void contribute(CkReductionMsg *msg);\*/
662
663 #define CkReductionTarget(me, method) \
664     CkIndex_##me::redn_wrapper_##method(NULL)
665
666 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
667 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
668         CMK_REFNUM_TYPE userFlag)\
669 {\
670         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
671         msg->setUserFlag(userFlag);\
672         msg->setMigratableContributor(migratable);\
673         myRednMgr->contribute(&myRednInfo,msg);\
674 }\
675 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
676         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
677 {\
678         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
679         msg->setUserFlag(userFlag);\
680         msg->setCallback(cb);\
681         msg->setMigratableContributor(migratable);\
682         myRednMgr->contribute(&myRednInfo,msg);\
683 }\
684 void me::contribute(CkReductionMsg *msg) \
685         {\
686         msg->setMigratableContributor(migratable);\
687         myRednMgr->contribute(&myRednInfo,msg);\
688         }\
689 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
690 {\
691         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
692     msg->setUserFlag(userFlag);\
693     msg->setCallback(cb);\
694     msg->setMigratableContributor(migratable);\
695     myRednMgr->contribute(&myRednInfo,msg);\
696 }\
697 void me::contribute(CMK_REFNUM_TYPE userFlag)\
698 {\
699     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
700     msg->setUserFlag(userFlag);\
701     msg->setMigratableContributor(migratable);\
702     myRednMgr->contribute(&myRednInfo,msg);\
703 }\
704
705
706 //A group that can contribute to reductions
707 class Group : public CkReductionMgr
708 {
709         contributorInfo reductionInfo;//My reduction information
710  public:
711         Group();
712         Group(CkMigrateMessage *msg);
713         virtual int isNodeGroup() { return 0; }
714         virtual void pup(PUP::er &p);
715         virtual void flushStates() {
716                 CkReductionMgr::flushStates(1);
717                 reductionInfo.redNo = 0;
718         }
719         virtual void CkAddThreadListeners(CthThread tid, void *msg);
720
721         CK_REDUCTION_CONTRIBUTE_METHODS_DECL
722 };
723
724 #ifdef _PIPELINED_ALLREDUCE_
725 class AllreduceMgr
726 {
727 public:
728         AllreduceMgr() { fragsRecieved=0; size=0; }
729         friend class ArrayElement;
730         // recieve an allreduce message
731         void allreduce_recieve(CkReductionMsg* msg)
732         {
733                 // allred_msgs.enq(msg);
734                 fragsRecieved++;
735                 if(fragsRecieved==1)
736                 {
737                         data = new char[FRAG_SIZE*msg->nFrags];
738                 }
739                 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
740                 size += msg->dataSize;
741                 
742                 if(fragsRecieved==msg->nFrags) {
743                         CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
744                         cb.send(ret);
745                         fragsRecieved=0; size=0;
746                         delete [] data;
747                 }
748                 
749         }
750         // TODO: check for same reduction
751         CkCallback cb;  
752         int size;
753         char* data;
754         int fragsRecieved;
755         // CkMsgQ<CkReductionMsg> allred_msgs;
756 };
757 #endif // _PIPELINED_ALLREDUCE_
758
759 #endif //_CKREDUCTION_H