pipelined allreduce for large messages implemented, use -D_PIPELINED_ALLREDUCE_ to...
[charm.git] / src / ck-core / ckreduction.h
1 /*
2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
5
6
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE.  This library provides several different kinds of
11 combination routines (reducers), and various utilities
12 for supporting them.
13
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
16
17 */
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
20 #include "CkReduction.decl.h"
21 #include "cknodegroupreduction.h"
22
23 #include "CkArrayReductionMgr.decl.h"
24
25 #if CMK_BLUEGENE_CHARM || CMK_MULTICORE || !CMK_SMP
26 #define GROUP_LEVEL_REDUCTION           1
27 #endif
28
29 #ifdef _PIPELINED_ALLREDUCE_
30 #define FRAG_SIZE 32768
31 #define FRAG_THRESHOLD 65536
32 #endif
33
34 class CkReductionMsg; //See definition below
35
36
37 //This message is sent between group objects on a single PE
38 // to let each know the other has been created.
39 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
40 public:
41         typedef void (*callbackType)(void *param);
42         CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
43                 {callback=Ncallback;param=Nparam;}
44         void call(void) {(*callback)(param);}
45 private:
46         callbackType callback;
47         void *param;
48 };
49
50 class CkGroupInitCallback : public IrrGroup {
51 public:
52         CkGroupInitCallback(void);
53         CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
54         void callMeBack(CkGroupCallbackMsg *m);
55         void pup(PUP::er& p){ IrrGroup::pup(p); }
56 };
57
58
59 class CkGroupReadyCallback : public IrrGroup {
60 private:
61   int _isReady;
62   CkQ<CkGroupCallbackMsg *> _msgs;
63   void callBuffered(void);
64 public:
65         CkGroupReadyCallback(void);
66         CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
67         void callMeBack(CkGroupCallbackMsg *m);
68         int isReady(void) { return _isReady; }
69 protected:
70         void setReady(void) {_isReady = 1; callBuffered(); }
71         void setNotReady(void) {_isReady = 0; }
72 };
73
74 class CkReductionMsg;
75
76
77 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
78 public:
79   int num;
80   CkReductionNumberMsg(int n) {num=n;}
81 };
82
83 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
84 #define MAX_INT 5000000
85 #endif
86
87
88 /**
89  * One CkReductionMgr runs a non-overlapping set of reductions.
90  * It collects messages from all local contributors, then sends
91  * the reduced message up the reduction tree to node zero, where
92  * they're passed to the user's client function.
93  */
94 class CkNodeReductionMgr;
95
96 class CProxy_CkArrayReductionMgr;
97 class CkReductionMgr : public CkGroupInitCallback {
98 public:
99         CProxy_CkReductionMgr thisProxy;
100
101 public:
102         CProxy_CkArrayReductionMgr nodeProxy; //holds the local branch of the nodegroup tree
103         CkReductionMgr(void);
104         CkReductionMgr(CkMigrateMessage *m);
105
106         typedef CkReductionClientFn clientFn;
107
108         /**
109          * Add the given client function.  Overwrites any previous client.
110          * This manager will dispose of the callback when replaced or done.
111          */
112         void ckSetReductionClient(CkCallback *cb);
113
114 //Contributors keep a copy of this structure:
115
116
117 //Contributor list maintainance:
118         //These just set and clear the "creating" flag to prevent
119         // reductions from finishing early because not all elements
120         // have been created.
121         void creatingContributors(void);
122         void doneCreatingContributors(void);
123         //Initializes a new contributor
124         void contributorStamped(contributorInfo *ci);//Increment global number
125         void contributorCreated(contributorInfo *ci);//Increment local number
126         void contributorDied(contributorInfo *ci);//Don't expect more contributions
127         //Migrating away
128         void contributorLeaving(contributorInfo *ci);
129         //Migrating in
130         void contributorArriving(contributorInfo *ci);
131
132 //Contribute-- the given msg can contain any data.  The reducerType
133 // field of the message must be valid.
134 // Each contributor must contribute exactly once to each reduction.
135         void contribute(contributorInfo *ci,CkReductionMsg *msg);
136
137 //Communication (library-private)
138         //Sent down the reduction tree (used by barren PEs)
139         void ReductionStarting(CkReductionNumberMsg *m);
140         //Sent to root of the reduction tree with late migrant data
141         void LateMigrantMsg(CkReductionMsg *m);
142         //A late migrating contributor will never contribute
143         void MigrantDied(CkReductionNumberMsg *m);
144
145         void RecvMsg(CkReductionMsg *m);
146
147         //Call back for using Node added by Sayantan
148         void ArrayReductionHandler(CkReductionMsg *m);
149         void endArrayReduction();
150
151         virtual CmiBool isReductionMgr(void){ return CmiTrue; }
152         virtual void flushStates();
153         /*FAULT_EVAC: used to get the gcount on a processor when 
154                 it is evacuated.
155                 TODO: It needs to be fixed as it should return the gcount
156                 and the adjustment information for objects that might have
157                 contributed and died.
158                 The current implementation lets us get by in the case
159                 when there are no gcount
160         */
161         int getGCount(){return gcount;};
162 private:
163
164
165 //Data members
166         //Stored callback function (may be NULL if none has been set)
167         CkCallback storedCallback;
168         // calback that came along with the contribute
169         CkCallback *secondaryStoredCallback;
170
171         int redNo;//Number of current reduction (incremented at end) to be deposited with NodeGroups
172         int completedRedNo;//Number of reduction Completed ie recieved callback from NodeGroups
173         CmiBool inProgress;//Is a reduction started, but not complete?
174         CmiBool creating;//Are elements still being created?
175         CmiBool startRequested;//Should we start the next reduction when creation finished?
176         int gcount;//=el't created here - el't deleted here
177         int lcount;//Number of local contributors
178         int maxStartRequest; // the highest future ReductionStarting message received
179
180         //Current local and remote contributions
181         int nContrib,nRemote;
182         //Contributions queued for the current reduction
183         CkMsgQ<CkReductionMsg> msgs;
184
185         //Contributions queued for future reductions (sent to us too early)
186         CkMsgQ<CkReductionMsg> futureMsgs;
187         //Remote messages queued for future reductions (sent to us too early)
188         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
189
190         CkMsgQ<CkReductionMsg> finalMsgs;
191
192
193
194
195 //State:
196         void startReduction(int number,int srcPE);
197         void addContribution(CkReductionMsg *m);
198         void finishReduction(void);
199
200 #if GROUP_LEVEL_REDUCTION
201 //Reduction tree utilities
202         unsigned upperSize;
203         unsigned label;
204         int parent;
205         int numKids;
206         /*vector storing the children of this node*/
207         CkVec<int> newKids;
208         CkVec<int> kids;
209         void init_BinomialTree();
210
211         void init_BinaryTree();
212         enum {TREE_WID=2};
213         int treeRoot(void);//Root PE
214         CmiBool hasParent(void);
215         int treeParent(void);//My parent PE
216         int firstKid(void);//My first child PE
217         int treeKids(void);//Number of children in tree
218 #endif
219
220         //Combine (& free) the current message vector.
221         CkReductionMsg *reduceMessages(void);
222
223         //Map reduction number to a time
224         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
225         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
226         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
227
228
229         //This vector of adjustments is indexed by redNo,
230         // starting from the current redNo.
231         CkVec<countAdjustment> adjVec;
232         //Return the countAdjustment struct for the given redNo:
233         countAdjustment &adj(int number);
234         //Shift the list of countAdjustments down
235         void shiftAdjVec(void);
236
237 protected:
238         //whether to notify children that reduction starts
239         CmiBool disableNotifyChildrenStart;
240
241 //Checkpointing utilities
242 public:
243 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
244     int *perProcessorCounts;
245     int processorCount;
246     int totalCount;
247     int numberReductionMessages(){
248             if(totalCount != 0){
249                 return totalCount;
250             }else{
251                 return MAX_INT;
252             }
253     }
254 #endif
255         virtual void pup(PUP::er &p);
256         static int isIrreducible(){ return 0;}
257         void contributeViaMessage(CkReductionMsg *m);
258 };
259
260
261 //A CkReductionMsg is sent up the reduction tree-- it
262 // carries a contribution, or several reduced contributions.
263 class CkReductionMsg : public CMessage_CkReductionMsg
264 {
265         friend class CkReduction;
266         friend class CkReductionMgr;
267         friend class CkNodeReductionMgr;
268         friend class CkArrayReductionMgr;
269         friend class CkMulticastMgr;
270 #ifdef _PIPELINED_ALLREDUCE_
271         friend class ArrayElement;
272         friend class AllreduceMgr;
273 #endif
274     friend class ck::impl::XArraySectionReducer;
275 public:
276
277 //Publically-accessible fields:
278         //"Constructor"-- builds and returns a new CkReductionMsg.
279         //  the "srcData" array you specify will be copied into this object (unless NULL).
280         static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
281                 CkReduction::reducerType reducer=CkReduction::invalid,
282                 CkReductionMsg *buf = NULL);
283
284         inline int getLength(void) const {return dataSize;}
285         inline int getSize(void) const {return dataSize;}
286         inline void *getData(void) {return data;}
287         inline const void *getData(void) const {return data;}
288
289         inline int getGcount(void){return gcount;}
290         inline CkReduction::reducerType getReducer(void){return reducer;}
291         inline int getRedNo(void){return redNo;}
292
293         inline CMK_REFNUM_TYPE getUserFlag(void) const {return userFlag;}
294         inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
295
296         inline void setCallback(const CkCallback &cb) { callback=cb; }
297
298         //Return true if this message came straight from a contribute call--
299         // if it didn't come from a previous reduction function.
300         inline int isFromUser(void) const {return sourceFlag==-1;}
301
302         inline bool isMigratableContributor(void) const {return migratableContributor;}
303         inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
304
305         ~CkReductionMsg();
306
307 //Implementation-only fields (don't access these directly!)
308         //Msg runtime support
309         static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
310         static void *pack(CkReductionMsg *);
311         static CkReductionMsg *unpack(void *in);
312
313 private:
314         int dataSize;//Length of array below, in bytes
315         void *data;//Reduction data
316         CMK_REFNUM_TYPE userFlag; //Some sort of identifying flag, for client use
317         CkCallback callback; //What to do when done
318         CkCallback secondaryCallback; // the group callback is piggybacked on the nodegrp reduction
319         bool migratableContributor; // are the contributors migratable
320
321         int sourceFlag;/*Flag:
322                 0 indicates this is a placeholder message (meaning: nothing to report)
323                 -1 indicates this is a single (non-reduced) contribution.
324                 >0 indicates this is a reduced contribution.
325         */
326         int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
327 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
328     int sourceProcessorCount;
329     int fromPE;
330 #endif
331 private:
332 #if CMK_BLUEGENE_CHARM
333         void *log;
334 #endif
335         CkReduction::reducerType reducer;
336         //contributorInfo *ci;//Source contributor, or NULL if none
337         int redNo;//The serial number of this reduction
338         int gcount;//Contribution to the global contributor count
339         // for section multicast/reduction library
340         CkSectionInfo sid;   // section cookie for multicast
341         char rebuilt;          // indicate if the multicast tree needs rebuilt
342         int nFrags;
343         int fragNo;      // fragment of a reduction msg (when pipelined)
344                          // value = 0 to nFrags-1
345         double dataStorage;//Start of data array (so it's double-aligned)
346
347         int no;
348
349         //Default constructor is private so you must use "buildNew", above
350         CkReductionMsg();
351 };
352
353
354 //Define methods used to contribute to the given reduction type.
355 //  Data is copied, not deleted.
356 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
357   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
358         CMK_REFNUM_TYPE userFlag=-1); \
359   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
360         const CkCallback &cb,CMK_REFNUM_TYPE userFlag=-1); \
361   void contribute(CkReductionMsg *msg);\*/
362
363 #define CkReductionTarget(me, method) \
364     CkIndex_##me::method##_redn_wrapper(NULL)
365
366 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
367 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
368         CMK_REFNUM_TYPE userFlag)\
369 {\
370         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
371         msg->setUserFlag(userFlag);\
372         msg->setMigratableContributor(migratable);\
373         myRednMgr->contribute(&myRednInfo,msg);\
374 }\
375 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
376         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
377 {\
378         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
379         msg->setUserFlag(userFlag);\
380         msg->setCallback(cb);\
381         msg->setMigratableContributor(migratable);\
382         myRednMgr->contribute(&myRednInfo,msg);\
383 }\
384 void me::contribute(CkReductionMsg *msg) \
385         {\
386         msg->setMigratableContributor(migratable);\
387         myRednMgr->contribute(&myRednInfo,msg);\
388         }\
389 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
390 {\
391         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
392     msg->setUserFlag(userFlag);\
393     msg->setCallback(cb);\
394     msg->setMigratableContributor(migratable);\
395     myRednMgr->contribute(&myRednInfo,msg);\
396 }\
397 void me::contribute(CMK_REFNUM_TYPE userFlag)\
398 {\
399     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
400     msg->setUserFlag(userFlag);\
401     msg->setMigratableContributor(migratable);\
402     myRednMgr->contribute(&myRednInfo,msg);\
403 }\
404
405
406 //A group that can contribute to reductions
407 class Group : public CkReductionMgr
408 {
409         contributorInfo reductionInfo;//My reduction information
410  public:
411         Group();
412         Group(CkMigrateMessage *msg);
413         virtual int isNodeGroup() { return 0; }
414         virtual void pup(PUP::er &p);
415         virtual void flushStates() {
416                 CkReductionMgr::flushStates();
417                 reductionInfo.redNo = 0;
418         }
419         virtual void CkAddThreadListeners(CthThread tid, void *msg);
420
421         CK_REDUCTION_CONTRIBUTE_METHODS_DECL
422 };
423
424 #ifdef _PIPELINED_ALLREDUCE_
425 class AllreduceMgr
426 {
427 public:
428         AllreduceMgr() { fragsRecieved=0; size=0; }
429         friend class ArrayElement;
430         // recieve an allreduce message
431         void allreduce_recieve(CkReductionMsg* msg)
432         {
433                 allred_msgs.enq(msg);
434                 fragsRecieved++;
435                 if(fragsRecieved==1)
436                 {
437                         data = new char[FRAG_SIZE*msg->nFrags];
438                 }
439                 memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
440                 size += msg->dataSize;
441                 
442                 if(fragsRecieved==msg->nFrags)
443                         cb.send(size, (void*)data);
444                 
445         }
446         // TODO: check for same reduction
447         CkCallback cb;  
448         int size;
449         char* data;
450         int fragsRecieved;
451         CkMsgQ<CkReductionMsg> allred_msgs;
452 };
453 #endif // _PIPELINED_ALLREDUCE_
454
455 #endif //_CKREDUCTION_H