9f2955817e54e441f6e8978963de6903fcaae305
[charm.git] / src / ck-core / qd.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include "ck.h"
9
10 #define  DEBUGP(x)   // CmiPrintf x;
11
12 // a fake QD which just wait for several seconds to triger QD callback
13 #define CMK_DUMMY_QD             0      /* seconds to wait for */
14
15 #if CMK_BLUEGENE_CHARM
16 // this is a hack for bgcharm++, I need to figure out a better
17 // way to do this
18 #undef CmiSyncSendAndFree
19 #define CmiSyncSendAndFree    CmiFreeSendFn
20 #endif
21
22 CpvDeclare(QdState*, _qd);
23
24 // called when a node asks children for their counters
25 // send broadcast msg (phase 0) to children, and report to itself (phase 1)
26 // stage 1 means the node is waiting for reports from children
27 static inline void _bcastQD1(QdState* state, QdMsg *msg)
28 {
29   msg->setPhase(0);
30   state->propagate(msg);
31   msg->setPhase(1);
32   DEBUGP(("[%d] State: getCreated:%d getProcessed:%d\n", CmiMyPe(), state->getCreated(), state->getProcessed()));
33 #if ! CMK_SHARED_VARS_UNIPROCESSOR && !CMK_MULTICORE
34   QdState *comm_state;
35   static int comm_create=0, comm_process=0;
36   if (CmiMyRank()==0) {
37     comm_state = CpvAccessOther(_qd, CmiMyNodeSize());
38     int new_create = comm_state->getCreated();
39     int new_process = comm_state->getProcessed();
40     // combine counters with comm thread
41     state->create(new_create-comm_create);
42     state->process(new_process-comm_process);
43     comm_create = new_create;
44     comm_process = new_process;
45   }
46 #endif
47   msg->setCreated(state->getCreated());
48   msg->setProcessed(state->getProcessed());
49   envelope *env = UsrToEnv((void*)msg);
50   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
51   state->markProcessed();
52   state->reset();
53   state->setStage(1);
54 }
55
56 // final phase to check if the counters become dirty or not
57 // stage 2 means the node is waiting for children to report their dirty state
58 static inline void _bcastQD2(QdState* state, QdMsg *msg)
59 {
60   msg->setPhase(1);
61   state->propagate(msg);
62   msg->setPhase(2);
63   msg->setDirty(state->isDirty());
64   envelope *env = UsrToEnv((void*)msg);
65   CmiSyncSendAndFree(CmiMyPe(), env->getTotalsize(), (char *)env);
66   state->reset();
67   state->setStage(2);
68 }
69
70 static inline void _handlePhase0(QdState *state, QdMsg *msg)
71 {
72   CkAssert(CmiMyPe()==0 || state->getStage()==0);
73   if(CmiMyPe()==0) {
74     QdCallback *qdcb = new QdCallback(msg->getCb());
75     _MEMCHECK(qdcb);
76     state->enq(qdcb);           // stores qd callback
77   }
78   if(state->getStage()==0)
79     _bcastQD1(state, msg);        // start asking children for the counters
80   else
81     CkFreeMsg(msg);               // already in the middle of processing
82 }
83
84 // collecting counters from children
85 static inline void _handlePhase1(QdState *state, QdMsg *msg)
86 {
87   switch(state->getStage()) {
88     case 0 :
89       CkAssert(CmiMyPe()!=0);
90       _bcastQD2(state, msg);
91       break;
92     case 1 :
93       DEBUGP(("[%d] msg: getCreated:%d getProcessed:%d\n", CmiMyPe(), msg->getCreated(), msg->getProcessed()));
94         // add children's counters
95       state->subtreeCreate(msg->getCreated());
96       state->subtreeProcess(msg->getProcessed());
97       state->reported();
98       if(state->allReported()) {
99         if(CmiMyPe()==0) {
100           DEBUGP(("ALL: %p getCCreated:%d getCProcessed:%d\n", state, state->getCCreated(), state->getCProcessed()));
101           if(state->getCCreated()==state->getCProcessed()) {
102             _bcastQD2(state, msg);    // almost reached, one pass to make sure
103           } else {
104             _bcastQD1(state, msg);    // not reached, go over again
105           }
106         } else {
107             // report counters to parent
108           msg->setCreated(state->getCCreated());
109           msg->setProcessed(state->getCProcessed());
110           envelope *env = UsrToEnv((void*)msg);
111           CmiSyncSendAndFree(state->getParent(), 
112                              env->getTotalsize(), (char *)env);
113           state->reset();
114           state->setStage(0);
115         }
116       } else
117           CkFreeMsg(msg);
118       break;
119     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
120   }
121 }
122
123 // check if counters became dirty and notify parents
124 static inline void _handlePhase2(QdState *state, QdMsg *msg)
125 {
126 //  This assertion seems too strong for smp and uth version.
127 //  CkAssert(state->getStage()==2);
128   state->subtreeSetDirty(msg->getDirty());
129   state->reported();
130   if(state->allReported()) {
131     if(CmiMyPe()==0) {
132       if(state->isDirty()) {
133         _bcastQD1(state, msg);   // dirty, restart again
134       } else {             
135           // quiescence detected, send callbacks
136         QdCallback* cb;
137         while(NULL!=(cb=state->deq())) {
138           cb->send();
139           delete cb;
140         }
141         state->reset();
142         state->setStage(0);
143         CkFreeMsg(msg);
144       }
145     } else {
146         // tell parent if the counters on the node is dirty or not
147       msg->setDirty(state->isDirty());
148       envelope *env = UsrToEnv((void*)msg);
149       CmiSyncSendAndFree(state->getParent(), env->getTotalsize(), (char *)env);
150       state->reset();
151       state->setStage(0);
152     }
153   } else
154     CkFreeMsg(msg);
155 }
156
157 static void _callWhenIdle(QdMsg *msg)
158 {
159   DEBUGP(("[%d] callWhenIdle msg:%p\n", CmiMyPe(), msg));
160   QdState *state = CpvAccess(_qd);
161   switch(msg->getPhase()) {
162     case 0 : _handlePhase0(state, msg); break;
163     case 1 : _handlePhase1(state, msg); break;
164     case 2 : _handlePhase2(state, msg); break;
165     default: CmiAbort("Internal QD Error. Contact Developers.!\n");
166   }
167 }
168
169 #if CMK_DUMMY_QD
170 static void _invokeQD(QdMsg *msg)
171 {
172   QdCallback *cb = new QdCallback(msg->getCb());
173   cb->send();
174   delete cb;
175 }
176 #endif
177
178 void _qdHandler(envelope *env)
179 {
180   register QdMsg *msg = (QdMsg*) EnvToUsr(env);
181   DEBUGP(("[%d] _qdHandler msg:%p\n", CmiMyPe(), msg));
182 #if CMK_DUMMY_QD
183   CcdCallFnAfter((CcdVoidFn)_invokeQD,(void *)msg, CMK_DUMMY_QD*1000); // in ms
184 #else
185   CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)_callWhenIdle, (void*) msg);
186 #endif
187 }
188
189
190 void CkStartQD(const CkCallback& cb)
191 {
192   register QdMsg *msg = (QdMsg*) CkAllocMsg(0,sizeof(QdMsg),0);
193   msg->setPhase(0);
194   msg->setCb(cb);
195   register envelope *env = UsrToEnv((void *)msg);
196   CmiSetHandler(env, _qdHandlerIdx);
197 #if CMK_BLUEGENE_CHARM
198   CmiFreeSendFn(0, env->getTotalsize(), (char *)env);
199 #else
200   CldEnqueue(0, env, _infoIdx);
201 #endif
202 }
203
204 extern "C"
205 void CkStartQD(int eIdx, const CkChareID *cid)
206 {
207   CkStartQD(CkCallback(eIdx, *cid));
208 }