If fault tolerance flag is set, the assignment of array elements is round robin.
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm_impl.h"
7 #include "tcharm.h"
8 #include "ckevacuation.h"
9 #include <ctype.h>
10
11 #if 0 
12     /*Many debugging statements:*/
13 #    define DBG(x) ckout<<"["<<thisIndex<<","<<CkMyPe()<<"] TCHARM> "<<x<<endl;
14 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
15 #else
16     /*No debugging statements*/
17 #    define DBG(x) /*empty*/
18 #    define DBGX(x) /*empty*/
19 #endif
20
21 CtvDeclare(TCharm *,_curTCharm);
22
23 static int lastNumChunks=0;
24
25 class TCharmTraceLibList {
26         enum {maxLibs=20,maxLibNameLen=15};
27         //List of libraries we want to trace:
28         int curLibs;
29         char libNames[maxLibs][maxLibNameLen];
30         int checkIfTracing(const char *lib) const
31         {
32                 for (int i=0;i<curLibs;i++) 
33                         if (0==strcmp(lib,libNames[i]))
34                                 return 1;
35                 return 0;
36         }
37 public:
38         TCharmTraceLibList() {curLibs=0;}
39         void addTracing(const char *lib) 
40         { //We want to trace this library-- add its name to the list.
41                 CkPrintf("TCHARM> Will trace calls to library %s\n",lib);
42                 int i;
43                 for (i=0;0!=*lib;i++,lib++)
44                         libNames[curLibs][i]=tolower(*lib);
45                 libNames[curLibs][i]=0;
46                 // if already tracing, skip
47                 if (checkIfTracing(libNames[curLibs])) return;
48                 curLibs++;
49         }
50         inline int isTracing(const char *lib) const {
51                 if (curLibs==0) return 0; //Common case
52                 else return checkIfTracing(lib);
53         }
54 };
55 static TCharmTraceLibList tcharm_tracelibs;
56 static int tcharm_nomig=0, tcharm_nothreads=0;
57 static int tcharm_stacksize=1*1024*1024; /*Default stack size is 1MB*/
58 static int tcharm_initted=0;
59 CkpvDeclare(int, mapCreated);
60 static CkGroupID mapID;
61 static char* mapping = NULL;
62
63 void TCharm::nodeInit(void)
64 {
65 }
66
67 void TCharm::procInit(void)
68 {
69   CtvInitialize(TCharm *,_curTCharm);
70   CtvAccess(_curTCharm)=NULL;
71   tcharm_initted=1;
72   CtgInit();
73
74   CkpvInitialize(int, mapCreated);
75   CkpvAccess(mapCreated) = 0;
76
77   // called on every pe to eat these arguments
78   char **argv=CkGetArgv();
79   tcharm_nomig=CmiGetArgFlagDesc(argv,"+tcharm_nomig","Disable migration support (debugging)");
80   tcharm_nothreads=CmiGetArgFlagDesc(argv,"+tcharm_nothread","Disable thread support (debugging)");
81   tcharm_nothreads|=CmiGetArgFlagDesc(argv,"+tcharm_nothreads",NULL);
82   char *traceLibName=NULL;
83   while (CmiGetArgStringDesc(argv,"+tcharm_trace",&traceLibName,"Print each call to this library"))
84       tcharm_tracelibs.addTracing(traceLibName);
85   CmiGetArgIntDesc(argv,"+tcharm_stacksize",&tcharm_stacksize,"Set the thread stack size (default 1MB)");
86   if (CkMyPe()!=0) { //Processor 0 eats "+vp<N>" and "-vp<N>" later:
87         int ignored;
88         while (CmiGetArgIntDesc(argv,"-vp",&ignored,NULL)) {}
89         while (CmiGetArgIntDesc(argv,"+vp",&ignored,NULL)) {}
90   }
91   if (CkMyPe()==0) { // Echo various debugging options:
92     if (tcharm_nomig) CmiPrintf("TCHARM> Disabling migration support, for debugging\n");
93     if (tcharm_nothreads) CmiPrintf("TCHARM> Disabling thread support, for debugging\n");
94   }
95   if (CkpvAccess(mapCreated)==0) {
96     if (0!=CmiGetArgString(argv, "+mapping", &mapping)){
97     }
98     CkpvAccess(mapCreated)=1;
99   }
100 }
101
102 void TCHARM_Api_trace(const char *routineName,const char *libraryName)
103 {
104         if (!tcharm_tracelibs.isTracing(libraryName)) return;
105         TCharm *tc=CtvAccess(_curTCharm);
106         char where[100];
107         if (tc==NULL) sprintf(where,"[serial context on %d]",CkMyPe());
108         else sprintf(where,"[%p> vp %d, p %d]",(void *)tc,tc->getElement(),CkMyPe());
109         CmiPrintf("%s Called routine %s\n",where,routineName);
110         CmiPrintStackTrace(1);
111         CmiPrintf("\n");
112 }
113
114 // register thread start functions to get a function handler
115 // this is portable across heterogeneous platforms, or on machines with
116 // random stack/function pointer
117
118 static CkVec<TCHARM_Thread_data_start_fn> threadFnTable;
119
120 int TCHARM_Register_thread_function(TCHARM_Thread_data_start_fn fn)
121 {
122   int idx = threadFnTable.size();
123   threadFnTable.push_back(fn);
124   return idx+1;                     // make 0 invalid number
125 }
126
127 TCHARM_Thread_data_start_fn getTCharmThreadFunction(int idx)
128 {
129   CmiAssert(idx > 0);
130   return threadFnTable[idx-1];
131 }
132
133 static void startTCharmThread(TCharmInitMsg *msg)
134 {
135         DBGX("thread started");
136         TCharm::activateThread();
137        TCHARM_Thread_data_start_fn threadFn = getTCharmThreadFunction(msg->threadFn);
138         threadFn(msg->data);
139         TCharm::deactivateThread();
140         CtvAccess(_curTCharm)->done();
141 }
142
143 TCharm::TCharm(TCharmInitMsg *initMsg_)
144 {
145   initMsg=initMsg_;
146   initMsg->opts.sanityCheck();
147   timeOffset=0.0;
148   threadGlobals=CtgCreate();
149   if (tcharm_nothreads)
150   { //Don't even make a new thread-- just use main thread
151     tid=CthSelf();
152   }
153   else /*Create a thread normally*/
154   {
155     if (tcharm_nomig) { /*Nonmigratable version, for debugging*/
156       tid=CthCreate((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
157     } else {
158       tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
159     }
160 #if CMK_BLUEGENE_CHARM
161     BgAttach(tid);
162     BgUnsetStartOutOfCore();
163 #endif
164   }
165   CtvAccessOther(tid,_curTCharm)=this;
166   isStopped=true;
167   resumeAfterMigration=false;
168         /* FAULT_EVAC*/
169         AsyncEvacuate(CmiTrue);
170   skipResume=false;
171   exitWhenDone=initMsg->opts.exitWhenDone;
172   isSelfDone = false;
173   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
174   threadInfo.thisElement=thisIndex;
175   threadInfo.numElements=initMsg->numElements;
176   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
177         heapBlocks=CmiIsomallocBlockListNew();
178   else
179         heapBlocks=0;
180   nUd=0;
181   usesAtSync=CmiTrue;
182   run();
183 }
184
185 TCharm::TCharm(CkMigrateMessage *msg)
186         :CBase_TCharm(msg)
187 {
188   initMsg=NULL;
189   tid=NULL;
190   threadGlobals=NULL;
191   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
192         AsyncEvacuate(CmiTrue);
193   heapBlocks=0;
194 }
195
196 void checkPupMismatch(PUP::er &p,int expected,const char *where)
197 {
198         int v=expected;
199         p|v;
200         if (v!=expected) {
201                 CkError("FATAL ERROR> Mismatch %s pup routine\n",where);
202                 CkAbort("FATAL ERROR: Pup direction mismatch");
203         }
204 }
205
206 void TCharm::pup(PUP::er &p) {
207 //Pup superclass
208   ArrayElement1D::pup(p);
209
210   //BIGSIM_OOC DEBUGGING
211   //if(!p.isUnpacking()){
212   //  CmiPrintf("TCharm[%d] packing: ", thisIndex);
213   //  CthPrintThdStack(tid);
214   //}
215
216   checkPupMismatch(p,5134,"before TCHARM");
217 #ifdef _FAULT_MLOG_
218     if(!isStopped){
219 //      resumeAfterMigration = true;
220     }
221     isStopped = true;
222 #endif
223   p(isStopped); p(resumeAfterMigration); p(exitWhenDone); p(isSelfDone); p(skipResume);
224   p(threadInfo.thisElement);
225   p(threadInfo.numElements);
226   
227   if (sema.size()>0){
228         CkAbort("TCharm::pup> Cannot migrate with unconsumed semaphores!\n");
229   }
230
231 #ifndef CMK_OPTIMIZE
232   DBG("Packing thread");
233   if (!isStopped && !CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
234     if(BgOutOfCoreFlag==0) //not doing out-of-core scheduling
235         CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
236   }     
237   if (tcharm_nomig) CkAbort("Cannot migrate with the +tcharm_nomig option!\n");
238 #endif
239
240   //This seekBlock allows us to reorder the packing/unpacking--
241   // This is needed because the userData depends on the thread's stack
242   // and heap data both at pack and unpack time.
243   PUP::seekBlock s(p,2);
244   
245   if (p.isUnpacking())
246   {//In this case, unpack the thread & heap before the user data
247     s.seek(1);
248     pupThread(p);
249     //Restart our clock: set it up so packTime==CkWallTimer+timeOffset
250     double packTime;
251     p(packTime);
252     timeOffset=packTime-CkWallTimer();
253   }
254   
255 //Pack all user data
256   // Set up TCHARM context for use during user's pup routines:
257   CtvAccess(_curTCharm)=this;
258   activateThread();
259
260   s.seek(0);
261   checkPupMismatch(p,5135,"before TCHARM user data");
262   p(nUd);
263   for(int i=0;i<nUd;i++) {
264     if (p.isUnpacking()) ud[i].update(tid);
265     ud[i].pup(p);
266   }
267   checkPupMismatch(p,5137,"after TCHARM_Register user data");
268
269   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
270     deactivateThread();
271   p|sud;           //  sud vector block can not be in isomalloc
272   checkPupMismatch(p,5138,"after TCHARM_Global user data");
273   
274   // Tear down TCHARM context after calling user pup routines
275   if (!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
276     deactivateThread();
277   CtvAccess(_curTCharm)=NULL;
278   
279   if (!p.isUnpacking())
280   {//In this case, pack the thread & heap after the user data
281     s.seek(1);
282     pupThread(p);
283     //Stop our clock:
284     double packTime=CkWallTimer()+timeOffset;
285     p(packTime);
286   }
287   
288   s.endBlock(); //End of seeking block
289   checkPupMismatch(p,5140,"after TCHARM");
290   
291   //BIGSIM_OOC DEBUGGING
292   //if(p.isUnpacking()){
293   //  CmiPrintf("TCharm[%d] unpacking: ", thisIndex);
294   //  CthPrintThdStack(tid);
295   //}
296
297 }
298
299 // Pup our thread and related data
300 void TCharm::pupThread(PUP::er &pc) {
301     pup_er p=(pup_er)&pc;
302     checkPupMismatch(pc,5138,"before TCHARM thread");
303     tid = CthPup(p, tid);
304     if (pc.isUnpacking()) {
305       CtvAccessOther(tid,_curTCharm)=this;
306 #if CMK_BLUEGENE_CHARM
307       BgAttach(tid);
308 #endif
309     }
310     if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
311       CmiIsomallocBlockListPup(p,&heapBlocks);
312     threadGlobals=CtgPup(p,threadGlobals);
313     checkPupMismatch(pc,5139,"after TCHARM thread");
314 }
315
316 //Pup one group of user data
317 void TCharm::UserData::pup(PUP::er &p)
318 {
319   pup_er pext=(pup_er)(&p);
320   p(mode);
321   switch(mode) {
322   case 'c': { /* C mode: userdata is on the stack, so keep address */
323 //     p((char*)&data,sizeof(data));
324      p(pos);
325      //FIXME: function pointers may not be valid across processors
326      p((char*)&cfn, sizeof(TCHARM_Pup_fn));
327      char *data = CthPointer(t, pos);
328      if (cfn) cfn(pext,data);
329      } break;
330   case 'g': { /* Global mode: zero out userdata on arrival */
331      if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
332      {
333         // keep the pointer value if using isomalloc, no need to use pup
334        p(pos);
335      }
336      else if (p.isUnpacking())      //  zero out userdata on arrival
337        pos=0;
338
339        //FIXME: function pointers may not be valid across processors
340      p((char*)&gfn, sizeof(TCHARM_Pup_global_fn));
341      if (gfn) gfn(pext);
342      } break;
343   default:
344      break;
345   };
346 }
347
348 TCharm::~TCharm()
349 {
350   //BIGSIM_OOC DEBUGGING
351   //CmiPrintf("TCharm destructor called with heapBlocks=%p!\n", heapBlocks);
352   
353   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
354   CthFree(tid);
355   CtgFree(threadGlobals);
356   delete initMsg;
357 }
358
359 void TCharm::migrateTo(int destPE) {
360         if (destPE==CkMyPe()) return;
361         // Make sure migrateMe gets called *after* we suspend:
362         thisProxy[thisIndex].migrateDelayed(destPE);
363 //      resumeAfterMigration=true;
364         suspend();
365 }
366 void TCharm::migrateDelayed(int destPE) {
367         migrateMe(destPE);
368 }
369 void TCharm::ckJustMigrated(void) {
370         ArrayElement::ckJustMigrated();
371 #ifdef _FAULT_MLOG_
372 //  resumeAfterMigration = true;
373 #endif
374         if (resumeAfterMigration) {
375                 resumeAfterMigration=false;
376                 resume(); //Start the thread running
377         }
378 }
379
380 void TCharm::ckJustRestored(void) {
381         //CkPrintf("call just restored from TCharm[%d]\n", thisIndex);
382         ArrayElement::ckJustRestored();
383         //if (resumeAfterMigration) {
384         //      resumeAfterMigration=false;
385         //      resume(); //Start the thread running
386         //}
387 }
388
389 /*
390         FAULT_EVAC
391
392         If a Tcharm object is about to migrate it should be suspended first
393 */
394 void TCharm::ckAboutToMigrate(void){
395         ArrayElement::ckAboutToMigrate();
396         resumeAfterMigration = true;
397         isStopped = true;
398 //      suspend();
399 }
400
401 // clear the data before restarting from disk
402 void TCharm::clear()
403 {
404   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
405   CthFree(tid);
406   delete initMsg;
407 }
408
409 //Register user data to be packed with the thread
410 int TCharm::add(const TCharm::UserData &d)
411 {
412   if (nUd>=maxUserData)
413     CkAbort("TCharm: Registered too many user data fields!\n");
414   int nu=nUd++;
415   ud[nu]=d;
416   return nu;
417 }
418 void *TCharm::lookupUserData(int i) {
419         if (i<0 || i>=nUd)
420                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
421         return ud[i].getData();
422 }
423
424 //Start the thread running
425 void TCharm::run(void)
426 {
427   DBG("TCharm::run()");
428   if (tcharm_nothreads) {/*Call user routine directly*/
429           startTCharmThread(initMsg);
430   } 
431   else /* start the thread as usual */
432           start();
433 }
434
435 //Block the thread until start()ed again.
436 void TCharm::stop(void)
437 {
438 #ifndef CMK_OPTIMIZE
439   if (tid != CthSelf())
440     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
441   if (tcharm_nothreads)
442     CkAbort("Cannot make blocking calls using +tcharm_nothreads!\n");
443 #endif
444   stopTiming();
445   isStopped=true;
446   DBG("thread suspended");
447
448   CthSuspend();
449 //      DBG("thread resumed");
450   /*SUBTLE: We have to do the get() because "this" may have changed
451     during a migration-suspend.  If you access *any* members
452     from this point onward, you'll cause heap corruption if
453     we're resuming from migration!  (OSL 2003/9/23)
454    */
455   TCharm *dis=TCharm::get();
456 #ifdef _FAULT_MLOG_ 
457 /*  CpvAccess(_currentObj) = dis;
458  *      printf("[%d] _currentObject set to TCharm index %d %p\n",CkMyPe(),dis->thisIndex,dis);*/
459 #endif
460   dis->isStopped=false;
461   dis->startTiming();
462   //CkPrintf("[%d] Thread resumed  for tid %p\n",dis->thisIndex,dis->tid);
463 }
464
465 //Resume the waiting thread
466 void TCharm::start(void)
467 {
468   //  since this thread is scheduled, it is not a good idea to migrate 
469   isStopped=false;
470   DBG("thread resuming soon");
471   //CkPrintf("TCharm[%d]::start()\n", thisIndex);
472   //CmiPrintStackTrace(0);
473 #ifdef _FAULT_MLOG_
474 //CthAwakenPrio(tid, CQS_QUEUEING_BFIFO, 1, &prio);
475   CthAwaken(tid);
476 #else
477   CthAwaken(tid);
478 #endif
479 }
480
481 //Block our thread, schedule, and come back:
482 void TCharm::schedule(void) {
483   DBG("thread schedule");
484   start(); // Calls CthAwaken
485   stop(); // Calls CthSuspend
486 }
487
488 //Go to sync, block, possibly migrate, and then resume
489 void TCharm::migrate(void)
490 {
491 #if CMK_LBDB_ON
492   DBG("going to sync");
493   AtSync();
494   stop();
495 #else
496   DBG("skipping sync, because there is no load balancer");
497 #endif
498 }
499
500
501
502 void TCharm::evacuate(){
503         /*
504                 FAULT_EVAC
505         */
506         //CkClearAllArrayElementsCPP();
507         if(CpvAccess(startedEvac)){
508                 int nextPE = getNextPE(CkArrayIndex1D(thisIndex));
509 //              resumeAfterMigration=true;
510                 CcdCallFnAfter((CcdVoidFn)CkEmmigrateElement, (void *)myRec, 1);
511                 suspend();
512                 return;
513         }
514         return;
515
516 }
517
518 //calls atsync with async mode
519 void TCharm::async_migrate(void)
520 {
521 #if CMK_LBDB_ON
522   DBG("going to sync at async mode");
523   skipResume = true;            // we resume immediately
524   ReadyMigrate(false);
525   AtSync(0);
526   schedule();
527 //  allow_migrate();
528 #else
529   DBG("skipping sync, because there is no load balancer");
530 #endif
531 }
532
533 /*
534 Note:
535  thread can only migrate at the point when this is called
536 */
537 void TCharm::allow_migrate(void)
538 {
539 #if CMK_LBDB_ON
540 //  ReadyMigrate(true);
541   int nextPe = MigrateToPe();
542   if (nextPe != -1) {
543     migrateTo(nextPe);
544   }
545 #else
546   DBG("skipping sync, because there is no load balancer");
547 #endif
548 }
549
550 //Resume from sync: start the thread again
551 void TCharm::ResumeFromSync(void)
552 {
553   //if(isSelfDone) return;
554   //if (exitWhenDone) return; //for bigsim ooc execution
555   if (!skipResume) start();
556 }
557
558
559 /****** TcharmClient ******/
560 void TCharmClient1D::ckJustMigrated(void) {
561   ArrayElement1D::ckJustMigrated();
562   findThread();
563   tcharmClientInit();
564 }
565
566 void TCharmClient1D::pup(PUP::er &p) {
567   ArrayElement1D::pup(p);
568   p|threadProxy;
569 }
570
571 CkArrayID TCHARM_Get_threads(void) {
572         TCHARMAPI("TCHARM_Get_threads");
573         return TCharm::get()->getProxy();
574 }
575
576 /************* Startup/Shutdown Coordination Support ************/
577
578 // Useless values to reduce over:
579 int vals[2]={0,1};
580
581 //Called when we want to go to a barrier
582 void TCharm::barrier(void) {
583         //Contribute to a synchronizing reduction
584         CkCallback cb(index_t::atBarrier(0), thisProxy[0]);
585         contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
586 #if CMK_BLUEGENE_CHARM
587         void *curLog;           // store current log in timeline
588         _TRACE_BG_TLINE_END(&curLog);
589         TRACE_BG_AMPI_BREAK(NULL, "TCharm_Barrier_START", NULL, 0, 1);
590 #endif
591         stop();
592 #if CMK_BLUEGENE_CHARM
593          _TRACE_BG_SET_INFO(NULL, "TCHARM_Barrier_END",  &curLog, 1);
594 #endif
595 }
596
597 //Called when we've reached the barrier
598 void TCharm::atBarrier(CkReductionMsg *m) {
599         DBGX("clients all at barrier");
600         delete m;
601         thisProxy.start(); //Just restart everybody
602 }
603
604 //Called when the thread is done running
605 void TCharm::done(void) {
606         //CmiPrintStackTrace(0);
607         DBG("TCharm thread "<<thisIndex<<" done")
608         if (exitWhenDone) {
609                 //Contribute to a synchronizing reduction
610                 CkCallback cb(index_t::atExit(0), thisProxy[0]);
611                 contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
612         }
613         isSelfDone = true;
614         stop();
615 }
616 //Called when all threads are done running
617 void TCharm::atExit(CkReductionMsg *m) {
618         DBGX("TCharm::atExit1> exiting");
619         delete m;
620         //thisProxy.unsetFlags();
621         CkExit();
622         //CkPrintf("After CkExit()!!!!!!!\n");
623 }
624
625 /************* Setup **************/
626
627 //Globals used to control setup process
628 static TCHARM_Fallback_setup_fn g_fallbackSetup=NULL;
629 void TCHARM_Set_fallback_setup(TCHARM_Fallback_setup_fn f)
630 {
631         g_fallbackSetup=f;
632 }
633 void TCHARM_Call_fallback_setup(void) {
634         if (g_fallbackSetup) 
635                 (g_fallbackSetup)();
636         else
637                 CkAbort("TCHARM: Unexpected fallback setup--missing TCHARM_User_setup routine?");
638 }
639
640 /************** User API ***************/
641 /**********************************
642 Callable from UserSetup:
643 */
644
645 // Read the command line to figure out how many threads to create:
646 CDECL int TCHARM_Get_num_chunks(void)
647 {
648         TCHARMAPI("TCHARM_Get_num_chunks");
649         if (CkMyPe()!=0) CkAbort("TCHARM_Get_num_chunks should only be called on PE 0 during setup!");
650         int nChunks=CkNumPes();
651         char **argv=CkGetArgv();
652         CmiGetArgIntDesc(argv,"-vp",&nChunks,"Set the total number of virtual processors");
653         CmiGetArgIntDesc(argv,"+vp",&nChunks,NULL);
654         lastNumChunks=nChunks;
655         return nChunks;
656 }
657 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
658 {
659         return TCHARM_Get_num_chunks();
660 }
661
662 // Fill out the default thread options:
663 TCHARM_Thread_options::TCHARM_Thread_options(int doDefault)
664 {
665         stackSize=0; /* default stacksize */
666         exitWhenDone=0; /* don't exit when done by default. */
667 }
668 void TCHARM_Thread_options::sanityCheck(void) {
669         if (stackSize<=0) stackSize=tcharm_stacksize;
670 }
671
672
673 TCHARM_Thread_options g_tcharmOptions(1);
674
675 /*Set the size of the thread stack*/
676 CDECL void TCHARM_Set_stack_size(int newStackSize)
677 {
678         TCHARMAPI("TCHARM_Set_stack_size");
679         g_tcharmOptions.stackSize=newStackSize;
680 }
681 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
682         (int *newSize)
683 { TCHARM_Set_stack_size(*newSize); }
684
685 CDECL void TCHARM_Set_exit(void) { g_tcharmOptions.exitWhenDone=1; }
686
687 /*Create a new array of threads, which will be bound to by subsequent libraries*/
688 CDECL void TCHARM_Create(int nThreads,
689                         int threadFn)
690 {
691         TCHARMAPI("TCHARM_Create");
692         TCHARM_Create_data(nThreads,
693                          threadFn,NULL,0);
694 }
695 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
696         (int *nThreads,int threadFn)
697 { TCHARM_Create(*nThreads,threadFn); }
698
699 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg);
700
701 /*As above, but pass along (arbitrary) data to threads*/
702 CDECL void TCHARM_Create_data(int nThreads,
703                   int threadFn,
704                   void *threadData,int threadDataLen)
705 {
706         TCHARMAPI("TCHARM_Create_data");
707         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
708                 threadFn,g_tcharmOptions);
709         msg->numElements=nThreads;
710         memcpy(msg->data,threadData,threadDataLen);
711         TCHARM_Build_threads(msg);
712         
713         // Reset the thread options:
714         g_tcharmOptions=TCHARM_Thread_options(1);
715 }
716
717 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
718         (int *nThreads,
719                   int threadFn,
720                   void *threadData,int *threadDataLen)
721 { TCHARM_Create_data(*nThreads,threadFn,threadData,*threadDataLen); }
722
723 CkGroupID CkCreatePropMap(void);
724
725 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg)
726 {
727   char *tmp;
728   char **argv=CkGetArgv();
729   CkArrayOptions opts(msg->numElements);
730   CkAssert(CkpvAccess(mapCreated)==1);
731
732   if(haveConfigurableRRMap()){
733     CkPrintf("USING ConfigurableRRMap\n");
734     mapID=CProxy_ConfigurableRRMap::ckNew();
735   } else if(mapping==NULL){
736 #if CMK_BLUEGENE_CHARM
737     mapID=CProxy_BlockMap::ckNew();
738 #else
739 #if __FAULT__
740         mapID=CProxy_RRMap::ckNew();
741 #else
742     mapID=CkCreatePropMap();
743 #endif
744 #endif
745   }else if(0==strcmp(mapping,"BLOCK_MAP")){
746     CkPrintf("USING BLOCK_MAP\n");
747     mapID=CProxy_BlockMap::ckNew();
748   }else if(0==strcmp(mapping,"RR_MAP")){
749     CkPrintf("USING RR_MAP\n");
750     mapID=CProxy_RRMap::ckNew();
751   }else{  // "PROP_MAP" or anything else
752     mapID=CkCreatePropMap();
753   }
754   opts.setMap(mapID);
755   int nElem=msg->numElements; //<- save it because msg will be deleted.
756   return CProxy_TCharm::ckNew(msg,opts);
757 }
758
759 // Helper used when creating a new array bound to the TCHARM threads:
760 CkArrayOptions TCHARM_Attach_start(CkArrayID *retTCharmArray,int *retNumElts)
761 {
762         TCharm *tc=TCharm::get();
763         if (!tc)
764                 CkAbort("You must call TCHARM initialization routines from a TCHARM thread!");
765         int nElts=tc->getNumElements();
766       
767         //CmiPrintf("TCHARM Elements = %d\n", nElts);  
768       
769         if (retNumElts!=NULL) *retNumElts=nElts;
770         *retTCharmArray=tc->getProxy();
771         CkArrayOptions opts(nElts);
772         opts.bindTo(tc->getProxy());
773         return opts;
774 }
775
776 void TCHARM_Suspend(void) {
777         TCharm *tc=TCharm::get();
778         tc->suspend();
779 }
780
781 /***********************************
782 Callable from worker thread
783 */
784 CDECL int TCHARM_Element(void)
785
786         TCHARMAPI("TCHARM_Element");
787         return TCharm::get()->getElement();
788 }
789 CDECL int TCHARM_Num_elements(void)
790
791         TCHARMAPI("TCHARM_Num_elements");
792         return TCharm::get()->getNumElements();
793 }
794
795 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
796 { return TCHARM_Element();}
797 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
798 { return TCHARM_Num_elements();}
799
800 //Make sure this address will migrate with us when we move:
801 static void checkAddress(void *data)
802 {
803         if (tcharm_nomig||tcharm_nothreads) return; //Stack is not isomalloc'd
804         if (CmiThreadIs(CMI_THREAD_IS_ALIAS)||CmiThreadIs(CMI_THREAD_IS_STACKCOPY)) return; // memory alias thread
805         if (CmiIsomallocEnabled()) {
806           if (!CmiIsomallocInRange(data))
807             CkAbort("The UserData you register must be allocated on the stack!\n");
808         }
809         else 
810           CkPrintf("Warning> checkAddress failed because isomalloc not supported.\n");
811 }
812
813 /* Old "register"-based userdata: */
814 CDECL int TCHARM_Register(void *data,TCHARM_Pup_fn pfn)
815
816         TCHARMAPI("TCHARM_Register");
817         checkAddress(data);
818         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
819 }
820 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
821         (void *data,TCHARM_Pup_fn pfn)
822
823         TCHARMAPI("TCHARM_Register");
824         checkAddress(data);
825         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
826 }
827
828 CDECL void *TCHARM_Get_userdata(int id)
829 {
830         TCHARMAPI("TCHARM_Get_userdata");
831         return TCharm::get()->lookupUserData(id);
832 }
833 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
834 { return TCHARM_Get_userdata(*id); }
835
836 /* New hardcoded-ID userdata: */
837 CDECL void TCHARM_Set_global(int globalID,void *new_value,TCHARM_Pup_global_fn pup_or_NULL)
838 {
839         TCHARMAPI("TCHARM_Set_global");
840         TCharm *tc=TCharm::get();
841         if (tc->sud.length()<=globalID)
842         { //We don't have room for this ID yet: make room
843                 int newLen=2*globalID;
844                 tc->sud.resize(newLen);
845         }
846         tc->sud[globalID]=TCharm::UserData(pup_or_NULL,tc->getThread(),new_value);
847 }
848 CDECL void *TCHARM_Get_global(int globalID)
849 {
850         //Skip TCHARMAPI("TCHARM_Get_global") because there's no dynamic allocation here,
851         // and this routine should be as fast as possible.
852         CkVec<TCharm::UserData> &v=TCharm::get()->sud;
853         if (v.length()<=globalID) return NULL; //Uninitialized global
854         return v[globalID].getData();
855 }
856
857 CDECL void TCHARM_Migrate(void)
858 {
859         TCHARMAPI("TCHARM_Migrate");
860         if (CthMigratable() == 0) {
861           CkPrintf("Warning> thread migration is not supported!\n");
862           return;
863         }
864         TCharm::get()->migrate();
865 }
866 FORTRAN_AS_C(TCHARM_MIGRATE,TCHARM_Migrate,tcharm_migrate,(void),())
867
868 CDECL void TCHARM_Async_Migrate(void)
869 {
870         TCHARMAPI("TCHARM_Async_Migrate");
871         TCharm::get()->async_migrate();
872 }
873 FORTRAN_AS_C(TCHARM_ASYNC_MIGRATE,TCHARM_Async_Migrate,tcharm_async_migrate,(void),())
874
875 CDECL void TCHARM_Allow_Migrate(void)
876 {
877         TCHARMAPI("TCHARM_Allow_Migrate");
878         TCharm::get()->allow_migrate();
879 }
880 FORTRAN_AS_C(TCHARM_ALLOW_MIGRATE,TCHARM_Allow_Migrate,tcharm_allow_migrate,(void),())
881
882 CDECL void TCHARM_Migrate_to(int destPE)
883 {
884         TCHARMAPI("TCHARM_Migrate_to");
885         TCharm::get()->migrateTo(destPE);
886 }
887
888 CDECL void TCHARM_Evacuate()
889 {
890         TCHARMAPI("TCHARM_Migrate_to");
891         TCharm::get()->evacuate();
892 }
893
894 FORTRAN_AS_C(TCHARM_MIGRATE_TO,TCHARM_Migrate_to,tcharm_migrate_to,
895         (int *destPE),(*destPE))
896
897 CDECL void TCHARM_Yield(void)
898 {
899         TCHARMAPI("TCHARM_Yield");
900         TCharm::get()->schedule();
901 }
902 FORTRAN_AS_C(TCHARM_YIELD,TCHARM_Yield,tcharm_yield,(void),())
903
904 CDECL void TCHARM_Barrier(void)
905 {
906         TCHARMAPI("TCHARM_Barrier");
907         TCharm::get()->barrier();
908 }
909 FORTRAN_AS_C(TCHARM_BARRIER,TCHARM_Barrier,tcharm_barrier,(void),())
910
911 CDECL void TCHARM_Done(void)
912 {
913         TCHARMAPI("TCHARM_Done");
914         TCharm *c=TCharm::getNULL();
915         if (!c) CkExit();
916         else c->done();
917 }
918 FORTRAN_AS_C(TCHARM_DONE,TCHARM_Done,tcharm_done,(void),())
919
920
921 CDECL double TCHARM_Wall_timer(void)
922 {
923   TCHARMAPI("TCHARM_Wall_timer");
924   TCharm *c=TCharm::getNULL();
925   if(!c) return CkWallTimer();
926   else { //Have to apply current thread's time offset
927     return CkWallTimer()+c->getTimeOffset();
928   }
929 }
930
931 #if 1
932 /*Include Fortran-style "iargc" and "getarg" routines.
933 These are needed to get access to the command-line arguments from Fortran.
934 */
935 FDECL int FTN_NAME(TCHARM_IARGC,tcharm_iargc)(void) {
936   TCHARMAPI("tcharm_iargc");
937   return CkGetArgc()-1;
938 }
939
940 FDECL void FTN_NAME(TCHARM_GETARG,tcharm_getarg)
941         (int *i_p,char *dest,int destLen)
942 {
943   TCHARMAPI("tcharm_getarg");
944   int i=*i_p;
945   if (i<0) CkAbort("tcharm_getarg called with negative argument!");
946   if (i>=CkGetArgc()) CkAbort("tcharm_getarg called with argument > iargc!");
947   const char *src=CkGetArgv()[i];
948   strcpy(dest,src);
949   for (i=strlen(dest);i<destLen;i++) dest[i]=' ';
950 }
951
952 #endif
953
954 //These silly routines are used for serial startup:
955 extern void _initCharm(int argc, char **argv);
956 CDECL void TCHARM_Init(int *argc,char ***argv) {
957         if (!tcharm_initted) {
958           ConverseInit(*argc, *argv, (CmiStartFn) _initCharm,1,1);
959           _initCharm(*argc,*argv);
960         }
961 }
962
963 FDECL void FTN_NAME(TCHARM_INIT,tcharm_init)(void)
964 {
965         int argc=1;
966         const char *argv_sto[2]={"foo",NULL};
967         char **argv=(char **)argv_sto;
968         TCHARM_Init(&argc,&argv);
969 }
970
971 /***********************************
972 * TCHARM Semaphores:
973 * The idea is one side "puts", the other side "gets"; 
974 * but the calls can come in any order--
975 * if the "get" comes first, it blocks until the put.
976 * This makes a convenient, race-condition-free way to do
977 * onetime initializations.  
978 */
979 /// Find this semaphore, or insert if there isn't one:
980 TCharm::TCharmSemaphore *TCharm::findSema(int id) {
981         for (int s=0;s<sema.size();s++)
982                 if (sema[s].id==id) 
983                         return &sema[s];
984         sema.push_back(TCharmSemaphore(id));
985         return &sema[sema.size()-1];
986 }
987 /// Remove this semaphore from the list
988 void TCharm::freeSema(TCharmSemaphore *doomed) {
989         int id=doomed->id;
990         for (int s=0;s<sema.size();s++)
991                 if (sema[s].id==id) {
992                         sema[s]=sema[sema.length()-1];
993                         sema.length()--;
994                         return;
995                 }
996         CkAbort("Tried to free nonexistent TCharm semaphore");
997 }
998
999 /// Block until this semaphore has data:
1000 TCharm::TCharmSemaphore *TCharm::getSema(int id) {
1001         TCharmSemaphore *s=findSema(id);
1002         if (s->data==NULL) 
1003         { //Semaphore isn't filled yet: wait until it is
1004                 s->thread=CthSelf();
1005                 suspend(); //Will be woken by semaPut
1006                 // Semaphore may have moved-- find it again
1007                 s=findSema(id);
1008                 if (s->data==NULL) CkAbort("TCharm::semaGet awoken too early!");
1009         }
1010         return s;
1011 }
1012
1013 /// Store data at the semaphore "id".
1014 ///  The put can come before or after the get.
1015 void TCharm::semaPut(int id,void *data) {
1016         TCharmSemaphore *s=findSema(id);
1017         if (s->data!=NULL) CkAbort("Duplicate calls to TCharm::semaPut!");
1018         s->data=data;
1019         DBG("semaPut "<<id<<" "<<data);
1020         if (s->thread!=NULL) {//Awaken the thread
1021                 s->thread=NULL;
1022                 resume();
1023         }
1024 }
1025
1026 /// Retreive data from the semaphore "id".
1027 ///  Blocks if the data is not immediately available.
1028 ///  Consumes the data, so another put will be required for the next get.
1029 void *TCharm::semaGet(int id) {
1030         TCharmSemaphore *s=getSema(id);
1031         void *ret=s->data;
1032         DBG("semaGet "<<id<<" "<<ret);
1033         // Now remove the semaphore from the list:
1034         freeSema(s);
1035         return ret;
1036 }
1037
1038 /// Retreive data from the semaphore "id".
1039 ///  Blocks if the data is not immediately available.
1040 void *TCharm::semaGets(int id) {
1041         TCharmSemaphore *s=getSema(id);
1042         return s->data;
1043 }
1044
1045 /// Retreive data from the semaphore "id", or returns NULL.
1046 void *TCharm::semaPeek(int id) {
1047         TCharmSemaphore *s=findSema(id);
1048         return s->data;
1049 }
1050
1051 /****** System Call support ******/
1052 /*
1053 TCHARM_System exists to work around a bug where Linux ia32
1054 glibc2.2.x with pthreads crashes at the fork() site when 
1055 called from a user-levelthread. 
1056
1057 The fix is to call system from the main thread, by 
1058 passing the request out of the thread to our array element 
1059 before calling system().
1060 */
1061
1062 CDECL int 
1063 TCHARM_System(const char *shell_command)
1064 {
1065         return TCharm::get()->system(shell_command);
1066 }
1067 int TCharm::system(const char *cmd)
1068 {
1069         int ret=-1778;
1070         callSystemStruct s;
1071         s.cmd=cmd;
1072         s.ret=&ret;
1073         thisProxy[thisIndex].callSystem(s);
1074         suspend();
1075         return ret;
1076 }
1077
1078 void TCharm::callSystem(const callSystemStruct &s)
1079 {
1080         *s.ret = ::system(s.cmd);
1081         resume();
1082 }
1083
1084
1085
1086 #include "tcharm.def.h"