more debug statement.
[charm.git] / src / ck-core / qd.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #define  DEBUGP(x)    // CmiPrintf x;
9
10 #include "ck.h"
11
12
13 // a fake QD which just wait for several seconds to triger QD callback
14 #define CMK_DUMMY_QD             0      /* seconds to wait for */
15
16 #if CMK_BLUEGENE_CHARM
17 // this is a hack for bgcharm++, I need to figure out a better
18 // way to do this
19 #undef CmiSyncSendAndFree
20 #define CmiSyncSendAndFree    CmiFreeSendFn
21 #endif
22
23 CpvDeclare(QdState*, _qd);
24
25 // called when a node asks children for their counters
26 // send broadcast msg (phase 0) to children, and report to itself (phase 1)
27 // stage 1 means the node is waiting for reports from children
28 static inline void _bcastQD1(QdState* state, QdMsg *msg)
29 {
30   msg->setPhase(0);
31   state->propagate(msg);
32   msg->setPhase(1);
33   DEBUGP(("[%d] _bcastQD1: State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
34 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
35   QdState *comm_state;
36   static int comm_create=0, comm_process=0;
37   if (CmiMyRank()==0) {
38     comm_state = CpvAccessOther(_qd, CmiMyNodeSize());
39     int new_create = comm_state->getCreated();
40     int new_process = comm_state->getProcessed();
41     // combine counters with comm thread
42     state->create(new_create-comm_create);
43     state->process(new_process-comm_process);
44     comm_create = new_create;
45     comm_process = new_process;
46   }
47 #endif
48   msg->setCreated(state->getCreated());
49   msg->setProcessed(state->getProcessed());
50   envelope *env = UsrToEnv((void*)msg);
51   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
52   state->markProcessed();
53   state->reset();
54   state->setStage(1);
55   DEBUGP(("[%d] _bcastQD1 stage changed to: %d\n", CmiMyPe(), state->getStage()));
56 }
57
58 // final phase to check if the counters become dirty or not
59 // stage 2 means the node is waiting for children to report their dirty state
60 static inline void _bcastQD2(QdState* state, QdMsg *msg)
61 {
62   DEBUGP(("[%d] _bcastQD2: \n", CmiMyPe()));
63   msg->setPhase(1);
64   state->propagate(msg);
65   msg->setPhase(2);
66   msg->setDirty(state->isDirty());
67   envelope *env = UsrToEnv((void*)msg);
68   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
69   state->reset();
70   state->setStage(2);
71   DEBUGP(("[%d] _bcastQD2: stage changed to: %d\n", CmiMyPe(), state->getStage()));
72 }
73
74 static inline void _handlePhase0(QdState *state, QdMsg *msg)
75 {
76   DEBUGP(("[%d] _handlePhase0: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
77   CkAssert(CmiMyPe()==0 || state->getStage()==0);
78   if(CmiMyPe()==0) {
79     QdCallback *qdcb = new QdCallback(msg->getCb());
80     _MEMCHECK(qdcb);
81     state->enq(qdcb);           // stores qd callback
82   }
83   if(state->getStage()==0)
84     _bcastQD1(state, msg);        // start asking children for the counters
85   else
86     CkFreeMsg(msg);               // already in the middle of processing
87 }
88
89 // collecting counters from children
90 static inline void _handlePhase1(QdState *state, QdMsg *msg)
91 {
92   DEBUGP(("[%d] _handlePhase1: stage: %d, msg phase: %d\n", CmiMyPe(), state->getStage(), msg->getPhase()));
93   switch(state->getStage()) {
94     case 0 :
95       CkAssert(CmiMyPe()!=0);
96       _bcastQD2(state, msg);
97       break;
98     case 1 :
99       DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
100         // add children's counters
101       state->subtreeCreate(msg->getCreated());
102       state->subtreeProcess(msg->getProcessed());
103       state->reported();
104       if(state->allReported()) {
105         if(CmiMyPe()==0) {
106           DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
107           if(state->getCCreated()==state->getCProcessed()) {
108             _bcastQD2(state, msg);    // almost reached, one pass to make sure
109           } else {
110             _bcastQD1(state, msg);    // not reached, go over again
111           }
112         } else {
113             // report counters to parent
114           msg->setCreated(state->getCCreated());
115           msg->setProcessed(state->getCProcessed());
116           envelope *env = UsrToEnv((void*)msg);
117           CmiSyncSendAndFree(state->getParent(), 
118                              env->getTotalsize(), (char *)env);
119           state->reset();
120           state->setStage(0);
121         }
122       } else
123           CkFreeMsg(msg);
124       break;
125     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
126   }
127 }
128
129 // check if counters became dirty and notify parents
130 static inline void _handlePhase2(QdState *state, QdMsg *msg)
131 {
132 //  This assertion seems too strong for smp and uth version.
133   DEBUGP(("[%d] _handlePhase2: stage: %d, msg phase: %d \n", CmiMyPe(), state->getStage(), msg->getPhase()));
134   CkAssert(state->getStage()==2);
135   state->subtreeSetDirty(msg->getDirty());
136   state->reported();
137   if(state->allReported()) {
138     if(CmiMyPe()==0) {
139       if(state->isDirty()) {
140         _bcastQD1(state, msg);   // dirty, restart again
141       } else {             
142           // quiescence detected, send callbacks
143         DEBUGP(("[%d] quiescence detected,\n", CmiMyPe()));
144         QdCallback* cb;
145         while(NULL!=(cb=state->deq())) {
146           cb->send();
147           delete cb;
148         }
149         state->reset();
150         state->setStage(0);
151         CkFreeMsg(msg);
152       }
153     } else {
154         // tell parent if the counters on the node is dirty or not
155       DEBUGP(("[%d] _handlePhase2 dirty:%d\n", CmiMyPe(), state->isDirty()));
156       msg->setDirty(state->isDirty());
157       envelope *env = UsrToEnv((void*)msg);
158       CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
159       state->reset();
160       state->setStage(0);
161     }
162   } else
163     CkFreeMsg(msg);
164 }
165
166 static void _callWhenIdle(QdMsg *msg)
167 {
168   DEBUGP(("[%d] callWhenIdle msg:%p \n", CmiMyPe(), msg));
169   QdState *state = CpvAccess(_qd);
170   switch(msg->getPhase()) {
171     case 0 : _handlePhase0(state, msg); break;
172     case 1 : _handlePhase1(state, msg); break;
173     case 2 : _handlePhase2(state, msg); break;
174     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
175   }
176 }
177
178 #if CMK_DUMMY_QD
179 static void _invokeQD(QdMsg *msg)
180 {
181   QdCallback *cb = new QdCallback(msg->getCb());
182   cb->send();
183   delete cb;
184 }
185 #endif
186
187 void _qdHandler(envelope *env)
188 {
189   register QdMsg *msg = (QdMsg*) EnvToUsr(env);
190   DEBUGP(("[%d] _qdHandler msg:%p \n", CmiMyPe(), msg));
191 #if CMK_DUMMY_QD
192   CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, CMK_DUMMY_QD*1000); // in ms
193 #else
194   CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
195 #endif
196 }
197
198
199 void CkStartQD(const CkCallback& cb)
200 {
201   register QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
202   msg->setPhase(0);
203   msg->setCb(cb);
204   register envelope *env = UsrToEnv((void *)msg);
205   CmiSetHandler(env, _qdHandlerIdx);
206 #if CMK_BLUEGENE_CHARM
207   CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
208 #else
209   CldEnqueue(0, env, _infoIdx);
210 #endif
211 }
212
213 extern "C"
214 void CkStartQD(int eIdx, const CkChareID *cid)
215 {
216   CkStartQD(CkCallback(eIdx, *cid));
217 }