Introducing support for message logging fault tolerance
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm_impl.h"
7 #include "tcharm.h"
8 #include <ctype.h>
9
10 #if 0 
11     /*Many debugging statements:*/
12 #    define DBG(x) ckout<<"["<<thisIndex<<","<<CkMyPe()<<"] TCHARM> "<<x<<endl;
13 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
14 #else
15     /*No debugging statements*/
16 #    define DBG(x) /*empty*/
17 #    define DBGX(x) /*empty*/
18 #endif
19
20 CtvDeclare(TCharm *,_curTCharm);
21
22 static int lastNumChunks=0;
23
24 class TCharmTraceLibList {
25         enum {maxLibs=20,maxLibNameLen=15};
26         //List of libraries we want to trace:
27         int curLibs;
28         char libNames[maxLibs][maxLibNameLen];
29         int checkIfTracing(const char *lib) const
30         {
31                 for (int i=0;i<curLibs;i++) 
32                         if (0==strcmp(lib,libNames[i]))
33                                 return 1;
34                 return 0;
35         }
36 public:
37         TCharmTraceLibList() {curLibs=0;}
38         void addTracing(const char *lib) 
39         { //We want to trace this library-- add its name to the list.
40                 CkPrintf("TCHARM> Will trace calls to library %s\n",lib);
41                 int i;
42                 for (i=0;0!=*lib;i++,lib++)
43                         libNames[curLibs][i]=tolower(*lib);
44                 libNames[curLibs][i]=0;
45                 // if already tracing, skip
46                 if (checkIfTracing(libNames[curLibs])) return;
47                 curLibs++;
48         }
49         inline int isTracing(const char *lib) const {
50                 if (curLibs==0) return 0; //Common case
51                 else return checkIfTracing(lib);
52         }
53 };
54 static TCharmTraceLibList tcharm_tracelibs;
55 static int tcharm_nomig=0, tcharm_nothreads=0;
56 static int tcharm_stacksize=1*1024*1024; /*Default stack size is 1MB*/
57 static int tcharm_initted=0;
58 CkpvDeclare(int, mapCreated);
59 static CkGroupID mapID;
60 static char* mapping = NULL;
61
62 void TCharm::nodeInit(void)
63 {
64 }
65
66 void TCharm::procInit(void)
67 {
68   CtvInitialize(TCharm *,_curTCharm);
69   CtvAccess(_curTCharm)=NULL;
70   tcharm_initted=1;
71   CtgInit();
72
73   CkpvInitialize(int, mapCreated);
74   CkpvAccess(mapCreated) = 0;
75
76   // called on every pe to eat these arguments
77   char **argv=CkGetArgv();
78   tcharm_nomig=CmiGetArgFlagDesc(argv,"+tcharm_nomig","Disable migration support (debugging)");
79   tcharm_nothreads=CmiGetArgFlagDesc(argv,"+tcharm_nothread","Disable thread support (debugging)");
80   tcharm_nothreads|=CmiGetArgFlagDesc(argv,"+tcharm_nothreads",NULL);
81   char *traceLibName=NULL;
82   while (CmiGetArgStringDesc(argv,"+tcharm_trace",&traceLibName,"Print each call to this library"))
83       tcharm_tracelibs.addTracing(traceLibName);
84   CmiGetArgIntDesc(argv,"+tcharm_stacksize",&tcharm_stacksize,"Set the thread stack size (default 1MB)");
85   if (CkMyPe()!=0) { //Processor 0 eats "+vp<N>" and "-vp<N>" later:
86         int ignored;
87         while (CmiGetArgIntDesc(argv,"-vp",&ignored,NULL)) {}
88         while (CmiGetArgIntDesc(argv,"+vp",&ignored,NULL)) {}
89   }
90   if (CkMyPe()==0) { // Echo various debugging options:
91     if (tcharm_nomig) CmiPrintf("TCHARM> Disabling migration support, for debugging\n");
92     if (tcharm_nothreads) CmiPrintf("TCHARM> Disabling thread support, for debugging\n");
93   }
94   if (CkpvAccess(mapCreated)==0) {
95     if (0!=CmiGetArgString(argv, "+mapping", &mapping)){
96     }
97     CkpvAccess(mapCreated)=1;
98   }
99 }
100
101 void TCHARM_Api_trace(const char *routineName,const char *libraryName)
102 {
103         if (!tcharm_tracelibs.isTracing(libraryName)) return;
104         TCharm *tc=CtvAccess(_curTCharm);
105         char where[100];
106         if (tc==NULL) sprintf(where,"[serial context on %d]",CkMyPe());
107         else sprintf(where,"[%p> vp %d, p %d]",(void *)tc,tc->getElement(),CkMyPe());
108         CmiPrintf("%s Called routine %s\n",where,routineName);
109         CmiPrintStackTrace(1);
110         CmiPrintf("\n");
111 }
112
113 // register thread start functions to get a function handler
114 // this is portable across heterogeneous platforms, or on machines with
115 // random stack/function pointer
116
117 static CkVec<TCHARM_Thread_data_start_fn> threadFnTable;
118
119 int TCHARM_Register_thread_function(TCHARM_Thread_data_start_fn fn)
120 {
121   int idx = threadFnTable.size();
122   threadFnTable.push_back(fn);
123   return idx+1;                     // make 0 invalid number
124 }
125
126 TCHARM_Thread_data_start_fn getTCharmThreadFunction(int idx)
127 {
128   CmiAssert(idx > 0);
129   return threadFnTable[idx-1];
130 }
131
132 static void startTCharmThread(TCharmInitMsg *msg)
133 {
134         DBGX("thread started");
135         TCharm::activateThread();
136        TCHARM_Thread_data_start_fn threadFn = getTCharmThreadFunction(msg->threadFn);
137         threadFn(msg->data);
138         TCharm::deactivateThread();
139         CtvAccess(_curTCharm)->done();
140 }
141
142 TCharm::TCharm(TCharmInitMsg *initMsg_)
143 {
144   initMsg=initMsg_;
145   initMsg->opts.sanityCheck();
146   timeOffset=0.0;
147   threadGlobals=CtgCreate();
148   if (tcharm_nothreads)
149   { //Don't even make a new thread-- just use main thread
150     tid=CthSelf();
151   }
152   else /*Create a thread normally*/
153   {
154     if (tcharm_nomig) { /*Nonmigratable version, for debugging*/
155       tid=CthCreate((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
156     } else {
157       tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
158     }
159 #if CMK_BLUEGENE_CHARM
160     BgAttach(tid);
161     BgUnsetStartOutOfCore();
162 #endif
163   }
164   CtvAccessOther(tid,_curTCharm)=this;
165   isStopped=true;
166   resumeAfterMigration=false;
167         /* FAULT_EVAC*/
168         AsyncEvacuate(CmiTrue);
169   skipResume=false;
170   exitWhenDone=initMsg->opts.exitWhenDone;
171   isSelfDone = false;
172   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
173   threadInfo.thisElement=thisIndex;
174   threadInfo.numElements=initMsg->numElements;
175   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
176         heapBlocks=CmiIsomallocBlockListNew();
177   else
178         heapBlocks=0;
179   nUd=0;
180   usesAtSync=CmiTrue;
181   run();
182 }
183
184 TCharm::TCharm(CkMigrateMessage *msg)
185         :CBase_TCharm(msg)
186 {
187   initMsg=NULL;
188   tid=NULL;
189   threadGlobals=NULL;
190   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
191         AsyncEvacuate(CmiTrue);
192   heapBlocks=0;
193 }
194
195 void checkPupMismatch(PUP::er &p,int expected,const char *where)
196 {
197         int v=expected;
198         p|v;
199         if (v!=expected) {
200                 CkError("FATAL ERROR> Mismatch %s pup routine\n",where);
201                 CkAbort("FATAL ERROR: Pup direction mismatch");
202         }
203 }
204
205 void TCharm::pup(PUP::er &p) {
206 //Pup superclass
207   ArrayElement1D::pup(p);
208
209   //BIGSIM_OOC DEBUGGING
210   //if(!p.isUnpacking()){
211   //  CmiPrintf("TCharm[%d] packing: ", thisIndex);
212   //  CthPrintThdStack(tid);
213   //}
214
215   checkPupMismatch(p,5134,"before TCHARM");
216 #ifdef _FAULT_MLOG_
217     if(!isStopped){
218 //      resumeAfterMigration = true;
219     }
220     isStopped = true;
221 #endif
222   p(isStopped); p(resumeAfterMigration); p(exitWhenDone); p(isSelfDone); p(skipResume);
223   p(threadInfo.thisElement);
224   p(threadInfo.numElements);
225   
226   if (sema.size()>0){
227         CkAbort("TCharm::pup> Cannot migrate with unconsumed semaphores!\n");
228   }
229
230 #ifndef CMK_OPTIMIZE
231   DBG("Packing thread");
232   if (!isStopped && !CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
233     if(BgOutOfCoreFlag==0) //not doing out-of-core scheduling
234         CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
235   }     
236   if (tcharm_nomig) CkAbort("Cannot migrate with the +tcharm_nomig option!\n");
237 #endif
238
239   //This seekBlock allows us to reorder the packing/unpacking--
240   // This is needed because the userData depends on the thread's stack
241   // and heap data both at pack and unpack time.
242   PUP::seekBlock s(p,2);
243   
244   if (p.isUnpacking())
245   {//In this case, unpack the thread & heap before the user data
246     s.seek(1);
247     pupThread(p);
248     //Restart our clock: set it up so packTime==CkWallTimer+timeOffset
249     double packTime;
250     p(packTime);
251     timeOffset=packTime-CkWallTimer();
252   }
253   
254 //Pack all user data
255   // Set up TCHARM context for use during user's pup routines:
256   CtvAccess(_curTCharm)=this;
257   activateThread();
258
259   s.seek(0);
260   checkPupMismatch(p,5135,"before TCHARM user data");
261   p(nUd);
262   for(int i=0;i<nUd;i++) {
263     if (p.isUnpacking()) ud[i].update(tid);
264     ud[i].pup(p);
265   }
266   checkPupMismatch(p,5137,"after TCHARM_Register user data");
267
268   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
269     deactivateThread();
270   p|sud;           //  sud vector block can not be in isomalloc
271   checkPupMismatch(p,5138,"after TCHARM_Global user data");
272   
273   // Tear down TCHARM context after calling user pup routines
274   if (!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
275     deactivateThread();
276   CtvAccess(_curTCharm)=NULL;
277   
278   if (!p.isUnpacking())
279   {//In this case, pack the thread & heap after the user data
280     s.seek(1);
281     pupThread(p);
282     //Stop our clock:
283     double packTime=CkWallTimer()+timeOffset;
284     p(packTime);
285   }
286   
287   s.endBlock(); //End of seeking block
288   checkPupMismatch(p,5140,"after TCHARM");
289   
290   //BIGSIM_OOC DEBUGGING
291   //if(p.isUnpacking()){
292   //  CmiPrintf("TCharm[%d] unpacking: ", thisIndex);
293   //  CthPrintThdStack(tid);
294   //}
295
296 }
297
298 // Pup our thread and related data
299 void TCharm::pupThread(PUP::er &pc) {
300     pup_er p=(pup_er)&pc;
301     checkPupMismatch(pc,5138,"before TCHARM thread");
302     tid = CthPup(p, tid);
303     if (pc.isUnpacking()) {
304       CtvAccessOther(tid,_curTCharm)=this;
305 #if CMK_BLUEGENE_CHARM
306       BgAttach(tid);
307 #endif
308     }
309     if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
310       CmiIsomallocBlockListPup(p,&heapBlocks);
311     threadGlobals=CtgPup(p,threadGlobals);
312     checkPupMismatch(pc,5139,"after TCHARM thread");
313 }
314
315 //Pup one group of user data
316 void TCharm::UserData::pup(PUP::er &p)
317 {
318   pup_er pext=(pup_er)(&p);
319   p(mode);
320   switch(mode) {
321   case 'c': { /* C mode: userdata is on the stack, so keep address */
322 //     p((char*)&data,sizeof(data));
323      p(pos);
324      //FIXME: function pointers may not be valid across processors
325      p((char*)&cfn, sizeof(TCHARM_Pup_fn));
326      char *data = CthPointer(t, pos);
327      if (cfn) cfn(pext,data);
328      } break;
329   case 'g': { /* Global mode: zero out userdata on arrival */
330      if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
331      {
332         // keep the pointer value if using isomalloc, no need to use pup
333        p(pos);
334      }
335      else if (p.isUnpacking())      //  zero out userdata on arrival
336        pos=0;
337
338        //FIXME: function pointers may not be valid across processors
339      p((char*)&gfn, sizeof(TCHARM_Pup_global_fn));
340      if (gfn) gfn(pext);
341      } break;
342   default:
343      break;
344   };
345 }
346
347 TCharm::~TCharm()
348 {
349   //BIGSIM_OOC DEBUGGING
350   //CmiPrintf("TCharm destructor called with heapBlocks=%p!\n", heapBlocks);
351   
352   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
353   CthFree(tid);
354   CtgFree(threadGlobals);
355   delete initMsg;
356 }
357
358 void TCharm::migrateTo(int destPE) {
359         if (destPE==CkMyPe()) return;
360         // Make sure migrateMe gets called *after* we suspend:
361         thisProxy[thisIndex].migrateDelayed(destPE);
362 //      resumeAfterMigration=true;
363         suspend();
364 }
365 void TCharm::migrateDelayed(int destPE) {
366         migrateMe(destPE);
367 }
368 void TCharm::ckJustMigrated(void) {
369         ArrayElement::ckJustMigrated();
370 #ifdef _FAULT_MLOG_
371 //  resumeAfterMigration = true;
372 #endif
373         if (resumeAfterMigration) {
374                 resumeAfterMigration=false;
375                 resume(); //Start the thread running
376         }
377 }
378
379 void TCharm::ckJustRestored(void) {
380         //CkPrintf("call just restored from TCharm[%d]\n", thisIndex);
381         ArrayElement::ckJustRestored();
382         //if (resumeAfterMigration) {
383         //      resumeAfterMigration=false;
384         //      resume(); //Start the thread running
385         //}
386 }
387
388 /*
389         FAULT_EVAC
390
391         If a Tcharm object is about to migrate it should be suspended first
392 */
393 void TCharm::ckAboutToMigrate(void){
394         ArrayElement::ckAboutToMigrate();
395         resumeAfterMigration = true;
396         isStopped = true;
397 //      suspend();
398 }
399
400 // clear the data before restarting from disk
401 void TCharm::clear()
402 {
403   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
404   CthFree(tid);
405   delete initMsg;
406 }
407
408 //Register user data to be packed with the thread
409 int TCharm::add(const TCharm::UserData &d)
410 {
411   if (nUd>=maxUserData)
412     CkAbort("TCharm: Registered too many user data fields!\n");
413   int nu=nUd++;
414   ud[nu]=d;
415   return nu;
416 }
417 void *TCharm::lookupUserData(int i) {
418         if (i<0 || i>=nUd)
419                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
420         return ud[i].getData();
421 }
422
423 //Start the thread running
424 void TCharm::run(void)
425 {
426   DBG("TCharm::run()");
427   if (tcharm_nothreads) {/*Call user routine directly*/
428           startTCharmThread(initMsg);
429   } 
430   else /* start the thread as usual */
431           start();
432 }
433
434 //Block the thread until start()ed again.
435 void TCharm::stop(void)
436 {
437 #ifndef CMK_OPTIMIZE
438   if (tid != CthSelf())
439     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
440   if (tcharm_nothreads)
441     CkAbort("Cannot make blocking calls using +tcharm_nothreads!\n");
442 #endif
443   stopTiming();
444   isStopped=true;
445   DBG("thread suspended");
446
447   CthSuspend();
448 //      DBG("thread resumed");
449   /*SUBTLE: We have to do the get() because "this" may have changed
450     during a migration-suspend.  If you access *any* members
451     from this point onward, you'll cause heap corruption if
452     we're resuming from migration!  (OSL 2003/9/23)
453    */
454   TCharm *dis=TCharm::get();
455 #ifdef _FAULT_MLOG_ 
456 /*  CpvAccess(_currentObj) = dis;
457  *      printf("[%d] _currentObject set to TCharm index %d %p\n",CkMyPe(),dis->thisIndex,dis);*/
458 #endif
459   dis->isStopped=false;
460   dis->startTiming();
461   //CkPrintf("[%d] Thread resumed  for tid %p\n",dis->thisIndex,dis->tid);
462 }
463
464 //Resume the waiting thread
465 void TCharm::start(void)
466 {
467   //  since this thread is scheduled, it is not a good idea to migrate 
468   isStopped=false;
469   DBG("thread resuming soon");
470   //CkPrintf("TCharm[%d]::start()\n", thisIndex);
471   //CmiPrintStackTrace(0);
472 #ifdef _FAULT_MLOG_
473 //CthAwakenPrio(tid, CQS_QUEUEING_BFIFO, 1, &prio);
474   CthAwaken(tid);
475 #else
476   CthAwaken(tid);
477 #endif
478 }
479
480 //Block our thread, schedule, and come back:
481 void TCharm::schedule(void) {
482   DBG("thread schedule");
483   start(); // Calls CthAwaken
484   stop(); // Calls CthSuspend
485 }
486
487 //Go to sync, block, possibly migrate, and then resume
488 void TCharm::migrate(void)
489 {
490 #if CMK_LBDB_ON
491   DBG("going to sync");
492   AtSync();
493   stop();
494 #else
495   DBG("skipping sync, because there is no load balancer");
496 #endif
497 }
498
499
500 void TCharm::evacuate(){
501         /*
502                 FAULT_EVAC
503         */
504         //CkClearAllArrayElementsCPP();
505         if(CpvAccess(startedEvac)){
506                 int nextPE = getNextPE(CkArrayIndex1D(thisIndex));
507 //              resumeAfterMigration=true;
508                 CcdCallFnAfter((CcdVoidFn)CkEmmigrateElement, (void *)myRec, 1);
509                 suspend();
510                 return;
511         }
512         return;
513
514 }
515
516 //calls atsync with async mode
517 void TCharm::async_migrate(void)
518 {
519 #if CMK_LBDB_ON
520   DBG("going to sync at async mode");
521   skipResume = true;            // we resume immediately
522   ReadyMigrate(false);
523   AtSync(0);
524   schedule();
525 //  allow_migrate();
526 #else
527   DBG("skipping sync, because there is no load balancer");
528 #endif
529 }
530
531 /*
532 Note:
533  thread can only migrate at the point when this is called
534 */
535 void TCharm::allow_migrate(void)
536 {
537 #if CMK_LBDB_ON
538 //  ReadyMigrate(true);
539   int nextPe = MigrateToPe();
540   if (nextPe != -1) {
541     migrateTo(nextPe);
542   }
543 #else
544   DBG("skipping sync, because there is no load balancer");
545 #endif
546 }
547
548 //Resume from sync: start the thread again
549 void TCharm::ResumeFromSync(void)
550 {
551   //if(isSelfDone) return;
552   //if (exitWhenDone) return; //for bigsim ooc execution
553   if (!skipResume) start();
554 }
555
556
557 /****** TcharmClient ******/
558 void TCharmClient1D::ckJustMigrated(void) {
559   ArrayElement1D::ckJustMigrated();
560   findThread();
561   tcharmClientInit();
562 }
563
564 void TCharmClient1D::pup(PUP::er &p) {
565   ArrayElement1D::pup(p);
566   p|threadProxy;
567 }
568
569 CkArrayID TCHARM_Get_threads(void) {
570         TCHARMAPI("TCHARM_Get_threads");
571         return TCharm::get()->getProxy();
572 }
573
574 /************* Startup/Shutdown Coordination Support ************/
575
576 // Useless values to reduce over:
577 int vals[2]={0,1};
578
579 //Called when we want to go to a barrier
580 void TCharm::barrier(void) {
581         //Contribute to a synchronizing reduction
582         CkCallback cb(index_t::atBarrier(0), thisProxy[0]);
583         contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
584 #if CMK_BLUEGENE_CHARM
585         void *curLog;           // store current log in timeline
586         _TRACE_BG_TLINE_END(&curLog);
587         TRACE_BG_AMPI_BREAK(NULL, "TCharm_Barrier_START", NULL, 0);
588 #endif
589         stop();
590 #if CMK_BLUEGENE_CHARM
591          _TRACE_BG_SET_INFO(NULL, "TCHARM_Barrier_END",  &curLog, 1);
592 #endif
593 }
594
595 //Called when we've reached the barrier
596 void TCharm::atBarrier(CkReductionMsg *m) {
597         DBGX("clients all at barrier");
598         delete m;
599         thisProxy.start(); //Just restart everybody
600 }
601
602 //Called when the thread is done running
603 void TCharm::done(void) {
604         //CmiPrintStackTrace(0);
605         DBG("TCharm thread "<<thisIndex<<" done")
606         if (exitWhenDone) {
607                 //Contribute to a synchronizing reduction
608                 CkCallback cb(index_t::atExit(0), thisProxy[0]);
609                 contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
610         }
611         isSelfDone = true;
612         stop();
613 }
614 //Called when all threads are done running
615 void TCharm::atExit(CkReductionMsg *m) {
616         DBGX("TCharm::atExit1> exiting");
617         delete m;
618         //thisProxy.unsetFlags();
619         CkExit();
620         //CkPrintf("After CkExit()!!!!!!!\n");
621 }
622
623 /************* Setup **************/
624
625 //Globals used to control setup process
626 static TCHARM_Fallback_setup_fn g_fallbackSetup=NULL;
627 void TCHARM_Set_fallback_setup(TCHARM_Fallback_setup_fn f)
628 {
629         g_fallbackSetup=f;
630 }
631 void TCHARM_Call_fallback_setup(void) {
632         if (g_fallbackSetup) 
633                 (g_fallbackSetup)();
634         else
635                 CkAbort("TCHARM: Unexpected fallback setup--missing TCHARM_User_setup routine?");
636 }
637
638 /************** User API ***************/
639 /**********************************
640 Callable from UserSetup:
641 */
642
643 // Read the command line to figure out how many threads to create:
644 CDECL int TCHARM_Get_num_chunks(void)
645 {
646         TCHARMAPI("TCHARM_Get_num_chunks");
647         if (CkMyPe()!=0) CkAbort("TCHARM_Get_num_chunks should only be called on PE 0 during setup!");
648         int nChunks=CkNumPes();
649         char **argv=CkGetArgv();
650         CmiGetArgIntDesc(argv,"-vp",&nChunks,"Set the total number of virtual processors");
651         CmiGetArgIntDesc(argv,"+vp",&nChunks,NULL);
652         lastNumChunks=nChunks;
653         return nChunks;
654 }
655 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
656 {
657         return TCHARM_Get_num_chunks();
658 }
659
660 // Fill out the default thread options:
661 TCHARM_Thread_options::TCHARM_Thread_options(int doDefault)
662 {
663         stackSize=0; /* default stacksize */
664         exitWhenDone=0; /* don't exit when done by default. */
665 }
666 void TCHARM_Thread_options::sanityCheck(void) {
667         if (stackSize<=0) stackSize=tcharm_stacksize;
668 }
669
670
671 TCHARM_Thread_options g_tcharmOptions(1);
672
673 /*Set the size of the thread stack*/
674 CDECL void TCHARM_Set_stack_size(int newStackSize)
675 {
676         TCHARMAPI("TCHARM_Set_stack_size");
677         g_tcharmOptions.stackSize=newStackSize;
678 }
679 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
680         (int *newSize)
681 { TCHARM_Set_stack_size(*newSize); }
682
683 CDECL void TCHARM_Set_exit(void) { g_tcharmOptions.exitWhenDone=1; }
684
685 /*Create a new array of threads, which will be bound to by subsequent libraries*/
686 CDECL void TCHARM_Create(int nThreads,
687                         int threadFn)
688 {
689         TCHARMAPI("TCHARM_Create");
690         TCHARM_Create_data(nThreads,
691                          threadFn,NULL,0);
692 }
693 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
694         (int *nThreads,int threadFn)
695 { TCHARM_Create(*nThreads,threadFn); }
696
697 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg);
698
699 /*As above, but pass along (arbitrary) data to threads*/
700 CDECL void TCHARM_Create_data(int nThreads,
701                   int threadFn,
702                   void *threadData,int threadDataLen)
703 {
704         TCHARMAPI("TCHARM_Create_data");
705         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
706                 threadFn,g_tcharmOptions);
707         msg->numElements=nThreads;
708         memcpy(msg->data,threadData,threadDataLen);
709         TCHARM_Build_threads(msg);
710         
711         // Reset the thread options:
712         g_tcharmOptions=TCHARM_Thread_options(1);
713 }
714
715 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
716         (int *nThreads,
717                   int threadFn,
718                   void *threadData,int *threadDataLen)
719 { TCHARM_Create_data(*nThreads,threadFn,threadData,*threadDataLen); }
720
721 CkGroupID CkCreatePropMap(void);
722
723 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg)
724 {
725   char *tmp;
726   char **argv=CkGetArgv();
727   CkArrayOptions opts(msg->numElements);
728   CkAssert(CkpvAccess(mapCreated)==1);
729
730   if(haveConfigurableRRMap()){
731     CkPrintf("USING ConfigurableRRMap\n");
732     mapID=CProxy_ConfigurableRRMap::ckNew();
733   } else if(mapping==NULL){
734 #if CMK_BLUEGENE_CHARM
735     mapID=CProxy_BlockMap::ckNew();
736 #else
737     mapID=CkCreatePropMap();
738 #endif
739   }else if(0==strcmp(mapping,"BLOCK_MAP")){
740     CkPrintf("USING BLOCK_MAP\n");
741     mapID=CProxy_BlockMap::ckNew();
742   }else if(0==strcmp(mapping,"RR_MAP")){
743     CkPrintf("USING RR_MAP\n");
744     mapID=CProxy_RRMap::ckNew();
745   }else{  // "PROP_MAP" or anything else
746     mapID=CkCreatePropMap();
747   }
748   opts.setMap(mapID);
749   int nElem=msg->numElements; //<- save it because msg will be deleted.
750   return CProxy_TCharm::ckNew(msg,opts);
751 }
752
753 // Helper used when creating a new array bound to the TCHARM threads:
754 CkArrayOptions TCHARM_Attach_start(CkArrayID *retTCharmArray,int *retNumElts)
755 {
756         TCharm *tc=TCharm::get();
757         if (!tc)
758                 CkAbort("You must call TCHARM initialization routines from a TCHARM thread!");
759         int nElts=tc->getNumElements();
760       
761         //CmiPrintf("TCHARM Elements = %d\n", nElts);  
762       
763         if (retNumElts!=NULL) *retNumElts=nElts;
764         *retTCharmArray=tc->getProxy();
765         CkArrayOptions opts(nElts);
766         opts.bindTo(tc->getProxy());
767         return opts;
768 }
769
770 void TCHARM_Suspend(void) {
771         TCharm *tc=TCharm::get();
772         tc->suspend();
773 }
774
775 /***********************************
776 Callable from worker thread
777 */
778 CDECL int TCHARM_Element(void)
779
780         TCHARMAPI("TCHARM_Element");
781         return TCharm::get()->getElement();
782 }
783 CDECL int TCHARM_Num_elements(void)
784
785         TCHARMAPI("TCHARM_Num_elements");
786         return TCharm::get()->getNumElements();
787 }
788
789 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
790 { return TCHARM_Element();}
791 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
792 { return TCHARM_Num_elements();}
793
794 //Make sure this address will migrate with us when we move:
795 static void checkAddress(void *data)
796 {
797         if (tcharm_nomig||tcharm_nothreads) return; //Stack is not isomalloc'd
798         if (CmiThreadIs(CMI_THREAD_IS_ALIAS)||CmiThreadIs(CMI_THREAD_IS_STACKCOPY)) return; // memory alias thread
799         if (CmiIsomallocEnabled()) {
800           if (!CmiIsomallocInRange(data))
801             CkAbort("The UserData you register must be allocated on the stack!\n");
802         }
803         else 
804           CkPrintf("Warning> checkAddress failed because isomalloc not supported.\n");
805 }
806
807 /* Old "register"-based userdata: */
808 CDECL int TCHARM_Register(void *data,TCHARM_Pup_fn pfn)
809
810         TCHARMAPI("TCHARM_Register");
811         checkAddress(data);
812         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
813 }
814 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
815         (void *data,TCHARM_Pup_fn pfn)
816
817         TCHARMAPI("TCHARM_Register");
818         checkAddress(data);
819         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
820 }
821
822 CDECL void *TCHARM_Get_userdata(int id)
823 {
824         TCHARMAPI("TCHARM_Get_userdata");
825         return TCharm::get()->lookupUserData(id);
826 }
827 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
828 { return TCHARM_Get_userdata(*id); }
829
830 /* New hardcoded-ID userdata: */
831 CDECL void TCHARM_Set_global(int globalID,void *new_value,TCHARM_Pup_global_fn pup_or_NULL)
832 {
833         TCHARMAPI("TCHARM_Set_global");
834         TCharm *tc=TCharm::get();
835         if (tc->sud.length()<=globalID)
836         { //We don't have room for this ID yet: make room
837                 int newLen=2*globalID;
838                 tc->sud.resize(newLen);
839         }
840         tc->sud[globalID]=TCharm::UserData(pup_or_NULL,tc->getThread(),new_value);
841 }
842 CDECL void *TCHARM_Get_global(int globalID)
843 {
844         //Skip TCHARMAPI("TCHARM_Get_global") because there's no dynamic allocation here,
845         // and this routine should be as fast as possible.
846         CkVec<TCharm::UserData> &v=TCharm::get()->sud;
847         if (v.length()<=globalID) return NULL; //Uninitialized global
848         return v[globalID].getData();
849 }
850
851 CDECL void TCHARM_Migrate(void)
852 {
853         TCHARMAPI("TCHARM_Migrate");
854         if (CthMigratable() == 0) {
855           CkPrintf("Warning> thread migration is not supported!\n");
856           return;
857         }
858         TCharm::get()->migrate();
859 }
860 FORTRAN_AS_C(TCHARM_MIGRATE,TCHARM_Migrate,tcharm_migrate,(void),())
861
862 CDECL void TCHARM_Async_Migrate(void)
863 {
864         TCHARMAPI("TCHARM_Async_Migrate");
865         TCharm::get()->async_migrate();
866 }
867 FORTRAN_AS_C(TCHARM_ASYNC_MIGRATE,TCHARM_Async_Migrate,tcharm_async_migrate,(void),())
868
869 CDECL void TCHARM_Allow_Migrate(void)
870 {
871         TCHARMAPI("TCHARM_Allow_Migrate");
872         TCharm::get()->allow_migrate();
873 }
874 FORTRAN_AS_C(TCHARM_ALLOW_MIGRATE,TCHARM_Allow_Migrate,tcharm_allow_migrate,(void),())
875
876 CDECL void TCHARM_Migrate_to(int destPE)
877 {
878         TCHARMAPI("TCHARM_Migrate_to");
879         TCharm::get()->migrateTo(destPE);
880 }
881
882 CDECL void TCHARM_Evacuate()
883 {
884         TCHARMAPI("TCHARM_Migrate_to");
885         TCharm::get()->evacuate();
886 }
887
888 FORTRAN_AS_C(TCHARM_MIGRATE_TO,TCHARM_Migrate_to,tcharm_migrate_to,
889         (int *destPE),(*destPE))
890
891 CDECL void TCHARM_Yield(void)
892 {
893         TCHARMAPI("TCHARM_Yield");
894         TCharm::get()->schedule();
895 }
896 FORTRAN_AS_C(TCHARM_YIELD,TCHARM_Yield,tcharm_yield,(void),())
897
898 CDECL void TCHARM_Barrier(void)
899 {
900         TCHARMAPI("TCHARM_Barrier");
901         TCharm::get()->barrier();
902 }
903 FORTRAN_AS_C(TCHARM_BARRIER,TCHARM_Barrier,tcharm_barrier,(void),())
904
905 CDECL void TCHARM_Done(void)
906 {
907         TCHARMAPI("TCHARM_Done");
908         TCharm *c=TCharm::getNULL();
909         if (!c) CkExit();
910         else c->done();
911 }
912 FORTRAN_AS_C(TCHARM_DONE,TCHARM_Done,tcharm_done,(void),())
913
914
915 CDECL double TCHARM_Wall_timer(void)
916 {
917   TCHARMAPI("TCHARM_Wall_timer");
918   TCharm *c=TCharm::getNULL();
919   if(!c) return CkWallTimer();
920   else { //Have to apply current thread's time offset
921     return CkWallTimer()+c->getTimeOffset();
922   }
923 }
924
925 #if 1
926 /*Include Fortran-style "iargc" and "getarg" routines.
927 These are needed to get access to the command-line arguments from Fortran.
928 */
929 FDECL int FTN_NAME(TCHARM_IARGC,tcharm_iargc)(void) {
930   TCHARMAPI("tcharm_iargc");
931   return CkGetArgc()-1;
932 }
933
934 FDECL void FTN_NAME(TCHARM_GETARG,tcharm_getarg)
935         (int *i_p,char *dest,int destLen)
936 {
937   TCHARMAPI("tcharm_getarg");
938   int i=*i_p;
939   if (i<0) CkAbort("tcharm_getarg called with negative argument!");
940   if (i>=CkGetArgc()) CkAbort("tcharm_getarg called with argument > iargc!");
941   const char *src=CkGetArgv()[i];
942   strcpy(dest,src);
943   for (i=strlen(dest);i<destLen;i++) dest[i]=' ';
944 }
945
946 #endif
947
948 //These silly routines are used for serial startup:
949 extern void _initCharm(int argc, char **argv);
950 CDECL void TCHARM_Init(int *argc,char ***argv) {
951         if (!tcharm_initted) {
952           ConverseInit(*argc, *argv, (CmiStartFn) _initCharm,1,1);
953           _initCharm(*argc,*argv);
954         }
955 }
956
957 FDECL void FTN_NAME(TCHARM_INIT,tcharm_init)(void)
958 {
959         int argc=1;
960         const char *argv_sto[2]={"foo",NULL};
961         char **argv=(char **)argv_sto;
962         TCHARM_Init(&argc,&argv);
963 }
964
965 /***********************************
966 * TCHARM Semaphores:
967 * The idea is one side "puts", the other side "gets"; 
968 * but the calls can come in any order--
969 * if the "get" comes first, it blocks until the put.
970 * This makes a convenient, race-condition-free way to do
971 * onetime initializations.  
972 */
973 /// Find this semaphore, or insert if there isn't one:
974 TCharm::TCharmSemaphore *TCharm::findSema(int id) {
975         for (int s=0;s<sema.size();s++)
976                 if (sema[s].id==id) 
977                         return &sema[s];
978         sema.push_back(TCharmSemaphore(id));
979         return &sema[sema.size()-1];
980 }
981 /// Remove this semaphore from the list
982 void TCharm::freeSema(TCharmSemaphore *doomed) {
983         int id=doomed->id;
984         for (int s=0;s<sema.size();s++)
985                 if (sema[s].id==id) {
986                         sema[s]=sema[sema.length()-1];
987                         sema.length()--;
988                         return;
989                 }
990         CkAbort("Tried to free nonexistent TCharm semaphore");
991 }
992
993 /// Block until this semaphore has data:
994 TCharm::TCharmSemaphore *TCharm::getSema(int id) {
995         TCharmSemaphore *s=findSema(id);
996         if (s->data==NULL) 
997         { //Semaphore isn't filled yet: wait until it is
998                 s->thread=CthSelf();
999                 suspend(); //Will be woken by semaPut
1000                 // Semaphore may have moved-- find it again
1001                 s=findSema(id);
1002                 if (s->data==NULL) CkAbort("TCharm::semaGet awoken too early!");
1003         }
1004         return s;
1005 }
1006
1007 /// Store data at the semaphore "id".
1008 ///  The put can come before or after the get.
1009 void TCharm::semaPut(int id,void *data) {
1010         TCharmSemaphore *s=findSema(id);
1011         if (s->data!=NULL) CkAbort("Duplicate calls to TCharm::semaPut!");
1012         s->data=data;
1013         DBG("semaPut "<<id<<" "<<data);
1014         if (s->thread!=NULL) {//Awaken the thread
1015                 s->thread=NULL;
1016                 resume();
1017         }
1018 }
1019
1020 /// Retreive data from the semaphore "id".
1021 ///  Blocks if the data is not immediately available.
1022 ///  Consumes the data, so another put will be required for the next get.
1023 void *TCharm::semaGet(int id) {
1024         TCharmSemaphore *s=getSema(id);
1025         void *ret=s->data;
1026         DBG("semaGet "<<id<<" "<<ret);
1027         // Now remove the semaphore from the list:
1028         freeSema(s);
1029         return ret;
1030 }
1031
1032 /// Retreive data from the semaphore "id".
1033 ///  Blocks if the data is not immediately available.
1034 void *TCharm::semaGets(int id) {
1035         TCharmSemaphore *s=getSema(id);
1036         return s->data;
1037 }
1038
1039 /// Retreive data from the semaphore "id", or returns NULL.
1040 void *TCharm::semaPeek(int id) {
1041         TCharmSemaphore *s=findSema(id);
1042         return s->data;
1043 }
1044
1045 /****** System Call support ******/
1046 /*
1047 TCHARM_System exists to work around a bug where Linux ia32
1048 glibc2.2.x with pthreads crashes at the fork() site when 
1049 called from a user-levelthread. 
1050
1051 The fix is to call system from the main thread, by 
1052 passing the request out of the thread to our array element 
1053 before calling system().
1054 */
1055
1056 CDECL int 
1057 TCHARM_System(const char *shell_command)
1058 {
1059         return TCharm::get()->system(shell_command);
1060 }
1061 int TCharm::system(const char *cmd)
1062 {
1063         int ret=-1778;
1064         callSystemStruct s;
1065         s.cmd=cmd;
1066         s.ret=&ret;
1067         thisProxy[thisIndex].callSystem(s);
1068         suspend();
1069         return ret;
1070 }
1071
1072 void TCharm::callSystem(const callSystemStruct &s)
1073 {
1074         *s.ret = ::system(s.cmd);
1075         resume();
1076 }
1077
1078
1079
1080 #include "tcharm.def.h"