1282b7c4e42ea1943324d153796cdd5814e98dcf
[charm.git] / src / ck-core / ckreduction.h
1 /*
2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
5
6
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE.  This library provides several different kinds of
11 combination routines (reducers), and various utilities
12 for supporting them.
13
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
16
17 */
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
20
21 #include "CkReduction.decl.h"
22 #include "CkArrayReductionMgr.decl.h"
23
24 #if CMK_BIGSIM_CHARM || CMK_MULTICORE || !CMK_SMP
25 #define GROUP_LEVEL_REDUCTION           1
26 #endif
27
28 #ifdef _PIPELINED_ALLREDUCE_
29 #define FRAG_SIZE 131072
30 #define FRAG_THRESHOLD 131072
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 #define MAX_INT 5000000
35 #endif
36
37 //This message is sent between group objects on a single PE
38 // to let each know the other has been created.
39 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
40 public:
41         typedef void (*callbackType)(void *param);
42         CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
43                 {callback=Ncallback;param=Nparam;}
44         void call(void) {(*callback)(param);}
45 private:
46         callbackType callback;
47         void *param;
48 };
49
50 class CkGroupInitCallback : public IrrGroup {
51 public:
52         CkGroupInitCallback(void);
53         CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
54         void callMeBack(CkGroupCallbackMsg *m);
55         void pup(PUP::er& p){ IrrGroup::pup(p); }
56 };
57
58
59 class CkGroupReadyCallback : public IrrGroup {
60 private:
61   int _isReady;
62   CkQ<CkGroupCallbackMsg *> _msgs;
63   void callBuffered(void);
64 public:
65         CkGroupReadyCallback(void);
66         CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
67         void callMeBack(CkGroupCallbackMsg *m);
68         int isReady(void) { return _isReady; }
69 protected:
70         void setReady(void) {_isReady = 1; callBuffered(); }
71         void setNotReady(void) {_isReady = 0; }
72 };
73
74 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
75 public:
76   int num;
77   CkReductionNumberMsg(int n) {num=n;}
78 };
79
80
81 /**some data classes used by both ckreductionmgr and cknodereductionmgr**/
82 class contributorInfo {
83 public:
84         int redNo;//Current reduction number
85         contributorInfo() {redNo=0;}
86         //Migration utilities:
87         void pup(PUP::er &p);
88 };
89
90 class countAdjustment {
91 public:
92         int gcount;//Adjustment to global count (applied at reduction end)
93         int lcount;//Adjustment to local count (applied continually)
94         int mainRecvd;
95         countAdjustment(int ignored=0) {gcount=lcount=0;mainRecvd=0;}
96         void pup(PUP::er& p){ p|gcount; p|lcount; p|mainRecvd; }
97 };
98
99 /** @todo: Fwd decl for a temporary class. Remove after
100  * delegated cross-array reductions are implemented more optimally
101  */
102 namespace ck { namespace impl { class XArraySectionReducer; } }
103
104 //CkReduction is just a "namespace class" for the user-visible
105 // parts of the reduction system.
106 class CkReduction {
107 public:
108         /*These are the reducers you can use,
109           in addition to any user-defined reducers.*/
110
111         /*  !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
112             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
113
114                  remember to update CkReduction::reducerTable
115
116             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
117             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!  */
118
119         typedef enum {
120         //A placeholder invalid reduction type
121                 invalid=0,
122                 nop,
123         //Compute the sum the numbers passed by each element.
124                 sum_int,sum_long,sum_float,sum_double,
125
126         //Compute the product the numbers passed by each element.
127                 product_int,product_long,product_float,product_double,
128
129         //Compute the largest number passed by any element.
130                 max_int,max_long,max_float,max_double,
131
132         //Compute the smallest number passed by any element.
133                 min_int,min_long,min_float,min_double,
134
135         //Compute the logical AND of the integers passed by each element.
136         // The resulting integer will be zero if any source integer is zero.
137                 logical_and,
138
139         //Compute the logical OR of the integers passed by each element.
140         // The resulting integer will be 1 if any source integer is nonzero.
141                 logical_or,
142
143                 // Compute the logical bitvector AND of the integers passed by each element.
144                 bitvec_and,
145
146                 // Compute the logical bitvector OR of the integers passed by each element.
147                 bitvec_or,
148
149         // Select one message at random to pass on
150                 random,
151
152         //Concatenate the (arbitrary) data passed by each element
153                 concat,
154
155         //Combine the data passed by each element into an list of setElements.
156         // Each element may contribute arbitrary data (with arbitrary length).
157                 set,
158
159         //Last system-defined reducer number (user-defined reducers follow)
160                 lastSystemReducer
161         } reducerType;
162
163         //This structure is used with the set reducer above,
164         // and contains the data from one contribution.
165         class setElement {
166         public:
167                 int dataSize;//The length of the data array below
168                 char data[1];//The (dataSize-long) array of data
169                 //Utility routine: get the next setElement,
170                 // or return NULL if there are none.
171                 setElement *next(void);
172         };
173
174 //Support for adding new reducerTypes:
175         //A reducerFunction is used to combine several contributions
176         //into a single summed contribution:
177         //  nMsg gives the number of messages to reduce.
178         //  msgs[i] contains a contribution or summed contribution.
179         typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
180
181         //Add the given reducer to the list.  Returns the new reducer's
182         // reducerType.  Must be called in the same order on every node.
183         static reducerType addReducer(reducerFn fn);
184
185 private:
186         friend class CkReductionMgr;
187         friend class CkNodeReductionMgr;
188         friend class CkArrayReductionMgr;
189         friend class CkMulticastMgr;
190     friend class ck::impl::XArraySectionReducer;
191 //System-level interface
192         //This is the maximum number of possible reducers,
193         // including both builtin and user-defined types
194         enum {MAXREDUCERS=256};
195
196         //Reducer table: maps reducerTypes to reducerFns.
197         static reducerFn reducerTable[MAXREDUCERS];
198         static int nReducers;//Number of reducers currently in table above
199
200         //Don't instantiate a CkReduction object-- it's just a namespace.
201         CkReduction();
202 };
203
204
205 //A CkReductionMsg is sent up the reduction tree-- it
206 // carries a contribution, or several reduced contributions.
207 class CkReductionMsg : public CMessage_CkReductionMsg
208 {
209         friend class CkReduction;
210         friend class CkReductionMgr;
211         friend class CkNodeReductionMgr;
212         friend class CkArrayReductionMgr;
213         friend class CkMulticastMgr;
214 #ifdef _PIPELINED_ALLREDUCE_
215         friend class ArrayElement;
216         friend class AllreduceMgr;
217 #endif
218         friend class ck::impl::XArraySectionReducer;
219 public:
220
221 //Publically-accessible fields:
222         //"Constructor"-- builds and returns a new CkReductionMsg.
223         //  the "srcData" array you specify will be copied into this object (unless NULL).
224         static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
225                 CkReduction::reducerType reducer=CkReduction::invalid,
226                 CkReductionMsg *buf = NULL);
227
228         inline int getLength(void) const {return dataSize;}
229         inline int getSize(void) const {return dataSize;}
230         inline void *getData(void) {return data;}
231         inline const void *getData(void) const {return data;}
232
233         inline int getGcount(void){return gcount;}
234         inline CkReduction::reducerType getReducer(void){return reducer;}
235         inline int getRedNo(void){return redNo;}
236
237         inline CMK_REFNUM_TYPE getUserFlag(void) const {return userFlag;}
238         inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
239
240         inline void setCallback(const CkCallback &cb) { callback=cb; }
241
242         //Return true if this message came straight from a contribute call--
243         // if it didn't come from a previous reduction function.
244         inline int isFromUser(void) const {return sourceFlag==-1;}
245
246         inline bool isMigratableContributor(void) const {return migratableContributor;}
247         inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
248
249         ~CkReductionMsg();
250
251 //Implementation-only fields (don't access these directly!)
252         //Msg runtime support
253         static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
254         static void *pack(CkReductionMsg *);
255         static CkReductionMsg *unpack(void *in);
256
257 private:
258         int dataSize;//Length of array below, in bytes
259         void *data;//Reduction data
260         CMK_REFNUM_TYPE userFlag; //Some sort of identifying flag, for client use
261         CkCallback callback; //What to do when done
262         CkCallback secondaryCallback; // the group callback is piggybacked on the nodegrp reduction
263         bool migratableContributor; // are the contributors migratable
264
265         int sourceFlag;/*Flag:
266                 0 indicates this is a placeholder message (meaning: nothing to report)
267                 -1 indicates this is a single (non-reduced) contribution.
268                 >0 indicates this is a reduced contribution.
269         */
270         int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
271 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
272     int sourceProcessorCount;
273     int fromPE;
274 #endif
275 private:
276 #if CMK_BIGSIM_CHARM
277         void *log;
278 #endif
279         CkReduction::reducerType reducer;
280         //contributorInfo *ci;//Source contributor, or NULL if none
281         int redNo;//The serial number of this reduction
282         int gcount;//Contribution to the global contributor count
283         // for section multicast/reduction library
284         CkSectionInfo sid;   // section cookie for multicast
285         char rebuilt;          // indicate if the multicast tree needs rebuilt
286         int nFrags;
287         int fragNo;      // fragment of a reduction msg (when pipelined)
288                          // value = 0 to nFrags-1
289         double dataStorage;//Start of data array (so it's double-aligned)
290
291         int no;
292
293         //Default constructor is private so you must use "buildNew", above
294         CkReductionMsg();
295 };
296
297
298 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
299   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
300         CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
301   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
302         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
303   void contribute(CkReductionMsg *msg); \
304   void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
305   void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
306
307
308 class CkNodeReductionMgr : public IrrGroup {
309 public:
310         CProxy_CkNodeReductionMgr thisProxy;
311 public:
312         CkNodeReductionMgr(void);
313         CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
314           storedCallback = NULL;
315         }
316
317         typedef CkReductionClientFn clientFn;
318
319         /**
320          * Add the given client function.  Overwrites any previous client.
321          * This manager will dispose of the callback when replaced or done.
322          */
323         void ckSetReductionClient(CkCallback *cb);
324
325 //Contribute-- the given msg can contain any data.  The reducerType
326 // field of the message must be valid.
327 // Each contributor must contribute exactly once to each reduction.
328         void contribute(contributorInfo *ci,CkReductionMsg *msg);
329         void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
330 //Communication (library-private)
331         void restartLocalGroupReductions(int number);
332         //Sent down the reduction tree (used by barren PEs)
333         void ReductionStarting(CkReductionNumberMsg *m);
334         //Sent up the reduction tree with reduced data
335         void RecvMsg(CkReductionMsg *m);
336         void doRecvMsg(CkReductionMsg *m);
337         void LateMigrantMsg(CkReductionMsg *m);
338
339         virtual void flushStates();     // flush state varaibles
340         virtual int startLocalGroupReductions(int number){ return 1;} // can be used to start reductions on all the 
341         //CkReductionMgrs on a particular node. It is overwritten by CkArrayReductionMgr to make the actual calls
342         // since it knows the CkReductionMgrs on a node.
343
344         virtual int getTotalGCount(){return 0;};
345
346 private:
347 //Data members
348         //Stored callback function (may be NULL if none has been set)
349         CkCallback *storedCallback;
350
351         int redNo;//Number of current reduction (incremented at end)
352         CmiBool inProgress;//Is a reduction started, but not complete?
353         CmiBool creating;//Are elements still being created?
354         CmiBool startRequested;//Should we start the next reduction when creation finished?
355         int gcount;
356         int lcount;//Number of local contributors
357
358         //Current local and remote contributions
359         int nContrib,nRemote;
360         //Contributions queued for the current reduction
361         CkMsgQ<CkReductionMsg> msgs;
362         //Contributions queued for future reductions (sent to us too early)
363         CkMsgQ<CkReductionMsg> futureMsgs;
364         //Remote messages queued for future reductions (sent to us too early)
365         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
366         //Late migrant messages queued for future reductions
367         CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
368         
369         //My Big LOCK
370         CmiNodeLock lockEverything;
371
372         int interrupt; /* flag for use in non-smp 0 means interrupt can occur 1 means not (also acts as a lock)*/
373
374         /*vector storing the children of this node*/
375         CkVec<int> kids;
376         
377 //State:
378         void startReduction(int number,int srcPE);
379         void doAddContribution(CkReductionMsg *m);
380         void finishReduction(void);
381 protected:      
382         void addContribution(CkReductionMsg *m);
383
384 private:
385
386 //Reduction tree utilities
387 /* for binomial trees*/
388         unsigned upperSize;
389         unsigned label;
390         int parent;
391         int numKids;
392 //      int *kids;
393         void init_BinomialTree();
394
395         
396         void init_BinaryTree();
397         enum {TREE_WID=2};
398         int treeRoot(void);//Root PE
399         CmiBool hasParent(void);
400         int treeParent(void);//My parent PE
401         int firstKid(void);//My first child PE
402         int treeKids(void);//Number of children in tree
403
404         //Combine (& free) the current message vector.
405         CkReductionMsg *reduceMessages(void);
406
407         //Map reduction number to a time
408         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
409         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
410         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
411
412         /*FAULT_EVAC*/
413         bool oldleaf;
414         bool blocked;
415         int newParent;
416         int additionalGCount,newAdditionalGCount; //gcount that gets passed to u from the node u replace
417         CkVec<int> newKids;
418         CkMsgQ<CkReductionMsg> bufferedMsgs;
419         CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
420         enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
421         int numModificationReplies;
422         int maxModificationRedNo;
423         int tempModificationRedNo;
424         bool readyDeletion;
425         int killed;     
426         
427 //Checkpointing utilities
428  public:
429         virtual void pup(PUP::er &p);
430         /*FAULT_EVAC*/
431         virtual void evacuate();
432         virtual void doneEvacuate();
433         void DeleteChild(int deletedChild);
434         void DeleteNewChild(int deletedChild);
435         void collectMaxRedNo(int maxRedNo);
436         void unblockNode(int maxRedNo);
437         void modifyTree(int code,int size,int *data);
438 private:        
439         int findMaxRedNo();
440         void updateTree();
441         void clearBlockedMsgs();
442 };
443
444
445
446 //A NodeGroup that contribute to reductions
447 class NodeGroup : public CkNodeReductionMgr {
448   protected:
449     contributorInfo reductionInfo;//My reduction information
450   public:
451     CmiNodeLock __nodelock;
452     NodeGroup();
453     NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m) { __nodelock=CmiCreateLock(); }
454     
455     ~NodeGroup();
456     inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
457     inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
458     virtual int isNodeGroup() { return 1; }
459
460     virtual void pup(PUP::er &p);
461     virtual void flushStates() {
462         CkNodeReductionMgr::flushStates();
463         reductionInfo.redNo = 0;
464     }
465
466     CK_REDUCTION_CONTRIBUTE_METHODS_DECL
467     void contributeWithCounter(CkReductionMsg *msg,int count);
468 };
469
470
471 /**
472  * One CkReductionMgr runs a non-overlapping set of reductions.
473  * It collects messages from all local contributors, then sends
474  * the reduced message up the reduction tree to node zero, where
475  * they're passed to the user's client function.
476  */
477 class CkNodeReductionMgr;
478
479 class CProxy_CkArrayReductionMgr;
480 class CkReductionMgr : public CkGroupInitCallback {
481 public:
482         CProxy_CkReductionMgr thisProxy;
483
484 public:
485         CProxy_CkArrayReductionMgr nodeProxy; //holds the local branch of the nodegroup tree
486         CkReductionMgr(void);
487         CkReductionMgr(CkMigrateMessage *m);
488
489         typedef CkReductionClientFn clientFn;
490
491         /**
492          * Add the given client function.  Overwrites any previous client.
493          * This manager will dispose of the callback when replaced or done.
494          */
495         void ckSetReductionClient(CkCallback *cb);
496
497 //Contributors keep a copy of this structure:
498
499
500 //Contributor list maintainance:
501         //These just set and clear the "creating" flag to prevent
502         // reductions from finishing early because not all elements
503         // have been created.
504         void creatingContributors(void);
505         void doneCreatingContributors(void);
506         //Initializes a new contributor
507         void contributorStamped(contributorInfo *ci);//Increment global number
508         void contributorCreated(contributorInfo *ci);//Increment local number
509         void contributorDied(contributorInfo *ci);//Don't expect more contributions
510         //Migrating away
511         void contributorLeaving(contributorInfo *ci);
512         //Migrating in
513         void contributorArriving(contributorInfo *ci);
514
515 //Contribute-- the given msg can contain any data.  The reducerType
516 // field of the message must be valid.
517 // Each contributor must contribute exactly once to each reduction.
518         void contribute(contributorInfo *ci,CkReductionMsg *msg);
519
520 //Communication (library-private)
521         //Sent down the reduction tree (used by barren PEs)
522         void ReductionStarting(CkReductionNumberMsg *m);
523         //Sent to root of the reduction tree with late migrant data
524         void LateMigrantMsg(CkReductionMsg *m);
525         //A late migrating contributor will never contribute
526         void MigrantDied(CkReductionNumberMsg *m);
527
528         void RecvMsg(CkReductionMsg *m);
529
530         //Call back for using Node added by Sayantan
531         void ArrayReductionHandler(CkReductionMsg *m);
532         void endArrayReduction();
533
534         virtual CmiBool isReductionMgr(void){ return CmiTrue; }
535         virtual void flushStates(int isgroup);
536         /*FAULT_EVAC: used to get the gcount on a processor when 
537                 it is evacuated.
538                 TODO: It needs to be fixed as it should return the gcount
539                 and the adjustment information for objects that might have
540                 contributed and died.
541                 The current implementation lets us get by in the case
542                 when there are no gcount
543         */
544         int getGCount(){return gcount;};
545         static void sanitycheck();
546 private:
547
548
549 //Data members
550         //Stored callback function (may be NULL if none has been set)
551         CkCallback storedCallback;
552         // calback that came along with the contribute
553         CkCallback *secondaryStoredCallback;
554
555         int redNo;//Number of current reduction (incremented at end) to be deposited with NodeGroups
556         int completedRedNo;//Number of reduction Completed ie recieved callback from NodeGroups
557         CmiBool inProgress;//Is a reduction started, but not complete?
558         CmiBool creating;//Are elements still being created?
559         CmiBool startRequested;//Should we start the next reduction when creation finished?
560         int gcount;//=el't created here - el't deleted here
561         int lcount;//Number of local contributors
562         int maxStartRequest; // the highest future ReductionStarting message received
563
564         //Current local and remote contributions
565         int nContrib,nRemote;
566         //Contributions queued for the current reduction
567         CkMsgQ<CkReductionMsg> msgs;
568
569         //Contributions queued for future reductions (sent to us too early)
570         CkMsgQ<CkReductionMsg> futureMsgs;
571         //Remote messages queued for future reductions (sent to us too early)
572         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
573
574         CkMsgQ<CkReductionMsg> finalMsgs;
575
576 //State:
577         void startReduction(int number,int srcPE);
578         void addContribution(CkReductionMsg *m);
579         void finishReduction(void);
580
581 #if GROUP_LEVEL_REDUCTION
582 //Reduction tree utilities
583         unsigned upperSize;
584         unsigned label;
585         int parent;
586         int numKids;
587         /*vector storing the children of this node*/
588         CkVec<int> newKids;
589         CkVec<int> kids;
590         void init_BinomialTree();
591
592         void init_BinaryTree();
593         enum {TREE_WID=2};
594         int treeRoot(void);//Root PE
595         CmiBool hasParent(void);
596         int treeParent(void);//My parent PE
597         int firstKid(void);//My first child PE
598         int treeKids(void);//Number of children in tree
599 #endif
600
601         //Combine (& free) the current message vector.
602         CkReductionMsg *reduceMessages(void);
603
604         //Map reduction number to a time
605         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
606         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
607         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
608
609
610         //This vector of adjustments is indexed by redNo,
611         // starting from the current redNo.
612         CkVec<countAdjustment> adjVec;
613         //Return the countAdjustment struct for the given redNo:
614         countAdjustment &adj(int number);
615         //Shift the list of countAdjustments down
616         void shiftAdjVec(void);
617
618 protected:
619         //whether to notify children that reduction starts
620         CmiBool disableNotifyChildrenStart;
621
622 //Checkpointing utilities
623 public:
624 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
625     int *perProcessorCounts;
626     int processorCount;
627     int totalCount;
628     int numberReductionMessages(){
629             if(totalCount != 0){
630                 return totalCount;
631             }else{
632                 return MAX_INT;
633             }
634     }
635 #endif
636         virtual void pup(PUP::er &p);
637         static int isIrreducible(){ return 0;}
638         void contributeViaMessage(CkReductionMsg *m);
639 };
640
641 //Define methods used to contribute to the given reduction type.
642 //  Data is copied, not deleted.
643 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
644   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
645         CMK_REFNUM_TYPE userFlag=-1); \
646   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
647         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=-1); \
648   void contribute(CkReductionMsg *msg);\*/
649
650 #define CkReductionTarget(me, method) \
651     CkIndex_##me::method##_redn_wrapper(NULL)
652
653 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
654 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
655         CMK_REFNUM_TYPE userFlag)\
656 {\
657         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
658         msg->setUserFlag(userFlag);\
659         msg->setMigratableContributor(migratable);\
660         myRednMgr->contribute(&myRednInfo,msg);\
661 }\
662 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
663         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
664 {\
665         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
666         msg->setUserFlag(userFlag);\
667         msg->setCallback(cb);\
668         msg->setMigratableContributor(migratable);\
669         myRednMgr->contribute(&myRednInfo,msg);\
670 }\
671 void me::contribute(CkReductionMsg *msg) \
672         {\
673         msg->setMigratableContributor(migratable);\
674         myRednMgr->contribute(&myRednInfo,msg);\
675         }\
676 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
677 {\
678         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
679     msg->setUserFlag(userFlag);\
680     msg->setCallback(cb);\
681     msg->setMigratableContributor(migratable);\
682     myRednMgr->contribute(&myRednInfo,msg);\
683 }\
684 void me::contribute(CMK_REFNUM_TYPE userFlag)\
685 {\
686     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
687     msg->setUserFlag(userFlag);\
688     msg->setMigratableContributor(migratable);\
689     myRednMgr->contribute(&myRednInfo,msg);\
690 }\
691
692
693 //A group that can contribute to reductions
694 class Group : public CkReductionMgr
695 {
696         contributorInfo reductionInfo;//My reduction information
697  public:
698         Group();
699         Group(CkMigrateMessage *msg);
700         virtual int isNodeGroup() { return 0; }
701         virtual void pup(PUP::er &p);
702         virtual void flushStates() {
703                 CkReductionMgr::flushStates(1);
704                 reductionInfo.redNo = 0;
705         }
706         virtual void CkAddThreadListeners(CthThread tid, void *msg);
707
708         CK_REDUCTION_CONTRIBUTE_METHODS_DECL
709 };
710
711 #ifdef _PIPELINED_ALLREDUCE_
712 class AllreduceMgr
713 {
714 public:
715         AllreduceMgr() { fragsRecieved=0; size=0; }
716         friend class ArrayElement;
717         // recieve an allreduce message
718         void allreduce_recieve(CkReductionMsg* msg)
719         {
720                 // allred_msgs.enq(msg);
721                 fragsRecieved++;
722                 if(fragsRecieved==1)
723                 {
724                         data = new char[FRAG_SIZE*msg->nFrags];
725                 }
726                 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
727                 size += msg->dataSize;
728                 
729                 if(fragsRecieved==msg->nFrags) {
730                         CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
731                         cb.send(ret);
732                         fragsRecieved=0; size=0;
733                         delete [] data;
734                 }
735                 
736         }
737         // TODO: check for same reduction
738         CkCallback cb;  
739         int size;
740         char* data;
741         int fragsRecieved;
742         // CkMsgQ<CkReductionMsg> allred_msgs;
743 };
744 #endif // _PIPELINED_ALLREDUCE_
745
746 #endif //_CKREDUCTION_H