Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / libs / ck-libs / tcharm / tcharm.C
1 /*
2 Threaded Charm++ "Framework Framework"
3
4 Orion Sky Lawlor, olawlor@acm.org, 11/19/2001
5  */
6 #include "tcharm_impl.h"
7 #include "tcharm.h"
8 #include "ckevacuation.h"
9 #include <ctype.h>
10
11 #if 0 
12     /*Many debugging statements:*/
13 #    define DBG(x) ckout<<"["<<thisIndex<<","<<CkMyPe()<<"] TCHARM> "<<x<<endl;
14 #    define DBGX(x) ckout<<"PE("<<CkMyPe()<<") TCHARM> "<<x<<endl;
15 #else
16     /*No debugging statements*/
17 #    define DBG(x) /*empty*/
18 #    define DBGX(x) /*empty*/
19 #endif
20
21 CtvDeclare(TCharm *,_curTCharm);
22
23 static int lastNumChunks=0;
24
25 class TCharmTraceLibList {
26         enum {maxLibs=20,maxLibNameLen=15};
27         //List of libraries we want to trace:
28         int curLibs;
29         char libNames[maxLibs][maxLibNameLen];
30         int checkIfTracing(const char *lib) const
31         {
32                 for (int i=0;i<curLibs;i++) 
33                         if (0==strcmp(lib,libNames[i]))
34                                 return 1;
35                 return 0;
36         }
37 public:
38         TCharmTraceLibList() {curLibs=0;}
39         void addTracing(const char *lib) 
40         { //We want to trace this library-- add its name to the list.
41                 CkPrintf("TCHARM> Will trace calls to library %s\n",lib);
42                 int i;
43                 for (i=0;0!=*lib;i++,lib++)
44                         libNames[curLibs][i]=tolower(*lib);
45                 libNames[curLibs][i]=0;
46                 // if already tracing, skip
47                 if (checkIfTracing(libNames[curLibs])) return;
48                 curLibs++;
49         }
50         inline int isTracing(const char *lib) const {
51                 if (curLibs==0) return 0; //Common case
52                 else return checkIfTracing(lib);
53         }
54 };
55 static TCharmTraceLibList tcharm_tracelibs;
56 static int tcharm_nomig=0, tcharm_nothreads=0;
57 static int tcharm_stacksize=1*1024*1024; /*Default stack size is 1MB*/
58 static int tcharm_initted=0;
59 CkpvDeclare(int, mapCreated);
60 static CkGroupID mapID;
61 static char* mapping = NULL;
62
63 void TCharm::nodeInit(void)
64 {
65 }
66
67 void TCharm::procInit(void)
68 {
69   CtvInitialize(TCharm *,_curTCharm);
70   CtvAccess(_curTCharm)=NULL;
71   tcharm_initted=1;
72   CtgInit();
73
74   CkpvInitialize(int, mapCreated);
75   CkpvAccess(mapCreated) = 0;
76
77   // called on every pe to eat these arguments
78   char **argv=CkGetArgv();
79   tcharm_nomig=CmiGetArgFlagDesc(argv,"+tcharm_nomig","Disable migration support (debugging)");
80   tcharm_nothreads=CmiGetArgFlagDesc(argv,"+tcharm_nothread","Disable thread support (debugging)");
81   tcharm_nothreads|=CmiGetArgFlagDesc(argv,"+tcharm_nothreads",NULL);
82   char *traceLibName=NULL;
83   while (CmiGetArgStringDesc(argv,"+tcharm_trace",&traceLibName,"Print each call to this library"))
84       tcharm_tracelibs.addTracing(traceLibName);
85   // CmiGetArgIntDesc(argv,"+tcharm_stacksize",&tcharm_stacksize,"Set the thread stack size (default 1MB)");
86   char *str;
87   if (CmiGetArgStringDesc(argv,"+tcharm_stacksize",&str,"Set the thread stack size (default 1MB)"))  {
88     if (strpbrk(str,"M")) {
89       sscanf(str, "%dM", &tcharm_stacksize);
90       tcharm_stacksize *= 1024*1024;
91     }
92     else if (strpbrk(str,"K")) {
93       sscanf(str, "%dK", &tcharm_stacksize);
94       tcharm_stacksize *= 1024;
95     }
96     else {
97       sscanf(str, "%d", &tcharm_stacksize);
98     }
99     if (CkMyPe() == 0)
100       CkPrintf("TCharm> stack size is set to %d.\n", tcharm_stacksize);
101   }
102   if (CkMyPe()!=0) { //Processor 0 eats "+vp<N>" and "-vp<N>" later:
103         int ignored;
104         while (CmiGetArgIntDesc(argv,"-vp",&ignored,NULL)) {}
105         while (CmiGetArgIntDesc(argv,"+vp",&ignored,NULL)) {}
106   }
107   if (CkMyPe()==0) { // Echo various debugging options:
108     if (tcharm_nomig) CmiPrintf("TCHARM> Disabling migration support, for debugging\n");
109     if (tcharm_nothreads) CmiPrintf("TCHARM> Disabling thread support, for debugging\n");
110   }
111   if (CkpvAccess(mapCreated)==0) {
112     if (0!=CmiGetArgString(argv, "+mapping", &mapping)){
113     }
114     CkpvAccess(mapCreated)=1;
115   }
116 }
117
118 void TCHARM_Api_trace(const char *routineName,const char *libraryName)
119 {
120         if (!tcharm_tracelibs.isTracing(libraryName)) return;
121         TCharm *tc=CtvAccess(_curTCharm);
122         char where[100];
123         if (tc==NULL) sprintf(where,"[serial context on %d]",CkMyPe());
124         else sprintf(where,"[%p> vp %d, p %d]",(void *)tc,tc->getElement(),CkMyPe());
125         CmiPrintf("%s Called routine %s\n",where,routineName);
126         CmiPrintStackTrace(1);
127         CmiPrintf("\n");
128 }
129
130 // register thread start functions to get a function handler
131 // this is portable across heterogeneous platforms, or on machines with
132 // random stack/function pointer
133
134 static CkVec<TCHARM_Thread_data_start_fn> threadFnTable;
135
136 int TCHARM_Register_thread_function(TCHARM_Thread_data_start_fn fn)
137 {
138   int idx = threadFnTable.size();
139   threadFnTable.push_back(fn);
140   return idx+1;                     // make 0 invalid number
141 }
142
143 TCHARM_Thread_data_start_fn getTCharmThreadFunction(int idx)
144 {
145   CmiAssert(idx > 0);
146   return threadFnTable[idx-1];
147 }
148
149 static void startTCharmThread(TCharmInitMsg *msg)
150 {
151         DBGX("thread started");
152         TCharm::activateThread();
153        TCHARM_Thread_data_start_fn threadFn = getTCharmThreadFunction(msg->threadFn);
154         threadFn(msg->data);
155         TCharm::deactivateThread();
156         CtvAccess(_curTCharm)->done();
157 }
158
159 TCharm::TCharm(TCharmInitMsg *initMsg_)
160 {
161   initMsg=initMsg_;
162   initMsg->opts.sanityCheck();
163   timeOffset=0.0;
164   threadGlobals=CtgCreate();
165   if (tcharm_nothreads)
166   { //Don't even make a new thread-- just use main thread
167     tid=CthSelf();
168   }
169   else /*Create a thread normally*/
170   {
171     if (tcharm_nomig) { /*Nonmigratable version, for debugging*/
172       tid=CthCreate((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
173     } else {
174       tid=CthCreateMigratable((CthVoidFn)startTCharmThread,initMsg,initMsg->opts.stackSize);
175     }
176 #if CMK_BIGSIM_CHARM
177     BgAttach(tid);
178     BgUnsetStartOutOfCore();
179 #endif
180   }
181   CtvAccessOther(tid,_curTCharm)=this;
182   isStopped=true;
183   resumeAfterMigration=false;
184         /* FAULT_EVAC*/
185         AsyncEvacuate(CmiTrue);
186   skipResume=false;
187   exitWhenDone=initMsg->opts.exitWhenDone;
188   isSelfDone = false;
189   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
190   threadInfo.thisElement=thisIndex;
191   threadInfo.numElements=initMsg->numElements;
192   if (1 || CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)) {
193         heapBlocks=CmiIsomallocBlockListNew(tid);
194   } else
195         heapBlocks=0;
196   nUd=0;
197   usesAtSync=CmiTrue;
198   run();
199 }
200
201 TCharm::TCharm(CkMigrateMessage *msg)
202         :CBase_TCharm(msg)
203 {
204   initMsg=NULL;
205   tid=NULL;
206   threadGlobals=NULL;
207   threadInfo.tProxy=CProxy_TCharm(thisArrayID);
208         AsyncEvacuate(CmiTrue);
209   heapBlocks=0;
210 }
211
212 void checkPupMismatch(PUP::er &p,int expected,const char *where)
213 {
214         int v=expected;
215         p|v;
216         if (v!=expected) {
217                 CkError("FATAL ERROR> Mismatch %s pup routine\n",where);
218                 CkAbort("FATAL ERROR: Pup direction mismatch");
219         }
220 }
221
222 void TCharm::pup(PUP::er &p) {
223 //Pup superclass
224   ArrayElement1D::pup(p);
225
226   //BIGSIM_OOC DEBUGGING
227   //if(!p.isUnpacking()){
228   //  CmiPrintf("TCharm[%d] packing: ", thisIndex);
229   //  CthPrintThdStack(tid);
230   //}
231
232   checkPupMismatch(p,5134,"before TCHARM");
233 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
234     if(!isStopped){
235 //      resumeAfterMigration = true;
236     }
237     isStopped = true;
238 #endif
239   p(isStopped); p(resumeAfterMigration); p(exitWhenDone); p(isSelfDone); p(skipResume);
240   p(threadInfo.thisElement);
241   p(threadInfo.numElements);
242   
243   if (sema.size()>0){
244         CkAbort("TCharm::pup> Cannot migrate with unconsumed semaphores!\n");
245   }
246
247 #ifndef CMK_OPTIMIZE
248   DBG("Packing thread");
249   if (!isStopped && !CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
250     if(_BgOutOfCoreFlag==0) //not doing out-of-core scheduling
251         CkAbort("Cannot pup a running thread.  You must suspend before migrating.\n");
252   }     
253   if (tcharm_nomig) CkAbort("Cannot migrate with the +tcharm_nomig option!\n");
254 #endif
255
256   //This seekBlock allows us to reorder the packing/unpacking--
257   // This is needed because the userData depends on the thread's stack
258   // and heap data both at pack and unpack time.
259   PUP::seekBlock s(p,2);
260   
261   if (p.isUnpacking())
262   {//In this case, unpack the thread & heap before the user data
263     s.seek(1);
264     pupThread(p);
265     //Restart our clock: set it up so packTime==CkWallTimer+timeOffset
266     double packTime;
267     p(packTime);
268     timeOffset=packTime-CkWallTimer();
269   }
270   
271 //Pack all user data
272   // Set up TCHARM context for use during user's pup routines:
273   CtvAccess(_curTCharm)=this;
274   activateThread();
275
276   s.seek(0);
277   checkPupMismatch(p,5135,"before TCHARM user data");
278   p(nUd);
279   for(int i=0;i<nUd;i++) {
280     if (p.isUnpacking()) ud[i].update(tid);
281     ud[i].pup(p);
282   }
283   checkPupMismatch(p,5137,"after TCHARM_Register user data");
284
285   if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
286     deactivateThread();
287   p|sud;           //  sud vector block can not be in isomalloc
288   checkPupMismatch(p,5138,"after TCHARM_Global user data");
289   
290   // Tear down TCHARM context after calling user pup routines
291   if (!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
292     deactivateThread();
293   CtvAccess(_curTCharm)=NULL;
294   
295   if (!p.isUnpacking())
296   {//In this case, pack the thread & heap after the user data
297     s.seek(1);
298     pupThread(p);
299     //Stop our clock:
300     double packTime=CkWallTimer()+timeOffset;
301     p(packTime);
302   }
303   
304   s.endBlock(); //End of seeking block
305   checkPupMismatch(p,5140,"after TCHARM");
306   
307   //BIGSIM_OOC DEBUGGING
308   //if(p.isUnpacking()){
309   //  CmiPrintf("TCharm[%d] unpacking: ", thisIndex);
310   //  CthPrintThdStack(tid);
311   //}
312
313 }
314
315 // Pup our thread and related data
316 void TCharm::pupThread(PUP::er &pc) {
317     pup_er p=(pup_er)&pc;
318     CthThread savedtid = tid;
319     checkPupMismatch(pc,5138,"before TCHARM thread");
320     tid = CthPup(p, tid);
321     if (pc.isUnpacking()) {
322       CtvAccessOther(tid,_curTCharm)=this;
323 #if CMK_BIGSIM_CHARM
324       BgAttach(tid);
325 #endif
326     }
327     if (1 || CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
328       CmiIsomallocBlockListPup(p,&heapBlocks,savedtid);
329     threadGlobals=CtgPup(p,threadGlobals);
330     checkPupMismatch(pc,5139,"after TCHARM thread");
331 }
332
333 //Pup one group of user data
334 void TCharm::UserData::pup(PUP::er &p)
335 {
336   pup_er pext=(pup_er)(&p);
337   p(mode);
338   switch(mode) {
339   case 'c': { /* C mode: userdata is on the stack, so keep address */
340 //     p((char*)&data,sizeof(data));
341      p(pos);
342      //FIXME: function pointers may not be valid across processors
343      p((char*)&cfn, sizeof(TCHARM_Pup_fn));
344      char *data = CthPointer(t, pos);
345      if (cfn) cfn(pext,data);
346      } break;
347   case 'g': { /* Global mode: zero out userdata on arrival */
348      if (CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
349      {
350         // keep the pointer value if using isomalloc, no need to use pup
351        p(pos);
352      }
353      else if (p.isUnpacking())      //  zero out userdata on arrival
354        pos=0;
355
356        //FIXME: function pointers may not be valid across processors
357      p((char*)&gfn, sizeof(TCHARM_Pup_global_fn));
358      if (gfn) gfn(pext);
359      } break;
360   default:
361      break;
362   };
363 }
364
365 TCharm::~TCharm()
366 {
367   //BIGSIM_OOC DEBUGGING
368   //CmiPrintf("TCharm destructor called with heapBlocks=%p!\n", heapBlocks);
369   
370 #if !USE_MEMPOOL_ISOMALLOC
371   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
372 #endif
373   CthFree(tid);
374   CtgFree(threadGlobals);
375   delete initMsg;
376 }
377
378 void TCharm::migrateTo(int destPE) {
379         if (destPE==CkMyPe()) return;
380         if (CthMigratable() == 0) {
381             CkPrintf("Warning> thread migration is not supported!\n");
382             return;
383         }
384         // Make sure migrateMe gets called *after* we suspend:
385         thisProxy[thisIndex].migrateDelayed(destPE);
386 //      resumeAfterMigration=true;
387         suspend();
388 }
389 void TCharm::migrateDelayed(int destPE) {
390         migrateMe(destPE);
391 }
392 void TCharm::ckJustMigrated(void) {
393         ArrayElement::ckJustMigrated();
394 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
395 //  resumeAfterMigration = true;
396 #endif
397         if (resumeAfterMigration) {
398                 resumeAfterMigration=false;
399                 resume(); //Start the thread running
400         }
401 }
402
403 void TCharm::ckJustRestored(void) {
404         //CkPrintf("call just restored from TCharm[%d]\n", thisIndex);
405         ArrayElement::ckJustRestored();
406         //if (resumeAfterMigration) {
407         //      resumeAfterMigration=false;
408         //      resume(); //Start the thread running
409         //}
410 }
411
412 /*
413         FAULT_EVAC
414
415         If a Tcharm object is about to migrate it should be suspended first
416 */
417 void TCharm::ckAboutToMigrate(void){
418         ArrayElement::ckAboutToMigrate();
419         resumeAfterMigration = true;
420         isStopped = true;
421 //      suspend();
422 }
423
424 // clear the data before restarting from disk
425 void TCharm::clear()
426 {
427   if (heapBlocks) CmiIsomallocBlockListDelete(heapBlocks);
428   CthFree(tid);
429   delete initMsg;
430 }
431
432 //Register user data to be packed with the thread
433 int TCharm::add(const TCharm::UserData &d)
434 {
435   if (nUd>=maxUserData)
436     CkAbort("TCharm: Registered too many user data fields!\n");
437   int nu=nUd++;
438   ud[nu]=d;
439   return nu;
440 }
441 void *TCharm::lookupUserData(int i) {
442         if (i<0 || i>=nUd)
443                 CkAbort("Bad user data index passed to TCharmGetUserdata!\n");
444         return ud[i].getData();
445 }
446
447 //Start the thread running
448 void TCharm::run(void)
449 {
450   DBG("TCharm::run()");
451   if (tcharm_nothreads) {/*Call user routine directly*/
452           startTCharmThread(initMsg);
453   } 
454   else /* start the thread as usual */
455           start();
456 }
457
458 //Block the thread until start()ed again.
459 void TCharm::stop(void)
460 {
461 #ifndef CMK_OPTIMIZE
462   if (tid != CthSelf())
463     CkAbort("Called TCharm::stop from outside TCharm thread!\n");
464   if (tcharm_nothreads)
465     CkAbort("Cannot make blocking calls using +tcharm_nothreads!\n");
466 #endif
467   stopTiming();
468   isStopped=true;
469   DBG("thread suspended");
470
471   CthSuspend();
472 //      DBG("thread resumed");
473   /*SUBTLE: We have to do the get() because "this" may have changed
474     during a migration-suspend.  If you access *any* members
475     from this point onward, you'll cause heap corruption if
476     we're resuming from migration!  (OSL 2003/9/23)
477    */
478   TCharm *dis=TCharm::get();
479 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
480 /*  CpvAccess(_currentObj) = dis;
481  *      printf("[%d] _currentObject set to TCharm index %d %p\n",CkMyPe(),dis->thisIndex,dis);*/
482 #endif
483   dis->isStopped=false;
484   dis->startTiming();
485   //CkPrintf("[%d] Thread resumed  for tid %p\n",dis->thisIndex,dis->tid);
486 }
487
488 //Resume the waiting thread
489 void TCharm::start(void)
490 {
491   //  since this thread is scheduled, it is not a good idea to migrate 
492   isStopped=false;
493   DBG("thread resuming soon");
494   //CkPrintf("TCharm[%d]::start()\n", thisIndex);
495   //CmiPrintStackTrace(0);
496 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
497 //CthAwakenPrio(tid, CQS_QUEUEING_BFIFO, 1, &prio);
498   CthAwaken(tid);
499 #else
500   CthAwaken(tid);
501 #endif
502 }
503
504 //Block our thread, schedule, and come back:
505 void TCharm::schedule(void) {
506   DBG("thread schedule");
507   start(); // Calls CthAwaken
508   stop(); // Calls CthSuspend
509 }
510
511 //Go to sync, block, possibly migrate, and then resume
512 void TCharm::migrate(void)
513 {
514 #if CMK_LBDB_ON
515   DBG("going to sync");
516   AtSync();
517   stop();
518 #else
519   DBG("skipping sync, because there is no load balancer");
520 #endif
521 }
522
523
524
525 void TCharm::evacuate(){
526         /*
527                 FAULT_EVAC
528         */
529         //CkClearAllArrayElementsCPP();
530         if(CkpvAccess(startedEvac)){
531                 int nextPE = getNextPE(CkArrayIndex1D(thisIndex));
532 //              resumeAfterMigration=true;
533                 CcdCallFnAfter((CcdVoidFn)CkEmmigrateElement, (void *)myRec, 1);
534                 suspend();
535                 return;
536         }
537         return;
538
539 }
540
541 //calls atsync with async mode
542 void TCharm::async_migrate(void)
543 {
544 #if CMK_LBDB_ON
545   DBG("going to sync at async mode");
546   skipResume = true;            // we resume immediately
547   ReadyMigrate(false);
548   AtSync(0);
549   schedule();
550 //  allow_migrate();
551 #else
552   DBG("skipping sync, because there is no load balancer");
553 #endif
554 }
555
556 /*
557 Note:
558  thread can only migrate at the point when this is called
559 */
560 void TCharm::allow_migrate(void)
561 {
562 #if CMK_LBDB_ON
563 //  ReadyMigrate(true);
564   int nextPe = MigrateToPe();
565   if (nextPe != -1) {
566     migrateTo(nextPe);
567   }
568 #else
569   DBG("skipping sync, because there is no load balancer");
570 #endif
571 }
572
573 //Resume from sync: start the thread again
574 void TCharm::ResumeFromSync(void)
575 {
576   //if(isSelfDone) return;
577   //if (exitWhenDone) return; //for bigsim ooc execution
578   if (!skipResume) start();
579 }
580
581
582 /****** TcharmClient ******/
583 void TCharmClient1D::ckJustMigrated(void) {
584   ArrayElement1D::ckJustMigrated();
585   findThread();
586   tcharmClientInit();
587 }
588
589 void TCharmClient1D::pup(PUP::er &p) {
590   ArrayElement1D::pup(p);
591   p|threadProxy;
592 }
593
594 CkArrayID TCHARM_Get_threads(void) {
595         TCHARMAPI("TCHARM_Get_threads");
596         return TCharm::get()->getProxy();
597 }
598
599 /************* Startup/Shutdown Coordination Support ************/
600
601 // Useless values to reduce over:
602 int _vals[2]={0,1};
603
604 //Called when we want to go to a barrier
605 void TCharm::barrier(void) {
606         //Contribute to a synchronizing reduction
607         CkCallback cb(index_t::atBarrier(0), thisProxy[0]);
608         contribute(sizeof(_vals),&_vals,CkReduction::sum_int,cb);
609 #if CMK_BIGSIM_CHARM
610         void *curLog;           // store current log in timeline
611         _TRACE_BG_TLINE_END(&curLog);
612         TRACE_BG_AMPI_BREAK(NULL, "TCharm_Barrier_START", NULL, 0, 1);
613 #endif
614         stop();
615 #if CMK_BIGSIM_CHARM
616          _TRACE_BG_SET_INFO(NULL, "TCHARM_Barrier_END",  &curLog, 1);
617 #endif
618 }
619
620 //Called when we've reached the barrier
621 void TCharm::atBarrier(CkReductionMsg *m) {
622         DBGX("clients all at barrier");
623         delete m;
624         thisProxy.start(); //Just restart everybody
625 }
626
627 //Called when the thread is done running
628 void TCharm::done(void) {
629         //CmiPrintStackTrace(0);
630         DBG("TCharm thread "<<thisIndex<<" done")
631         if (exitWhenDone) {
632                 //Contribute to a synchronizing reduction
633                 CkCallback cb(index_t::atExit(0), thisProxy[0]);
634                 contribute(sizeof(_vals),&_vals,CkReduction::sum_int,cb);
635         }
636         isSelfDone = true;
637         stop();
638 }
639 //Called when all threads are done running
640 void TCharm::atExit(CkReductionMsg *m) {
641         DBGX("TCharm::atExit1> exiting");
642         delete m;
643         //thisProxy.unsetFlags();
644         CkExit();
645         //CkPrintf("After CkExit()!!!!!!!\n");
646 }
647
648 /************* Setup **************/
649
650 //Globals used to control setup process
651 static TCHARM_Fallback_setup_fn g_fallbackSetup=NULL;
652 void TCHARM_Set_fallback_setup(TCHARM_Fallback_setup_fn f)
653 {
654         g_fallbackSetup=f;
655 }
656 void TCHARM_Call_fallback_setup(void) {
657         if (g_fallbackSetup) 
658                 (g_fallbackSetup)();
659         else
660                 CkAbort("TCHARM: Unexpected fallback setup--missing TCHARM_User_setup routine?");
661 }
662
663 /************** User API ***************/
664 /**********************************
665 Callable from UserSetup:
666 */
667
668 // Read the command line to figure out how many threads to create:
669 CDECL int TCHARM_Get_num_chunks(void)
670 {
671         TCHARMAPI("TCHARM_Get_num_chunks");
672         if (CkMyPe()!=0) CkAbort("TCHARM_Get_num_chunks should only be called on PE 0 during setup!");
673         int nChunks=CkNumPes();
674         char **argv=CkGetArgv();
675         CmiGetArgIntDesc(argv,"-vp",&nChunks,"Set the total number of virtual processors");
676         CmiGetArgIntDesc(argv,"+vp",&nChunks,NULL);
677         lastNumChunks=nChunks;
678         return nChunks;
679 }
680 FDECL int FTN_NAME(TCHARM_GET_NUM_CHUNKS,tcharm_get_num_chunks)(void)
681 {
682         return TCHARM_Get_num_chunks();
683 }
684
685 // Fill out the default thread options:
686 TCHARM_Thread_options::TCHARM_Thread_options(int doDefault)
687 {
688         stackSize=0; /* default stacksize */
689         exitWhenDone=0; /* don't exit when done by default. */
690 }
691 void TCHARM_Thread_options::sanityCheck(void) {
692         if (stackSize<=0) stackSize=tcharm_stacksize;
693 }
694
695
696 TCHARM_Thread_options g_tcharmOptions(1);
697
698 /*Set the size of the thread stack*/
699 CDECL void TCHARM_Set_stack_size(int newStackSize)
700 {
701         TCHARMAPI("TCHARM_Set_stack_size");
702         g_tcharmOptions.stackSize=newStackSize;
703 }
704 FDECL void FTN_NAME(TCHARM_SET_STACK_SIZE,tcharm_set_stack_size)
705         (int *newSize)
706 { TCHARM_Set_stack_size(*newSize); }
707
708 CDECL void TCHARM_Set_exit(void) { g_tcharmOptions.exitWhenDone=1; }
709
710 /*Create a new array of threads, which will be bound to by subsequent libraries*/
711 CDECL void TCHARM_Create(int nThreads,
712                         int threadFn)
713 {
714         TCHARMAPI("TCHARM_Create");
715         TCHARM_Create_data(nThreads,
716                          threadFn,NULL,0);
717 }
718 FDECL void FTN_NAME(TCHARM_CREATE,tcharm_create)
719         (int *nThreads,int threadFn)
720 { TCHARM_Create(*nThreads,threadFn); }
721
722 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg);
723
724 /*As above, but pass along (arbitrary) data to threads*/
725 CDECL void TCHARM_Create_data(int nThreads,
726                   int threadFn,
727                   void *threadData,int threadDataLen)
728 {
729         TCHARMAPI("TCHARM_Create_data");
730         TCharmInitMsg *msg=new (threadDataLen,0) TCharmInitMsg(
731                 threadFn,g_tcharmOptions);
732         msg->numElements=nThreads;
733         memcpy(msg->data,threadData,threadDataLen);
734         TCHARM_Build_threads(msg);
735         
736         // Reset the thread options:
737         g_tcharmOptions=TCHARM_Thread_options(1);
738 }
739
740 FDECL void FTN_NAME(TCHARM_CREATE_DATA,tcharm_create_data)
741         (int *nThreads,
742                   int threadFn,
743                   void *threadData,int *threadDataLen)
744 { TCHARM_Create_data(*nThreads,threadFn,threadData,*threadDataLen); }
745
746 CkGroupID CkCreatePropMap(void);
747
748 static CProxy_TCharm TCHARM_Build_threads(TCharmInitMsg *msg)
749 {
750   char *tmp;
751   char **argv=CkGetArgv();
752   CkArrayOptions opts(msg->numElements);
753   CkAssert(CkpvAccess(mapCreated)==1);
754
755   if(haveConfigurableRRMap()){
756     CkPrintf("USING ConfigurableRRMap\n");
757     mapID=CProxy_ConfigurableRRMap::ckNew();
758   } else if(mapping==NULL){
759 #if CMK_BIGSIM_CHARM
760     mapID=CProxy_BlockMap::ckNew();
761 #else
762 #if __FAULT__
763         mapID=CProxy_RRMap::ckNew();
764 #else
765     mapID=CkCreatePropMap();
766 #endif
767 #endif
768   } else if(0 == strcmp(mapping,"BLOCK_MAP")) {
769     CkPrintf("USING BLOCK_MAP\n");
770     mapID = CProxy_BlockMap::ckNew();
771   } else if(0 == strcmp(mapping,"RR_MAP")) {
772     CkPrintf("USING RR_MAP\n");
773     mapID = CProxy_RRMap::ckNew();
774   } else if(0 == strcmp(mapping,"MAPFILE")) {
775     CkPrintf("Reading map from file\n");
776     mapID = CProxy_ReadFileMap::ckNew();
777   } else {  // "PROP_MAP" or anything else
778     mapID = CkCreatePropMap();
779   }
780   opts.setMap(mapID);
781   int nElem=msg->numElements; //<- save it because msg will be deleted.
782   return CProxy_TCharm::ckNew(msg,opts);
783 }
784
785 // Helper used when creating a new array bound to the TCHARM threads:
786 CkArrayOptions TCHARM_Attach_start(CkArrayID *retTCharmArray,int *retNumElts)
787 {
788         TCharm *tc=TCharm::get();
789         if (!tc)
790                 CkAbort("You must call TCHARM initialization routines from a TCHARM thread!");
791         int nElts=tc->getNumElements();
792       
793         //CmiPrintf("TCHARM Elements = %d\n", nElts);  
794       
795         if (retNumElts!=NULL) *retNumElts=nElts;
796         *retTCharmArray=tc->getProxy();
797         CkArrayOptions opts(nElts);
798         opts.bindTo(tc->getProxy());
799         return opts;
800 }
801
802 void TCHARM_Suspend(void) {
803         TCharm *tc=TCharm::get();
804         tc->suspend();
805 }
806
807 /***********************************
808 Callable from worker thread
809 */
810 CDECL int TCHARM_Element(void)
811
812         TCHARMAPI("TCHARM_Element");
813         return TCharm::get()->getElement();
814 }
815 CDECL int TCHARM_Num_elements(void)
816
817         TCHARMAPI("TCHARM_Num_elements");
818         return TCharm::get()->getNumElements();
819 }
820
821 FDECL int FTN_NAME(TCHARM_ELEMENT,tcharm_element)(void) 
822 { return TCHARM_Element();}
823 FDECL int FTN_NAME(TCHARM_NUM_ELEMENTS,tcharm_num_elements)(void) 
824 { return TCHARM_Num_elements();}
825
826 //Make sure this address will migrate with us when we move:
827 static void checkAddress(void *data)
828 {
829         if (tcharm_nomig||tcharm_nothreads) return; //Stack is not isomalloc'd
830         if (CmiThreadIs(CMI_THREAD_IS_ALIAS)||CmiThreadIs(CMI_THREAD_IS_STACKCOPY)) return; // memory alias thread
831         if (CmiIsomallocEnabled()) {
832           if (!CmiIsomallocInRange(data))
833             CkAbort("The UserData you register must be allocated on the stack!\n");
834         }
835         else {
836           if(CkMyPe() == 0)
837             CkPrintf("Warning> checkAddress failed because isomalloc not supported.\n");
838         }
839 }
840
841 /* Old "register"-based userdata: */
842 CDECL int TCHARM_Register(void *data,TCHARM_Pup_fn pfn)
843
844         TCHARMAPI("TCHARM_Register");
845         checkAddress(data);
846         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
847 }
848 FDECL int FTN_NAME(TCHARM_REGISTER,tcharm_register)
849         (void *data,TCHARM_Pup_fn pfn)
850
851         TCHARMAPI("TCHARM_Register");
852         checkAddress(data);
853         return TCharm::get()->add(TCharm::UserData(pfn,TCharm::get()->getThread(),data));
854 }
855
856 CDECL void *TCHARM_Get_userdata(int id)
857 {
858         TCHARMAPI("TCHARM_Get_userdata");
859         return TCharm::get()->lookupUserData(id);
860 }
861 FDECL void *FTN_NAME(TCHARM_GET_USERDATA,tcharm_get_userdata)(int *id)
862 { return TCHARM_Get_userdata(*id); }
863
864 /* New hardcoded-ID userdata: */
865 CDECL void TCHARM_Set_global(int globalID,void *new_value,TCHARM_Pup_global_fn pup_or_NULL)
866 {
867         TCHARMAPI("TCHARM_Set_global");
868         TCharm *tc=TCharm::get();
869         if (tc->sud.length()<=globalID)
870         { //We don't have room for this ID yet: make room
871                 int newLen=2*globalID;
872                 tc->sud.resize(newLen);
873         }
874         tc->sud[globalID]=TCharm::UserData(pup_or_NULL,tc->getThread(),new_value);
875 }
876 CDECL void *TCHARM_Get_global(int globalID)
877 {
878         //Skip TCHARMAPI("TCHARM_Get_global") because there's no dynamic allocation here,
879         // and this routine should be as fast as possible.
880         CkVec<TCharm::UserData> &v=TCharm::get()->sud;
881         if (v.length()<=globalID) return NULL; //Uninitialized global
882         return v[globalID].getData();
883 }
884
885 CDECL void TCHARM_Migrate(void)
886 {
887         TCHARMAPI("TCHARM_Migrate");
888         if (CthMigratable() == 0) {
889           if(CkMyPe() == 0)
890             CkPrintf("Warning> thread migration is not supported!\n");
891           return;
892         }
893         TCharm::get()->migrate();
894 }
895 FORTRAN_AS_C(TCHARM_MIGRATE,TCHARM_Migrate,tcharm_migrate,(void),())
896
897 CDECL void TCHARM_Async_Migrate(void)
898 {
899         TCHARMAPI("TCHARM_Async_Migrate");
900         TCharm::get()->async_migrate();
901 }
902 FORTRAN_AS_C(TCHARM_ASYNC_MIGRATE,TCHARM_Async_Migrate,tcharm_async_migrate,(void),())
903
904 CDECL void TCHARM_Allow_Migrate(void)
905 {
906         TCHARMAPI("TCHARM_Allow_Migrate");
907         TCharm::get()->allow_migrate();
908 }
909 FORTRAN_AS_C(TCHARM_ALLOW_MIGRATE,TCHARM_Allow_Migrate,tcharm_allow_migrate,(void),())
910
911 CDECL void TCHARM_Migrate_to(int destPE)
912 {
913         TCHARMAPI("TCHARM_Migrate_to");
914         TCharm::get()->migrateTo(destPE);
915 }
916
917 CDECL void TCHARM_Evacuate()
918 {
919         TCHARMAPI("TCHARM_Migrate_to");
920         TCharm::get()->evacuate();
921 }
922
923 FORTRAN_AS_C(TCHARM_MIGRATE_TO,TCHARM_Migrate_to,tcharm_migrate_to,
924         (int *destPE),(*destPE))
925
926 CDECL void TCHARM_Yield(void)
927 {
928         TCHARMAPI("TCHARM_Yield");
929         TCharm::get()->schedule();
930 }
931 FORTRAN_AS_C(TCHARM_YIELD,TCHARM_Yield,tcharm_yield,(void),())
932
933 CDECL void TCHARM_Barrier(void)
934 {
935         TCHARMAPI("TCHARM_Barrier");
936         TCharm::get()->barrier();
937 }
938 FORTRAN_AS_C(TCHARM_BARRIER,TCHARM_Barrier,tcharm_barrier,(void),())
939
940 CDECL void TCHARM_Done(void)
941 {
942         TCHARMAPI("TCHARM_Done");
943         TCharm *c=TCharm::getNULL();
944         if (!c) CkExit();
945         else c->done();
946 }
947 FORTRAN_AS_C(TCHARM_DONE,TCHARM_Done,tcharm_done,(void),())
948
949
950 CDECL double TCHARM_Wall_timer(void)
951 {
952   TCHARMAPI("TCHARM_Wall_timer");
953   TCharm *c=TCharm::getNULL();
954   if(!c) return CkWallTimer();
955   else { //Have to apply current thread's time offset
956     return CkWallTimer()+c->getTimeOffset();
957   }
958 }
959
960 #if 1
961 /*Include Fortran-style "iargc" and "getarg" routines.
962 These are needed to get access to the command-line arguments from Fortran.
963 */
964 FDECL int FTN_NAME(TCHARM_IARGC,tcharm_iargc)(void) {
965   TCHARMAPI("tcharm_iargc");
966   return CkGetArgc()-1;
967 }
968
969 FDECL void FTN_NAME(TCHARM_GETARG,tcharm_getarg)
970         (int *i_p,char *dest,int destLen)
971 {
972   TCHARMAPI("tcharm_getarg");
973   int i=*i_p;
974   if (i<0) CkAbort("tcharm_getarg called with negative argument!");
975   if (i>=CkGetArgc()) CkAbort("tcharm_getarg called with argument > iargc!");
976   const char *src=CkGetArgv()[i];
977   strcpy(dest,src);
978   for (i=strlen(dest);i<destLen;i++) dest[i]=' ';
979 }
980
981 #endif
982
983 //These silly routines are used for serial startup:
984 extern void _initCharm(int argc, char **argv);
985 CDECL void TCHARM_Init(int *argc,char ***argv) {
986         if (!tcharm_initted) {
987           ConverseInit(*argc, *argv, (CmiStartFn) _initCharm,1,1);
988           _initCharm(*argc,*argv);
989         }
990 }
991
992 FDECL void FTN_NAME(TCHARM_INIT,tcharm_init)(void)
993 {
994         int argc=1;
995         const char *argv_sto[2]={"foo",NULL};
996         char **argv=(char **)argv_sto;
997         TCHARM_Init(&argc,&argv);
998 }
999
1000 /***********************************
1001 * TCHARM Semaphores:
1002 * The idea is one side "puts", the other side "gets"; 
1003 * but the calls can come in any order--
1004 * if the "get" comes first, it blocks until the put.
1005 * This makes a convenient, race-condition-free way to do
1006 * onetime initializations.  
1007 */
1008 /// Find this semaphore, or insert if there isn't one:
1009 TCharm::TCharmSemaphore *TCharm::findSema(int id) {
1010         for (int s=0;s<sema.size();s++)
1011                 if (sema[s].id==id) 
1012                         return &sema[s];
1013         sema.push_back(TCharmSemaphore(id));
1014         return &sema[sema.size()-1];
1015 }
1016 /// Remove this semaphore from the list
1017 void TCharm::freeSema(TCharmSemaphore *doomed) {
1018         int id=doomed->id;
1019         for (int s=0;s<sema.size();s++)
1020                 if (sema[s].id==id) {
1021                         sema[s]=sema[sema.length()-1];
1022                         sema.length()--;
1023                         return;
1024                 }
1025         CkAbort("Tried to free nonexistent TCharm semaphore");
1026 }
1027
1028 /// Block until this semaphore has data:
1029 TCharm::TCharmSemaphore *TCharm::getSema(int id) {
1030         TCharmSemaphore *s=findSema(id);
1031         if (s->data==NULL) 
1032         { //Semaphore isn't filled yet: wait until it is
1033                 s->thread=CthSelf();
1034                 suspend(); //Will be woken by semaPut
1035                 // Semaphore may have moved-- find it again
1036                 s=findSema(id);
1037                 if (s->data==NULL) CkAbort("TCharm::semaGet awoken too early!");
1038         }
1039         return s;
1040 }
1041
1042 /// Store data at the semaphore "id".
1043 ///  The put can come before or after the get.
1044 void TCharm::semaPut(int id,void *data) {
1045         TCharmSemaphore *s=findSema(id);
1046         if (s->data!=NULL) CkAbort("Duplicate calls to TCharm::semaPut!");
1047         s->data=data;
1048         DBG("semaPut "<<id<<" "<<data);
1049         if (s->thread!=NULL) {//Awaken the thread
1050                 s->thread=NULL;
1051                 resume();
1052         }
1053 }
1054
1055 /// Retreive data from the semaphore "id".
1056 ///  Blocks if the data is not immediately available.
1057 ///  Consumes the data, so another put will be required for the next get.
1058 void *TCharm::semaGet(int id) {
1059         TCharmSemaphore *s=getSema(id);
1060         void *ret=s->data;
1061         DBG("semaGet "<<id<<" "<<ret);
1062         // Now remove the semaphore from the list:
1063         freeSema(s);
1064         return ret;
1065 }
1066
1067 /// Retreive data from the semaphore "id".
1068 ///  Blocks if the data is not immediately available.
1069 void *TCharm::semaGets(int id) {
1070         TCharmSemaphore *s=getSema(id);
1071         return s->data;
1072 }
1073
1074 /// Retreive data from the semaphore "id", or returns NULL.
1075 void *TCharm::semaPeek(int id) {
1076         TCharmSemaphore *s=findSema(id);
1077         return s->data;
1078 }
1079
1080 /****** System Call support ******/
1081 /*
1082 TCHARM_System exists to work around a bug where Linux ia32
1083 glibc2.2.x with pthreads crashes at the fork() site when 
1084 called from a user-levelthread. 
1085
1086 The fix is to call system from the main thread, by 
1087 passing the request out of the thread to our array element 
1088 before calling system().
1089 */
1090
1091 CDECL int 
1092 TCHARM_System(const char *shell_command)
1093 {
1094         return TCharm::get()->system(shell_command);
1095 }
1096 int TCharm::system(const char *cmd)
1097 {
1098         int ret=-1778;
1099         callSystemStruct s;
1100         s.cmd=cmd;
1101         s.ret=&ret;
1102         thisProxy[thisIndex].callSystem(s);
1103         suspend();
1104         return ret;
1105 }
1106
1107 void TCharm::callSystem(const callSystemStruct &s)
1108 {
1109         *s.ret = ::system(s.cmd);
1110         resume();
1111 }
1112
1113
1114
1115 #include "tcharm.def.h"