fix work stealing hanging problem on Jagua
[charm.git] / src / ck-core / qd.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #define  DEBUGP(x)   //  CmiPrintf x;
9
10 #include "ck.h"
11
12 extern int _qdCommHandlerIdx;
13
14 // a fake QD which just wait for several seconds to triger QD callback
15 int _dummy_dq = 0;                      /* seconds to wait for */
16
17 #if CMK_BLUEGENE_CHARM
18 // this is a hack for bgcharm++, I need to figure out a better
19 // way to do this
20 #undef CmiSyncSendAndFree
21 #define CmiSyncSendAndFree    CmiFreeSendFn
22 #endif
23
24 CpvDeclare(QdState*, _qd);
25
26 // called when a node asks children for their counters
27 // send broadcast msg (phase 0) to children, and report to itself (phase 1)
28 // stage 1 means the node is waiting for reports from children
29 static inline void _bcastQD1(QdState* state, QdMsg *msg)
30 {
31   msg->setPhase(0);
32   state->propagate(msg);
33   msg->setPhase(1);
34   DEBUGP(("[%d] _bcastQD1: State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
35 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
36 /* immediate message does not count in QD now */
37 /*
38   QdState *comm_state;
39   static int comm_create=0, comm_process=0;
40   if (CmiMyRank()==0) {
41     comm_state = CpvAccessOther(_qd, CmiMyNodeSize());
42     int new_create = comm_state->getCreated();
43     int new_process = comm_state->getProcessed();
44     // combine counters with comm thread
45     CmiAssert(new_create==0);
46     CmiAssert(new_create == 0&& new_process==0);
47     state->create(new_create-comm_create);
48     state->process(new_process-comm_process);
49     comm_create = new_create;
50     comm_process = new_process;
51   }
52 */
53 #endif
54   msg->setCreated(state->getCreated());
55   msg->setProcessed(state->getProcessed());
56   envelope *env = UsrToEnv((void*)msg);
57   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
58   state->markProcessed();
59   state->reset();
60   state->setStage(1);
61   DEBUGP(("[%d] _bcastQD1 stage changed to: %d\n", CmiMyPe(), state->getStage()));
62 }
63
64 // final phase to check if the counters become dirty or not
65 // stage 2 means the node is waiting for children to report their dirty state
66 static inline void _bcastQD2(QdState* state, QdMsg *msg)
67 {
68   DEBUGP(("[%d] _bcastQD2: \n", CmiMyPe()));
69   msg->setPhase(1);
70   state->propagate(msg);
71   msg->setPhase(2);
72   msg->setDirty(state->isDirty());
73   envelope *env = UsrToEnv((void*)msg);
74   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
75   state->reset();
76   state->setStage(2);
77   DEBUGP(("[%d] _bcastQD2: stage changed to: %d\n", CmiMyPe(), state->getStage()));
78 }
79
80 static inline void _handlePhase0(QdState *state, QdMsg *msg)
81 {
82   DEBUGP(("[%d] _handlePhase0: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
83   CkAssert(CmiMyPe()==0 || state->getStage()==0);
84   if(CmiMyPe()==0) {
85     QdCallback *qdcb = new QdCallback(msg->getCb());
86     _MEMCHECK(qdcb);
87     state->enq(qdcb);           // stores qd callback
88   }
89   if(state->getStage()==0)
90     _bcastQD1(state, msg);        // start asking children for the counters
91   else
92     CkFreeMsg(msg);               // already in the middle of processing
93 }
94
95 // collecting counters from children
96 static inline void _handlePhase1(QdState *state, QdMsg *msg)
97 {
98   DEBUGP(("[%d] _handlePhase1: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
99   switch(state->getStage()) {
100     case 0 :
101       CkAssert(CmiMyPe()!=0);
102       _bcastQD2(state, msg);
103       break;
104     case 1 :
105       DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
106         // add children's counters
107       state->subtreeCreate(msg->getCreated());
108       state->subtreeProcess(msg->getProcessed());
109       state->reported();
110       if(state->allReported()) {
111         if(CmiMyPe()==0) {
112           DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
113           if(state->getCCreated()==state->getCProcessed()) {
114             _bcastQD2(state, msg);    // almost reached, one pass to make sure
115           } else {
116             _bcastQD1(state, msg);    // not reached, go over again
117           }
118         } else {
119             // report counters to parent
120           msg->setCreated(state->getCCreated());
121           msg->setProcessed(state->getCProcessed());
122           envelope *env = UsrToEnv((void*)msg);
123           CmiSyncSendAndFree(state->getParent(), 
124                              env->getTotalsize(), (char *)env);
125           state->reset();
126           state->setStage(0);
127         }
128       } else
129           CkFreeMsg(msg);
130       break;
131     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
132   }
133 }
134
135 // check if counters became dirty and notify parents
136 static inline void _handlePhase2(QdState *state, QdMsg *msg)
137 {
138 //  This assertion seems too strong for smp and uth version.
139   DEBUGP(("[%d] _handlePhase2: stage: %d, msg phase: %d \n", CmiMyPe(), state->getStage(), msg->getPhase()));
140   CkAssert(state->getStage()==2);
141   state->subtreeSetDirty(msg->getDirty());
142   state->reported();
143   if(state->allReported()) {
144     if(CmiMyPe()==0) {
145       if(state->isDirty()) {
146         _bcastQD1(state, msg);   // dirty, restart again
147       } else {             
148           // quiescence detected, send callbacks
149         DEBUGP(("[%d] quiescence detected at %f.\n", CmiMyPe(), CmiWallTimer()));
150         QdCallback* cb;
151         while(NULL!=(cb=state->deq())) {
152           cb->send();
153           delete cb;
154         }
155         state->reset();
156         state->setStage(0);
157         CkFreeMsg(msg);
158       }
159     } else {
160         // tell parent if the counters on the node is dirty or not
161       DEBUGP(("[%d] _handlePhase2 dirty:%d\n", CmiMyPe(), state->isDirty()));
162       msg->setDirty(state->isDirty());
163       envelope *env = UsrToEnv((void*)msg);
164       CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
165       state->reset();
166       state->setStage(0);
167     }
168   } else
169     CkFreeMsg(msg);
170 }
171
172 static void _callWhenIdle(QdMsg *msg)
173 {
174   DEBUGP(("[%d] callWhenIdle msg:%p \n", CmiMyPe(), msg));
175   CcdCancelCallOnCondition(CcdUSER, msg->cond2);
176   QdState *state = CpvAccess(_qd);
177   switch(msg->getPhase()) {
178     case 0 : _handlePhase0(state, msg); break;
179     case 1 : _handlePhase1(state, msg); break;
180     case 2 : _handlePhase2(state, msg); break;
181     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
182   }
183 }
184
185 // allow qd callback to be called by a user condition (CcdUSER)
186 static void _callWhenIdle2(QdMsg *msg)
187 {
188   DEBUGP(("[%d] callWhenIdle2 msg:%p \n", CmiMyPe(), msg));
189   CcdCancelCallOnCondition(CcdPROCESSOR_STILL_IDLE, msg->cond1);
190   QdState *state = CpvAccess(_qd);
191   switch(msg->getPhase()) {
192     case 0 : _handlePhase0(state, msg); break;
193     case 1 : _handlePhase1(state, msg); break;
194     case 2 : _handlePhase2(state, msg); break;
195     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
196   }
197 }
198
199 static void _invokeQD(QdMsg *msg)
200 {
201   QdCallback *cb = new QdCallback(msg->getCb());
202   cb->send();
203   delete cb;
204 }
205
206 void _qdHandler(envelope *env)
207 {
208   register QdMsg *msg = (QdMsg*) EnvToUsr(env);
209   DEBUGP(("[%d] _qdHandler msg:%p \n", CmiMyPe(), msg));
210   if (_dummy_dq > 0)
211     CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, _dummy_dq*1000); // in ms
212   else {
213     int cond1 = CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
214     int cond2 = CcdCallOnCondition(CcdUSER, (CcdVoidFn)_callWhenIdle2, (void*) msg);
215     msg->cond1 = cond1;
216     msg->cond2 = cond2;
217   }
218 }
219
220 // when a message is sent from an immediate handler from comm thread or 
221 // interrupt handler, the counter is sent to rank 0 of the same node
222 void _qdCommHandler(envelope *env)
223 {
224   register QdCommMsg *msg = (QdCommMsg*) EnvToUsr(env);
225   DEBUGP(("[%d] _qdCommHandler msg:%p \n", CmiMyPe(), msg));
226   if (msg->flag == 0)
227     CpvAccess(_qd)->create(msg->count);
228   else
229     CpvAccess(_qd)->process(msg->count);
230   CmiFree(env);
231 }
232
233 void QdState::sendCount(int flag, int count)
234 {
235   if (_dummy_dq == 0) {
236 #if CMK_NET_VERSION && ! CMK_SMP && ! defined(CMK_CPV_IS_SMP)
237         if (CmiImmIsRunning())
238 #else
239         if (CmiMyRank() == CmiMyNodeSize())
240 #endif
241         {
242           register QdCommMsg *msg = (QdCommMsg*) CkAllocMsg(0,sizeof(QdCommMsg),0);
243           msg->flag = flag;
244           msg->count = count;
245           register envelope *env = UsrToEnv((void *)msg);
246           CmiSetHandler(env, _qdCommHandlerIdx);
247           CmiFreeSendFn(CmiNodeFirst(CmiMyNode()), env->getTotalsize(), (char *)env);
248         }
249   }
250 }
251
252 void CkStartQD(const CkCallback& cb)
253 {
254   register QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
255   msg->setPhase(0);
256   msg->setCb(cb);
257   register envelope *env = UsrToEnv((void *)msg);
258   CmiSetHandler(env, _qdHandlerIdx);
259 #if CMK_MEM_CHECKPOINT
260   CmiGetRestartPhase(env) = 9999;        // make sure it is always executed
261 #endif
262 #if CMK_BLUEGENE_CHARM
263   CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
264 #else
265   _CldEnqueue(0, env, _infoIdx);
266 #endif
267 }
268
269 extern "C"
270 void CkStartQD(int eIdx, const CkChareID *cid)
271 {
272   CkStartQD(CkCallback(eIdx, *cid));
273 }