tcharm initization sends a function pointer in the tcharmInitMsg to broadcast to...
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm_impl.h"
7 #include "tcharm.h"
8 #include <ctype.h>
9
10 #if 0
11     /*Many debugging statements:*/
12 #    define DBG(x) ckout<<"["<<thisIndex<<","<<CkMyPe()<<"] TCHARM> "<<x<<endl;
13 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
14 #else
15     /*No debugging statements*/
16 #    define DBG(x) /*empty*/
17 #    define DBGX(x) /*empty*/
18 #endif
19
20 CtvDeclare(TCharm *,_curTCharm);
21
22 static int lastNumChunks=0;
23
24 class TCharmTraceLibList {
25         enum {maxLibs=20,maxLibNameLen=15};
26         //List of libraries we want to trace:
27         int curLibs;
28         char libNames[maxLibs][maxLibNameLen];
29         int checkIfTracing(const char *lib) const
30         {
31                 for (int i=0;i<curLibs;i++) 
32                         if (0==strcmp(lib,libNames[i]))
33                                 return 1;
34                 return 0;
35         }
36 public:
37         TCharmTraceLibList() {curLibs=0;}
38         void addTracing(const char *lib) 
39         { //We want to trace this library-- add its name to the list.
40                 CkPrintf("TCHARM> Will trace calls to library %s\n",lib);
41                 int i;
42                 for (i=0;0!=*lib;i++,lib++)
43                         libNames[curLibs][i]=tolower(*lib);
44                 libNames[curLibs][i]=0;
45                 // if already tracing, skip
46                 if (checkIfTracing(libNames[curLibs])) return;
47                 curLibs++;
48         }
49         inline int isTracing(const char *lib) const {
50                 if (curLibs==0) return 0; //Common case
51                 else return checkIfTracing(lib);
52         }
53 };
54 static TCharmTraceLibList tcharm_tracelibs;
55 static int tcharm_nomig=0, tcharm_nothreads=0;
56 static int tcharm_stacksize=1*1024*1024; /*Default stack size is 1MB*/
57 static int tcharm_initted=0;
58 CkpvDeclare(int, mapCreated);
59 static CkGroupID mapID;
60 static char* mapping = NULL;
61
62 void TCharm::nodeInit(void)
63 {
64 }
65
66 void TCharm::procInit(void)
67 {
68   CtvInitialize(TCharm *,_curTCharm);
69   CtvAccess(_curTCharm)=NULL;
70   tcharm_initted=1;
71   CtgInit();
72
73   CkpvInitialize(int, mapCreated);
74   CkpvAccess(mapCreated) = 0;
75
76   // called on every pe to eat these arguments
77   char **argv=CkGetArgv();
78   tcharm_nomig=CmiGetArgFlagDesc(argv,"+tcharm_nomig","Disable migration support (debugging)");
79   tcharm_nothreads=CmiGetArgFlagDesc(argv,"+tcharm_nothread","Disable thread support (debugging)");
80   tcharm_nothreads|=CmiGetArgFlagDesc(argv,"+tcharm_nothreads",NULL);
81   char *traceLibName=NULL;
82   while (CmiGetArgStringDesc(argv,"+tcharm_trace",&traceLibName,"Print each call to this library"))
83       tcharm_tracelibs.addTracing(traceLibName);
84   CmiGetArgIntDesc(argv,"+tcharm_stacksize",&tcharm_stacksize,"Set the thread stack size (default 1MB)");
85   if (CkMyPe()!=0) { //Processor 0 eats "+vp<N>" and "-vp<N>" later:
86         int ignored;
87         while (CmiGetArgIntDesc(argv,"-vp",&ignored,NULL)) {}
88         while (CmiGetArgIntDesc(argv,"+vp",&ignored,NULL)) {}
89   }
90   if (CkMyPe()==0) { // Echo various debugging options:
91     if (tcharm_nomig) CmiPrintf("TCHARM> Disabling migration support, for debugging\n");
92     if (tcharm_nothreads) CmiPrintf("TCHARM> Disabling thread support, for debugging\n");
93   }
94   if (CkpvAccess(mapCreated)==0) {
95     if (0!=CmiGetArgString(argv, "+mapping", &mapping)){
96     }
97     CkpvAccess(mapCreated)=1;
98   }
99 }
100
101 void TCHARM_Api_trace(const char *routineName,const char *libraryName)
102 {
103         if (!tcharm_tracelibs.isTracing(libraryName)) return;
104         TCharm *tc=CtvAccess(_curTCharm);
105         char where[100];
106         if (tc==NULL) sprintf(where,"[serial context on %d]",CkMyPe());
107         else sprintf(where,"[%p> vp %d, p %d]",(void *)tc,tc->getElement(),CkMyPe());
108         CmiPrintf("%s Called routine %s\n",where,routineName);
109         CmiPrintStackTrace(1);
110         CmiPrintf("\n");
111 }
112
113 // register thread start functions to get a function handler
114 // this is portable across heterogeneous platforms, or on machines with
115 // random stack/function pointer
116
117 static CkVec<TCHARM_Thread_data_start_fn> threadFnTable;
118
119 int TCHARM_Register_thread_function(TCHARM_Thread_data_start_fn fn)
120 {
121   int idx = threadFnTable.size();
122   threadFnTable.push_back(fn);
123   return idx+1;                     // make 0 invalid number
124 }
125
126 TCHARM_Thread_data_start_fn getTCharmThreadFunction(int idx)
127 {
128   CmiAssert(idx > 0);
129   return threadFnTable[idx-1];
130 }
131
132 static void startTCharmThread(TCharmInitMsg *msg)
133 {
134         DBGX("thread started");
135         TCharm::activateThread();
136         TCHARM_Thread_data_start_fn threadFn = getTCharmThreadFunction(msg->threadFn);
137         threadFn(msg->data);
138         TCharm::deactivateThread();
139         CtvAccess(_curTCharm)->done();
140 }
141
142 TCharm::TCharm(TCharmInitMsg *initMsg_)
143 {
144   initMsg=initMsg_;
145   initMsg->opts.sanityCheck();
146   timeOffset=0.0;
147   threadGlobals=CtgCreate();
148   if (tcharm_nothreads)
149   { //Don't even make a new thread-- just use main thread
150     tid=CthSelf();
151   }
152   else /*Create a thread normally*/
153   {
154     if (tcharm_nomig) { /*Nonmigratable version, for debugging*/
155       tid=CthCreate((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
156     } else {
157       tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
158     }
159 #if CMK_BLUEGENE_CHARM
160     BgAttach(tid);
161 #endif
162   }
163   CtvAccessOther(tid,_curTCharm)=this;
164   isStopped=true;
165   resumeAfterMigration=false;
166         /* FAULT_EVAC*/
167         AsyncEvacuate(CmiTrue);
168   skipResume=false;
169   exitWhenDone=initMsg->opts.exitWhenDone;
170   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
171   threadInfo.thisElement=thisIndex;
172   threadInfo.numElements=initMsg->numElements;
173   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
174         heapBlocks=CmiIsomallocBlockListNew();
175   else
176         heapBlocks=0;
177   nUd=0;
178   usesAtSync=CmiTrue;
179   run();
180 }
181
182 TCharm::TCharm(CkMigrateMessage *msg)
183         :CBase_TCharm(msg)
184 {
185   initMsg=NULL;
186   tid=NULL;
187   threadGlobals=NULL;
188   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
189         AsyncEvacuate(CmiTrue);
190   heapBlocks=0;
191 }
192
193 void checkPupMismatch(PUP::er &p,int expected,const char *where)
194 {
195         int v=expected;
196         p|v;
197         if (v!=expected) {
198                 CkError("FATAL ERROR> Mismatch %s pup routine\n",where);
199                 CkAbort("FATAL ERROR: Pup direction mismatch");
200         }
201 }
202
203 void TCharm::pup(PUP::er &p) {
204 //Pup superclass
205   ArrayElement1D::pup(p);
206
207   checkPupMismatch(p,5134,"before TCHARM");
208   p(isStopped); p(resumeAfterMigration); p(exitWhenDone); p(skipResume);
209   p(threadInfo.thisElement);
210   p(threadInfo.numElements);
211   
212   if (sema.size()>0) 
213         CkAbort("TCharm::pup> Cannot migrate with unconsumed semaphores!\n");
214
215 #ifndef CMK_OPTIMIZE
216   DBG("Packing thread");
217   if (!isStopped && !CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
218     CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
219         }       
220   if (tcharm_nomig) CkAbort("Cannot migrate with the +tcharm_nomig option!\n");
221 #endif
222
223   //This seekBlock allows us to reorder the packing/unpacking--
224   // This is needed because the userData depends on the thread's stack
225   // and heap data both at pack and unpack time.
226   PUP::seekBlock s(p,2);
227   
228   if (p.isUnpacking())
229   {//In this case, unpack the thread & heap before the user data
230     s.seek(1);
231     pupThread(p);
232     //Restart our clock: set it up so packTime==CkWallTimer+timeOffset
233     double packTime;
234     p(packTime);
235     timeOffset=packTime-CkWallTimer();
236   }
237   
238 //Pack all user data
239   // Set up TCHARM context for use during user's pup routines:
240   CtvAccess(_curTCharm)=this;
241   activateThread();
242
243   s.seek(0);
244   checkPupMismatch(p,5135,"before TCHARM user data");
245   p(nUd);
246   for(int i=0;i<nUd;i++) {
247     if (p.isUnpacking()) ud[i].update(tid);
248     ud[i].pup(p);
249   }
250   checkPupMismatch(p,5137,"after TCHARM_Register user data");
251
252   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
253     deactivateThread();
254   p|sud;           //  sud vector block can not be in isomalloc
255   checkPupMismatch(p,5138,"after TCHARM_Global user data");
256   
257   // Tear down TCHARM context after calling user pup routines
258   if (!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
259     deactivateThread();
260   CtvAccess(_curTCharm)=NULL;
261   
262   if (!p.isUnpacking())
263   {//In this case, pack the thread & heap after the user data
264     s.seek(1);
265     pupThread(p);
266     //Stop our clock:
267     double packTime=CkWallTimer()+timeOffset;
268     p(packTime);
269   }
270   
271   s.endBlock(); //End of seeking block
272   checkPupMismatch(p,5140,"after TCHARM");
273 }
274
275 // Pup our thread and related data
276 void TCharm::pupThread(PUP::er &pc) {
277     pup_er p=(pup_er)&pc;
278     checkPupMismatch(pc,5138,"before TCHARM thread");
279     tid = CthPup(p, tid);
280     if (pc.isUnpacking()) {
281       CtvAccessOther(tid,_curTCharm)=this;
282 #if CMK_BLUEGENE_CHARM
283       BgAttach(tid);
284 #endif
285     }
286     if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
287       CmiIsomallocBlockListPup(p,&heapBlocks);
288     threadGlobals=CtgPup(p,threadGlobals);
289     checkPupMismatch(pc,5139,"after TCHARM thread");
290 }
291
292 //Pup one group of user data
293 void TCharm::UserData::pup(PUP::er &p)
294 {
295   pup_er pext=(pup_er)(&p);
296   p(mode);
297   switch(mode) {
298   case 'c': { /* C mode: userdata is on the stack, so keep address */
299 //     p((char*)&data,sizeof(data));
300      p(pos);
301      //FIXME: function pointers may not be valid across processors
302      p((char*)&cfn, sizeof(TCHARM_Pup_fn));
303      char *data = CthPointer(t, pos);
304      if (cfn) cfn(pext,data);
305      } break;
306   case 'g': { /* Global mode: zero out userdata on arrival */
307      if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
308      {
309         // keep the pointer value if using isomalloc, no need to use pup
310        p(pos);
311      }
312      else if (p.isUnpacking())      //  zero out userdata on arrival
313        pos=0;
314
315        //FIXME: function pointers may not be valid across processors
316      p((char*)&gfn, sizeof(TCHARM_Pup_global_fn));
317      if (gfn) gfn(pext);
318      } break;
319   default:
320      break;
321   };
322 }
323
324 TCharm::~TCharm()
325 {
326   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
327   CthFree(tid);
328   CtgFree(threadGlobals);
329   delete initMsg;
330 }
331
332 void TCharm::migrateTo(int destPE) {
333         if (destPE==CkMyPe()) return;
334         // Make sure migrateMe gets called *after* we suspend:
335         thisProxy[thisIndex].migrateDelayed(destPE);
336 //      resumeAfterMigration=true;
337         suspend();
338 }
339 void TCharm::migrateDelayed(int destPE) {
340         migrateMe(destPE);
341 }
342 void TCharm::ckJustMigrated(void) {
343         ArrayElement::ckJustMigrated();
344         if (resumeAfterMigration) {
345                 resumeAfterMigration=false;
346                 resume(); //Start the thread running
347         }
348 }
349
350 /*
351         FAULT_EVAC
352
353         If a Tcharm object is about to migrate it should be suspended first
354 */
355 void TCharm::ckAboutToMigrate(void){
356         ArrayElement::ckAboutToMigrate();
357         resumeAfterMigration = true;
358         isStopped = true;
359 //      suspend();
360 }
361
362 // clear the data before restarting from disk
363 void TCharm::clear()
364 {
365   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
366   CthFree(tid);
367   delete initMsg;
368 }
369
370 //Register user data to be packed with the thread
371 int TCharm::add(const TCharm::UserData &d)
372 {
373   if (nUd>=maxUserData)
374     CkAbort("TCharm: Registered too many user data fields!\n");
375   int nu=nUd++;
376   ud[nu]=d;
377   return nu;
378 }
379 void *TCharm::lookupUserData(int i) {
380         if (i<0 || i>=nUd)
381                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
382         return ud[i].getData();
383 }
384
385 //Start the thread running
386 void TCharm::run(void)
387 {
388   DBG("TCharm::run()");
389   if (tcharm_nothreads) {/*Call user routine directly*/
390           startTCharmThread(initMsg);
391   } 
392   else /* start the thread as usual */
393           start();
394 }
395
396 //Block the thread until start()ed again.
397 void TCharm::stop(void)
398 {
399 #ifndef CMK_OPTIMIZE
400   if (tid != CthSelf())
401     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
402   if (tcharm_nothreads)
403     CkAbort("Cannot make blocking calls using +tcharm_nothreads!\n");
404 #endif
405   stopTiming();
406   isStopped=true;
407   DBG("thread suspended");
408   CthSuspend();
409 //      DBG("thread resumed");
410   /*SUBTLE: We have to do the get() because "this" may have changed
411     during a migration-suspend.  If you access *any* members
412     from this point onward, you'll cause heap corruption if
413     we're resuming from migration!  (OSL 2003/9/23)
414    */
415   TCharm *dis=TCharm::get();
416   dis->isStopped=false;
417   dis->startTiming();
418 //      printf("[%d] Thread resumed  for tid %p\n",dis->thisIndex,dis->tid);
419 }
420
421 //Resume the waiting thread
422 void TCharm::start(void)
423 {
424   //  since this thread is scheduled, it is not a good idea to migrate 
425   isStopped=false;
426   DBG("thread resuming soon");
427   CthAwaken(tid);
428 }
429
430 //Block our thread, schedule, and come back:
431 void TCharm::schedule(void) {
432   DBG("thread schedule");
433   start(); // Calls CthAwaken
434   stop(); // Calls CthSuspend
435 }
436
437 //Go to sync, block, possibly migrate, and then resume
438 void TCharm::migrate(void)
439 {
440 #if CMK_LBDB_ON
441   DBG("going to sync");
442   AtSync();
443   stop();
444 #else
445   DBG("skipping sync, because there is no load balancer");
446 #endif
447 }
448
449
450 void TCharm::evacuate(){
451         /*
452                 FAULT_EVAC
453         */
454         //CkClearAllArrayElementsCPP();
455         if(CpvAccess(startedEvac)){
456                 int nextPE = getNextPE(CkArrayIndex1D(thisIndex));
457 //              resumeAfterMigration=true;
458                 CcdCallFnAfter((CcdVoidFn)CkEmmigrateElement, (void *)myRec, 1);
459                 suspend();
460                 return;
461         }
462         return;
463
464 }
465
466 //calls atsync with async mode
467 void TCharm::async_migrate(void)
468 {
469 #if CMK_LBDB_ON
470   DBG("going to sync at async mode");
471   skipResume = true;            // we resume immediately
472   ReadyMigrate(false);
473   AtSync(0);
474   schedule();
475 //  allow_migrate();
476 #else
477   DBG("skipping sync, because there is no load balancer");
478 #endif
479 }
480
481 /*
482 Note:
483  thread can only migrate at the point when this is called
484 */
485 void TCharm::allow_migrate(void)
486 {
487 #if CMK_LBDB_ON
488 //  ReadyMigrate(true);
489   int nextPe = MigrateToPe();
490   if (nextPe != -1) {
491     migrateTo(nextPe);
492   }
493 #else
494   DBG("skipping sync, because there is no load balancer");
495 #endif
496 }
497
498 //Resume from sync: start the thread again
499 void TCharm::ResumeFromSync(void)
500 {
501   if (!skipResume) start();
502 }
503
504
505 /****** TcharmClient ******/
506 void TCharmClient1D::ckJustMigrated(void) {
507   ArrayElement1D::ckJustMigrated();
508   findThread();
509   tcharmClientInit();
510 }
511
512 void TCharmClient1D::pup(PUP::er &p) {
513   ArrayElement1D::pup(p);
514   p|threadProxy;
515 }
516
517 CkArrayID TCHARM_Get_threads(void) {
518         TCHARMAPI("TCHARM_Get_threads");
519         return TCharm::get()->getProxy();
520 }
521
522 /************* Startup/Shutdown Coordination Support ************/
523
524 // Useless values to reduce over:
525 int vals[2]={0,1};
526
527 //Called when we want to go to a barrier
528 void TCharm::barrier(void) {
529         //Contribute to a synchronizing reduction
530         CkCallback cb(index_t::atBarrier(0), thisProxy[0]);
531         contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
532 #if CMK_BLUEGENE_CHARM
533         void *curLog;           // store current log in timeline
534         _TRACE_BG_TLINE_END(&curLog);
535         TRACE_BG_AMPI_BREAK(NULL, "TCharm_Barrier_START", NULL, 0);
536 #endif
537         stop();
538 #if CMK_BLUEGENE_CHARM
539          _TRACE_BG_SET_INFO(NULL, "TCHARM_Barrier_END",  &curLog, 1);
540 #endif
541 }
542
543 //Called when we've reached the barrier
544 void TCharm::atBarrier(CkReductionMsg *m) {
545         DBGX("clients all at barrier");
546         delete m;
547         thisProxy.start(); //Just restart everybody
548 }
549
550 //Called when the thread is done running
551 void TCharm::done(void) {
552         DBG("TCharm thread "<<thisIndex<<" done")
553         if (exitWhenDone) {
554                 //Contribute to a synchronizing reduction
555                 CkCallback cb(index_t::atExit(0), thisProxy[0]);
556                 contribute(sizeof(vals),&vals,CkReduction::sum_int,cb);
557         }
558         stop();
559 }
560 //Called when all threads are done running
561 void TCharm::atExit(CkReductionMsg *m) {
562         DBGX("TCharm::atExit> exiting");
563         delete m;
564         CkExit();
565 }
566
567
568 /************* Setup **************/
569
570 //Globals used to control setup process
571 static TCHARM_Fallback_setup_fn g_fallbackSetup=NULL;
572 void TCHARM_Set_fallback_setup(TCHARM_Fallback_setup_fn f)
573 {
574         g_fallbackSetup=f;
575 }
576 void TCHARM_Call_fallback_setup(void) {
577         if (g_fallbackSetup) 
578                 (g_fallbackSetup)();
579         else
580                 CkAbort("TCHARM: Unexpected fallback setup--missing TCHARM_User_setup routine?");
581 }
582
583 /************** User API ***************/
584 /**********************************
585 Callable from UserSetup:
586 */
587
588 // Read the command line to figure out how many threads to create:
589 CDECL int TCHARM_Get_num_chunks(void)
590 {
591         TCHARMAPI("TCHARM_Get_num_chunks");
592         if (CkMyPe()!=0) CkAbort("TCHARM_Get_num_chunks should only be called on PE 0 during setup!");
593         int nChunks=CkNumPes();
594         char **argv=CkGetArgv();
595         CmiGetArgIntDesc(argv,"-vp",&nChunks,"Set the total number of virtual processors");
596         CmiGetArgIntDesc(argv,"+vp",&nChunks,NULL);
597         lastNumChunks=nChunks;
598         return nChunks;
599 }
600 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
601 {
602         return TCHARM_Get_num_chunks();
603 }
604
605 // Fill out the default thread options:
606 TCHARM_Thread_options::TCHARM_Thread_options(int doDefault)
607 {
608         stackSize=0; /* default stacksize */
609         exitWhenDone=0; /* don't exit when done by default. */
610 }
611 void TCHARM_Thread_options::sanityCheck(void) {
612         if (stackSize<=0) stackSize=tcharm_stacksize;
613 }
614
615
616 TCHARM_Thread_options g_tcharmOptions(1);
617
618 /*Set the size of the thread stack*/
619 CDECL void TCHARM_Set_stack_size(int newStackSize)
620 {
621         TCHARMAPI("TCHARM_Set_stack_size");
622         g_tcharmOptions.stackSize=newStackSize;
623 }
624 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
625         (int *newSize)
626 { TCHARM_Set_stack_size(*newSize); }
627
628 CDECL void TCHARM_Set_exit(void) { g_tcharmOptions.exitWhenDone=1; }
629
630 /*Create a new array of threads, which will be bound to by subsequent libraries*/
631 CDECL void TCHARM_Create(int nThreads,
632                         int threadFn)
633 {
634         TCHARMAPI("TCHARM_Create");
635         TCHARM_Create_data(nThreads,
636                          threadFn,NULL,0);
637 }
638 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
639         (int *nThreads,int threadFn)
640 { TCHARM_Create(*nThreads,threadFn); }
641
642 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg);
643
644 /*As above, but pass along (arbitrary) data to threads*/
645 CDECL void TCHARM_Create_data(int nThreads,
646                   int threadFn,
647                   void *threadData,int threadDataLen)
648 {
649         TCHARMAPI("TCHARM_Create_data");
650         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
651                 threadFn,g_tcharmOptions);
652         msg->numElements=nThreads;
653         memcpy(msg->data,threadData,threadDataLen);
654         TCHARM_Build_threads(msg);
655         
656         // Reset the thread options:
657         g_tcharmOptions=TCHARM_Thread_options(1);
658 }
659
660 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
661         (int *nThreads,
662                   int threadFn,
663                   void *threadData,int *threadDataLen)
664 { TCHARM_Create_data(*nThreads,threadFn,threadData,*threadDataLen); }
665
666 CkGroupID CkCreatePropMap(void);
667
668 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg)
669 {
670   CkArrayOptions opts(msg->numElements);
671   CkAssert(CkpvAccess(mapCreated)==1);
672   if(mapping==NULL){
673 #if CMK_BLUEGENE_CHARM
674     mapID=CProxy_BlockMap::ckNew();
675 #else
676     mapID=CkCreatePropMap();
677 #endif
678   }else if(0==strcmp(mapping,"BLOCK_MAP")){
679     mapID=CProxy_BlockMap::ckNew();
680   }else if(0==strcmp(mapping,"RR_MAP")){
681     mapID=CProxy_RRMap::ckNew();
682   }else{  // "PROP_MAP" or anything else
683     mapID=CkCreatePropMap();
684   }
685   opts.setMap(mapID);
686   int nElem=msg->numElements; //<- save it because msg will be deleted.
687   return CProxy_TCharm::ckNew(msg,opts);
688 }
689
690 // Helper used when creating a new array bound to the TCHARM threads:
691 CkArrayOptions TCHARM_Attach_start(CkArrayID *retTCharmArray,int *retNumElts)
692 {
693         TCharm *tc=TCharm::get();
694         if (!tc)
695                 CkAbort("You must call TCHARM initialization routines from a TCHARM thread!");
696         int nElts=tc->getNumElements();
697         if (retNumElts!=NULL) *retNumElts=nElts;
698         *retTCharmArray=tc->getProxy();
699         CkArrayOptions opts(nElts);
700         opts.bindTo(tc->getProxy());
701         return opts;
702 }
703
704 void TCHARM_Suspend(void) {
705         TCharm *tc=TCharm::get();
706         tc->suspend();
707 }
708
709 /***********************************
710 Callable from worker thread
711 */
712 CDECL int TCHARM_Element(void)
713
714         TCHARMAPI("TCHARM_Element");
715         return TCharm::get()->getElement();
716 }
717 CDECL int TCHARM_Num_elements(void)
718
719         TCHARMAPI("TCHARM_Num_elements");
720         return TCharm::get()->getNumElements();
721 }
722
723 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
724 { return TCHARM_Element();}
725 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
726 { return TCHARM_Num_elements();}
727
728 //Make sure this address will migrate with us when we move:
729 static void checkAddress(void *data)
730 {
731         if (tcharm_nomig||tcharm_nothreads) return; //Stack is not isomalloc'd
732         if (CmiThreadIs(CMI_THREAD_IS_ALIAS)||CmiThreadIs(CMI_THREAD_IS_STACKCOPY)) return; // memory alias thread
733         if (CmiIsomallocEnabled()) {
734           if (!CmiIsomallocInRange(data))
735             CkAbort("The UserData you register must be allocated on the stack!\n");
736         }
737         else 
738           CkPrintf("Warning> checkAddress failed because isomalloc not supported.\n");
739 }
740
741 /* Old "register"-based userdata: */
742 CDECL int TCHARM_Register(void *data,TCHARM_Pup_fn pfn)
743
744         TCHARMAPI("TCHARM_Register");
745         checkAddress(data);
746         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
747 }
748 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
749         (void *data,TCHARM_Pup_fn pfn)
750
751         TCHARMAPI("TCHARM_Register");
752         checkAddress(data);
753         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
754 }
755
756 CDECL void *TCHARM_Get_userdata(int id)
757 {
758         TCHARMAPI("TCHARM_Get_userdata");
759         return TCharm::get()->lookupUserData(id);
760 }
761 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
762 { return TCHARM_Get_userdata(*id); }
763
764 /* New hardcoded-ID userdata: */
765 CDECL void TCHARM_Set_global(int globalID,void *new_value,TCHARM_Pup_global_fn pup_or_NULL)
766 {
767         TCHARMAPI("TCHARM_Set_global");
768         TCharm *tc=TCharm::get();
769         if (tc->sud.length()<=globalID)
770         { //We don't have room for this ID yet: make room
771                 int newLen=2*globalID;
772                 tc->sud.resize(newLen);
773         }
774         tc->sud[globalID]=TCharm::UserData(pup_or_NULL,tc->getThread(),new_value);
775 }
776 CDECL void *TCHARM_Get_global(int globalID)
777 {
778         //Skip TCHARMAPI("TCHARM_Get_global") because there's no dynamic allocation here,
779         // and this routine should be as fast as possible.
780         CkVec<TCharm::UserData> &v=TCharm::get()->sud;
781         if (v.length()<=globalID) return NULL; //Uninitialized global
782         return v[globalID].getData();
783 }
784
785 CDECL void TCHARM_Migrate(void)
786 {
787         TCHARMAPI("TCHARM_Migrate");
788         if (CthMigratable() == 0) {
789           CkPrintf("Warning> thread migration is not supported!\n");
790           return;
791         }
792         TCharm::get()->migrate();
793 }
794 FORTRAN_AS_C(TCHARM_MIGRATE,TCHARM_Migrate,tcharm_migrate,(void),())
795
796 CDECL void TCHARM_Async_Migrate(void)
797 {
798         TCHARMAPI("TCHARM_Async_Migrate");
799         TCharm::get()->async_migrate();
800 }
801 FORTRAN_AS_C(TCHARM_ASYNC_MIGRATE,TCHARM_Async_Migrate,tcharm_async_migrate,(void),())
802
803 CDECL void TCHARM_Allow_Migrate(void)
804 {
805         TCHARMAPI("TCHARM_Allow_Migrate");
806         TCharm::get()->allow_migrate();
807 }
808 FORTRAN_AS_C(TCHARM_ALLOW_MIGRATE,TCHARM_Allow_Migrate,tcharm_allow_migrate,(void),())
809
810 CDECL void TCHARM_Migrate_to(int destPE)
811 {
812         TCHARMAPI("TCHARM_Migrate_to");
813         TCharm::get()->migrateTo(destPE);
814 }
815
816 CDECL void TCHARM_Evacuate()
817 {
818         TCHARMAPI("TCHARM_Migrate_to");
819         TCharm::get()->evacuate();
820 }
821
822 FORTRAN_AS_C(TCHARM_MIGRATE_TO,TCHARM_Migrate_to,tcharm_migrate_to,
823         (int *destPE),(*destPE))
824
825 CDECL void TCHARM_Yield(void)
826 {
827         TCHARMAPI("TCHARM_Yield");
828         TCharm::get()->schedule();
829 }
830 FORTRAN_AS_C(TCHARM_YIELD,TCHARM_Yield,tcharm_yield,(void),())
831
832 CDECL void TCHARM_Barrier(void)
833 {
834         TCHARMAPI("TCHARM_Barrier");
835         TCharm::get()->barrier();
836 }
837 FORTRAN_AS_C(TCHARM_BARRIER,TCHARM_Barrier,tcharm_barrier,(void),())
838
839 CDECL void TCHARM_Done(void)
840 {
841         TCHARMAPI("TCHARM_Done");
842         TCharm *c=TCharm::getNULL();
843         if (!c) CkExit();
844         else c->done();
845 }
846 FORTRAN_AS_C(TCHARM_DONE,TCHARM_Done,tcharm_done,(void),())
847
848
849 CDECL double TCHARM_Wall_timer(void)
850 {
851   TCHARMAPI("TCHARM_Wall_timer");
852   TCharm *c=TCharm::getNULL();
853   if(!c) return CkWallTimer();
854   else { //Have to apply current thread's time offset
855     return CkWallTimer()+c->getTimeOffset();
856   }
857 }
858
859 #if 1
860 /*Include Fortran-style "iargc" and "getarg" routines.
861 These are needed to get access to the command-line arguments from Fortran.
862 */
863 FDECL int FTN_NAME(TCHARM_IARGC,tcharm_iargc)(void) {
864   TCHARMAPI("tcharm_iargc");
865   return CkGetArgc()-1;
866 }
867
868 FDECL void FTN_NAME(TCHARM_GETARG,tcharm_getarg)
869         (int *i_p,char *dest,int destLen)
870 {
871   TCHARMAPI("tcharm_getarg");
872   int i=*i_p;
873   if (i<0) CkAbort("tcharm_getarg called with negative argument!");
874   if (i>=CkGetArgc()) CkAbort("tcharm_getarg called with argument > iargc!");
875   const char *src=CkGetArgv()[i];
876   strcpy(dest,src);
877   for (i=strlen(dest);i<destLen;i++) dest[i]=' ';
878 }
879
880 #endif
881
882 //These silly routines are used for serial startup:
883 extern void _initCharm(int argc, char **argv);
884 CDECL void TCHARM_Init(int *argc,char ***argv) {
885         if (!tcharm_initted) {
886           ConverseInit(*argc, *argv, (CmiStartFn) _initCharm,1,1);
887           _initCharm(*argc,*argv);
888         }
889 }
890
891 FDECL void FTN_NAME(TCHARM_INIT,tcharm_init)(void)
892 {
893         int argc=1;
894         const char *argv_sto[2]={"foo",NULL};
895         char **argv=(char **)argv_sto;
896         TCHARM_Init(&argc,&argv);
897 }
898
899 /***********************************
900 * TCHARM Semaphores:
901 * The idea is one side "puts", the other side "gets"; 
902 * but the calls can come in any order--
903 * if the "get" comes first, it blocks until the put.
904 * This makes a convenient, race-condition-free way to do
905 * onetime initializations.  
906 */
907 /// Find this semaphore, or insert if there isn't one:
908 TCharm::TCharmSemaphore *TCharm::findSema(int id) {
909         for (int s=0;s<sema.size();s++)
910                 if (sema[s].id==id) 
911                         return &sema[s];
912         sema.push_back(TCharmSemaphore(id));
913         return &sema[sema.size()-1];
914 }
915 /// Remove this semaphore from the list
916 void TCharm::freeSema(TCharmSemaphore *doomed) {
917         int id=doomed->id;
918         for (int s=0;s<sema.size();s++)
919                 if (sema[s].id==id) {
920                         sema[s]=sema[sema.length()-1];
921                         sema.length()--;
922                         return;
923                 }
924         CkAbort("Tried to free nonexistent TCharm semaphore");
925 }
926
927 /// Block until this semaphore has data:
928 TCharm::TCharmSemaphore *TCharm::getSema(int id) {
929         TCharmSemaphore *s=findSema(id);
930         if (s->data==NULL) 
931         { //Semaphore isn't filled yet: wait until it is
932                 s->thread=CthSelf();
933                 suspend(); //Will be woken by semaPut
934                 // Semaphore may have moved-- find it again
935                 s=findSema(id);
936                 if (s->data==NULL) CkAbort("TCharm::semaGet awoken too early!");
937         }
938         return s;
939 }
940
941 /// Store data at the semaphore "id".
942 ///  The put can come before or after the get.
943 void TCharm::semaPut(int id,void *data) {
944         TCharmSemaphore *s=findSema(id);
945         if (s->data!=NULL) CkAbort("Duplicate calls to TCharm::semaPut!");
946         s->data=data;
947         DBG("semaPut "<<id<<" "<<data);
948         if (s->thread!=NULL) {//Awaken the thread
949                 s->thread=NULL;
950                 resume();
951         }
952 }
953
954 /// Retreive data from the semaphore "id".
955 ///  Blocks if the data is not immediately available.
956 ///  Consumes the data, so another put will be required for the next get.
957 void *TCharm::semaGet(int id) {
958         TCharmSemaphore *s=getSema(id);
959         void *ret=s->data;
960         DBG("semaGet "<<id<<" "<<ret);
961         // Now remove the semaphore from the list:
962         freeSema(s);
963         return ret;
964 }
965
966 /// Retreive data from the semaphore "id".
967 ///  Blocks if the data is not immediately available.
968 void *TCharm::semaGets(int id) {
969         TCharmSemaphore *s=getSema(id);
970         return s->data;
971 }
972
973 /// Retreive data from the semaphore "id", or returns NULL.
974 void *TCharm::semaPeek(int id) {
975         TCharmSemaphore *s=findSema(id);
976         return s->data;
977 }
978
979 /****** System Call support ******/
980 /*
981 TCHARM_System exists to work around a bug where Linux ia32
982 glibc2.2.x with pthreads crashes at the fork() site when 
983 called from a user-levelthread. 
984
985 The fix is to call system from the main thread, by 
986 passing the request out of the thread to our array element 
987 before calling system().
988 */
989
990 CDECL int 
991 TCHARM_System(const char *shell_command)
992 {
993         return TCharm::get()->system(shell_command);
994 }
995 int TCharm::system(const char *cmd)
996 {
997         int ret=-1778;
998         callSystemStruct s;
999         s.cmd=cmd;
1000         s.ret=&ret;
1001         thisProxy[thisIndex].callSystem(s);
1002         suspend();
1003         return ret;
1004 }
1005
1006 void TCharm::callSystem(const callSystemStruct &s)
1007 {
1008         *s.ret = ::system(s.cmd);
1009         resume();
1010 }
1011
1012
1013
1014 #include "tcharm.def.h"