doc: Add serial to list of ci file reserved words
[charm.git] / src / conv-core / quiescence.c
1 /**
2 FIXME: (OSL, 2/28/2003)
3  Messages sent from the communications thread or SIGIO
4  (i.e., messages sent from immediate messages) add
5  mCreated and mProcessed to the communication thread's
6  CpvAccess(cQdState), not to any real processor's
7  CpvAccess(cQdState).  Thus processor rank 0 should
8  add CpvAccessOther(cQdState,CmiNodeSize())'s counts to its own.
9  
10 Until this is fixed, if you send or receive immediate 
11 messages (e.g., from reductions) you CANNOT use converse 
12 quiescence detection!
13 */
14
15 #include "converse.h"
16 #include "quiescence.h"
17 #include <stdio.h>
18 #include <stdlib.h>
19 #ifndef  DEBUGF
20 #define  DEBUGF(x) /*printf x*/ 
21 #endif
22
23 CpvDeclare(CQdState, cQdState);
24 unsigned int _CQdHandlerIdx;
25 unsigned int _CQdAnnounceHandlerIdx;
26
27
28 int  CQdMsgGetPhase(CQdMsg msg) 
29 { return msg->phase; }
30
31 void CQdMsgSetPhase(CQdMsg msg, int p) 
32 { msg->phase = p; }
33
34 int  CQdMsgGetCreated(CQdMsg msg) 
35 { CmiAssert(msg->phase==1); return msg->u.p1.created; }
36
37 void CQdMsgSetCreated(CQdMsg msg, int c) 
38 { CmiAssert(msg->phase==1); msg->u.p1.created = c; }
39
40 int  CQdMsgGetProcessed(CQdMsg msg) 
41 { CmiAssert(msg->phase==1); return msg->u.p1.processed; }
42
43 void CQdMsgSetProcessed(CQdMsg msg, int p) 
44 { CmiAssert(msg->phase==1); msg->u.p1.processed = p; }
45
46 int  CQdMsgGetDirty(CQdMsg msg) 
47 { CmiAssert(msg->phase==2); return msg->u.p2.dirty; }
48
49 void CQdMsgSetDirty(CQdMsg msg, int d) 
50 { CmiAssert(msg->phase==2); msg->u.p2.dirty = d; }
51
52
53 int CQdGetCreated(CQdState state)
54 { return state->mCreated; }
55
56 void CQdCreate(CQdState state, int n)
57 { state->mCreated += n; }
58
59 int CQdGetProcessed(CQdState state)
60 { return state->mProcessed; }
61
62 void CQdProcess(CQdState state, int n)
63 { state->mProcessed += n; }
64
65 void CQdPropagate(CQdState state, CQdMsg msg) 
66 {   
67   int i;
68   CmiSetHandler(msg, _CQdHandlerIdx);
69   for(i=0; i<state->nChildren; i++) {
70     CQdCreate(state, -1);
71     CmiSyncSend(state->children[i], sizeof(struct ConvQdMsg), (char *)msg);
72   }
73 }
74
75 int  CQdGetParent(CQdState state) 
76 { return state->parent; }
77     
78 int  CQdGetCCreated(CQdState state) 
79 { return state->cCreated; }
80
81 int  CQdGetCProcessed(CQdState state) 
82 { return state->cProcessed; }
83
84 void CQdSubtreeCreate(CQdState state, int c) 
85 { state->cCreated += c; }
86
87 void CQdSubtreeProcess(CQdState state, int p) 
88 { state->cProcessed += p; }
89
90 int  CQdGetStage(CQdState state) 
91 { return state->stage; }
92
93 void CQdSetStage(CQdState state, int p) 
94 { state->stage = p; }
95
96 void CQdReported(CQdState state) 
97 { state->nReported++; }
98
99 int  CQdAllReported(CQdState state) 
100 { return state->nReported==(state->nChildren+1);}
101
102 void CQdReset(CQdState state) 
103
104   state->nReported=0; state->cCreated=0; 
105   state->cProcessed=0; state->cDirty=0; 
106 }
107
108 void CQdMarkProcessed(CQdState state) 
109 { state->oProcessed = state->mProcessed; }
110
111 int  CQdIsDirty(CQdState state) 
112 { return ((state->mProcessed > state->oProcessed) || state->cDirty); }
113
114 void CQdSubtreeSetDirty(CQdState state, int d) 
115 { state->cDirty = state->cDirty || d; }
116
117 CQdState CQdStateCreate(void)
118 {
119   CQdState state = (CQdState) malloc(sizeof(struct ConvQdState));
120   _MEMCHECK(state);
121   state->mCreated = 0;
122   state->mProcessed = 0;
123   state->stage = 0;
124   state->nReported = 0;
125   state->oProcessed = 0;
126   state->cCreated = 0;
127   state->cProcessed = 0;
128   state->cDirty = 0;
129   state->nChildren = CmiNumSpanTreeChildren(CmiMyPe());
130   state->parent = CmiSpanTreeParent(CmiMyPe());
131   /* fixed bug on SP3, when nChildren is 0, NULL will be returned by malloc */
132   if (state->nChildren) {
133     state->children = (int *) malloc(state->nChildren*sizeof(int));
134     _MEMCHECK(state->children);
135   }
136   else 
137     state->children = NULL;
138   CmiSpanTreeChildren(CmiMyPe(), state->children);
139
140   return state;
141 }
142
143
144 static void CQdBcastQD1(CQdState state, CQdMsg msg)
145 {  
146   CQdMsgSetPhase(msg, 0); 
147   CQdPropagate(state, msg); 
148   CQdMsgSetPhase(msg, 1); 
149   CQdMsgSetCreated(msg, CQdGetCreated(state)); 
150   CQdMsgSetProcessed(msg, CQdGetProcessed(state)); 
151   CQdCreate(state, -1);
152   CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
153   CQdMarkProcessed(state); 
154   CQdReset(state); 
155   CQdSetStage(state, 1); 
156 }
157
158
159 static void CQdBcastQD2(CQdState state, CQdMsg msg)
160 {
161   CQdMsgSetPhase(msg, 1); 
162   CQdPropagate(state, msg); 
163   CQdMsgSetPhase(msg, 2); 
164   CQdMsgSetDirty(msg, CQdIsDirty(state)); 
165   CQdCreate(state, -1);
166   CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
167   CQdReset(state); 
168   CQdSetStage(state, 2); 
169 }
170
171
172 static void CQdHandlePhase0(CQdState state, CQdMsg msg)
173 {
174   CmiAssert(CmiMyPe()==0 || CQdGetStage(state)==0);
175   if(CQdGetStage(state)==0)
176     CQdBcastQD1(state, msg);
177   else
178     CmiFree(msg);
179 }
180
181
182 static void CQdHandlePhase1(CQdState state, CQdMsg msg)
183 {
184   switch(CQdGetStage(state)) {          
185   case 0 :
186     CmiAssert(CmiMyPe()!=0);
187     CQdBcastQD2(state, msg);
188     break;
189   case 1 :
190     CQdSubtreeCreate(state, CQdMsgGetCreated(msg)); 
191     CQdSubtreeProcess(state, CQdMsgGetProcessed(msg)); 
192     CQdReported(state); 
193     
194     if(CQdAllReported(state)) {
195       if(CmiMyPe()==0) {
196         if(CQdGetCCreated(state) == CQdGetCProcessed(state)) 
197           CQdBcastQD2(state, msg); 
198         else 
199           CQdBcastQD1(state, msg);
200       } 
201       else {
202         CQdMsgSetCreated(msg, CQdGetCCreated(state)); 
203         CQdMsgSetProcessed(msg, CQdGetCProcessed(state)); 
204         CQdCreate(state, -1);
205         CmiSyncSendAndFree(CQdGetParent(state), 
206                            sizeof(struct ConvQdMsg), (char *) msg);
207         DEBUGF(("PE = %d, My parent = %d\n", CmiMyPe(), CQdGetParent(state)));
208         CQdReset(state); 
209         CQdSetStage(state, 0); 
210       }
211     } 
212     else
213       CmiFree(msg);
214     break;
215   default: 
216     CmiAbort("Internal QD Error. Contact Developers.!\n");
217   }
218 }
219
220
221 static void CQdHandlePhase2(CQdState state, CQdMsg msg)
222 {
223   CmiAssert(CQdGetStage(state)==2);
224   CQdSubtreeSetDirty(state, CQdMsgGetDirty(msg));    
225   CQdReported(state);
226   if(CQdAllReported(state)) { 
227     if(CmiMyPe()==0) {
228       if(CQdIsDirty(state)) 
229         CQdBcastQD1(state, msg);
230       else {
231         CmiSetHandler(msg, _CQdAnnounceHandlerIdx);
232         CQdCreate(state, 0-CmiNumPes());
233         CmiSyncBroadcastAllAndFree(sizeof(struct ConvQdMsg), (char *) msg);
234         CQdReset(state); 
235         CQdSetStage(state, 0); 
236       }
237     } 
238     else {
239       CQdMsgSetDirty(msg, CQdIsDirty(state)); 
240       CQdCreate(state, -1);
241       CmiSyncSendAndFree(CQdGetParent(state), 
242                          sizeof(struct ConvQdMsg), (char *) msg);
243       CQdReset(state); 
244       CQdSetStage(state, 0); 
245     }
246   } 
247   else
248     CmiFree(msg);
249 }
250
251
252 static void CQdCallWhenIdle(CQdMsg msg)
253 {
254   CQdState state = CpvAccess(cQdState);
255   
256   switch(CQdMsgGetPhase(msg)) {
257   case 0 : CQdHandlePhase0(state, msg); break;
258   case 1 : CQdHandlePhase1(state, msg); break;
259   case 2 : CQdHandlePhase2(state, msg); break;
260   default: CmiAbort("Internal QD Error. Contact Developers.!\n");
261   }
262 }
263
264
265 void CQdHandler(CQdMsg msg)
266 {
267   CQdProcess(CpvAccess(cQdState), -1);
268   CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, 
269                      (CcdVoidFn)CQdCallWhenIdle, (void*) msg);  
270 }
271
272
273 void CQdRegisterCallback(CQdVoidFn fn, void *arg)
274 {
275   CcdCallOnCondition(CcdQUIESCENCE, fn, arg);
276 }
277
278 void CQdAnnounceHandler(CQdMsg msg)
279 {
280   CQdProcess(CpvAccess(cQdState), -1);
281   CcdRaiseCondition(CcdQUIESCENCE);
282 }
283
284 void CQdCpvInit(void) {
285   CpvInitialize(CQdState, cQdState);
286   CpvAccess(cQdState) = CQdStateCreate();
287 }
288
289 void CQdInit(void)
290 {
291   CQdCpvInit();
292   _CQdHandlerIdx = CmiRegisterHandler((CmiHandler)CQdHandler);
293   _CQdAnnounceHandlerIdx = 
294     CmiRegisterHandler((CmiHandler)CQdAnnounceHandler);
295 }
296
297 void CmiStartQD(CQdVoidFn fn, void *arg)
298 {
299   register CQdMsg msg = (CQdMsg) CmiAlloc(sizeof(struct ConvQdMsg)); 
300   CQdRegisterCallback(fn, arg);
301   CQdMsgSetPhase(msg, 0);  
302   CmiSetHandler(msg, _CQdHandlerIdx);
303   CQdCreate(CpvAccess(cQdState), -1);
304   CmiSyncSendAndFree(0, sizeof(struct ConvQdMsg), (char *)msg);
305 }
306
307
308