Adding a new map class that can be used to specify VP to PE mappings statically throu...
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm_impl.h"
7 #include "tcharm.h"
8 #include <ctype.h>
9
10 #if 0 
11     /*Many debugging statements:*/
12 #    define DBG(x) ckout<<"["<<thisIndex<<","<<CkMyPe()<<"] TCHARM> "<<x<<endl;
13 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
14 #else
15     /*No debugging statements*/
16 #    define DBG(x) /*empty*/
17 #    define DBGX(x) /*empty*/
18 #endif
19
20 CtvDeclare(TCharm *,_curTCharm);
21
22 static int lastNumChunks=0;
23
24 class TCharmTraceLibList {
25         enum {maxLibs=20,maxLibNameLen=15};
26         //List of libraries we want to trace:
27         int curLibs;
28         char libNames[maxLibs][maxLibNameLen];
29         int checkIfTracing(const char *lib) const
30         {
31                 for (int i=0;i<curLibs;i++) 
32                         if (0==strcmp(lib,libNames[i]))
33                                 return 1;
34                 return 0;
35         }
36 public:
37         TCharmTraceLibList() {curLibs=0;}
38         void addTracing(const char *lib) 
39         { //We want to trace this library-- add its name to the list.
40                 CkPrintf("TCHARM> Will trace calls to library %s\n",lib);
41                 int i;
42                 for (i=0;0!=*lib;i++,lib++)
43                         libNames[curLibs][i]=tolower(*lib);
44                 libNames[curLibs][i]=0;
45                 // if already tracing, skip
46                 if (checkIfTracing(libNames[curLibs])) return;
47                 curLibs++;
48         }
49         inline int isTracing(const char *lib) const {
50                 if (curLibs==0) return 0; //Common case
51                 else return checkIfTracing(lib);
52         }
53 };
54 static TCharmTraceLibList tcharm_tracelibs;
55 static int tcharm_nomig=0, tcharm_nothreads=0;
56 static int tcharm_stacksize=1*1024*1024; /*Default stack size is 1MB*/
57 static int tcharm_initted=0;
58 CkpvDeclare(int, mapCreated);
59 static CkGroupID mapID;
60 static char* mapping = NULL;
61
62 void TCharm::nodeInit(void)
63 {
64 }
65
66 void TCharm::procInit(void)
67 {
68   CtvInitialize(TCharm *,_curTCharm);
69   CtvAccess(_curTCharm)=NULL;
70   tcharm_initted=1;
71   CtgInit();
72
73   CkpvInitialize(int, mapCreated);
74   CkpvAccess(mapCreated) = 0;
75
76   // called on every pe to eat these arguments
77   char **argv=CkGetArgv();
78   tcharm_nomig=CmiGetArgFlagDesc(argv,"+tcharm_nomig","Disable migration support (debugging)");
79   tcharm_nothreads=CmiGetArgFlagDesc(argv,"+tcharm_nothread","Disable thread support (debugging)");
80   tcharm_nothreads|=CmiGetArgFlagDesc(argv,"+tcharm_nothreads",NULL);
81   char *traceLibName=NULL;
82   while (CmiGetArgStringDesc(argv,"+tcharm_trace",&traceLibName,"Print each call to this library"))
83       tcharm_tracelibs.addTracing(traceLibName);
84   CmiGetArgIntDesc(argv,"+tcharm_stacksize",&tcharm_stacksize,"Set the thread stack size (default 1MB)");
85   if (CkMyPe()!=0) { //Processor 0 eats "+vp<N>" and "-vp<N>" later:
86         int ignored;
87         while (CmiGetArgIntDesc(argv,"-vp",&ignored,NULL)) {}
88         while (CmiGetArgIntDesc(argv,"+vp",&ignored,NULL)) {}
89   }
90   if (CkMyPe()==0) { // Echo various debugging options:
91     if (tcharm_nomig) CmiPrintf("TCHARM> Disabling migration support, for debugging\n");
92     if (tcharm_nothreads) CmiPrintf("TCHARM> Disabling thread support, for debugging\n");
93   }
94   if (CkpvAccess(mapCreated)==0) {
95     if (0!=CmiGetArgString(argv, "+mapping", &mapping)){
96     }
97     CkpvAccess(mapCreated)=1;
98   }
99 }
100
101 void TCHARM_Api_trace(const char *routineName,const char *libraryName)
102 {
103         if (!tcharm_tracelibs.isTracing(libraryName)) return;
104         TCharm *tc=CtvAccess(_curTCharm);
105         char where[100];
106         if (tc==NULL) sprintf(where,"[serial context on %d]",CkMyPe());
107         else sprintf(where,"[%p> vp %d, p %d]",(void *)tc,tc->getElement(),CkMyPe());
108         CmiPrintf("%s Called routine %s\n",where,routineName);
109         CmiPrintStackTrace(1);
110         CmiPrintf("\n");
111 }
112
113 // register thread start functions to get a function handler
114 // this is portable across heterogeneous platforms, or on machines with
115 // random stack/function pointer
116
117 static CkVec<TCHARM_Thread_data_start_fn> threadFnTable;
118
119 int TCHARM_Register_thread_function(TCHARM_Thread_data_start_fn fn)
120 {
121   int idx = threadFnTable.size();
122   threadFnTable.push_back(fn);
123   return idx+1;                     // make 0 invalid number
124 }
125
126 TCHARM_Thread_data_start_fn getTCharmThreadFunction(int idx)
127 {
128   CmiAssert(idx > 0);
129   return threadFnTable[idx-1];
130 }
131
132 static void startTCharmThread(TCharmInitMsg *msg)
133 {
134         DBGX("thread started");
135         TCharm::activateThread();
136        TCHARM_Thread_data_start_fn threadFn = getTCharmThreadFunction(msg->threadFn);
137         threadFn(msg->data);
138         TCharm::deactivateThread();
139         CtvAccess(_curTCharm)->done();
140 }
141
142 TCharm::TCharm(TCharmInitMsg *initMsg_)
143 {
144   initMsg=initMsg_;
145   initMsg->opts.sanityCheck();
146   timeOffset=0.0;
147   threadGlobals=CtgCreate();
148   if (tcharm_nothreads)
149   { //Don't even make a new thread-- just use main thread
150     tid=CthSelf();
151   }
152   else /*Create a thread normally*/
153   {
154     if (tcharm_nomig) { /*Nonmigratable version, for debugging*/
155       tid=CthCreate((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
156     } else {
157       tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
158     }
159 #if CMK_BLUEGENE_CHARM
160     BgAttach(tid);
161     BgUnsetStartOutOfCore();
162 #endif
163   }
164   CtvAccessOther(tid,_curTCharm)=this;
165   isStopped=true;
166   resumeAfterMigration=false;
167         /* FAULT_EVAC*/
168         AsyncEvacuate(CmiTrue);
169   skipResume=false;
170   exitWhenDone=initMsg->opts.exitWhenDone;
171   isSelfDone = false;
172   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
173   threadInfo.thisElement=thisIndex;
174   threadInfo.numElements=initMsg->numElements;
175   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
176         heapBlocks=CmiIsomallocBlockListNew();
177   else
178         heapBlocks=0;
179   nUd=0;
180   usesAtSync=CmiTrue;
181   run();
182 }
183
184 TCharm::TCharm(CkMigrateMessage *msg)
185         :CBase_TCharm(msg)
186 {
187   initMsg=NULL;
188   tid=NULL;
189   threadGlobals=NULL;
190   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
191         AsyncEvacuate(CmiTrue);
192   heapBlocks=0;
193 }
194
195 void checkPupMismatch(PUP::er &p,int expected,const char *where)
196 {
197         int v=expected;
198         p|v;
199         if (v!=expected) {
200                 CkError("FATAL ERROR> Mismatch %s pup routine\n",where);
201                 CkAbort("FATAL ERROR: Pup direction mismatch");
202         }
203 }
204
205 void TCharm::pup(PUP::er &p) {
206 //Pup superclass
207   ArrayElement1D::pup(p);
208
209   //BIGSIM_OOC DEBUGGING
210   //if(!p.isUnpacking()){
211   //  CmiPrintf("TCharm[%d] packing: ", thisIndex);
212   //  CthPrintThdStack(tid);
213   //}
214
215   checkPupMismatch(p,5134,"before TCHARM");
216   p(isStopped); p(resumeAfterMigration); p(exitWhenDone); p(isSelfDone); p(skipResume);
217   p(threadInfo.thisElement);
218   p(threadInfo.numElements);
219   
220   if (sema.size()>0){
221         CkAbort("TCharm::pup> Cannot migrate with unconsumed semaphores!\n");
222   }
223
224 #ifndef CMK_OPTIMIZE
225   DBG("Packing thread");
226   if (!isStopped && !CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
227     if(BgOutOfCoreFlag==0) //not doing out-of-core scheduling
228         CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
229   }     
230   if (tcharm_nomig) CkAbort("Cannot migrate with the +tcharm_nomig option!\n");
231 #endif
232
233   //This seekBlock allows us to reorder the packing/unpacking--
234   // This is needed because the userData depends on the thread's stack
235   // and heap data both at pack and unpack time.
236   PUP::seekBlock s(p,2);
237   
238   if (p.isUnpacking())
239   {//In this case, unpack the thread & heap before the user data
240     s.seek(1);
241     pupThread(p);
242     //Restart our clock: set it up so packTime==CkWallTimer+timeOffset
243     double packTime;
244     p(packTime);
245     timeOffset=packTime-CkWallTimer();
246   }
247   
248 //Pack all user data
249   // Set up TCHARM context for use during user's pup routines:
250   CtvAccess(_curTCharm)=this;
251   activateThread();
252
253   s.seek(0);
254   checkPupMismatch(p,5135,"before TCHARM user data");
255   p(nUd);
256   for(int i=0;i<nUd;i++) {
257     if (p.isUnpacking()) ud[i].update(tid);
258     ud[i].pup(p);
259   }
260   checkPupMismatch(p,5137,"after TCHARM_Register user data");
261
262   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
263     deactivateThread();
264   p|sud;           //  sud vector block can not be in isomalloc
265   checkPupMismatch(p,5138,"after TCHARM_Global user data");
266   
267   // Tear down TCHARM context after calling user pup routines
268   if (!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
269     deactivateThread();
270   CtvAccess(_curTCharm)=NULL;
271   
272   if (!p.isUnpacking())
273   {//In this case, pack the thread & heap after the user data
274     s.seek(1);
275     pupThread(p);
276     //Stop our clock:
277     double packTime=CkWallTimer()+timeOffset;
278     p(packTime);
279   }
280   
281   s.endBlock(); //End of seeking block
282   checkPupMismatch(p,5140,"after TCHARM");
283   
284   //BIGSIM_OOC DEBUGGING
285   //if(p.isUnpacking()){
286   //  CmiPrintf("TCharm[%d] unpacking: ", thisIndex);
287   //  CthPrintThdStack(tid);
288   //}
289
290 }
291
292 // Pup our thread and related data
293 void TCharm::pupThread(PUP::er &pc) {
294     pup_er p=(pup_er)&pc;
295     checkPupMismatch(pc,5138,"before TCHARM thread");
296     tid = CthPup(p, tid);
297     if (pc.isUnpacking()) {
298       CtvAccessOther(tid,_curTCharm)=this;
299 #if CMK_BLUEGENE_CHARM
300       BgAttach(tid);
301 #endif
302     }
303     if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
304       CmiIsomallocBlockListPup(p,&heapBlocks);
305     threadGlobals=CtgPup(p,threadGlobals);
306     checkPupMismatch(pc,5139,"after TCHARM thread");
307 }
308
309 //Pup one group of user data
310 void TCharm::UserData::pup(PUP::er &p)
311 {
312   pup_er pext=(pup_er)(&p);
313   p(mode);
314   switch(mode) {
315   case 'c': { /* C mode: userdata is on the stack, so keep address */
316 //     p((char*)&data,sizeof(data));
317      p(pos);
318      //FIXME: function pointers may not be valid across processors
319      p((char*)&cfn, sizeof(TCHARM_Pup_fn));
320      char *data = CthPointer(t, pos);
321      if (cfn) cfn(pext,data);
322      } break;
323   case 'g': { /* Global mode: zero out userdata on arrival */
324      if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
325      {
326         // keep the pointer value if using isomalloc, no need to use pup
327        p(pos);
328      }
329      else if (p.isUnpacking())      //  zero out userdata on arrival
330        pos=0;
331
332        //FIXME: function pointers may not be valid across processors
333      p((char*)&gfn, sizeof(TCHARM_Pup_global_fn));
334      if (gfn) gfn(pext);
335      } break;
336   default:
337      break;
338   };
339 }
340
341 TCharm::~TCharm()
342 {
343   //BIGSIM_OOC DEBUGGING
344   //CmiPrintf("TCharm destructor called with heapBlocks=%p!\n", heapBlocks);
345   
346   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
347   CthFree(tid);
348   CtgFree(threadGlobals);
349   delete initMsg;
350 }
351
352 void TCharm::migrateTo(int destPE) {
353         if (destPE==CkMyPe()) return;
354         // Make sure migrateMe gets called *after* we suspend:
355         thisProxy[thisIndex].migrateDelayed(destPE);
356 //      resumeAfterMigration=true;
357         suspend();
358 }
359 void TCharm::migrateDelayed(int destPE) {
360         migrateMe(destPE);
361 }
362 void TCharm::ckJustMigrated(void) {
363         ArrayElement::ckJustMigrated();
364         if (resumeAfterMigration) {
365                 resumeAfterMigration=false;
366                 resume(); //Start the thread running
367         }
368 }
369
370 void TCharm::ckJustRestored(void) {
371         //CkPrintf("call just restored from TCharm[%d]\n", thisIndex);
372         ArrayElement::ckJustRestored();
373         //if (resumeAfterMigration) {
374         //      resumeAfterMigration=false;
375         //      resume(); //Start the thread running
376         //}
377 }
378
379 /*
380         FAULT_EVAC
381
382         If a Tcharm object is about to migrate it should be suspended first
383 */
384 void TCharm::ckAboutToMigrate(void){
385         ArrayElement::ckAboutToMigrate();
386         resumeAfterMigration = true;
387         isStopped = true;
388 //      suspend();
389 }
390
391 // clear the data before restarting from disk
392 void TCharm::clear()
393 {
394   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
395   CthFree(tid);
396   delete initMsg;
397 }
398
399 //Register user data to be packed with the thread
400 int TCharm::add(const TCharm::UserData &d)
401 {
402   if (nUd>=maxUserData)
403     CkAbort("TCharm: Registered too many user data fields!\n");
404   int nu=nUd++;
405   ud[nu]=d;
406   return nu;
407 }
408 void *TCharm::lookupUserData(int i) {
409         if (i<0 || i>=nUd)
410                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
411         return ud[i].getData();
412 }
413
414 //Start the thread running
415 void TCharm::run(void)
416 {
417   DBG("TCharm::run()");
418   if (tcharm_nothreads) {/*Call user routine directly*/
419           startTCharmThread(initMsg);
420   } 
421   else /* start the thread as usual */
422           start();
423 }
424
425 //Block the thread until start()ed again.
426 void TCharm::stop(void)
427 {
428 #ifndef CMK_OPTIMIZE
429   if (tid != CthSelf())
430     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
431   if (tcharm_nothreads)
432     CkAbort("Cannot make blocking calls using +tcharm_nothreads!\n");
433 #endif
434   stopTiming();
435   isStopped=true;
436   DBG("thread suspended");
437
438   CthSuspend();
439 //      DBG("thread resumed");
440   /*SUBTLE: We have to do the get() because "this" may have changed
441     during a migration-suspend.  If you access *any* members
442     from this point onward, you'll cause heap corruption if
443     we're resuming from migration!  (OSL 2003/9/23)
444    */
445   TCharm *dis=TCharm::get();
446   dis->isStopped=false;
447   dis->startTiming();
448   //CkPrintf("[%d] Thread resumed  for tid %p\n",dis->thisIndex,dis->tid);
449 }
450
451 //Resume the waiting thread
452 void TCharm::start(void)
453 {
454   //  since this thread is scheduled, it is not a good idea to migrate 
455   isStopped=false;
456   DBG("thread resuming soon");
457   //CkPrintf("TCharm[%d]::start()\n", thisIndex);
458   //CmiPrintStackTrace(0);
459   CthAwaken(tid);
460 }
461
462 //Block our thread, schedule, and come back:
463 void TCharm::schedule(void) {
464   DBG("thread schedule");
465   start(); // Calls CthAwaken
466   stop(); // Calls CthSuspend
467 }
468
469 //Go to sync, block, possibly migrate, and then resume
470 void TCharm::migrate(void)
471 {
472 #if CMK_LBDB_ON
473   DBG("going to sync");
474   AtSync();
475   stop();
476 #else
477   DBG("skipping sync, because there is no load balancer");
478 #endif
479 }
480
481
482 void TCharm::evacuate(){
483         /*
484                 FAULT_EVAC
485         */
486         //CkClearAllArrayElementsCPP();
487         if(CpvAccess(startedEvac)){
488                 int nextPE = getNextPE(CkArrayIndex1D(thisIndex));
489 //              resumeAfterMigration=true;
490                 CcdCallFnAfter((CcdVoidFn)CkEmmigrateElement, (void *)myRec, 1);
491                 suspend();
492                 return;
493         }
494         return;
495
496 }
497
498 //calls atsync with async mode
499 void TCharm::async_migrate(void)
500 {
501 #if CMK_LBDB_ON
502   DBG("going to sync at async mode");
503   skipResume = true;            // we resume immediately
504   ReadyMigrate(false);
505   AtSync(0);
506   schedule();
507 //  allow_migrate();
508 #else
509   DBG("skipping sync, because there is no load balancer");
510 #endif
511 }
512
513 /*
514 Note:
515  thread can only migrate at the point when this is called
516 */
517 void TCharm::allow_migrate(void)
518 {
519 #if CMK_LBDB_ON
520 //  ReadyMigrate(true);
521   int nextPe = MigrateToPe();
522   if (nextPe != -1) {
523     migrateTo(nextPe);
524   }
525 #else
526   DBG("skipping sync, because there is no load balancer");
527 #endif
528 }
529
530 //Resume from sync: start the thread again
531 void TCharm::ResumeFromSync(void)
532 {
533   //if(isSelfDone) return;
534   //if (exitWhenDone) return; //for bigsim ooc execution
535   if (!skipResume) start();
536 }
537
538
539 /****** TcharmClient ******/
540 void TCharmClient1D::ckJustMigrated(void) {
541   ArrayElement1D::ckJustMigrated();
542   findThread();
543   tcharmClientInit();
544 }
545
546 void TCharmClient1D::pup(PUP::er &p) {
547   ArrayElement1D::pup(p);
548   p|threadProxy;
549 }
550
551 CkArrayID TCHARM_Get_threads(void) {
552         TCHARMAPI("TCHARM_Get_threads");
553         return TCharm::get()->getProxy();
554 }
555
556 /************* Startup/Shutdown Coordination Support ************/
557
558 // Useless values to reduce over:
559 int vals[2]={0,1};
560
561 //Called when we want to go to a barrier
562 void TCharm::barrier(void) {
563         //Contribute to a synchronizing reduction
564         CkCallback cb(index_t::atBarrier(0), thisProxy[0]);
565         contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
566 #if CMK_BLUEGENE_CHARM
567         void *curLog;           // store current log in timeline
568         _TRACE_BG_TLINE_END(&curLog);
569         TRACE_BG_AMPI_BREAK(NULL, "TCharm_Barrier_START", NULL, 0);
570 #endif
571         stop();
572 #if CMK_BLUEGENE_CHARM
573          _TRACE_BG_SET_INFO(NULL, "TCHARM_Barrier_END",  &curLog, 1);
574 #endif
575 }
576
577 //Called when we've reached the barrier
578 void TCharm::atBarrier(CkReductionMsg *m) {
579         DBGX("clients all at barrier");
580         delete m;
581         thisProxy.start(); //Just restart everybody
582 }
583
584 //Called when the thread is done running
585 void TCharm::done(void) {
586         //CmiPrintStackTrace(0);
587         DBG("TCharm thread "<<thisIndex<<" done")
588         if (exitWhenDone) {
589                 //Contribute to a synchronizing reduction
590                 CkCallback cb(index_t::atExit(0), thisProxy[0]);
591                 contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
592         }
593         isSelfDone = true;
594         stop();
595 }
596 //Called when all threads are done running
597 void TCharm::atExit(CkReductionMsg *m) {
598         DBGX("TCharm::atExit1> exiting");
599         delete m;
600         //thisProxy.unsetFlags();
601         CkExit();
602         //CkPrintf("After CkExit()!!!!!!!\n");
603 }
604
605 /************* Setup **************/
606
607 //Globals used to control setup process
608 static TCHARM_Fallback_setup_fn g_fallbackSetup=NULL;
609 void TCHARM_Set_fallback_setup(TCHARM_Fallback_setup_fn f)
610 {
611         g_fallbackSetup=f;
612 }
613 void TCHARM_Call_fallback_setup(void) {
614         if (g_fallbackSetup) 
615                 (g_fallbackSetup)();
616         else
617                 CkAbort("TCHARM: Unexpected fallback setup--missing TCHARM_User_setup routine?");
618 }
619
620 /************** User API ***************/
621 /**********************************
622 Callable from UserSetup:
623 */
624
625 // Read the command line to figure out how many threads to create:
626 CDECL int TCHARM_Get_num_chunks(void)
627 {
628         TCHARMAPI("TCHARM_Get_num_chunks");
629         if (CkMyPe()!=0) CkAbort("TCHARM_Get_num_chunks should only be called on PE 0 during setup!");
630         int nChunks=CkNumPes();
631         char **argv=CkGetArgv();
632         CmiGetArgIntDesc(argv,"-vp",&nChunks,"Set the total number of virtual processors");
633         CmiGetArgIntDesc(argv,"+vp",&nChunks,NULL);
634         lastNumChunks=nChunks;
635         return nChunks;
636 }
637 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
638 {
639         return TCHARM_Get_num_chunks();
640 }
641
642 // Fill out the default thread options:
643 TCHARM_Thread_options::TCHARM_Thread_options(int doDefault)
644 {
645         stackSize=0; /* default stacksize */
646         exitWhenDone=0; /* don't exit when done by default. */
647 }
648 void TCHARM_Thread_options::sanityCheck(void) {
649         if (stackSize<=0) stackSize=tcharm_stacksize;
650 }
651
652
653 TCHARM_Thread_options g_tcharmOptions(1);
654
655 /*Set the size of the thread stack*/
656 CDECL void TCHARM_Set_stack_size(int newStackSize)
657 {
658         TCHARMAPI("TCHARM_Set_stack_size");
659         g_tcharmOptions.stackSize=newStackSize;
660 }
661 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
662         (int *newSize)
663 { TCHARM_Set_stack_size(*newSize); }
664
665 CDECL void TCHARM_Set_exit(void) { g_tcharmOptions.exitWhenDone=1; }
666
667 /*Create a new array of threads, which will be bound to by subsequent libraries*/
668 CDECL void TCHARM_Create(int nThreads,
669                         int threadFn)
670 {
671         TCHARMAPI("TCHARM_Create");
672         TCHARM_Create_data(nThreads,
673                          threadFn,NULL,0);
674 }
675 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
676         (int *nThreads,int threadFn)
677 { TCHARM_Create(*nThreads,threadFn); }
678
679 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg);
680
681 /*As above, but pass along (arbitrary) data to threads*/
682 CDECL void TCHARM_Create_data(int nThreads,
683                   int threadFn,
684                   void *threadData,int threadDataLen)
685 {
686         TCHARMAPI("TCHARM_Create_data");
687         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
688                 threadFn,g_tcharmOptions);
689         msg->numElements=nThreads;
690         memcpy(msg->data,threadData,threadDataLen);
691         TCHARM_Build_threads(msg);
692         
693         // Reset the thread options:
694         g_tcharmOptions=TCHARM_Thread_options(1);
695 }
696
697 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
698         (int *nThreads,
699                   int threadFn,
700                   void *threadData,int *threadDataLen)
701 { TCHARM_Create_data(*nThreads,threadFn,threadData,*threadDataLen); }
702
703 CkGroupID CkCreatePropMap(void);
704
705 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg)
706 {
707   char *tmp;
708   char **argv=CkGetArgv();
709   CkArrayOptions opts(msg->numElements);
710   CkAssert(CkpvAccess(mapCreated)==1);
711
712   if(haveConfigurableRRMap()){
713     CkPrintf("USING ConfigurableRRMap\n");
714     mapID=CProxy_ConfigurableRRMap::ckNew();
715   } else if(mapping==NULL){
716 #if CMK_BLUEGENE_CHARM
717     mapID=CProxy_BlockMap::ckNew();
718 #else
719     mapID=CkCreatePropMap();
720 #endif
721   }else if(0==strcmp(mapping,"BLOCK_MAP")){
722     CkPrintf("USING BLOCK_MAP\n");
723     mapID=CProxy_BlockMap::ckNew();
724   }else if(0==strcmp(mapping,"RR_MAP")){
725     CkPrintf("USING RR_MAP\n");
726     mapID=CProxy_RRMap::ckNew();
727   }else{  // "PROP_MAP" or anything else
728     mapID=CkCreatePropMap();
729   }
730   opts.setMap(mapID);
731   int nElem=msg->numElements; //<- save it because msg will be deleted.
732   return CProxy_TCharm::ckNew(msg,opts);
733 }
734
735 // Helper used when creating a new array bound to the TCHARM threads:
736 CkArrayOptions TCHARM_Attach_start(CkArrayID *retTCharmArray,int *retNumElts)
737 {
738         TCharm *tc=TCharm::get();
739         if (!tc)
740                 CkAbort("You must call TCHARM initialization routines from a TCHARM thread!");
741         int nElts=tc->getNumElements();
742       
743         //CmiPrintf("TCHARM Elements = %d\n", nElts);  
744       
745         if (retNumElts!=NULL) *retNumElts=nElts;
746         *retTCharmArray=tc->getProxy();
747         CkArrayOptions opts(nElts);
748         opts.bindTo(tc->getProxy());
749         return opts;
750 }
751
752 void TCHARM_Suspend(void) {
753         TCharm *tc=TCharm::get();
754         tc->suspend();
755 }
756
757 /***********************************
758 Callable from worker thread
759 */
760 CDECL int TCHARM_Element(void)
761
762         TCHARMAPI("TCHARM_Element");
763         return TCharm::get()->getElement();
764 }
765 CDECL int TCHARM_Num_elements(void)
766
767         TCHARMAPI("TCHARM_Num_elements");
768         return TCharm::get()->getNumElements();
769 }
770
771 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
772 { return TCHARM_Element();}
773 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
774 { return TCHARM_Num_elements();}
775
776 //Make sure this address will migrate with us when we move:
777 static void checkAddress(void *data)
778 {
779         if (tcharm_nomig||tcharm_nothreads) return; //Stack is not isomalloc'd
780         if (CmiThreadIs(CMI_THREAD_IS_ALIAS)||CmiThreadIs(CMI_THREAD_IS_STACKCOPY)) return; // memory alias thread
781         if (CmiIsomallocEnabled()) {
782           if (!CmiIsomallocInRange(data))
783             CkAbort("The UserData you register must be allocated on the stack!\n");
784         }
785         else 
786           CkPrintf("Warning> checkAddress failed because isomalloc not supported.\n");
787 }
788
789 /* Old "register"-based userdata: */
790 CDECL int TCHARM_Register(void *data,TCHARM_Pup_fn pfn)
791
792         TCHARMAPI("TCHARM_Register");
793         checkAddress(data);
794         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
795 }
796 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
797         (void *data,TCHARM_Pup_fn pfn)
798
799         TCHARMAPI("TCHARM_Register");
800         checkAddress(data);
801         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
802 }
803
804 CDECL void *TCHARM_Get_userdata(int id)
805 {
806         TCHARMAPI("TCHARM_Get_userdata");
807         return TCharm::get()->lookupUserData(id);
808 }
809 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
810 { return TCHARM_Get_userdata(*id); }
811
812 /* New hardcoded-ID userdata: */
813 CDECL void TCHARM_Set_global(int globalID,void *new_value,TCHARM_Pup_global_fn pup_or_NULL)
814 {
815         TCHARMAPI("TCHARM_Set_global");
816         TCharm *tc=TCharm::get();
817         if (tc->sud.length()<=globalID)
818         { //We don't have room for this ID yet: make room
819                 int newLen=2*globalID;
820                 tc->sud.resize(newLen);
821         }
822         tc->sud[globalID]=TCharm::UserData(pup_or_NULL,tc->getThread(),new_value);
823 }
824 CDECL void *TCHARM_Get_global(int globalID)
825 {
826         //Skip TCHARMAPI("TCHARM_Get_global") because there's no dynamic allocation here,
827         // and this routine should be as fast as possible.
828         CkVec<TCharm::UserData> &v=TCharm::get()->sud;
829         if (v.length()<=globalID) return NULL; //Uninitialized global
830         return v[globalID].getData();
831 }
832
833 CDECL void TCHARM_Migrate(void)
834 {
835         TCHARMAPI("TCHARM_Migrate");
836         if (CthMigratable() == 0) {
837           CkPrintf("Warning> thread migration is not supported!\n");
838           return;
839         }
840         TCharm::get()->migrate();
841 }
842 FORTRAN_AS_C(TCHARM_MIGRATE,TCHARM_Migrate,tcharm_migrate,(void),())
843
844 CDECL void TCHARM_Async_Migrate(void)
845 {
846         TCHARMAPI("TCHARM_Async_Migrate");
847         TCharm::get()->async_migrate();
848 }
849 FORTRAN_AS_C(TCHARM_ASYNC_MIGRATE,TCHARM_Async_Migrate,tcharm_async_migrate,(void),())
850
851 CDECL void TCHARM_Allow_Migrate(void)
852 {
853         TCHARMAPI("TCHARM_Allow_Migrate");
854         TCharm::get()->allow_migrate();
855 }
856 FORTRAN_AS_C(TCHARM_ALLOW_MIGRATE,TCHARM_Allow_Migrate,tcharm_allow_migrate,(void),())
857
858 CDECL void TCHARM_Migrate_to(int destPE)
859 {
860         TCHARMAPI("TCHARM_Migrate_to");
861         TCharm::get()->migrateTo(destPE);
862 }
863
864 CDECL void TCHARM_Evacuate()
865 {
866         TCHARMAPI("TCHARM_Migrate_to");
867         TCharm::get()->evacuate();
868 }
869
870 FORTRAN_AS_C(TCHARM_MIGRATE_TO,TCHARM_Migrate_to,tcharm_migrate_to,
871         (int *destPE),(*destPE))
872
873 CDECL void TCHARM_Yield(void)
874 {
875         TCHARMAPI("TCHARM_Yield");
876         TCharm::get()->schedule();
877 }
878 FORTRAN_AS_C(TCHARM_YIELD,TCHARM_Yield,tcharm_yield,(void),())
879
880 CDECL void TCHARM_Barrier(void)
881 {
882         TCHARMAPI("TCHARM_Barrier");
883         TCharm::get()->barrier();
884 }
885 FORTRAN_AS_C(TCHARM_BARRIER,TCHARM_Barrier,tcharm_barrier,(void),())
886
887 CDECL void TCHARM_Done(void)
888 {
889         TCHARMAPI("TCHARM_Done");
890         TCharm *c=TCharm::getNULL();
891         if (!c) CkExit();
892         else c->done();
893 }
894 FORTRAN_AS_C(TCHARM_DONE,TCHARM_Done,tcharm_done,(void),())
895
896
897 CDECL double TCHARM_Wall_timer(void)
898 {
899   TCHARMAPI("TCHARM_Wall_timer");
900   TCharm *c=TCharm::getNULL();
901   if(!c) return CkWallTimer();
902   else { //Have to apply current thread's time offset
903     return CkWallTimer()+c->getTimeOffset();
904   }
905 }
906
907 #if 1
908 /*Include Fortran-style "iargc" and "getarg" routines.
909 These are needed to get access to the command-line arguments from Fortran.
910 */
911 FDECL int FTN_NAME(TCHARM_IARGC,tcharm_iargc)(void) {
912   TCHARMAPI("tcharm_iargc");
913   return CkGetArgc()-1;
914 }
915
916 FDECL void FTN_NAME(TCHARM_GETARG,tcharm_getarg)
917         (int *i_p,char *dest,int destLen)
918 {
919   TCHARMAPI("tcharm_getarg");
920   int i=*i_p;
921   if (i<0) CkAbort("tcharm_getarg called with negative argument!");
922   if (i>=CkGetArgc()) CkAbort("tcharm_getarg called with argument > iargc!");
923   const char *src=CkGetArgv()[i];
924   strcpy(dest,src);
925   for (i=strlen(dest);i<destLen;i++) dest[i]=' ';
926 }
927
928 #endif
929
930 //These silly routines are used for serial startup:
931 extern void _initCharm(int argc, char **argv);
932 CDECL void TCHARM_Init(int *argc,char ***argv) {
933         if (!tcharm_initted) {
934           ConverseInit(*argc, *argv, (CmiStartFn) _initCharm,1,1);
935           _initCharm(*argc,*argv);
936         }
937 }
938
939 FDECL void FTN_NAME(TCHARM_INIT,tcharm_init)(void)
940 {
941         int argc=1;
942         const char *argv_sto[2]={"foo",NULL};
943         char **argv=(char **)argv_sto;
944         TCHARM_Init(&argc,&argv);
945 }
946
947 /***********************************
948 * TCHARM Semaphores:
949 * The idea is one side "puts", the other side "gets"; 
950 * but the calls can come in any order--
951 * if the "get" comes first, it blocks until the put.
952 * This makes a convenient, race-condition-free way to do
953 * onetime initializations.  
954 */
955 /// Find this semaphore, or insert if there isn't one:
956 TCharm::TCharmSemaphore *TCharm::findSema(int id) {
957         for (int s=0;s<sema.size();s++)
958                 if (sema[s].id==id) 
959                         return &sema[s];
960         sema.push_back(TCharmSemaphore(id));
961         return &sema[sema.size()-1];
962 }
963 /// Remove this semaphore from the list
964 void TCharm::freeSema(TCharmSemaphore *doomed) {
965         int id=doomed->id;
966         for (int s=0;s<sema.size();s++)
967                 if (sema[s].id==id) {
968                         sema[s]=sema[sema.length()-1];
969                         sema.length()--;
970                         return;
971                 }
972         CkAbort("Tried to free nonexistent TCharm semaphore");
973 }
974
975 /// Block until this semaphore has data:
976 TCharm::TCharmSemaphore *TCharm::getSema(int id) {
977         TCharmSemaphore *s=findSema(id);
978         if (s->data==NULL) 
979         { //Semaphore isn't filled yet: wait until it is
980                 s->thread=CthSelf();
981                 suspend(); //Will be woken by semaPut
982                 // Semaphore may have moved-- find it again
983                 s=findSema(id);
984                 if (s->data==NULL) CkAbort("TCharm::semaGet awoken too early!");
985         }
986         return s;
987 }
988
989 /// Store data at the semaphore "id".
990 ///  The put can come before or after the get.
991 void TCharm::semaPut(int id,void *data) {
992         TCharmSemaphore *s=findSema(id);
993         if (s->data!=NULL) CkAbort("Duplicate calls to TCharm::semaPut!");
994         s->data=data;
995         DBG("semaPut "<<id<<" "<<data);
996         if (s->thread!=NULL) {//Awaken the thread
997                 s->thread=NULL;
998                 resume();
999         }
1000 }
1001
1002 /// Retreive data from the semaphore "id".
1003 ///  Blocks if the data is not immediately available.
1004 ///  Consumes the data, so another put will be required for the next get.
1005 void *TCharm::semaGet(int id) {
1006         TCharmSemaphore *s=getSema(id);
1007         void *ret=s->data;
1008         DBG("semaGet "<<id<<" "<<ret);
1009         // Now remove the semaphore from the list:
1010         freeSema(s);
1011         return ret;
1012 }
1013
1014 /// Retreive data from the semaphore "id".
1015 ///  Blocks if the data is not immediately available.
1016 void *TCharm::semaGets(int id) {
1017         TCharmSemaphore *s=getSema(id);
1018         return s->data;
1019 }
1020
1021 /// Retreive data from the semaphore "id", or returns NULL.
1022 void *TCharm::semaPeek(int id) {
1023         TCharmSemaphore *s=findSema(id);
1024         return s->data;
1025 }
1026
1027 /****** System Call support ******/
1028 /*
1029 TCHARM_System exists to work around a bug where Linux ia32
1030 glibc2.2.x with pthreads crashes at the fork() site when 
1031 called from a user-levelthread. 
1032
1033 The fix is to call system from the main thread, by 
1034 passing the request out of the thread to our array element 
1035 before calling system().
1036 */
1037
1038 CDECL int 
1039 TCHARM_System(const char *shell_command)
1040 {
1041         return TCharm::get()->system(shell_command);
1042 }
1043 int TCharm::system(const char *cmd)
1044 {
1045         int ret=-1778;
1046         callSystemStruct s;
1047         s.cmd=cmd;
1048         s.ret=&ret;
1049         thisProxy[thisIndex].callSystem(s);
1050         suspend();
1051         return ret;
1052 }
1053
1054 void TCharm::callSystem(const callSystemStruct &s)
1055 {
1056         *s.ret = ::system(s.cmd);
1057         resume();
1058 }
1059
1060
1061
1062 #include "tcharm.def.h"