TCharm: manages a CthThread's creation, load balacing,
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm.h"
7
8 #if 0
9     /*Many debugging statements:*/
10 #    define DBG(x) ckout<<"["<<thisIndex<<"] TCHARM> "<<x<<endl;
11 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
12 #else
13     /*No debugging statements*/
14 #    define DBG(x) /*empty*/
15 #    define DBGX(x) /*empty*/
16 #endif
17
18 CtvDeclare(TCharm *,_curTCharm);
19 CpvDeclare(inState,_stateTCharm);
20
21 void TCharm::nodeInit(void)
22 {
23   CtvInitialize(TCharm *,_curTCharm);
24   CpvInitialize(inState,_stateTCharm);
25   TCharmUserNodeSetup();
26   FTN_NAME(TCHARM_USER_NODE_SETUP,tcharm_user_node_setup)();
27 }
28
29 static void startTCharmThread(TCharmInitMsg *msg)
30 {
31         TCharm::setState(inDriver);
32         typedef void (*threadFn_t)(void *);
33         ((threadFn_t)msg->threadFn)(msg->data);
34         CtvAccess(_curTCharm)->done();
35 }
36
37 TCharm::TCharm(TCharmInitMsg *initMsg_)
38 {
39   initMsg=initMsg_;
40   
41 tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->stackSize);
42   CtvAccessOther(tid,_curTCharm)=this;
43   TCharm::setState(inInit);
44   isStopped=true;
45   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
46   threadInfo.thisElement=thisIndex;
47   threadInfo.numElements=initMsg->numElements;
48   nUd=0;
49   usesAtSync=CmiTrue;
50   ready();
51 }
52
53 TCharm::TCharm(CkMigrateMessage *msg)
54         :ArrayElement1D(msg)
55 {
56   initMsg=NULL;
57   tid=NULL;
58   threadInfo.tProxy=CProxy_TCharm(thisArrayID);  
59 }
60
61 void TCharm::pup(PUP::er &p) {
62 //Pup superclass
63   ArrayElement1D::pup(p);  
64
65   p(isStopped);
66   p(threadInfo.thisElement);
67   p(threadInfo.numElements);
68
69 #ifndef CMK_OPTIMIZE
70   DBG("Packing thread");
71   if (!isStopped)
72     CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
73 #endif
74
75 //Pup thread (EVIL & UGLY):
76   //This seekBlock allows us to reorder the packing/unpacking--
77   // This is needed because the userData depends on the thread's stack
78   // both at pack and unpack time.
79   PUP::seekBlock s(p,2);
80   if (p.isUnpacking()) 
81   {//In this case, unpack the thread before the user data
82     s.seek(1);
83     tid = CthPup((pup_er) &p, tid);
84     CtvAccessOther(tid,_curTCharm)=this;
85   }
86   
87   //Pack all user data
88   TCharm::setState(inPup);
89   s.seek(0);
90   p(nUd);
91   for(int i=0;i<nUd;i++) 
92     ud[i].pup(p);
93   TCharm::setState(inFramework);
94
95   if (!p.isUnpacking()) 
96   {//In this case, pack the thread after the user data
97     s.seek(1);
98     tid = CthPup((pup_er) &p, tid);
99   }
100   s.endBlock(); //End of seeking block
101 }
102
103 //Pup one group of user data
104 void TCharm::UserData::pup(PUP::er &p)
105 {
106   pup_er pext=(pup_er)(&p);
107   p(isC);
108   //Save address of userdata-- assumes user data is on the stack
109   p((void*)&data,sizeof(data));
110   if (isC) { //C version
111     //FIXME: function pointers may not be valid across processors
112     p((void*)&cfn, sizeof(TCpupUserDataC));
113     cfn(pext,data);
114   } 
115   else { //Fortran version
116     //FIXME: function pointers may not be valid across processors
117     p((void*)&ffn, sizeof(TCpupUserDataF));        
118     ffn(pext,data);
119   }
120 }
121
122 TCharm::~TCharm() 
123 {
124   CthFree(tid);
125   delete initMsg;
126 }
127
128 //Register user data to be packed with the thread
129 int TCharm::add(const TCharm::UserData &d)
130 {
131   if (nUd>=maxUserData)
132     CkAbort("TCharm: Registered too many user data fields!\n");
133   int nu=nUd++;
134   ud[nu]=d;
135   return nu;
136 }
137 void *TCharm::lookupUserData(int i) {
138         if (i<0 || i>=nUd)
139                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
140         return ud[i].getData();
141 }
142
143 //Start the thread running
144 void TCharm::run(void)
145 {
146   DBG("TCharm::run()");
147   start();
148 }
149
150 //Block the thread until start()ed again.
151 void TCharm::stop(void)
152 {
153   if (isStopped) return; //Nothing to do
154 #ifndef CMK_OPTIMIZE
155   DBG("suspending thread");
156   if (tid != CthSelf())
157     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
158 #endif
159   isStopped=true;
160   stopTiming();
161   TCharm::setState(inFramework);
162   CthSuspend();
163   TCharm::setState(inDriver);
164   /*We have to do the get() because "this" may have changed
165     during a migration-suspend.*/
166   TCharm::get()->startTiming();
167 }
168
169 //Resume the waiting thread
170 void TCharm::start(void)
171 {
172   if (!isStopped) return; //Already started
173   isStopped=false;
174   TCharm::setState(inDriver);
175   DBG("awakening thread");
176   CthAwaken(tid);
177 }
178
179 //Go to sync, block, possibly migrate, and then resume
180 void TCharm::migrate(void)
181 {
182 #if CMK_LBDB_ON
183   DBG("going to sync");  
184   AtSync();
185   stop();
186 #else
187   DBG("skipping sync, because there is no load balancer");
188 #endif
189 }
190
191 //Resume from sync: start the thread again
192 void TCharm::ResumeFromSync(void)
193 {
194   start();
195 }
196
197 #ifndef CMK_OPTIMIZE
198 //Make sure we're actually in driver
199 void TCharm::check(void)
200 {
201         if (getState()!=inDriver)
202                 CkAbort("TCharm> Can only use that routine from within driver!\n");
203 }
204 #endif
205
206 static int propMapCreated=0;
207 static CkGroupID propMapID;
208 CkGroupID CkCreatePropMap(void);
209
210 static void TCharmBuildThreads(TCharmInitMsg *msg,TCharmSetupCookie &cook)
211 {
212         CkArrayOptions opts(msg->numElements);
213         if (!propMapCreated) {
214                 propMapCreated=1;
215                 propMapID=CkCreatePropMap();
216         }
217         opts.setMap(propMapID);
218         CkArrayID id=CProxy_TCharm::ckNew(msg,opts);
219         cook.setThreads(id,msg->numElements);
220 }
221
222 /************* Startup/Shutdown Coordination Support ************/
223
224 enum {TC_READY=23, TC_DONE=42};
225
226 //Called when a client is ready to run
227 void TCharm::ready(void) {
228         DBG("TCharm thread "<<thisIndex<<" ready")
229         int vals[2]={0,1};
230         if (thisIndex==0) vals[0]=TC_READY;
231         //Contribute to a synchronizing reduction
232         contribute(sizeof(vals),&vals,CkReduction::sum_int);
233 }
234
235 //Called when the thread is done running
236 void TCharm::done(void) {
237         DBG("TCharm thread "<<thisIndex<<" done")
238         int vals[2]={0,1};
239         if (thisIndex==0) vals[0]=TC_DONE;
240         //Contribute to a synchronizing reduction
241         contribute(sizeof(vals),&vals,CkReduction::sum_int);
242         stop();
243 }
244
245 //Called when an array reduction is complete
246 static void coordinatorReduction(void *coord_,int dataLen,void *reductionData)
247 {
248         TCharmCoordinator *coord=(TCharmCoordinator *)coord_;
249         int *vals=(int *)reductionData;
250         if (dataLen!=2*sizeof(int))
251                 CkAbort("Unexpected length in TCharm array reduction!\n");
252         DBGX("Finished coordinator reduction: "<<vals[0]<<", "<<vals[1]);
253         switch (vals[0]) {
254         case TC_READY: coord->clientReady(); break;
255         case TC_DONE: coord->clientDone(); break;
256         default:
257                 CkAbort("Unexpected value from TCharm array reduction!\n");
258         };
259 }
260
261 int TCharmCoordinator::nArrays=0; //Total number of running thread arrays
262 TCharmCoordinator *TCharmCoordinator::head=NULL; //List of coordinators
263
264
265 TCharmCoordinator::TCharmCoordinator(CkArrayID threads_,int nThreads_)
266         :threads(threads_), nThreads(nThreads_), nClients(0), nReady(0)
267 {
268         nArrays++;
269         //Link into the coordinator list
270         next=head;
271         head=this;
272
273         threads.setReductionClient(coordinatorReduction,this);
274         nClients=1; //Thread array itself is a client
275 }
276 TCharmCoordinator::~TCharmCoordinator()
277 {
278         //Coordinators never get deleted
279 }
280 void TCharmCoordinator::addClient(const CkArrayID &client)
281 {
282         nClients++;
283 }
284 void TCharmCoordinator::clientReady(void)
285 {
286         DBGX("client "<<nReady+1<<" of "<<nClients<<" ready");
287         nReady++;
288         if (nReady>=nClients) { //All client arrays are ready-- start threads
289                 DBGX("starting threads");
290                 threads.run();
291         }
292 }
293 void TCharmCoordinator::clientDone(void)
294 {
295         DBGX("clientDone");     
296         nArrays--;
297         if (nArrays<=0) { //All arrays have exited
298                 DBGX("done with computation");
299                 CkExit();
300         }
301 }
302
303 /************* Setup **************/
304
305 //Cookie used during setup
306 TCharmSetupCookie *TCharmSetupCookie::theCookie;
307
308 //Globals used to control setup process
309 static int g_numDefaultSetups=0;
310 static TCharmFallbackSetupFn g_fallbackSetup=NULL;
311 void TCharmSetFallbackSetup(TCharmFallbackSetupFn f)
312 {
313         g_fallbackSetup=f;
314 }
315 CDECL void TCharmInDefaultSetup(void) {
316         g_numDefaultSetups++;
317 }
318
319 //Tiny simple main chare
320 class TCharmMain : public Chare {
321 public:
322   TCharmMain(CkArgMsg *msg) {
323     TCharmSetupCookie cookie(msg->argv);
324     TCharmSetupCookie::theCookie=&cookie;
325     g_numDefaultSetups=0;
326     
327     /*Call user-overridable C setup*/
328     TCharmUserSetup();
329     /*Call user-overridable Fortran setup*/
330     FTN_NAME(TCHARM_USER_SETUP,tcharm_user_setup)();
331     
332     if (g_numDefaultSetups==2) 
333     { //User didn't override either setup routine
334             if (g_fallbackSetup)
335                     (g_fallbackSetup)();
336             else
337                     CmiAbort("You need to override TCharmUserSetup to start your computation, or else link in a framework module\n");
338     }       
339     
340     delete msg;
341     
342     if (0==TCharmCoordinator::getTotal())
343             CkAbort("You didn't create any TCharm arrays in TCharmUserSetup!\n");
344   }
345 };
346
347 #ifndef CMK_OPTIMIZE
348 /*The setup cookie, used to store global initialization state*/
349 TCharmSetupCookie &TCharmSetupCookie::check(void)
350 {
351         if (magic!=correctMagic)
352                 CkAbort("TCharm setup cookie is damaged!\n");
353         return *this;
354 }
355 #endif
356
357 void TCharmSetupCookie::setThreads(const CkArrayID &aid,int nel)
358 {
359         coord=new TCharmCoordinator(aid,nel);
360         tc=aid; numElements=nel;
361 }
362
363 TCharmSetupCookie::TCharmSetupCookie(char **argv_)
364 {
365         magic=correctMagic;
366         stackSize=0;
367         argv=argv_;
368         coord=NULL;
369 }
370
371
372 /************** User API ***************/
373
374 #define cookie (*TCharmSetupCookie::get())
375
376 /**********************************
377 Callable from UserSetup: 
378 */
379
380 /*Set the size of the thread stack*/
381 CDECL void TCharmSetStackSize(int newStackSize)
382 {
383         if (TCharm::getState()!=inInit)
384                 CkAbort("TCharm> Can only set stack size from in init!\n");
385         cookie.setStackSize(newStackSize);
386 }
387 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
388         (int *newSize)
389 { TCharmSetStackSize(*newSize); }
390
391
392 /*Create a new array of threads, which will be bound to by subsequent libraries*/
393 CDECL void TCharmCreate(int nThreads,
394                         TCharmThreadStartFn threadFn)
395 {
396         TCharmCreateData(nThreads,
397                          (TCharmThreadDataStartFn)threadFn,NULL,0);
398 }
399 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
400         (int *nThreads,TCharmThreadStartFn threadFn)
401 { TCharmCreate(*nThreads,threadFn); }
402
403
404 /*As above, but pass along (arbitrary) data to threads*/
405 CDECL void TCharmCreateData(int nThreads,
406                   TCharmThreadDataStartFn threadFn,
407                   void *threadData,int threadDataLen)
408 {
409         if (TCharm::getState()!=inInit)
410                 CkAbort("TCharm> Can only create threads from in init!\n");
411         TCharmSetupCookie &cook=cookie;
412         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
413                 (CthVoidFn)threadFn,cook.getStackSize());
414         msg->numElements=nThreads;
415         memcpy(msg->data,threadData,threadDataLen);
416         TCharmBuildThreads(msg,cook);
417 }
418
419 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
420         (int *nThreads,
421                   TCharmThreadDataStartFn threadFn,
422                   void *threadData,int *threadDataLen)
423 { TCharmCreateData(*nThreads,threadFn,threadData,*threadDataLen); }
424
425
426 /*Get the unconsumed command-line arguments*/
427 CDECL char **TCharmArgv(void)
428 {
429         if (TCharm::getState()!=inInit)
430                 CkAbort("TCharm> Can only get arguments from in init!\n");
431         return cookie.getArgv();
432 }
433 CDECL int TCharmArgc(void)
434 {
435         if (TCharm::getState()!=inInit)
436                 CkAbort("TCharm> Can only get arguments from in init!\n");
437         return CmiGetArgc(cookie.getArgv());
438 }
439
440 CDECL int TCharmGetNumChunks(void)
441 {
442         int nChunks=CkNumPes();
443         char **argv=TCharmArgv();
444         CmiGetArgInt(argv,"-vp",&nChunks);
445         CmiGetArgInt(argv,"+vp",&nChunks);
446         return nChunks;
447 }
448 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
449 {
450         return TCharmGetNumChunks();
451 }
452
453
454 /***********************************
455 Callable from worker thread
456 */
457 CDECL int TCharmElement(void)
458 { return TCharm::get()->getElement();}
459 CDECL int TCharmNumElements(void)
460 { return TCharm::get()->getNumElements();}
461
462 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
463 { return TCharmElement();}
464 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
465 { return TCharmNumElements();}
466
467 CDECL int TCharmRegister(void *data,TCharmPupFn pfn)
468
469         return TCharm::get()->add(TCharm::UserData(pfn,data));
470 }
471 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
472         (void *data,TCpupUserDataF pfn)
473
474         return TCharm::get()->add(TCharm::UserData(
475                 pfn,data,TCharm::UserData::isFortran()));
476 }
477
478 CDECL void *TCharmGetUserdata(int id)
479 {
480         return TCharm::get()->lookupUserData(id);
481 }
482 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
483 { return TCharmGetUserdata(*id); }
484
485 CDECL void TCharmMigrate(void)
486 {
487         TCharm::get()->migrate();
488 }
489 FDECL void FTN_NAME(TCHARM_MIGRATE,tcharm_migrate)(void)
490 {
491         TCharm::get()->migrate();
492 }
493
494 CDECL void TCharmDone(void)
495 {
496         TCharm::get()->done();
497 }
498 FDECL void FTN_NAME(TCHARM_DONE,tcharm_done)(void)
499 {
500         TCharmDone();
501 }
502
503
504
505 #include "tcharm.def.h"