9cede818803d3a12ab4d111a56cc4fa554f33323
[charm.git] / src / ck-core / ckreduction.h
1 /*
2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
5
6
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE.  This library provides several different kinds of
11 combination routines (reducers), and various utilities
12 for supporting them.
13
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
16
17 */
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
20
21 #include "CkReduction.decl.h"
22 #include "CkArrayReductionMgr.decl.h"
23
24 #if CMK_BIGSIM_CHARM || CMK_MULTICORE || !CMK_SMP
25 #define GROUP_LEVEL_REDUCTION           1
26 #endif
27
28 #ifdef _PIPELINED_ALLREDUCE_
29 #define FRAG_SIZE 131072
30 #define FRAG_THRESHOLD 131072
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 #define MAX_INT 5000000
35 #define _MLOG_REDUCE_P2P_ 0
36 #endif
37
38 //This message is sent between group objects on a single PE
39 // to let each know the other has been created.
40 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
41 public:
42         typedef void (*callbackType)(void *param);
43         CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
44                 {callback=Ncallback;param=Nparam;}
45         void call(void) {(*callback)(param);}
46 private:
47         callbackType callback;
48         void *param;
49 };
50
51 class CkGroupInitCallback : public IrrGroup {
52 public:
53         CkGroupInitCallback(void);
54         CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
55         void callMeBack(CkGroupCallbackMsg *m);
56         void pup(PUP::er& p){ IrrGroup::pup(p); }
57 };
58
59
60 class CkGroupReadyCallback : public IrrGroup {
61 private:
62   int _isReady;
63   CkQ<CkGroupCallbackMsg *> _msgs;
64   void callBuffered(void);
65 public:
66         CkGroupReadyCallback(void);
67         CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
68         void callMeBack(CkGroupCallbackMsg *m);
69         int isReady(void) { return _isReady; }
70 protected:
71         void setReady(void) {_isReady = 1; callBuffered(); }
72         void setNotReady(void) {_isReady = 0; }
73 };
74
75 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
76 public:
77   int num;
78   CkReductionNumberMsg(int n) {num=n;}
79 };
80
81
82 /**some data classes used by both ckreductionmgr and cknodereductionmgr**/
83 class contributorInfo {
84 public:
85         int redNo;//Current reduction number
86         contributorInfo() {redNo=0;}
87         //Migration utilities:
88         void pup(PUP::er &p);
89 };
90
91 class countAdjustment {
92 public:
93         int gcount;//Adjustment to global count (applied at reduction end)
94         int lcount;//Adjustment to local count (applied continually)
95         int mainRecvd;
96         countAdjustment(int ignored=0) {gcount=lcount=0;mainRecvd=0;}
97         void pup(PUP::er& p){ p|gcount; p|lcount; p|mainRecvd; }
98 };
99
100 /** @todo: Fwd decl for a temporary class. Remove after
101  * delegated cross-array reductions are implemented more optimally
102  */
103 namespace ck { namespace impl { class XArraySectionReducer; } }
104
105 //CkReduction is just a "namespace class" for the user-visible
106 // parts of the reduction system.
107 class CkReduction {
108 public:
109         /*These are the reducers you can use,
110           in addition to any user-defined reducers.*/
111
112         /*  !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
113             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
114
115                  remember to update CkReduction::reducerTable
116
117             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
118             !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!  */
119
120         typedef enum {
121         //A placeholder invalid reduction type
122                 invalid=0,
123                 nop,
124         //Compute the sum the numbers passed by each element.
125                 sum_int,sum_long,sum_float,sum_double,
126
127         //Compute the product the numbers passed by each element.
128                 product_int,product_long,product_float,product_double,
129
130         //Compute the largest number passed by any element.
131                 max_int,max_long,max_float,max_double,
132
133         //Compute the smallest number passed by any element.
134                 min_int,min_long,min_float,min_double,
135
136         //Compute the logical AND of the integers passed by each element.
137         // The resulting integer will be zero if any source integer is zero.
138                 logical_and,
139
140         //Compute the logical OR of the integers passed by each element.
141         // The resulting integer will be 1 if any source integer is nonzero.
142                 logical_or,
143
144                 // Compute the logical bitvector AND of the integers passed by each element.
145                 bitvec_and,
146
147                 // Compute the logical bitvector OR of the integers passed by each element.
148                 bitvec_or,
149
150         // Select one message at random to pass on
151                 random,
152
153         //Concatenate the (arbitrary) data passed by each element
154                 concat,
155
156         //Combine the data passed by each element into an list of setElements.
157         // Each element may contribute arbitrary data (with arbitrary length).
158                 set,
159
160         //Last system-defined reducer number (user-defined reducers follow)
161                 lastSystemReducer
162         } reducerType;
163
164         //This structure is used with the set reducer above,
165         // and contains the data from one contribution.
166         class setElement {
167         public:
168                 int dataSize;//The length of the data array below
169                 char data[1];//The (dataSize-long) array of data
170                 //Utility routine: get the next setElement,
171                 // or return NULL if there are none.
172                 setElement *next(void);
173         };
174
175 //Support for adding new reducerTypes:
176         //A reducerFunction is used to combine several contributions
177         //into a single summed contribution:
178         //  nMsg gives the number of messages to reduce.
179         //  msgs[i] contains a contribution or summed contribution.
180         typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
181
182         //Add the given reducer to the list.  Returns the new reducer's
183         // reducerType.  Must be called in the same order on every node.
184         static reducerType addReducer(reducerFn fn);
185
186 private:
187         friend class CkReductionMgr;
188         friend class CkNodeReductionMgr;
189         friend class CkArrayReductionMgr;
190         friend class CkMulticastMgr;
191     friend class ck::impl::XArraySectionReducer;
192 //System-level interface
193         //This is the maximum number of possible reducers,
194         // including both builtin and user-defined types
195         enum {MAXREDUCERS=256};
196
197         //Reducer table: maps reducerTypes to reducerFns.
198         static reducerFn reducerTable[MAXREDUCERS];
199         static int nReducers;//Number of reducers currently in table above
200
201         //Don't instantiate a CkReduction object-- it's just a namespace.
202         CkReduction();
203 };
204 PUPbytes(CkReduction::reducerType)
205
206 //A CkReductionMsg is sent up the reduction tree-- it
207 // carries a contribution, or several reduced contributions.
208 class CkReductionMsg : public CMessage_CkReductionMsg
209 {
210         friend class CkReduction;
211         friend class CkReductionMgr;
212         friend class CkNodeReductionMgr;
213         friend class CkArrayReductionMgr;
214         friend class CkMulticastMgr;
215 #ifdef _PIPELINED_ALLREDUCE_
216         friend class ArrayElement;
217         friend class AllreduceMgr;
218 #endif
219         friend class ck::impl::XArraySectionReducer;
220 public:
221
222 //Publically-accessible fields:
223         //"Constructor"-- builds and returns a new CkReductionMsg.
224         //  the "srcData" array you specify will be copied into this object (unless NULL).
225         static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
226                 CkReduction::reducerType reducer=CkReduction::invalid,
227                 CkReductionMsg *buf = NULL);
228
229         inline int getLength(void) const {return dataSize;}
230         inline int getSize(void) const {return dataSize;}
231         inline void *getData(void) {return data;}
232         inline const void *getData(void) const {return data;}
233
234         inline int getGcount(void){return gcount;}
235         inline CkReduction::reducerType getReducer(void){return reducer;}
236         inline int getRedNo(void){return redNo;}
237
238         inline CMK_REFNUM_TYPE getUserFlag(void) const {return userFlag;}
239         inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
240
241         inline void setCallback(const CkCallback &cb) { callback=cb; }
242
243         //Return true if this message came straight from a contribute call--
244         // if it didn't come from a previous reduction function.
245         inline int isFromUser(void) const {return sourceFlag==-1;}
246
247         inline bool isMigratableContributor(void) const {return migratableContributor;}
248         inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
249
250         ~CkReductionMsg();
251
252 //Implementation-only fields (don't access these directly!)
253         //Msg runtime support
254         static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
255         static void *pack(CkReductionMsg *);
256         static CkReductionMsg *unpack(void *in);
257
258 private:
259         int dataSize;//Length of array below, in bytes
260         void *data;//Reduction data
261         CMK_REFNUM_TYPE userFlag; //Some sort of identifying flag, for client use
262         CkCallback callback; //What to do when done
263         CkCallback secondaryCallback; // the group callback is piggybacked on the nodegrp reduction
264         bool migratableContributor; // are the contributors migratable
265
266         int sourceFlag;/*Flag:
267                 0 indicates this is a placeholder message (meaning: nothing to report)
268                 -1 indicates this is a single (non-reduced) contribution.
269                 >0 indicates this is a reduced contribution.
270         */
271         int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
272 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
273     int sourceProcessorCount;
274     int fromPE;
275 #endif
276 private:
277 #if CMK_BIGSIM_CHARM
278         void *log;
279 #endif
280         CkReduction::reducerType reducer;
281         //contributorInfo *ci;//Source contributor, or NULL if none
282         int redNo;//The serial number of this reduction
283         int gcount;//Contribution to the global contributor count
284         // for section multicast/reduction library
285         CkSectionInfo sid;   // section cookie for multicast
286         char rebuilt;          // indicate if the multicast tree needs rebuilt
287         int nFrags;
288         int fragNo;      // fragment of a reduction msg (when pipelined)
289                          // value = 0 to nFrags-1
290         double dataStorage;//Start of data array (so it's double-aligned)
291
292         int no;
293
294         //Default constructor is private so you must use "buildNew", above
295         CkReductionMsg();
296 };
297
298
299 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
300   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
301         CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
302   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
303         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
304   void contribute(CkReductionMsg *msg); \
305   void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
306   void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
307
308
309 class CkNodeReductionMgr : public IrrGroup {
310 public:
311         CProxy_CkNodeReductionMgr thisProxy;
312 public:
313         CkNodeReductionMgr(void);
314         CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
315           storedCallback = NULL;
316         }
317
318         typedef CkReductionClientFn clientFn;
319
320         /**
321          * Add the given client function.  Overwrites any previous client.
322          * This manager will dispose of the callback when replaced or done.
323          */
324         void ckSetReductionClient(CkCallback *cb);
325
326 //Contribute-- the given msg can contain any data.  The reducerType
327 // field of the message must be valid.
328 // Each contributor must contribute exactly once to each reduction.
329         void contribute(contributorInfo *ci,CkReductionMsg *msg);
330         void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
331 //Communication (library-private)
332         void restartLocalGroupReductions(int number);
333         //Sent down the reduction tree (used by barren PEs)
334         void ReductionStarting(CkReductionNumberMsg *m);
335         //Sent up the reduction tree with reduced data
336         void RecvMsg(CkReductionMsg *m);
337         void doRecvMsg(CkReductionMsg *m);
338         void LateMigrantMsg(CkReductionMsg *m);
339
340         virtual void flushStates();     // flush state varaibles
341         virtual int startLocalGroupReductions(int number){ return 1;} // can be used to start reductions on all the 
342         //CkReductionMgrs on a particular node. It is overwritten by CkArrayReductionMgr to make the actual calls
343         // since it knows the CkReductionMgrs on a node.
344
345         virtual int getTotalGCount(){return 0;};
346
347 private:
348 //Data members
349         //Stored callback function (may be NULL if none has been set)
350         CkCallback *storedCallback;
351
352         int redNo;//Number of current reduction (incremented at end)
353         CmiBool inProgress;//Is a reduction started, but not complete?
354         CmiBool creating;//Are elements still being created?
355         CmiBool startRequested;//Should we start the next reduction when creation finished?
356         int gcount;
357         int lcount;//Number of local contributors
358
359         //Current local and remote contributions
360         int nContrib,nRemote;
361         //Contributions queued for the current reduction
362         CkMsgQ<CkReductionMsg> msgs;
363         //Contributions queued for future reductions (sent to us too early)
364         CkMsgQ<CkReductionMsg> futureMsgs;
365         //Remote messages queued for future reductions (sent to us too early)
366         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
367         //Late migrant messages queued for future reductions
368         CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
369         
370         //My Big LOCK
371         CmiNodeLock lockEverything;
372
373         int interrupt; /* flag for use in non-smp 0 means interrupt can occur 1 means not (also acts as a lock)*/
374
375         /*vector storing the children of this node*/
376         CkVec<int> kids;
377         
378 //State:
379         void startReduction(int number,int srcPE);
380         void doAddContribution(CkReductionMsg *m);
381         void finishReduction(void);
382 protected:      
383         void addContribution(CkReductionMsg *m);
384
385 private:
386
387 //Reduction tree utilities
388 /* for binomial trees*/
389         unsigned upperSize;
390         unsigned label;
391         int parent;
392         int numKids;
393 //      int *kids;
394         void init_BinomialTree();
395
396         
397         void init_BinaryTree();
398         enum {TREE_WID=2};
399         int treeRoot(void);//Root PE
400         CmiBool hasParent(void);
401         int treeParent(void);//My parent PE
402         int firstKid(void);//My first child PE
403         int treeKids(void);//Number of children in tree
404
405         //Combine (& free) the current message vector.
406         CkReductionMsg *reduceMessages(void);
407
408         //Map reduction number to a time
409         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
410         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
411         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
412
413         /*FAULT_EVAC*/
414         bool oldleaf;
415         bool blocked;
416         int newParent;
417         int additionalGCount,newAdditionalGCount; //gcount that gets passed to u from the node u replace
418         CkVec<int> newKids;
419         CkMsgQ<CkReductionMsg> bufferedMsgs;
420         CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
421         enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
422         int numModificationReplies;
423         int maxModificationRedNo;
424         int tempModificationRedNo;
425         bool readyDeletion;
426         int killed;     
427         
428 //Checkpointing utilities
429  public:
430         virtual void pup(PUP::er &p);
431         /*FAULT_EVAC*/
432         virtual void evacuate();
433         virtual void doneEvacuate();
434         void DeleteChild(int deletedChild);
435         void DeleteNewChild(int deletedChild);
436         void collectMaxRedNo(int maxRedNo);
437         void unblockNode(int maxRedNo);
438         void modifyTree(int code,int size,int *data);
439 private:        
440         int findMaxRedNo();
441         void updateTree();
442         void clearBlockedMsgs();
443 };
444
445
446
447 //A NodeGroup that contribute to reductions
448 class NodeGroup : public CkNodeReductionMgr {
449   protected:
450     contributorInfo reductionInfo;//My reduction information
451   public:
452     CmiNodeLock __nodelock;
453     NodeGroup();
454     NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m) { __nodelock=CmiCreateLock(); }
455     
456     ~NodeGroup();
457     inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
458     inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
459     virtual int isNodeGroup() { return 1; }
460
461     virtual void pup(PUP::er &p);
462     virtual void flushStates() {
463         CkNodeReductionMgr::flushStates();
464         reductionInfo.redNo = 0;
465     }
466
467     CK_REDUCTION_CONTRIBUTE_METHODS_DECL
468     void contributeWithCounter(CkReductionMsg *msg,int count);
469 };
470
471
472 /**
473  * One CkReductionMgr runs a non-overlapping set of reductions.
474  * It collects messages from all local contributors, then sends
475  * the reduced message up the reduction tree to node zero, where
476  * they're passed to the user's client function.
477  */
478 class CkNodeReductionMgr;
479
480 class CProxy_CkArrayReductionMgr;
481 class CkReductionMgr : public CkGroupInitCallback {
482 public:
483         CProxy_CkReductionMgr thisProxy;
484
485 public:
486         CProxy_CkArrayReductionMgr nodeProxy; //holds the local branch of the nodegroup tree
487         CkReductionMgr(void);
488         CkReductionMgr(CkMigrateMessage *m);
489
490         typedef CkReductionClientFn clientFn;
491
492         /**
493          * Add the given client function.  Overwrites any previous client.
494          * This manager will dispose of the callback when replaced or done.
495          */
496         void ckSetReductionClient(CkCallback *cb);
497
498 //Contributors keep a copy of this structure:
499
500
501 //Contributor list maintainance:
502         //These just set and clear the "creating" flag to prevent
503         // reductions from finishing early because not all elements
504         // have been created.
505         void creatingContributors(void);
506         void doneCreatingContributors(void);
507         //Initializes a new contributor
508         void contributorStamped(contributorInfo *ci);//Increment global number
509         void contributorCreated(contributorInfo *ci);//Increment local number
510         void contributorDied(contributorInfo *ci);//Don't expect more contributions
511         //Migrating away
512         void contributorLeaving(contributorInfo *ci);
513         //Migrating in
514         void contributorArriving(contributorInfo *ci);
515
516 //Contribute-- the given msg can contain any data.  The reducerType
517 // field of the message must be valid.
518 // Each contributor must contribute exactly once to each reduction.
519         void contribute(contributorInfo *ci,CkReductionMsg *msg);
520
521 //Communication (library-private)
522         //Sent down the reduction tree (used by barren PEs)
523         void ReductionStarting(CkReductionNumberMsg *m);
524         //Sent to root of the reduction tree with late migrant data
525         void LateMigrantMsg(CkReductionMsg *m);
526         //A late migrating contributor will never contribute
527         void MigrantDied(CkReductionNumberMsg *m);
528
529         void RecvMsg(CkReductionMsg *m);
530
531         //Call back for using Node added by Sayantan
532         void ArrayReductionHandler(CkReductionMsg *m);
533         void endArrayReduction();
534
535         virtual CmiBool isReductionMgr(void){ return CmiTrue; }
536         virtual void flushStates(int isgroup);
537         /*FAULT_EVAC: used to get the gcount on a processor when 
538                 it is evacuated.
539                 TODO: It needs to be fixed as it should return the gcount
540                 and the adjustment information for objects that might have
541                 contributed and died.
542                 The current implementation lets us get by in the case
543                 when there are no gcount
544         */
545         int getGCount(){return gcount;};
546         static void sanitycheck();
547 private:
548
549
550 //Data members
551         //Stored callback function (may be NULL if none has been set)
552         CkCallback storedCallback;
553         // calback that came along with the contribute
554         CkCallback *secondaryStoredCallback;
555
556         int redNo;//Number of current reduction (incremented at end) to be deposited with NodeGroups
557         int completedRedNo;//Number of reduction Completed ie recieved callback from NodeGroups
558         CmiBool inProgress;//Is a reduction started, but not complete?
559         CmiBool creating;//Are elements still being created?
560         CmiBool startRequested;//Should we start the next reduction when creation finished?
561         int gcount;//=el't created here - el't deleted here
562         int lcount;//Number of local contributors
563         int maxStartRequest; // the highest future ReductionStarting message received
564
565         //Current local and remote contributions
566         int nContrib,nRemote;
567         //Contributions queued for the current reduction
568         CkMsgQ<CkReductionMsg> msgs;
569
570         //Contributions queued for future reductions (sent to us too early)
571         CkMsgQ<CkReductionMsg> futureMsgs;
572         //Remote messages queued for future reductions (sent to us too early)
573         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
574
575         CkMsgQ<CkReductionMsg> finalMsgs;
576
577 //State:
578         void startReduction(int number,int srcPE);
579         void addContribution(CkReductionMsg *m);
580         void finishReduction(void);
581
582 #if GROUP_LEVEL_REDUCTION
583 //Reduction tree utilities
584         unsigned upperSize;
585         unsigned label;
586         int parent;
587         int numKids;
588         /*vector storing the children of this node*/
589         CkVec<int> newKids;
590         CkVec<int> kids;
591         void init_BinomialTree();
592
593         void init_BinaryTree();
594         enum {TREE_WID=2};
595         int treeRoot(void);//Root PE
596         CmiBool hasParent(void);
597         int treeParent(void);//My parent PE
598         int firstKid(void);//My first child PE
599         int treeKids(void);//Number of children in tree
600 #endif
601
602         //Combine (& free) the current message vector.
603         CkReductionMsg *reduceMessages(void);
604
605         //Map reduction number to a time
606         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
607         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
608         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
609
610
611         //This vector of adjustments is indexed by redNo,
612         // starting from the current redNo.
613         CkVec<countAdjustment> adjVec;
614         //Return the countAdjustment struct for the given redNo:
615         countAdjustment &adj(int number);
616         //Shift the list of countAdjustments down
617         void shiftAdjVec(void);
618
619 protected:
620         //whether to notify children that reduction starts
621         CmiBool disableNotifyChildrenStart;
622
623 //Checkpointing utilities
624 public:
625 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
626     int *perProcessorCounts;
627     int processorCount;
628     int totalCount;
629     int numberReductionMessages(){
630             if(totalCount != 0){
631                 return totalCount;
632             }else{
633                 return MAX_INT;
634             }
635     }
636 #endif
637         virtual void pup(PUP::er &p);
638         static int isIrreducible(){ return 0;}
639         void contributeViaMessage(CkReductionMsg *m);
640 };
641
642 //Define methods used to contribute to the given reduction type.
643 //  Data is copied, not deleted.
644 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
645   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
646         CMK_REFNUM_TYPE userFlag=-1); \
647   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
648         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=-1); \
649   void contribute(CkReductionMsg *msg);\*/
650
651 #define CkReductionTarget(me, method) \
652     CkIndex_##me::redn_wrapper_##method(NULL)
653
654 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
655 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
656         CMK_REFNUM_TYPE userFlag)\
657 {\
658         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
659         msg->setUserFlag(userFlag);\
660         msg->setMigratableContributor(migratable);\
661         myRednMgr->contribute(&myRednInfo,msg);\
662 }\
663 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
664         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
665 {\
666         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
667         msg->setUserFlag(userFlag);\
668         msg->setCallback(cb);\
669         msg->setMigratableContributor(migratable);\
670         myRednMgr->contribute(&myRednInfo,msg);\
671 }\
672 void me::contribute(CkReductionMsg *msg) \
673         {\
674         msg->setMigratableContributor(migratable);\
675         myRednMgr->contribute(&myRednInfo,msg);\
676         }\
677 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
678 {\
679         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
680     msg->setUserFlag(userFlag);\
681     msg->setCallback(cb);\
682     msg->setMigratableContributor(migratable);\
683     myRednMgr->contribute(&myRednInfo,msg);\
684 }\
685 void me::contribute(CMK_REFNUM_TYPE userFlag)\
686 {\
687     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
688     msg->setUserFlag(userFlag);\
689     msg->setMigratableContributor(migratable);\
690     myRednMgr->contribute(&myRednInfo,msg);\
691 }\
692
693
694 //A group that can contribute to reductions
695 class Group : public CkReductionMgr
696 {
697         contributorInfo reductionInfo;//My reduction information
698  public:
699         Group();
700         Group(CkMigrateMessage *msg);
701         virtual int isNodeGroup() { return 0; }
702         virtual void pup(PUP::er &p);
703         virtual void flushStates() {
704                 CkReductionMgr::flushStates(1);
705                 reductionInfo.redNo = 0;
706         }
707         virtual void CkAddThreadListeners(CthThread tid, void *msg);
708
709         CK_REDUCTION_CONTRIBUTE_METHODS_DECL
710 };
711
712 #ifdef _PIPELINED_ALLREDUCE_
713 class AllreduceMgr
714 {
715 public:
716         AllreduceMgr() { fragsRecieved=0; size=0; }
717         friend class ArrayElement;
718         // recieve an allreduce message
719         void allreduce_recieve(CkReductionMsg* msg)
720         {
721                 // allred_msgs.enq(msg);
722                 fragsRecieved++;
723                 if(fragsRecieved==1)
724                 {
725                         data = new char[FRAG_SIZE*msg->nFrags];
726                 }
727                 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
728                 size += msg->dataSize;
729                 
730                 if(fragsRecieved==msg->nFrags) {
731                         CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
732                         cb.send(ret);
733                         fragsRecieved=0; size=0;
734                         delete [] data;
735                 }
736                 
737         }
738         // TODO: check for same reduction
739         CkCallback cb;  
740         int size;
741         char* data;
742         int fragsRecieved;
743         // CkMsgQ<CkReductionMsg> allred_msgs;
744 };
745 #endif // _PIPELINED_ALLREDUCE_
746
747 #endif //_CKREDUCTION_H