75df2911d524e68a22eaaeeff2957d52e856430f
[charm.git] / src / ck-core / ckreduction.h
1 /*
2 Charm++ File: Reduction Library
3 added 3/27/2000 by Orion Sky Lawlor, olawlor@acm.org
4 modified 02/21/2003 by Sayantan Chakravorty
5
6
7 A reduction takes some sort of inputs (contributions)
8 from some set of objects scattered across all PE's,
9 and combines (reduces) all the contributions onto one
10 PE.  This library provides several different kinds of
11 combination routines (reducers), and various utilities
12 for supporting them.
13
14 The calls needed to use the reduction manager are:
15 -Create with CProxy_CkReduction::ckNew.
16
17 */
18 #ifndef _CKREDUCTION_H
19 #define _CKREDUCTION_H
20 #include "CkReduction.decl.h"
21 #include "cknodegroupreduction.h"
22
23 #include "CkArrayReductionMgr.decl.h"
24
25 #if CMK_BLUEGENE_CHARM
26 #define GROUP_LEVEL_REDUCTION           1
27 #endif
28
29 class CkReductionMsg; //See definition below
30
31
32 //This message is sent between group objects on a single PE
33 // to let each know the other has been created.
34 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
35 public:
36         typedef void (*callbackType)(void *param);
37         CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
38                 {callback=Ncallback;param=Nparam;}
39         void call(void) {(*callback)(param);}
40 private:
41         callbackType callback;
42         void *param;
43 };
44
45 class CkGroupInitCallback : public IrrGroup {
46 public:
47         CkGroupInitCallback(void);
48         CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
49         void callMeBack(CkGroupCallbackMsg *m);
50         void pup(PUP::er& p){ IrrGroup::pup(p); }
51 };
52
53
54 class CkGroupReadyCallback : public IrrGroup {
55 private:
56   int _isReady;
57   CkQ<CkGroupCallbackMsg *> _msgs;
58   void callBuffered(void);
59 public:
60         CkGroupReadyCallback(void);
61         CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
62         void callMeBack(CkGroupCallbackMsg *m);
63         int isReady(void) { return _isReady; }
64 protected:
65         void setReady(void) {_isReady = 1; callBuffered(); }
66         void setNotReady(void) {_isReady = 0; }
67 };
68
69 class CkReductionMsg;
70
71
72 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
73 public:
74   int num;
75   CkReductionNumberMsg(int n) {num=n;}
76 };
77
78 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
79 #define MAX_INT 5000000
80 #endif
81
82
83 /**
84  * One CkReductionMgr runs a non-overlapping set of reductions.
85  * It collects messages from all local contributors, then sends
86  * the reduced message up the reduction tree to node zero, where
87  * they're passed to the user's client function.
88  */
89 class CkNodeReductionMgr;
90
91 class CProxy_CkArrayReductionMgr;
92 class CkReductionMgr : public CkGroupInitCallback {
93 public:
94         CProxy_CkReductionMgr thisProxy;
95
96 public:
97         CProxy_CkArrayReductionMgr nodeProxy; //holds the local branch of the nodegroup tree
98         CkReductionMgr(void);
99         CkReductionMgr(CkMigrateMessage *m);
100
101         typedef CkReductionClientFn clientFn;
102
103         /**
104          * Add the given client function.  Overwrites any previous client.
105          * This manager will dispose of the callback when replaced or done.
106          */
107         void ckSetReductionClient(CkCallback *cb);
108
109 //Contributors keep a copy of this structure:
110
111
112 //Contributor list maintainance:
113         //These just set and clear the "creating" flag to prevent
114         // reductions from finishing early because not all elements
115         // have been created.
116         void creatingContributors(void);
117         void doneCreatingContributors(void);
118         //Initializes a new contributor
119         void contributorStamped(contributorInfo *ci);//Increment global number
120         void contributorCreated(contributorInfo *ci);//Increment local number
121         void contributorDied(contributorInfo *ci);//Don't expect more contributions
122         //Migrating away
123         void contributorLeaving(contributorInfo *ci);
124         //Migrating in
125         void contributorArriving(contributorInfo *ci);
126
127 //Contribute-- the given msg can contain any data.  The reducerType
128 // field of the message must be valid.
129 // Each contributor must contribute exactly once to each reduction.
130         void contribute(contributorInfo *ci,CkReductionMsg *msg);
131
132 //Communication (library-private)
133         //Sent down the reduction tree (used by barren PEs)
134         void ReductionStarting(CkReductionNumberMsg *m);
135         //Sent to root of the reduction tree with late migrant data
136         void LateMigrantMsg(CkReductionMsg *m);
137         //A late migrating contributor will never contribute
138         void MigrantDied(CkReductionNumberMsg *m);
139
140         void RecvMsg(CkReductionMsg *m);
141
142         //Call back for using Node added by Sayantan
143         void ArrayReductionHandler(CkReductionMsg *m);
144         void endArrayReduction();
145
146         virtual CmiBool isReductionMgr(void){ return CmiTrue; }
147         virtual void flushStates();
148         /*FAULT_EVAC: used to get the gcount on a processor when 
149                 it is evacuated.
150                 TODO: It needs to be fixed as it should return the gcount
151                 and the adjustment information for objects that might have
152                 contributed and died.
153                 The current implementation lets us get by in the case
154                 when there are no gcount
155         */
156         int getGCount(){return gcount;};
157 private:
158
159
160 //Data members
161         //Stored callback function (may be NULL if none has been set)
162         CkCallback storedCallback;
163         // calback that came along with the contribute
164         CkCallback *secondaryStoredCallback;
165
166         int redNo;//Number of current reduction (incremented at end) to be deposited with NodeGroups
167         int completedRedNo;//Number of reduction Completed ie recieved callback from NodeGroups
168         CmiBool inProgress;//Is a reduction started, but not complete?
169         CmiBool creating;//Are elements still being created?
170         CmiBool startRequested;//Should we start the next reduction when creation finished?
171         int gcount;//=el't created here - el't deleted here
172         int lcount;//Number of local contributors
173         int maxStartRequest; // the highest future ReductionStarting message received
174
175         //Current local and remote contributions
176         int nContrib,nRemote;
177         //Contributions queued for the current reduction
178         CkMsgQ<CkReductionMsg> msgs;
179
180         //Contributions queued for future reductions (sent to us too early)
181         CkMsgQ<CkReductionMsg> futureMsgs;
182         //Remote messages queued for future reductions (sent to us too early)
183         CkMsgQ<CkReductionMsg> futureRemoteMsgs;
184
185         CkMsgQ<CkReductionMsg> finalMsgs;
186
187
188
189
190 //State:
191         void startReduction(int number,int srcPE);
192         void addContribution(CkReductionMsg *m);
193         void finishReduction(void);
194
195 #if GROUP_LEVEL_REDUCTION
196 //Reduction tree utilities
197         unsigned upperSize;
198         unsigned label;
199         int parent;
200         int numKids;
201         /*vector storing the children of this node*/
202         CkVec<int> newKids;
203         CkVec<int> kids;
204         void init_BinomialTree();
205
206         void init_BinaryTree();
207         enum {TREE_WID=2};
208         int treeRoot(void);//Root PE
209         CmiBool hasParent(void);
210         int treeParent(void);//My parent PE
211         int firstKid(void);//My first child PE
212         int treeKids(void);//Number of children in tree
213 #endif
214
215         //Combine (& free) the current message vector.
216         CkReductionMsg *reduceMessages(void);
217
218         //Map reduction number to a time
219         CmiBool isPast(int num) const {return (CmiBool)(num<redNo);}
220         CmiBool isPresent(int num) const {return (CmiBool)(num==redNo);}
221         CmiBool isFuture(int num) const {return (CmiBool)(num>redNo);}
222
223
224         //This vector of adjustments is indexed by redNo,
225         // starting from the current redNo.
226         CkVec<countAdjustment> adjVec;
227         //Return the countAdjustment struct for the given redNo:
228         countAdjustment &adj(int number);
229         //Shift the list of countAdjustments down
230         void shiftAdjVec(void);
231
232 //Checkpointing utilities
233 public:
234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
235     int *perProcessorCounts;
236     int processorCount;
237     int totalCount;
238     int numberReductionMessages(){
239             if(totalCount != 0){
240                 return totalCount;
241             }else{
242                 return MAX_INT;
243             }
244     }
245 #endif
246         virtual void pup(PUP::er &p);
247         static int isIrreducible(){ return 0;}
248         void contributeViaMessage(CkReductionMsg *m);
249 };
250
251
252 //A CkReductionMsg is sent up the reduction tree-- it
253 // carries a contribution, or several reduced contributions.
254 class CkReductionMsg : public CMessage_CkReductionMsg
255 {
256         friend class CkReduction;
257         friend class CkReductionMgr;
258         friend class CkNodeReductionMgr;
259         friend class CkArrayReductionMgr;
260         friend class CkMulticastMgr;
261     friend class ck::impl::XArraySectionReducer;
262 public:
263
264 //Publically-accessible fields:
265         //"Constructor"-- builds and returns a new CkReductionMsg.
266         //  the "srcData" array you specify will be copied into this object (unless NULL).
267         static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
268                 CkReduction::reducerType reducer=CkReduction::invalid,
269                 CkReductionMsg *buf = NULL);
270
271         inline int getLength(void) const {return dataSize;}
272         inline int getSize(void) const {return dataSize;}
273         inline void *getData(void) {return data;}
274         inline const void *getData(void) const {return data;}
275
276         inline int getGcount(void){return gcount;}
277         inline CkReduction::reducerType getReducer(void){return reducer;}
278         inline int getRedNo(void){return redNo;}
279
280         inline int getUserFlag(void) const {return userFlag;}
281         inline void setUserFlag(int f) { userFlag=f;}
282
283         inline void setCallback(const CkCallback &cb) { callback=cb; }
284
285         //Return true if this message came straight from a contribute call--
286         // if it didn't come from a previous reduction function.
287         inline int isFromUser(void) const {return sourceFlag==-1;}
288
289         inline bool isMigratableContributor(void) const {return migratableContributor;}
290         inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
291
292         ~CkReductionMsg();
293
294 //Implementation-only fields (don't access these directly!)
295         //Msg runtime support
296         static void *alloc(int msgnum, size_t size, int *reqSize, int priobits);
297         static void *pack(CkReductionMsg *);
298         static CkReductionMsg *unpack(void *in);
299
300 private:
301         int dataSize;//Length of array below, in bytes
302         void *data;//Reduction data
303         int userFlag; //Some sort of identifying flag, for client use
304         CkCallback callback; //What to do when done
305         CkCallback secondaryCallback; // the group callback is piggybacked on the nodegrp reduction
306         bool migratableContributor; // are the contributors migratable
307
308         int sourceFlag;/*Flag:
309                 0 indicates this is a placeholder message (meaning: nothing to report)
310                 -1 indicates this is a single (non-reduced) contribution.
311                 >0 indicates this is a reduced contribution.
312         */
313         int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
314 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
315     int sourceProcessorCount;
316     int fromPE;
317 #endif
318 private:
319 #if CMK_BLUEGENE_CHARM
320         void *log;
321 #endif
322         CkReduction::reducerType reducer;
323         //contributorInfo *ci;//Source contributor, or NULL if none
324         int redNo;//The serial number of this reduction
325         int gcount;//Contribution to the global contributor count
326         // for section multicast/reduction library
327         CkSectionInfo sid;   // section cookie for multicast
328         char rebuilt;          // indicate if the multicast tree needs rebuilt
329         int nFrags;
330         int fragNo;      // fragment of a reduction msg (when pipelined)
331                          // value = 0 to nFrags-1
332         double dataStorage;//Start of data array (so it's double-aligned)
333
334         int no;
335
336         //Default constructor is private so you must use "buildNew", above
337         CkReductionMsg();
338 };
339
340
341 //Define methods used to contribute to the given reduction type.
342 //  Data is copied, not deleted.
343 /*#define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
344   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
345         int userFlag=-1); \
346   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
347         const CkCallback &cb,int userFlag=-1); \
348   void contribute(CkReductionMsg *msg);\*/
349
350 #define CkReductionTarget(me, method) \
351     CkIndex_##me::method##_redn_wrapper(NULL)
352
353 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
354 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
355         int userFlag)\
356 {\
357         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
358         msg->setUserFlag(userFlag);\
359         msg->setMigratableContributor(migratable);\
360         myRednMgr->contribute(&myRednInfo,msg);\
361 }\
362 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
363         const CkCallback &cb,int userFlag)\
364 {\
365         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
366         msg->setUserFlag(userFlag);\
367         msg->setCallback(cb);\
368         msg->setMigratableContributor(migratable);\
369         myRednMgr->contribute(&myRednInfo,msg);\
370 }\
371 void me::contribute(CkReductionMsg *msg) \
372         {\
373         msg->setMigratableContributor(migratable);\
374         myRednMgr->contribute(&myRednInfo,msg);\
375         }\
376 void me::contribute(const CkCallback &cb,int userFlag)\
377 {\
378         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
379     msg->setUserFlag(userFlag);\
380     msg->setCallback(cb);\
381     msg->setMigratableContributor(migratable);\
382     myRednMgr->contribute(&myRednInfo,msg);\
383 }\
384 void me::contribute(int userFlag)\
385 {\
386     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);\
387     msg->setUserFlag(userFlag);\
388     msg->setMigratableContributor(migratable);\
389     myRednMgr->contribute(&myRednInfo,msg);\
390 }\
391
392
393 //A group that can contribute to reductions
394 class Group : public CkReductionMgr
395 {
396         contributorInfo reductionInfo;//My reduction information
397  public:
398         Group();
399         Group(CkMigrateMessage *msg);
400         virtual int isNodeGroup() { return 0; }
401         virtual void pup(PUP::er &p);
402         virtual void flushStates() {
403                 CkReductionMgr::flushStates();
404                 reductionInfo.redNo = 0;
405         }
406         virtual void CkAddThreadListeners(CthThread tid, void *msg);
407
408         CK_REDUCTION_CONTRIBUTE_METHODS_DECL
409 };
410
411
412 #endif //_CKREDUCTION_H