Added an easy-to-use barrier call, which internally uses a
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm.h"
7
8 #if 0
9     /*Many debugging statements:*/
10 #    define DBG(x) ckout<<"["<<thisIndex<<"] TCHARM> "<<x<<endl;
11 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
12 #else
13     /*No debugging statements*/
14 #    define DBG(x) /*empty*/
15 #    define DBGX(x) /*empty*/
16 #endif
17
18 CtvDeclare(TCharm *,_curTCharm);
19 CpvDeclare(inState,_stateTCharm);
20
21 void TCharm::nodeInit(void)
22 {
23   CtvInitialize(TCharm *,_curTCharm);
24   CpvInitialize(inState,_stateTCharm);
25   TCharm::setState(inNodeSetup);
26   TCharmUserNodeSetup();
27   FTN_NAME(TCHARM_USER_NODE_SETUP,tcharm_user_node_setup)();
28   TCharm::setState(inInit);
29 }
30
31 static void startTCharmThread(TCharmInitMsg *msg)
32 {
33         TCharm::setState(inDriver);
34         typedef void (*threadFn_t)(void *);
35         ((threadFn_t)msg->threadFn)(msg->data);
36         CtvAccess(_curTCharm)->done();
37 }
38
39 TCharm::TCharm(TCharmInitMsg *initMsg_)
40 {
41   initMsg=initMsg_;
42   tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->stackSize);
43   CtvAccessOther(tid,_curTCharm)=this;
44   TCharm::setState(inInit);
45   isStopped=true;
46   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
47   threadInfo.thisElement=thisIndex;
48   threadInfo.numElements=initMsg->numElements;
49   nUd=0;
50   usesAtSync=CmiTrue;
51   ready();
52 }
53
54 TCharm::TCharm(CkMigrateMessage *msg)
55         :ArrayElement1D(msg)
56 {
57   initMsg=NULL;
58   tid=NULL;
59   threadInfo.tProxy=CProxy_TCharm(thisArrayID);  
60 }
61
62 void TCharm::pup(PUP::er &p) {
63 //Pup superclass
64   ArrayElement1D::pup(p);  
65
66   p(isStopped);
67   p(threadInfo.thisElement);
68   p(threadInfo.numElements);
69
70 #ifndef CMK_OPTIMIZE
71   DBG("Packing thread");
72   if (!isStopped)
73     CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
74 #endif
75
76 //Pup thread (EVIL & UGLY):
77   //This seekBlock allows us to reorder the packing/unpacking--
78   // This is needed because the userData depends on the thread's stack
79   // both at pack and unpack time.
80   PUP::seekBlock s(p,2);
81   if (p.isUnpacking()) 
82   {//In this case, unpack the thread before the user data
83     s.seek(1);
84     tid = CthPup((pup_er) &p, tid);
85     CtvAccessOther(tid,_curTCharm)=this;
86   }
87   
88   //Pack all user data
89   TCharm::setState(inPup);
90   s.seek(0);
91   p(nUd);
92   for(int i=0;i<nUd;i++) 
93     ud[i].pup(p);
94   TCharm::setState(inFramework);
95
96   if (!p.isUnpacking()) 
97   {//In this case, pack the thread after the user data
98     s.seek(1);
99     tid = CthPup((pup_er) &p, tid);
100   }
101   s.endBlock(); //End of seeking block
102 }
103
104 //Pup one group of user data
105 void TCharm::UserData::pup(PUP::er &p)
106 {
107   pup_er pext=(pup_er)(&p);
108   p(isC);
109   //Save address of userdata-- assumes user data is on the stack
110   p((void*)&data,sizeof(data));
111   if (isC) { //C version
112     //FIXME: function pointers may not be valid across processors
113     p((void*)&cfn, sizeof(TCpupUserDataC));
114     cfn(pext,data);
115   } 
116   else { //Fortran version
117     //FIXME: function pointers may not be valid across processors
118     p((void*)&ffn, sizeof(TCpupUserDataF));        
119     ffn(pext,data);
120   }
121 }
122
123 TCharm::~TCharm() 
124 {
125   CthFree(tid);
126   delete initMsg;
127 }
128
129 //Register user data to be packed with the thread
130 int TCharm::add(const TCharm::UserData &d)
131 {
132   if (nUd>=maxUserData)
133     CkAbort("TCharm: Registered too many user data fields!\n");
134   int nu=nUd++;
135   ud[nu]=d;
136   return nu;
137 }
138 void *TCharm::lookupUserData(int i) {
139         if (i<0 || i>=nUd)
140                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
141         return ud[i].getData();
142 }
143
144 //Start the thread running
145 void TCharm::run(void)
146 {
147   DBG("TCharm::run()");
148   start();
149 }
150
151 //Block the thread until start()ed again.
152 void TCharm::stop(void)
153 {
154   if (isStopped) return; //Nothing to do
155 #ifndef CMK_OPTIMIZE
156   DBG("suspending thread");
157   if (tid != CthSelf())
158     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
159 #endif
160   isStopped=true;
161   stopTiming();
162   TCharm::setState(inFramework);
163   CthSuspend();
164   TCharm::setState(inDriver);
165   /*We have to do the get() because "this" may have changed
166     during a migration-suspend.*/
167   TCharm::get()->startTiming();
168 }
169
170 //Resume the waiting thread
171 void TCharm::start(void)
172 {
173   if (!isStopped) return; //Already started
174   isStopped=false;
175   TCharm::setState(inDriver);
176   DBG("awakening thread");
177   CthAwaken(tid);
178 }
179
180 //Go to sync, block, possibly migrate, and then resume
181 void TCharm::migrate(void)
182 {
183 #if CMK_LBDB_ON
184   DBG("going to sync");  
185   AtSync();
186   stop();
187 #else
188   DBG("skipping sync, because there is no load balancer");
189 #endif
190 }
191
192 //Resume from sync: start the thread again
193 void TCharm::ResumeFromSync(void)
194 {
195   start();
196 }
197
198 #ifndef CMK_OPTIMIZE
199 //Make sure we're actually in driver
200 void TCharm::check(void)
201 {
202         if (getState()!=inDriver)
203                 CkAbort("TCharm> Can only use that routine from within driver!\n");
204 }
205 #endif
206
207 static int propMapCreated=0;
208 static CkGroupID propMapID;
209 CkGroupID CkCreatePropMap(void);
210
211 static void TCharmBuildThreads(TCharmInitMsg *msg,TCharmSetupCookie &cook)
212 {
213         CkArrayOptions opts(msg->numElements);
214         if (!propMapCreated) {
215                 propMapCreated=1;
216                 propMapID=CkCreatePropMap();
217         }
218         opts.setMap(propMapID);
219         int nElem=msg->numElements; //<- save it because msg will be deleted.
220         CkArrayID id=CProxy_TCharm::ckNew(msg,opts);
221         cook.setThreads(id,nElem);
222 }
223
224 /****** Readonlys *****/
225 CkVec<TCpupReadonlyGlobal> TCharmReadonlys::entries;
226 void TCharmReadonlys::add(TCpupReadonlyGlobal fn)
227 {
228         entries.push_back(fn);
229 }
230 //Pups all registered readonlys
231 void TCharmReadonlys::pup(PUP::er &p) {
232         if (p.isUnpacking()) {
233                 //HACK: Rather than sending this message only where its needed,
234                 // we send it everywhere and just ignore it if it's not needed.
235                 if (CkMyPe()==0) return; //Processor 0 is the source-- no unpacking needed
236                 if (CkMyRank()!=0) return; //Some other processor will do the unpacking
237         }
238         //Pup the globals for this node:
239         int i,n=entries.length();
240         p(n);
241         if (n!=entries.length())
242                 CkAbort("TCharmReadonly list length mismatch!\n");
243         for (i=0;i<n;i++)
244                 (entries[i])((pup_er)&p);
245 }
246
247 CDECL void TCharmReadonlyGlobals(TCpupReadonlyGlobal fn)
248 {
249         if (TCharm::getState()!=inNodeSetup)
250                 CkAbort("Can only call TCharmReadonlyGlobals from in TCharmUserNodeSetup!\n");
251         TCharmReadonlys::add(fn);
252 }
253 FDECL void FTN_NAME(TCHARM_READONLY_GLOBALS,tcharm_readonly_globals)
254         (TCpupReadonlyGlobal fn)
255 {
256         TCharmReadonlyGlobals(fn);
257 }
258
259 /************* Startup/Shutdown Coordination Support ************/
260
261 enum {TC_READY=23, TC_BARRIER=87, TC_DONE=42};
262
263 //Called when a client is ready to run
264 void TCharm::ready(void) {
265         DBG("TCharm thread "<<thisIndex<<" ready")
266         int vals[2]={0,1};
267         if (thisIndex==0) vals[0]=TC_READY;
268         //Contribute to a synchronizing reduction
269         contribute(sizeof(vals),&vals,CkReduction::sum_int);
270 }
271
272 //Called when we want to go to a barrier
273 void TCharm::barrier(void) {
274         int vals[2]={0,1};
275         if (thisIndex==0) vals[0]=TC_BARRIER;
276         //Contribute to a synchronizing reduction
277         contribute(sizeof(vals),&vals,CkReduction::sum_int);
278         stop();
279 }
280
281 //Called when the thread is done running
282 void TCharm::done(void) {
283         DBG("TCharm thread "<<thisIndex<<" done")
284         int vals[2]={0,1};
285         if (thisIndex==0) vals[0]=TC_DONE;
286         //Contribute to a synchronizing reduction
287         contribute(sizeof(vals),&vals,CkReduction::sum_int);
288         stop();
289 }
290
291 //Called when an array reduction is complete
292 static void coordinatorReduction(void *coord_,int dataLen,void *reductionData)
293 {
294         TCharmCoordinator *coord=(TCharmCoordinator *)coord_;
295         int *vals=(int *)reductionData;
296         if (dataLen!=2*sizeof(int))
297                 CkAbort("Unexpected length in TCharm array reduction!\n");
298         DBGX("Finished coordinator reduction: "<<vals[0]<<", "<<vals[1]);
299         switch (vals[0]) {
300         case TC_READY: coord->clientReady(); break;
301         case TC_BARRIER: coord->clientBarrier(); break;
302         case TC_DONE: coord->clientDone(); break;
303         default:
304                 CkAbort("Unexpected value from TCharm array reduction!\n");
305         };
306 }
307
308 int TCharmCoordinator::nArrays=0; //Total number of running thread arrays
309 TCharmCoordinator *TCharmCoordinator::head=NULL; //List of coordinators
310
311
312 TCharmCoordinator::TCharmCoordinator(CkArrayID threads_,int nThreads_)
313         :threads(threads_), nThreads(nThreads_), nClients(0), nReady(0)
314 {
315         nArrays++;
316         //Link into the coordinator list
317         next=head;
318         head=this;
319
320         threads.setReductionClient(coordinatorReduction,this);
321         nClients=1; //Thread array itself is a client
322 }
323 TCharmCoordinator::~TCharmCoordinator()
324 {
325         //Coordinators never get deleted
326 }
327 void TCharmCoordinator::addClient(const CkArrayID &client)
328 {
329         nClients++;
330 }
331 void TCharmCoordinator::clientReady(void)
332 {
333         DBGX("client "<<nReady+1<<" of "<<nClients<<" ready");
334         nReady++;
335         if (nReady>=nClients) { //All client arrays are ready-- start threads
336                 DBGX("starting threads");
337                 threads.run();
338         }
339 }
340 void TCharmCoordinator::clientBarrier(void)
341 {
342         DBGX("clients all at barrier");
343         threads.run();
344 }
345 void TCharmCoordinator::clientDone(void)
346 {
347         DBGX("clientDone");     
348         nArrays--;
349         if (nArrays<=0) { //All arrays have exited
350                 DBGX("done with computation");
351                 CkExit();
352         }
353 }
354
355 /************* Setup **************/
356
357 //Cookie used during setup
358 TCharmSetupCookie *TCharmSetupCookie::theCookie;
359
360 //Globals used to control setup process
361 static int g_numDefaultSetups=0;
362 static TCharmFallbackSetupFn g_fallbackSetup=NULL;
363 void TCharmSetFallbackSetup(TCharmFallbackSetupFn f)
364 {
365         g_fallbackSetup=f;
366 }
367 CDECL void TCharmInDefaultSetup(void) {
368         g_numDefaultSetups++;
369 }
370
371 //Tiny simple main chare
372 class TCharmMain : public Chare {
373 public:
374   TCharmMain(CkArgMsg *msg) {
375     TCharmSetupCookie cookie(msg->argv);
376     TCharmSetupCookie::theCookie=&cookie;
377     g_numDefaultSetups=0;
378     
379     /*Call user-overridable C setup*/
380     TCharmUserSetup();
381     /*Call user-overridable Fortran setup*/
382     FTN_NAME(TCHARM_USER_SETUP,tcharm_user_setup)();
383     
384     if (g_numDefaultSetups==2) 
385     { //User didn't override either setup routine
386             if (g_fallbackSetup)
387                     (g_fallbackSetup)();
388             else
389                     CmiAbort("You need to override TCharmUserSetup to start your computation, or else link in a framework module\n");
390     }       
391     
392     delete msg;
393     
394     if (0==TCharmCoordinator::getTotal())
395             CkAbort("You didn't create any TCharm arrays in TCharmUserSetup!\n");
396
397     //Send out the readonly globals:
398     TCharmReadonlys r;
399     CProxy_TCharmReadonlyGroup::ckNew(r);
400   }
401 };
402
403 #ifndef CMK_OPTIMIZE
404 /*The setup cookie, used to store global initialization state*/
405 TCharmSetupCookie &TCharmSetupCookie::check(void)
406 {
407         if (magic!=correctMagic)
408                 CkAbort("TCharm setup cookie is damaged!\n");
409         return *this;
410 }
411 #endif
412
413 void TCharmSetupCookie::setThreads(const CkArrayID &aid,int nel)
414 {
415         coord=new TCharmCoordinator(aid,nel);
416         tc=aid; numElements=nel;
417 }
418
419 TCharmSetupCookie::TCharmSetupCookie(char **argv_)
420 {
421         magic=correctMagic;
422         stackSize=0;
423         argv=argv_;
424         coord=NULL;
425 }
426
427
428 /************** User API ***************/
429
430 #define cookie (*TCharmSetupCookie::get())
431
432 /**********************************
433 Callable from UserSetup: 
434 */
435
436 /*Set the size of the thread stack*/
437 CDECL void TCharmSetStackSize(int newStackSize)
438 {
439         if (TCharm::getState()!=inInit)
440                 CkAbort("TCharm> Can only set stack size from in init!\n");
441         cookie.setStackSize(newStackSize);
442 }
443 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
444         (int *newSize)
445 { TCharmSetStackSize(*newSize); }
446
447
448 /*Create a new array of threads, which will be bound to by subsequent libraries*/
449 CDECL void TCharmCreate(int nThreads,
450                         TCharmThreadStartFn threadFn)
451 {
452         TCharmCreateData(nThreads,
453                          (TCharmThreadDataStartFn)threadFn,NULL,0);
454 }
455 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
456         (int *nThreads,TCharmThreadStartFn threadFn)
457 { TCharmCreate(*nThreads,threadFn); }
458
459
460 /*As above, but pass along (arbitrary) data to threads*/
461 CDECL void TCharmCreateData(int nThreads,
462                   TCharmThreadDataStartFn threadFn,
463                   void *threadData,int threadDataLen)
464 {
465         if (TCharm::getState()!=inInit)
466                 CkAbort("TCharm> Can only create threads from in init!\n");
467         TCharmSetupCookie &cook=cookie;
468         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
469                 (CthVoidFn)threadFn,cook.getStackSize());
470         msg->numElements=nThreads;
471         memcpy(msg->data,threadData,threadDataLen);
472         TCharmBuildThreads(msg,cook);
473 }
474
475 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
476         (int *nThreads,
477                   TCharmThreadDataStartFn threadFn,
478                   void *threadData,int *threadDataLen)
479 { TCharmCreateData(*nThreads,threadFn,threadData,*threadDataLen); }
480
481
482 /*Get the unconsumed command-line arguments*/
483 CDECL char **TCharmArgv(void)
484 {
485         if (TCharm::getState()!=inInit)
486                 CkAbort("TCharm> Can only get arguments from in init!\n");
487         return cookie.getArgv();
488 }
489 CDECL int TCharmArgc(void)
490 {
491         if (TCharm::getState()!=inInit)
492                 CkAbort("TCharm> Can only get arguments from in init!\n");
493         return CmiGetArgc(cookie.getArgv());
494 }
495
496 CDECL int TCharmGetNumChunks(void)
497 {
498         int nChunks=CkNumPes();
499         char **argv=TCharmArgv();
500         CmiGetArgInt(argv,"-vp",&nChunks);
501         CmiGetArgInt(argv,"+vp",&nChunks);
502         return nChunks;
503 }
504 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
505 {
506         return TCharmGetNumChunks();
507 }
508
509
510 /***********************************
511 Callable from worker thread
512 */
513 CDECL int TCharmElement(void)
514 { return TCharm::get()->getElement();}
515 CDECL int TCharmNumElements(void)
516 { return TCharm::get()->getNumElements();}
517
518 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
519 { return TCharmElement();}
520 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
521 { return TCharmNumElements();}
522
523 CDECL int TCharmRegister(void *data,TCharmPupFn pfn)
524
525         if (!CmiIsomallocInRange(data))
526                 CkAbort("The UserData you register must be allocated on the stack!\n");
527         return TCharm::get()->add(TCharm::UserData(pfn,data));
528 }
529 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
530         (void *data,TCpupUserDataF pfn)
531
532         if (!CmiIsomallocInRange(data))
533                 CkAbort("The UserData you register must be allocated on the stack!\n");
534         return TCharm::get()->add(TCharm::UserData(
535                 pfn,data,TCharm::UserData::isFortran()));
536 }
537
538 CDECL void *TCharmGetUserdata(int id)
539 {
540         return TCharm::get()->lookupUserData(id);
541 }
542 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
543 { return TCharmGetUserdata(*id); }
544
545 CDECL void TCharmMigrate(void)
546 {
547         TCharm::get()->migrate();
548 }
549 FDECL void FTN_NAME(TCHARM_MIGRATE,tcharm_migrate)(void)
550 {
551         TCharm::get()->migrate();
552 }
553
554 CDECL void TCharmBarrier(void)
555 {
556         TCharm::get()->barrier();
557 }
558 FDECL void FTN_NAME(TCHARM_BARRIER,tcharm_barrier)(void)
559 {
560         TCharmBarrier();
561 }
562
563 CDECL void TCharmDone(void)
564 {
565         TCharm::get()->done();
566 }
567 FDECL void FTN_NAME(TCHARM_DONE,tcharm_done)(void)
568 {
569         TCharmDone();
570 }
571
572
573
574 #include "tcharm.def.h"