fixed a typo in previous checks that is caught only by xlc compiler.
[charm.git] / src / ck-core / init.C
1 /**
2 \addtogroup CkInit
3 \brief Controls the Charm++ startup process.
4
5 This file runs the entire Charm++ startup process.
6
7 The process begins with every processor finishing the 
8 Converse startup process and calling _initCharm.
9 This routine runs almost the entire Charm++ setup process.
10 It begins by setting up various Cpvs and subsystems.
11
12 The rank 0 processor of each node then does
13 the Charm++ registration, by calling the various _register
14 routines.  
15
16 Now processor 0:
17 <ol>
18 <li>Creates each mainchare, by allocating the chares
19  and calling their constructors with argc/argv.
20  This typically results in a number of chare/group creations.
21 <li>Sends off all readonly data to the other processors.
22 </ol>
23 After _initCharm, processor 0 immediately begins work.
24
25 The other processors, however, must wait until they recieve 
26 the readonly data and all group creations.  They do this by 
27 setting the _charmHandlerIdx to a special "_bufferHandler"
28 which simply saves all normal messages into a special queue.  
29
30 As the startup data (readonlies and group creations) streams
31 into _initHandler, it counts messages until it is fully 
32 initialized, then calls _initDone to clean out the queues 
33 and resume normal operation.  
34
35 Upon resume of normal operation, the user code is guaranteed that
36 all readonlies (both data and messages) have been set consistently
37 on all processors, and that the constructors for all nodegroups
38 and groups allocated from a mainchare have been called.
39
40 It is not guaranteed the order in which (node)groups allocated
41 outside of a mainchare are constructed, nor that the construction
42 will happen before other messages have been delivered by the scheduler.
43
44 Even though not exposed to the final users, the creation order of
45 groups and nodegroups allocated in mainchares is deterministic and
46 respects the following points:
47 <ul>
48 <li>On all processors, there is no guarantee of the order of creation
49 between (node)groups allocated from different mainchares;
50 <li>On processor zero, within a mainchare, all (node)groups are created
51 in the order specified by the source code (strictly), including array
52 allocation of initial elements;
53 <li>On processors other than zero, within a mainchare, the order
54 specified by the source code is maintained between different nodegroups
55 and between different groups;
56 <li>On processors other than zero, the ordering between groups and
57 nodegroups is NOT maintained, as all nodegroups are created before any
58 group is created.
59 </ul>
60
61 This process should not have race conditions, but it can
62 never be excluded...
63 */
64 /*@{*/
65
66 #include "ck.h"
67 #include "trace.h"
68
69 void CkRestartMain(const char* dirname);
70
71 #define  DEBUGF(x)     //CmiPrintf x;
72
73 #ifdef CALCULATE_HOPS
74 /** Turn on manually if you want to calculate hops from within Charm++
75 for some application **/
76
77 #include "TopoManager.h"
78
79 TopoManager *tmgr = NULL;
80 double hops = 0;
81
82 extern "C" void calculateTotalHops(int pe1, int pe2, int size) {
83   if(tmgr == NULL)
84     tmgr = new TopoManager();
85   hops += (tmgr->getHopsBetweenRanks(pe1, pe2) * size);
86 }
87
88 extern "C" void printTotalHops() {
89   CmiPrintf("TOTAL HOPS %lf\n", hops/(1024*1024) );
90 }
91 #endif
92
93 UChar _defaultQueueing = CK_QUEUEING_FIFO;
94
95 UInt  _printCS = 0;
96 UInt  _printSS = 0;
97
98 /**
99  * This value has the number of total initialization message a processor awaits.
100  * It is received on nodes other than zero together with the ROData message.
101  * Even though it is shared by all processors it is ok: it doesn't matter when and
102  * by who it is set, provided that it becomes equal to the number of awaited messages
103  * (which is always at least one ---the readonly data message).
104  */
105 UInt  _numExpectInitMsgs = 0;
106 /**
107  * This number is used only by processor zero to count how many messages it will
108  * send out for the initialization process. After the readonly data message is sent
109  * (containing this counter), its value becomes irrelevant.
110  */
111 UInt  _numInitMsgs = 0;
112 /**
113  * Count the number of nodegroups that have been created in mainchares.
114  * Since the nodegroup creation is executed by a single processor in a
115  * given node, this value must be seen by all processors in a node.
116  */
117 CksvDeclare(UInt,_numInitNodeMsgs);
118 int   _infoIdx;
119 int   _charmHandlerIdx;
120 int   _initHandlerIdx;
121 int   _roRestartHandlerIdx;
122 int   _bocHandlerIdx;
123 int   _qdHandlerIdx;
124 int   _qdCommHandlerIdx;
125 int   _triggerHandlerIdx;
126 int   _mainDone = 0;
127 static int   _triggersSent = 0;
128
129 CkOutStream ckout;
130 CkErrStream ckerr;
131 CkInStream  ckin;
132
133 CkpvDeclare(void*,       _currentChare);
134 CkpvDeclare(int,         _currentChareType);
135 CkpvDeclare(CkGroupID,   _currentGroup);
136 CkpvDeclare(void*,       _currentNodeGroupObj);
137 CkpvDeclare(CkGroupID,   _currentGroupRednMgr);
138 CkpvDeclare(GroupTable*, _groupTable);
139 CkpvDeclare(GroupIDTable*, _groupIDTable);
140 CkpvDeclare(CmiImmediateLockType, _groupTableImmLock);
141 CkpvDeclare(UInt, _numGroups);
142
143 CkpvDeclare(CkCoreState *, _coreState);
144
145 CksvDeclare(UInt, _numNodeGroups);
146 CksvDeclare(GroupTable*, _nodeGroupTable);
147 CksvDeclare(GroupIDTable, _nodeGroupIDTable);
148 CksvDeclare(CmiImmediateLockType, _nodeGroupTableImmLock);
149 CksvDeclare(CmiNodeLock, _nodeLock);
150 CksvStaticDeclare(PtrVec*,_nodeBocInitVec);
151 CkpvDeclare(int, _charmEpoch);
152
153 CkpvDeclare(Stats*, _myStats);
154 CkpvDeclare(MsgPool*, _msgPool);
155
156 CkpvDeclare(_CkOutStream*, _ckout);
157 CkpvDeclare(_CkErrStream*, _ckerr);
158
159 CkpvStaticDeclare(int,  _numInitsRecd);
160 CkpvStaticDeclare(PtrQ*, _buffQ);
161 CkpvStaticDeclare(PtrVec*, _bocInitVec);
162
163 #ifndef CMK_CHARE_USE_PTR
164 CpvExtern(CkVec<void *>, chare_objs);
165 CpvExtern(CkVec<int>, chare_types);
166 CpvExtern(CkVec<VidBlock *>, vidblocks);
167 #endif
168
169 /*
170         FAULT_EVAC
171 */
172 CpvDeclare(char *, _validProcessors);
173 CpvDeclare(char ,startedEvac);
174
175 int    _exitHandlerIdx;
176
177 static Stats** _allStats = 0;
178
179 static int   _numStatsRecd = 0;
180 static int   _exitStarted = 0;
181
182 static InitCallTable _initCallTable;
183
184 #ifndef CMK_OPTIMIZE
185 #define _STATS_ON(x) (x) = 1
186 #else
187 #define _STATS_ON(x) \
188           CmiPrintf("stats unavailable in optimized version. ignoring...\n");
189 #endif
190
191 // fault tolerance
192 typedef void (*CkFtFn)(const char *, CkArgMsg *);
193 static CkFtFn  faultFunc = NULL;
194 static char* _restartDir;
195
196 #ifdef _FAULT_MLOG_
197 int chkptPeriod=1000;
198 bool parallelRestart=false;
199 extern int BUFFER_TIME; //time spent waiting for buffered messages
200 #endif
201
202 // flag for killing processes 
203 extern int killFlag;
204 // file specifying the processes to be killed
205 extern char *killFile;
206 // function for reading the kill file
207 void readKillFile();
208
209
210 int _defaultObjectQ = 0;            // for obejct queue
211 int _ringexit = 0;                  // for charm exit
212 int _ringtoken = 8;
213
214
215 /*
216         FAULT_EVAC
217
218         flag which marks whether or not to trigger the processor shutdowns
219 */
220 static int _raiseEvac=0;
221 static char *_raiseEvacFile;
222 void processRaiseEvacFile(char *raiseEvacFile);
223
224 static inline void _parseCommandLineOpts(char **argv)
225 {
226   if (CmiGetArgFlagDesc(argv,"+cs", "Print extensive statistics at shutdown"))
227       _STATS_ON(_printCS);
228   if (CmiGetArgFlagDesc(argv,"+ss", "Print summary statistics at shutdown"))
229       _STATS_ON(_printSS);
230   if (CmiGetArgFlagDesc(argv,"+fifo", "Default to FIFO queuing"))
231       _defaultQueueing = CK_QUEUEING_FIFO;
232   if (CmiGetArgFlagDesc(argv,"+lifo", "Default to LIFO queuing"))
233       _defaultQueueing = CK_QUEUEING_LIFO;
234   if (CmiGetArgFlagDesc(argv,"+ififo", "Default to integer-prioritized FIFO queuing"))
235       _defaultQueueing = CK_QUEUEING_IFIFO;
236   if (CmiGetArgFlagDesc(argv,"+ilifo", "Default to integer-prioritized LIFO queuing"))
237       _defaultQueueing = CK_QUEUEING_ILIFO;
238   if (CmiGetArgFlagDesc(argv,"+bfifo", "Default to bitvector-prioritized FIFO queuing"))
239       _defaultQueueing = CK_QUEUEING_BFIFO;
240   if (CmiGetArgFlagDesc(argv,"+blifo", "Default to bitvector-prioritized LIFO queuing"))
241       _defaultQueueing = CK_QUEUEING_BLIFO;
242   if (CmiGetArgFlagDesc(argv,"+objq", "Default to use object queue for every obejct"))
243   {
244 #if CMK_OBJECT_QUEUE_AVAILABLE
245       _defaultObjectQ = 1;
246       if (CkMyPe()==0)
247         CmiPrintf("Charm++> Create object queue for every Charm object.\n");
248 #else
249       CmiAbort("Charm++> Object queue not enabled, recompile Charm++ with CMK_OBJECT_QUEUE_AVAILABLE defined to 1.");
250 #endif
251   }
252   if(CmiGetArgString(argv,"+restart",&_restartDir))
253       faultFunc = CkRestartMain;
254 #if __FAULT__
255   if (CmiGetArgFlagDesc(argv,"+restartaftercrash","restarting this processor after a crash")){  
256 # if CMK_MEM_CHECKPOINT
257       faultFunc = CkMemRestart;
258 # endif
259 #ifdef _FAULT_MLOG_
260             faultFunc = CkMlogRestart;
261 #endif
262       CmiPrintf("[%d] Restarting after crash \n",CmiMyPe());
263   }
264   // reading the killFile
265   if(CmiGetArgStringDesc(argv,"+killFile", &killFile,"Generates SIGKILL on specified processors")){
266     if(faultFunc == NULL){
267       //do not read the killfile if this is a restarting processor
268       killFlag = 1;
269       if(CmiMyPe() == 0){
270         printf("[%d] killFlag set to 1 for file %s\n",CkMyPe(),killFile);
271       }
272     }
273   }
274 #endif
275
276   // shut down program in ring fashion to allow projections output w/o IO error
277   if (CmiGetArgIntDesc(argv,"+ringexit",&_ringtoken, "Program exits in a ring fashion")) 
278   {
279     _ringexit = 1;
280     if (CkMyPe()==0)
281       CkPrintf("Charm++> Program shutdown in token ring (%d).\n", _ringtoken);
282     if (_ringtoken > CkNumPes())  _ringtoken = CkNumPes();
283   }
284         /*
285                 FAULT_EVAC
286
287                 if the argument +raiseevac is present then cause faults
288         */
289         if(CmiGetArgStringDesc(argv,"+raiseevac", &_raiseEvacFile,"Generates processor evacuation on random processors")){
290                 _raiseEvac = 1;
291         }
292 #ifdef _FAULT_MLOG_
293     if(!CmiGetArgIntDesc(argv,"+chkptPeriod",&chkptPeriod,"Set the checkpoint period for the message logging fault tolerance algorithm in seconds")){
294         chkptPeriod = 100;
295     }
296     if(CmiGetArgFlagDesc(argv,"+Parallelrestart", "Parallel Restart with message logging protocol")){
297         parallelRestart = true;
298     }
299     if(!CmiGetArgIntDesc(argv,"+mlog_local_buffer",&_maxBufferedMessages,"# of local messages buffered in the message logging protoocl")){
300         _maxBufferedMessages = 2;
301     }
302     if(!CmiGetArgIntDesc(argv,"+mlog_remote_buffer",&_maxBufferedTicketRequests,"# of remote ticket requests buffered in the message logging protoocl")){
303         _maxBufferedTicketRequests = 2;
304     }
305     if(!CmiGetArgIntDesc(argv,"+mlog_buffer_time",&BUFFER_TIME,"# Time spent waiting for messages to be buffered in the message logging protoocl")){
306         BUFFER_TIME = 2;
307     }
308
309 #endif  
310         /* Anytime migration flag */
311         isAnytimeMigration = CmiTrue;
312         if (CmiGetArgFlagDesc(argv,"+noAnytimeMigration","The program does not require support for anytime migration")) {
313           isAnytimeMigration = CmiFalse;
314         }
315 }
316
317 static void _bufferHandler(void *msg)
318 {
319   DEBUGF(("[%d] _bufferHandler called.\n", CkMyPe()));
320   CkpvAccess(_buffQ)->enq(msg);
321 }
322
323 static void _discardHandler(envelope *env)
324 {
325   DEBUGF(("[%d] _discardHandler called.\n", CkMyPe()));
326   CmiFree(env);
327 }
328
329 #ifndef CMK_OPTIMIZE
330 static inline void _printStats(void)
331 {
332   DEBUGF(("[%d] _printStats\n", CkMyPe()));
333   int i;
334   if(_printSS || _printCS) {
335     Stats *total = new Stats();
336     _MEMCHECK(total);
337     for(i=0;i<CkNumPes();i++)
338       total->combine(_allStats[i]);
339     CkPrintf("Charm Kernel Summary Statistics:\n");
340     for(i=0;i<CkNumPes();i++) {
341       CkPrintf("Proc %d: [%d created, %d processed]\n", i,
342                _allStats[i]->getCharesCreated(),
343                _allStats[i]->getCharesProcessed());
344     }
345     CkPrintf("Total Chares: [%d created, %d processed]\n",
346              total->getCharesCreated(), total->getCharesProcessed());
347   }
348   if(_printCS) {
349     CkPrintf("Charm Kernel Detailed Statistics (R=requested P=processed):\n\n");
350
351     CkPrintf("         Create    Mesgs     Create    Mesgs     Create    Mesgs\n");
352     CkPrintf("         Chare     for       Group     for       Nodegroup for\n");
353     CkPrintf("PE   R/P Mesgs     Chares    Mesgs     Groups    Mesgs     Nodegroups\n");
354     CkPrintf("---- --- --------- --------- --------- --------- --------- ----------\n");
355
356     for(i=0;i<CkNumPes();i++) {
357       CkPrintf("%4d  R  %9d %9d %9d %9d %9d %9d\n      P  %9d %9d %9d %9d %9d %9d\n",i,
358                _allStats[i]->getCharesCreated(),
359                _allStats[i]->getForCharesCreated(),
360                _allStats[i]->getGroupsCreated(),
361                _allStats[i]->getGroupMsgsCreated(),
362                _allStats[i]->getNodeGroupsCreated(),
363                _allStats[i]->getNodeGroupMsgsCreated(),
364                _allStats[i]->getCharesProcessed(),
365                _allStats[i]->getForCharesProcessed(),
366                _allStats[i]->getGroupsProcessed(),
367                _allStats[i]->getGroupMsgsProcessed(),
368                _allStats[i]->getNodeGroupsProcessed(),
369                _allStats[i]->getNodeGroupMsgsProcessed());
370     }
371   }
372 }
373 #else
374 static inline void _printStats(void) {}
375 #endif
376
377 static inline void _sendStats(void)
378 {
379   DEBUGF(("[%d] _sendStats\n", CkMyPe()));
380 #ifndef CMK_OPTIMIZE
381   envelope *env = UsrToEnv(CkpvAccess(_myStats));
382 #else
383   envelope *env = _allocEnv(StatMsg);
384 #endif
385   env->setSrcPe(CkMyPe());
386   CmiSetHandler(env, _exitHandlerIdx);
387   CmiSyncSendAndFree(0, env->getTotalsize(), (char *)env);
388 }
389
390 #ifdef _FAULT_MLOG_
391 extern void _messageLoggingExit();
392 #endif
393
394 static void _exitHandler(envelope *env)
395 {
396   DEBUGF(("exitHandler called on %d msgtype: %d\n", CkMyPe(), env->getMsgtype()));
397   switch(env->getMsgtype()) {
398     case ExitMsg:
399       CkAssert(CkMyPe()==0);
400       if(_exitStarted) {
401         CmiFree(env);
402         return;
403       }
404       _exitStarted = 1;
405       CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
406       CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
407       env->setMsgtype(ReqStatMsg);
408       env->setSrcPe(CkMyPe());
409       // if exit in ring, instead of broadcasting, send in ring
410       if (_ringexit){
411         DEBUGF(("[%d] Ring Exit \n",CkMyPe()));
412         const int stride = CkNumPes()/_ringtoken;
413         int pe = 0;
414         while (pe<CkNumPes()) {
415           CmiSyncSend(pe, env->getTotalsize(), (char *)env);
416           pe += stride;
417         }
418         CmiFree(env);
419       }else{
420         CmiSyncBroadcastAllAndFree(env->getTotalsize(), (char *)env);
421       } 
422       break;
423     case ReqStatMsg:
424 #ifdef _FAULT_MLOG_
425         _messageLoggingExit();
426 #endif
427       DEBUGF(("ReqStatMsg on %d\n", CkMyPe()));
428       CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
429       CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
430         /*FAULT_EVAC*/
431       if(CmiNodeAlive(CkMyPe())){
432          _sendStats();
433       } 
434       _mainDone = 1; // This is needed because the destructors for
435                      // readonly variables will be called when the program
436                      // exits. If the destructor is called while _mainDone
437                      // is 0, it will assume that the readonly variable was
438                      // declared locally. On all processors other than 0, 
439                      // _mainDone is never set to 1 before the program exits.
440 #ifndef CMK_OPTIMIZE
441       if (_ringexit) traceClose();
442 #endif
443       if (_ringexit) {
444         int stride = CkNumPes()/_ringtoken;
445         int pe = CkMyPe()+1;
446         if (pe < CkNumPes() && pe % stride != 0)
447           CmiSyncSendAndFree(pe, env->getTotalsize(), (char *)env);
448         else
449           CmiFree(env);
450       }
451       else
452         CmiFree(env);
453       if(CkMyPe()){
454         DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
455         ConverseExit();
456       } 
457       break;
458     case StatMsg:
459       CkAssert(CkMyPe()==0);
460 #ifndef CMK_OPTIMIZE
461       _allStats[env->getSrcPe()] = (Stats*) EnvToUsr(env);
462 #endif
463       _numStatsRecd++;
464       DEBUGF(("StatMsg on %d with %d\n", CkMyPe(), _numStatsRecd));
465                         /*FAULT_EVAC*/
466       if(_numStatsRecd==CkNumValidPes()) {
467         _printStats();
468         DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
469         ConverseExit();
470       }
471       break;
472     default:
473       CmiAbort("Internal Error(_exitHandler): Unknown-msg-type. Contact Developers.\n");
474   }
475 }
476
477 /**
478  * Create all groups in this processor (not called on processor zero).
479  * Notice that only groups created in mainchares are processed here;
480  * groups created later are processed as regular messages.
481  */
482 static inline void _processBufferedBocInits(void)
483 {
484   CkCoreState *ck = CkpvAccess(_coreState);
485   CkNumberHandlerEx(_bocHandlerIdx,(CmiHandlerEx)_processHandler, ck);
486   register int i = 0;
487   PtrVec &inits=*CkpvAccess(_bocInitVec);
488   register int len = inits.size();
489   for(i=1; i<len; i++) {
490     envelope *env = inits[i];
491     if(env==0) CkAbort("_processBufferedBocInits: empty message");
492     if(env->isPacked())
493       CkUnpackMessage(&env);
494     _processBocInitMsg(ck,env);
495   }
496   delete &inits;
497 }
498
499 /**
500  * Create all nodegroups in this node (called only by rank zero, and never on node zero).
501  * Notice that only nodegroups created in mainchares are processed here;
502  * nodegroups created later are processed as regular messages.
503  */
504 static inline void _processBufferedNodeBocInits(void)
505 {
506   CkCoreState *ck = CkpvAccess(_coreState);
507   register int i = 0;
508   PtrVec &inits=*CksvAccess(_nodeBocInitVec);
509   register int len = inits.size();
510   for(i=1; i<len; i++) {
511     envelope *env = inits[i];
512     if(env==0) CkAbort("_processBufferedNodeBocInits: empty message");
513     if(env->isPacked())
514       CkUnpackMessage(&env);
515     _processNodeBocInitMsg(ck,env);
516   }
517   delete &inits;
518 }
519
520 static inline void _processBufferedMsgs(void)
521 {
522   CkNumberHandlerEx(_charmHandlerIdx,(CmiHandlerEx)_processHandler,
523         CkpvAccess(_coreState));
524   envelope *env;
525   while(NULL!=(env=(envelope*)CkpvAccess(_buffQ)->deq())) {
526     if(env->getMsgtype()==NewChareMsg || env->getMsgtype()==NewVChareMsg) {
527       if(env->isForAnyPE())
528         CldEnqueue(CLD_ANYWHERE, env, _infoIdx);
529       else
530         CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
531     } else {
532       CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
533     }
534   }
535 }
536
537 static int _charmLoadEstimator(void)
538 {
539   return CkpvAccess(_buffQ)->length();
540 }
541
542 /**
543  * This function is used to send other processors on the same node a signal so
544  * they can check if their _initDone can be called: the reason for this is that
545  * the check at the end of _initHandler can fail due to a missing message containing
546  * a Nodegroup creation. When that message arrives only one processor will receive
547  * it, and thus if no notification is sent to the other processors in the node, they
548  * will never proceed.
549  */
550 static void _sendTriggers(void)
551 {
552   int i, num, first;
553   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
554   if (_triggersSent == 0)
555   {
556     _triggersSent++;
557     num = CmiMyNodeSize();
558     register envelope *env = _allocEnv(RODataMsg); // Notice that the type here is irrelevant
559     env->setSrcPe(CkMyPe());
560     CmiSetHandler(env, _triggerHandlerIdx);
561     first = CmiNodeFirst(CmiMyNode());
562     for (i=0; i < num; i++)
563       if(first+i != CkMyPe())
564         CmiSyncSend(first+i, env->getTotalsize(), (char *)env);
565     CmiFree(env);
566   }
567   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
568 }
569
570 /**
571  * This function (not a handler) is called once and only once per processor.
572  * It signals the processor that the initialization is done and regular messages
573  * can be processed.
574  *
575  * On processor zero it is called by _initCharm, on all other processors either
576  * by _initHandler or _triggerHandler (cannot be both).
577  * When fault-tolerance is active, it is called by the fault-tolerance scheme itself.
578  */
579 void _initDone(void)
580 {
581   DEBUGF(("[%d] _initDone.\n", CkMyPe()));
582   if (!_triggersSent) _sendTriggers();
583   CkNumberHandler(_triggerHandlerIdx, (CmiHandler)_discardHandler);
584   CmiNodeBarrier();
585   if(CkMyRank() == 0) {
586     _processBufferedNodeBocInits();
587   }
588   CmiNodeBarrier(); // wait for all nodegroups to be created
589   _processBufferedBocInits();
590   DEBUGF(("Reached CmiNodeBarrier(), pe = %d, rank = %d\n", CkMyPe(), CkMyRank()));
591   CmiNodeBarrier();
592   DEBUGF(("Crossed CmiNodeBarrier(), pe = %d, rank = %d\n", CkMyPe(), CkMyRank()));
593   _processBufferedMsgs();
594   CkpvAccess(_charmEpoch)=1;
595 }
596
597 /**
598  * Converse handler receiving a signal from another processors in the same node.
599  * (On _sendTrigger there is the explanation of why this is necessary)
600  * Simply check if with the NodeGroup processed by another processor we reached
601  * the expected count. Notice that it can only be called before _initDone: after
602  * _initDone, a message destined for this handler will go instead to the _discardHandler.
603  */
604 static void _triggerHandler(envelope *env)
605 {
606   if (_numExpectInitMsgs && CkpvAccess(_numInitsRecd) + CksvAccess(_numInitNodeMsgs) == _numExpectInitMsgs)
607   {
608     DEBUGF(("Calling Init Done from _triggerHandler\n"));
609     _initDone();
610   }
611   CmiFree(env);
612 }
613
614 static inline void _processROMsgMsg(envelope *env)
615 {
616   *((char **)(_readonlyMsgs[env->getRoIdx()]->pMsg))=(char *)EnvToUsr(env);
617 }
618
619 static inline void _processRODataMsg(envelope *env)
620 {
621   //Unpack each readonly:
622   PUP::fromMem pu((char *)EnvToUsr(env));
623   for(size_t i=0;i<_readonlyTable.size();i++) _readonlyTable[i]->pupData(pu);
624   CmiFree(env);
625 }
626
627 /**
628  * This is similar to the _initHandler, only that the Groups and Nodegroups are
629  * initialized from disk, so only one single message is expected.
630  *
631  * It is unclear how Readonly Messages are treated during restart... (if at all considered)
632  */
633 static void _roRestartHandler(void *msg)
634 {
635   CkAssert(CkMyPe()!=0);
636   register envelope *env = (envelope *) msg;
637   CkpvAccess(_numInitsRecd)++;
638   _numExpectInitMsgs = env->getCount();
639   _processRODataMsg(env);
640 }
641
642 /**
643  * This handler is used only during initialization. It receives messages from
644  * processor zero regarding Readonly Data (in one single message), Readonly Messages,
645  * Groups, and Nodegroups.
646  * The Readonly Data message also contains the total number of messages expected
647  * during the initialization phase.
648  * For Groups and Nodegroups, only messages with epoch=0 (meaning created from within
649  * a mainchare) are buffered for special creation, the other messages are buffered
650  * together with all the other regular messages by _bufferHandler (and will be flushed
651  * after all the initialization messages have been processed).
652  */
653 static void _initHandler(void *msg)
654 {
655   CkAssert(CkMyPe()!=0);
656   register envelope *env = (envelope *) msg;
657   switch (env->getMsgtype()) {
658     case BocInitMsg:
659       if (env->getGroupEpoch()==0) {
660         CkpvAccess(_numInitsRecd)++;
661         CpvAccess(_qd)->process();
662         CkpvAccess(_bocInitVec)->insert(env->getGroupNum().idx, env);
663       } else _bufferHandler(msg);
664       break;
665     case NodeBocInitMsg:
666       if (env->getGroupEpoch()==0) {
667         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
668         CksvAccess(_numInitNodeMsgs)++;
669         CksvAccess(_nodeBocInitVec)->insert(env->getGroupNum().idx, env);
670         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
671         CpvAccess(_qd)->process();
672       } else _bufferHandler(msg);
673       break;
674     case ROMsgMsg:
675       CkpvAccess(_numInitsRecd)++;
676       CpvAccess(_qd)->process();
677       if(env->isPacked()) CkUnpackMessage(&env);
678       _processROMsgMsg(env);
679       break;
680     case RODataMsg:
681       CkpvAccess(_numInitsRecd)++;
682       CpvAccess(_qd)->process();
683       _numExpectInitMsgs = env->getCount();
684       _processRODataMsg(env);
685       break;
686     default:
687       CmiAbort("Internal Error: Unknown-msg-type. Contact Developers.\n");
688   }
689         DEBUGF(("[%d,%.6lf] _numExpectInitMsgs %d CkpvAccess(_numInitsRecd)+CksvAccess(_numInitNodeMsgs) %d+%d\n",CmiMyPe(),CmiWallTimer(),_numExpectInitMsgs,CkpvAccess(_numInitsRecd),CksvAccess(_numInitNodeMsgs)));
690   if(_numExpectInitMsgs&&(CkpvAccess(_numInitsRecd)+CksvAccess(_numInitNodeMsgs)==_numExpectInitMsgs)) {
691     _initDone();
692   }
693 }
694
695 // CkExit: start the termination process, but
696 //   then drop into the scheduler so the user's
697 //   method never returns (which would be confusing).
698 extern "C"
699 void _CkExit(void) 
700 {
701   // Shuts down Converse handlers for the upper layers on this processor
702   //
703   CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
704   CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
705   DEBUGF(("[%d] CkExit - _exitStarted:%d %d\n", CkMyPe(), _exitStarted, _exitHandlerIdx));
706
707   if(CkMyPe()==0) {
708     if(_exitStarted)
709       CsdScheduler(-1);
710     envelope *env = _allocEnv(ReqStatMsg);
711     env->setSrcPe(CkMyPe());
712     CmiSetHandler(env, _exitHandlerIdx);
713                 /*FAULT_EVAC*/
714     CmiSyncBroadcastAllAndFree(env->getTotalsize(), (char *)env);
715   } else {
716     envelope *env = _allocEnv(ExitMsg);
717     env->setSrcPe(CkMyPe());
718     CmiSetHandler(env, _exitHandlerIdx);
719     CmiSyncSendAndFree(0, env->getTotalsize(), (char *)env);
720   }
721 #if ! CMK_BLUEGENE_THREAD
722   _TRACE_END_EXECUTE();
723   //Wait for stats, which will call ConverseExit when finished:
724   CsdScheduler(-1);
725 #endif
726 }
727
728 CkQ<CkExitFn> _CkExitFnVec;
729
730 // wrapper of CkExit
731 // traverse _CkExitFnVec to call registered user exit functions
732 // CkExitFn will call CkExit() when finished to make sure other
733 // registered functions get called.
734 extern "C"
735 void CkExit(void)
736 {
737         /*FAULT_EVAC*/
738         DEBUGF(("[%d] CkExit called \n",CkMyPe()));
739   if (!_CkExitFnVec.isEmpty()) {
740     CkExitFn fn = _CkExitFnVec.deq();
741     fn();
742   }
743   else
744     _CkExit();
745 }
746
747 /* This is a routine called in case the application is closing due to a signal.
748    Tear down structures that must be cleaned up even when unclean exit happens.
749    It is called by the machine layer whenever some problem occurs (it is thus up
750    to the machine layer to call this function). */
751 extern "C"
752 void EmergencyExit(void) {
753 #ifndef __BLUEGENE__
754   /* Delete _coreState to force any CkMessageWatcher to close down. */
755   delete CkpvAccess(_coreState);
756 #endif
757 }
758
759 static void _nullFn(void *, void *)
760 {
761   CmiAbort("Null-Method Called. Program may have Unregistered Module!!\n");
762 }
763
764 extern void _registerLBDatabase(void);
765 extern void _registerPathHistory(void);
766 extern void _registerExternalModules(char **argv);
767 extern void _ckModuleInit(void);
768 extern void _loadbalancerInit();
769 #if CMK_MEM_CHECKPOINT
770 extern void init_memcheckpt(char **argv);
771 #endif
772 extern "C" void initCharmProjections();
773 extern "C" void CmiInitCPUTopology(char **argv);
774 extern "C" void CmiInitCPUAffinity(char **argv);
775
776 void _registerInitCall(CkInitCallFn fn, int isNodeCall)
777 {
778   if (isNodeCall) _initCallTable.initNodeCalls.enq(fn);
779   else _initCallTable.initProcCalls.enq(fn);
780 }
781
782 void InitCallTable::enumerateInitCalls()
783 {
784   int i;
785 #ifdef __BLUEGENE__
786   if(BgNodeRank()==0)        // called only once on an emulating node
787 #else
788   if(CkMyRank()==0) 
789 #endif
790   {
791     for (i=0; i<initNodeCalls.length(); i++) initNodeCalls[i]();
792   }
793   // initproc may depend on initnode calls.
794   CmiNodeAllBarrier();
795   for (i=0; i<initProcCalls.length(); i++) initProcCalls[i]();
796 }
797
798 CpvCExtern(int, cpdSuspendStartup);
799 extern "C" void CpdFreeze(void);
800
801 extern int _dummy_dq;
802
803 extern "C" void initQd(char **argv)
804 {
805         CpvInitialize(QdState*, _qd);
806         CpvAccess(_qd) = new QdState();
807         if (CmiMyRank() == 0) {
808 #if !defined(CMK_CPV_IS_SMP) && !CMK_SHARED_VARS_UNIPROCESSOR
809         CpvAccessOther(_qd, 1) = new QdState(); // for i/o interrupt
810 #endif
811         }
812         _qdHandlerIdx = CmiRegisterHandler((CmiHandler)_qdHandler);
813         _qdCommHandlerIdx = CmiRegisterHandler((CmiHandler)_qdCommHandler);
814         if (CmiGetArgIntDesc(argv,"+qd",&_dummy_dq, "QD time in seconds")) {
815           if (CmiMyPe()==0)
816             CmiPrintf("Charm++> Fake QD using %d seconds.\n", _dummy_dq);
817         }
818 }
819
820 /**
821   This is the main charm setup routine.  It's called
822   on all processors after Converse initialization.
823   This routine gets passed to Converse from "main.C".
824   
825   The main purpose of this routine is to set up the objects
826   and Ckpv's used during a regular Charm run.  See the comment
827   at the top of the file for overall flow.
828 */
829 void _initCharm(int unused_argc, char **argv)
830
831         int inCommThread = (CmiMyRank() == CmiMyNodeSize());
832
833         DEBUGF(("[%d,%.6lf ] _initCharm started\n",CmiMyPe(),CmiWallTimer()));
834
835         CkpvInitialize(PtrQ*,_buffQ);
836         CkpvInitialize(PtrVec*,_bocInitVec);
837         CkpvInitialize(void*, _currentChare);
838         CkpvInitialize(int,   _currentChareType);
839         CkpvInitialize(CkGroupID, _currentGroup);
840         CkpvInitialize(void *, _currentNodeGroupObj);
841         CkpvInitialize(CkGroupID, _currentGroupRednMgr);
842         CkpvInitialize(GroupTable*, _groupTable);
843         CkpvInitialize(GroupIDTable*, _groupIDTable);
844         CkpvInitialize(CmiImmediateLockType, _groupTableImmLock);
845         CkpvInitialize(UInt, _numGroups);
846         CkpvInitialize(int, _numInitsRecd);
847         CkpvInitialize(char**, Ck_argv); CkpvAccess(Ck_argv)=argv;
848         CkpvInitialize(MsgPool*, _msgPool);
849         CkpvInitialize(CkCoreState *, _coreState);
850         /*
851                 Added for evacuation-sayantan
852         */
853 #ifndef __BLUEGENE__
854         CpvInitialize(char *,_validProcessors);
855         CpvInitialize(char ,startedEvac);
856 #endif
857         CpvInitialize(int,serializer);
858
859 #ifndef CMK_CHARE_USE_PTR
860           /* chare and vidblock table */
861         CpvInitialize(CkVec<void *>, chare_objs);
862         CpvInitialize(CkVec<int>, chare_types);
863         CpvInitialize(CkVec<VidBlock *>, vidblocks);
864 #endif
865
866         CksvInitialize(UInt, _numNodeGroups);
867         CksvInitialize(GroupTable*, _nodeGroupTable);
868         CksvInitialize(GroupIDTable, _nodeGroupIDTable);
869         CksvInitialize(CmiImmediateLockType, _nodeGroupTableImmLock);
870         CksvInitialize(CmiNodeLock, _nodeLock);
871         CksvInitialize(PtrVec*,_nodeBocInitVec);
872         CksvInitialize(UInt,_numInitNodeMsgs);
873         CkpvInitialize(int,_charmEpoch);
874         CkpvAccess(_charmEpoch)=0;
875
876         CkpvInitialize(_CkOutStream*, _ckout);
877         CkpvInitialize(_CkErrStream*, _ckerr);
878         CkpvInitialize(Stats*, _myStats);
879
880         CkpvAccess(_groupIDTable) = new GroupIDTable(0);
881         CkpvAccess(_groupTable) = new GroupTable;
882         CkpvAccess(_groupTable)->init();
883         CkpvAccess(_groupTableImmLock) = CmiCreateImmediateLock();
884         CkpvAccess(_numGroups) = 1; // make 0 an invalid group number
885         CkpvAccess(_buffQ) = new PtrQ();
886         CkpvAccess(_bocInitVec) = new PtrVec();
887
888         CkpvAccess(_currentNodeGroupObj) = NULL;
889
890         if(CkMyRank()==0)
891         {
892                 CksvAccess(_numNodeGroups) = 1; //make 0 an invalid group number
893                 CksvAccess(_numInitNodeMsgs) = 0;
894                 CksvAccess(_nodeLock) = CmiCreateLock();
895                 CksvAccess(_nodeGroupTable) = new GroupTable();
896                 CksvAccess(_nodeGroupTable)->init();
897                 CksvAccess(_nodeGroupTableImmLock) = CmiCreateImmediateLock();
898                 CksvAccess(_nodeBocInitVec) = new PtrVec();
899         }
900
901         CkCallbackInit();
902         
903         CmiNodeAllBarrier();
904
905 #if ! CMK_BLUEGENE_CHARM
906         initQd(argv);         // bigsim calls it in ConverseCommonInit
907 #endif
908
909         CkpvAccess(_coreState)=new CkCoreState();
910
911         CkpvAccess(_numInitsRecd) = 0;
912
913         CkpvAccess(_ckout) = new _CkOutStream();
914         CkpvAccess(_ckerr) = new _CkErrStream();
915
916         _charmHandlerIdx = CkRegisterHandler((CmiHandler)_bufferHandler);
917         _initHandlerIdx = CkRegisterHandler((CmiHandler)_initHandler);
918         _roRestartHandlerIdx = CkRegisterHandler((CmiHandler)_roRestartHandler);
919         _exitHandlerIdx = CkRegisterHandler((CmiHandler)_exitHandler);
920         _bocHandlerIdx = CkRegisterHandler((CmiHandler)_initHandler);
921         _infoIdx = CldRegisterInfoFn((CldInfoFn)_infoFn);
922         _triggerHandlerIdx = CkRegisterHandler((CmiHandler)_triggerHandler);
923         _ckModuleInit();
924
925         CldRegisterEstimator((CldEstimator)_charmLoadEstimator);
926
927         _futuresModuleInit(); // part of futures implementation is a converse module
928         _loadbalancerInit();
929         
930 #if CMK_MEM_CHECKPOINT
931         init_memcheckpt(argv);
932 #endif
933
934         initCharmProjections();
935 #if CMK_TRACE_IN_CHARM
936         // initialize trace module in ck
937         traceCharmInit(argv);
938 #endif
939         
940     CkpvInitialize(int, envelopeEventID);
941     CkpvAccess(envelopeEventID) = 0;
942         CkMessageWatcherInit(argv,CkpvAccess(_coreState));
943         
944         /**
945           The rank-0 processor of each node calls the 
946           translator-generated "_register" routines. 
947           
948           _register routines call the charm.h "CkRegister*" routines,
949           which record function pointers and class information for
950           all Charm entities, like Chares, Arrays, and readonlies.
951           
952           There's one _register routine generated for each
953           .ci file.  _register routines *must* be called in the 
954           same order on every node, and *must not* be called by 
955           multiple threads simultaniously.
956         */
957 #ifdef __BLUEGENE__
958         if(BgNodeRank()==0) 
959 #else
960         if(CkMyRank()==0)
961 #endif
962         {
963                 CmiArgGroup("Charm++",NULL);
964                 _parseCommandLineOpts(argv);
965                 _registerInit();
966                 CkRegisterMsg("System", 0, 0, CkFreeMsg, sizeof(int));
967                 CkRegisterChareInCharm(CkRegisterChare("null", 0, TypeChare));
968                 CkIndex_Chare::__idx=CkRegisterChare("Chare", sizeof(Chare), TypeChare);
969                 CkRegisterChareInCharm(CkIndex_Chare::__idx);
970                 CkIndex_Group::__idx=CkRegisterChare("Group", sizeof(Group), TypeGroup);
971                 CkRegisterChareInCharm(CkIndex_Group::__idx);
972                 CkRegisterEp("null", (CkCallFnPtr)_nullFn, 0, 0, 0+CK_EP_INTRINSIC);
973                 
974                 /**
975                   These _register calls are for the built-in
976                   Charm .ci files, like arrays and load balancing.
977                   If you add a .ci file to charm, you'll have to 
978                   add a call to the _register routine here, or make
979                   your library into a "-module".
980                 */
981                 _registerCkFutures();
982                 _registerCkArray();
983                 _registerLBDatabase();
984                 _registerCkCallback();
985                 _registertempo();
986                 _registerwaitqd();
987                 _registercharisma();
988                 _registerCkCheckpoint();
989 #if CMK_MEM_CHECKPOINT
990                 _registerCkMemCheckpoint();
991 #endif
992                 
993                 _registerPathHistory();
994
995                 /**
996                   _registerExternalModules is actually generated by 
997                   charmc at link time (as "moduleinit<pid>.C").  
998                   
999                   This generated routine calls the _register functions
1000                   for the .ci files of libraries linked using "-module".
1001                   This funny initialization is most useful for AMPI/FEM
1002                   programs, which don't have a .ci file and hence have
1003                   no other way to control the _register process.
1004                 */
1005                 _registerExternalModules(argv);
1006                 
1007                 /**
1008                   CkRegisterMainModule is generated by the (unique)
1009                   "mainmodule" .ci file.  It will include calls to 
1010                   register all the .ci files.
1011                 */
1012                 CkRegisterMainModule();
1013                 _registerDone();
1014         }
1015         CmiNodeAllBarrier();
1016
1017     // Execute the initcalls registered in modules
1018         _initCallTable.enumerateInitCalls();
1019
1020 #ifndef CMK_OPTIMIZE
1021 #ifdef __BLUEGENE__
1022         if(BgNodeRank()==0)
1023 #else
1024         if(CkMyRank()==0)
1025 #endif
1026           CpdFinishInitialization();
1027 #endif
1028
1029         //CmiNodeAllBarrier();
1030
1031         CkpvAccess(_myStats) = new Stats();
1032         CkpvAccess(_msgPool) = new MsgPool();
1033
1034         CmiNodeAllBarrier();
1035 #if CMK_SMP_TRACE_COMMTHREAD
1036         _TRACE_BEGIN_COMPUTATION();     
1037 #else
1038         if (!inCommThread) {
1039           _TRACE_BEGIN_COMPUTATION();
1040         }
1041 #endif
1042
1043 #ifdef ADAPT_SCHED_MEM
1044     if(CkMyRank()==0){
1045         memCriticalEntries = new int[numMemCriticalEntries];
1046         int memcnt=0;
1047         for(int i=0; i<_entryTable.size(); i++){
1048             if(_entryTable[i]->isMemCritical){
1049                 memCriticalEntries[memcnt++] = i;
1050             }
1051         }
1052     }
1053 #endif
1054
1055 #ifdef _FAULT_MLOG_
1056     _messageLoggingInit();
1057 #endif
1058
1059 #ifndef __BLUEGENE__
1060         /*
1061                 FAULT_EVAC
1062         */
1063         CpvAccess(_validProcessors) = new char[CkNumPes()];
1064         for(int vProc=0;vProc<CkNumPes();vProc++){
1065                 CpvAccess(_validProcessors)[vProc]=1;
1066         }
1067         CpvAccess(startedEvac) = 0;
1068         _ckEvacBcastIdx = CkRegisterHandler((CmiHandler)_ckEvacBcast);
1069         _ckAckEvacIdx = CkRegisterHandler((CmiHandler)_ckAckEvac);
1070 #endif
1071         CpvAccess(serializer) = 0;
1072
1073         evacuate = 0;
1074         CcdCallOnCondition(CcdSIGUSR1,(CcdVoidFn)CkDecideEvacPe,0);
1075 #ifdef _FAULT_MLOG_ 
1076     CcdCallOnCondition(CcdSIGUSR2,(CcdVoidFn)CkMlogRestart,0);
1077 #endif
1078
1079         if(_raiseEvac){
1080                 processRaiseEvacFile(_raiseEvacFile);
1081                 /*
1082                 if(CkMyPe() == 2){
1083                 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,(CcdVoidFn)CkDecideEvacPe,0);
1084                         CcdCallFnAfter((CcdVoidFn)CkDecideEvacPe, 0, 10000);
1085                 }
1086                 if(CkMyPe() == 3){
1087                         CcdCallFnAfter((CcdVoidFn)CkDecideEvacPe, 0, 10000);
1088                 }*/
1089         }       
1090         
1091         if (faultFunc == NULL) {         // this is not restart
1092             // these two are blocking calls for non-bigsim
1093 #if ! CMK_BLUEGENE_CHARM
1094           CmiInitCPUAffinity(argv);
1095 #endif
1096           CmiInitCPUTopology(argv);
1097         }
1098
1099         if (faultFunc) {
1100                 if (CkMyPe()==0) _allStats = new Stats*[CkNumPes()];
1101                 if (!inCommThread) {
1102                   CkArgMsg *msg = (CkArgMsg *)CkAllocMsg(0, sizeof(CkArgMsg), 0);
1103                   msg->argc = CmiGetArgc(argv);
1104                   msg->argv = argv;
1105                   faultFunc(_restartDir, msg);
1106                   CkFreeMsg(msg);
1107                 }
1108         }else if(CkMyPe()==0){
1109                 _allStats = new Stats*[CkNumPes()];
1110                 register size_t i, nMains=_mainTable.size();
1111                 for(i=0;i<nMains;i++)  /* Create all mainchares */
1112                 {
1113                         register int size = _chareTable[_mainTable[i]->chareIdx]->size;
1114                         register void *obj = malloc(size);
1115                         _MEMCHECK(obj);
1116                         _mainTable[i]->setObj(obj);
1117                         CkpvAccess(_currentChare) = obj;
1118                         CkpvAccess(_currentChareType) = _mainTable[i]->chareIdx;
1119                         register CkArgMsg *msg = (CkArgMsg *)CkAllocMsg(0, sizeof(CkArgMsg), 0);
1120                         msg->argc = CmiGetArgc(argv);
1121                         msg->argv = argv;
1122                         _entryTable[_mainTable[i]->entryIdx]->call(msg, obj);
1123 #ifdef _FAULT_MLOG_
1124             CpvAccess(_currentObj) = (Chare *)obj;
1125 #endif
1126                 }
1127                 _mainDone = 1;
1128
1129                 _STATS_RECORD_CREATE_CHARE_N(nMains);
1130                 _STATS_RECORD_PROCESS_CHARE_N(nMains);
1131
1132
1133
1134
1135                 for(i=0;i<_readonlyMsgs.size();i++) /* Send out readonly messages */
1136                 {
1137                         register void *roMsg = (void *) *((char **)(_readonlyMsgs[i]->pMsg));
1138                         if(roMsg==0)
1139                                 continue;
1140                         //Pack the message and send it to all other processors
1141                         register envelope *env = UsrToEnv(roMsg);
1142                         env->setSrcPe(CkMyPe());
1143                         env->setMsgtype(ROMsgMsg);
1144                         env->setRoIdx(i);
1145                         CmiSetHandler(env, _initHandlerIdx);
1146                         CkPackMessage(&env);
1147                         CmiSyncBroadcast(env->getTotalsize(), (char *)env);
1148                         CpvAccess(_qd)->create(CkNumPes()-1);
1149
1150                         //For processor 0, unpack and re-set the global
1151                         CkUnpackMessage(&env);
1152                         _processROMsgMsg(env);
1153                         _numInitMsgs++;
1154                 }
1155
1156                 //Determine the size of the RODataMessage
1157                 PUP::sizer ps;
1158                 for(i=0;i<_readonlyTable.size();i++) _readonlyTable[i]->pupData(ps);
1159
1160                 //Allocate and fill out the RODataMessage
1161                 envelope *env = _allocEnv(RODataMsg, ps.size());
1162                 PUP::toMem pp((char *)EnvToUsr(env));
1163                 for(i=0;i<_readonlyTable.size();i++) _readonlyTable[i]->pupData(pp);
1164
1165                 env->setCount(++_numInitMsgs);
1166                 env->setSrcPe(CkMyPe());
1167                 CmiSetHandler(env, _initHandlerIdx);
1168                 DEBUGF(("[%d,%.6lf] RODataMsg being sent of size %d \n",CmiMyPe(),CmiWallTimer(),env->getTotalsize()));
1169                 CmiSyncBroadcastAndFree(env->getTotalsize(), (char *)env);
1170                 CpvAccess(_qd)->create(CkNumPes()-1);
1171                 _initDone();
1172         }
1173
1174         DEBUGF(("[%d,%d%.6lf] inCommThread %d\n",CmiMyPe(),CmiMyRank(),CmiWallTimer(),inCommThread));
1175         // when I am a communication thread, I don't participate initDone.
1176         if (inCommThread) {
1177                 CkNumberHandlerEx(_bocHandlerIdx,(CmiHandlerEx)_processHandler,
1178                                         CkpvAccess(_coreState));
1179                 CkNumberHandlerEx(_charmHandlerIdx,(CmiHandlerEx)_processHandler
1180 ,
1181                                         CkpvAccess(_coreState));
1182         }
1183
1184 #if CMK_CCS_AVAILABLE
1185        if (CpvAccess(cpdSuspendStartup))
1186        { 
1187           //CmiPrintf("In Parallel Debugging mode .....\n");
1188           CpdFreeze();
1189        }
1190 #endif
1191
1192
1193 #if __FAULT__
1194         if(killFlag){                                                  
1195                 readKillFile();                                        
1196         }
1197 #endif
1198
1199 }
1200
1201 // this is needed because on o2k, f90 programs have to have main in
1202 // fortran90.
1203 extern "C" void fmain_(int *argc,char _argv[][80],int length[])
1204 {
1205   int i;
1206   char **argv = new char*[*argc+2];
1207
1208   for(i=0;i <= *argc;i++) {
1209     if (length[i] < 100) {
1210       _argv[i][length[i]]='\0';
1211       argv[i] = &(_argv[i][0]);
1212     } else {
1213       argv[i][0] = '\0';
1214     }
1215   }
1216   argv[*argc+1]=0;
1217
1218   ConverseInit(*argc, argv, (CmiStartFn) _initCharm, 0, 0);
1219 }
1220
1221 // user callable function to register an exit function, this function
1222 // will perform task of collecting of info from all pes to pe0, and call
1223 // CkExit() on pe0 again to recursively traverse the registered exitFn.
1224 // see trace-summary for an example.
1225 void registerExitFn(CkExitFn fn)
1226 {
1227   _CkExitFnVec.enq(fn);
1228 }
1229
1230 /*@}*/