More small changes
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 #include <map>
30 CpvDeclare(CkVec<void *>, chare_objs);
31 CpvDeclare(CkVec<int>, chare_types);
32 CpvDeclare(CkVec<VidBlock *>, vidblocks);
33
34 typedef std::map<int, CkChareID>  Vidblockmap;
35 CpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
36 #endif
37
38
39 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
40
41 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
42
43 int CMessage_CkMessage::__idx=-1;
44 int CMessage_CkArgMsg::__idx=0;
45 int CkIndex_Chare::__idx;
46 int CkIndex_Group::__idx;
47 int CkIndex_ArrayBase::__idx=-1;
48
49 extern int _defaultObjectQ;
50
51 void _initChareTables()
52 {
53 #ifndef CMK_CHARE_USE_PTR
54           /* chare and vidblock table */
55   CpvInitialize(CkVec<void *>, chare_objs);
56   CpvInitialize(CkVec<int>, chare_types);
57   CpvInitialize(CkVec<VidBlock *>, vidblocks);
58   CpvInitialize(Vidblockmap, vmap);
59 #endif
60 }
61
62 //Charm++ virtual functions: declaring these here results in a smaller executable
63 Chare::Chare(void) {
64   thishandle.onPE=CkMyPe();
65   thishandle.objPtr=this;
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (chareIdx >= 0) thishandle.objPtr=(void*)chareIdx;
69 #endif
70 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
71   mlogData = new ChareMlogData();
72   mlogData->objID.type = TypeChare;
73   mlogData->objID.data.chare.id = thishandle;
74 #endif
75 #if CMK_OBJECT_QUEUE_AVAILABLE
76   if (_defaultObjectQ)  CkEnableObjQ();
77 #endif
78 }
79
80 Chare::Chare(CkMigrateMessage* m) {
81   thishandle.onPE=CkMyPe();
82   thishandle.objPtr=this;
83
84 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
85         mlogData = NULL;
86 #endif
87
88 #if CMK_OBJECT_QUEUE_AVAILABLE
89   if (_defaultObjectQ)  CkEnableObjQ();
90 #endif
91 }
92
93 void Chare::CkEnableObjQ()
94 {
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96   objQ.create();
97 #endif
98 }
99
100 Chare::~Chare() {
101 #ifndef CMK_CHARE_USE_PTR
102 /*
103   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
104 */
105   if (chareIdx != -1)
106   {
107     CmiAssert(CpvAccess(chare_objs)[chareIdx] == this);
108     CpvAccess(chare_objs)[chareIdx] = NULL;
109     Vidblockmap::iterator iter = CpvAccess(vmap).find(chareIdx);
110     if (iter != CpvAccess(vmap).end()) {
111       register CkChareID *pCid = (CkChareID *)
112         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
113       int srcPe = iter->second.onPE;
114       *pCid = iter->second;
115       register envelope *ret = UsrToEnv(pCid);
116       ret->setVidPtr(iter->second.objPtr);
117       ret->setSrcPe(CkMyPe());
118       CmiSetHandler(ret, _charmHandlerIdx);
119       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
120       CpvAccess(_qd)->create();
121       CpvAccess(vmap).erase(iter);
122     }
123   }
124 #endif
125 }
126
127 void Chare::pup(PUP::er &p)
128 {
129   p(thishandle.onPE);
130   thishandle.objPtr=(void *)this;
131 #ifndef CMK_CHARE_USE_PTR
132   p(chareIdx);
133   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
134 #endif
135 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
136         if(p.isUnpacking()){
137                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
138                 mlogData = new ChareMlogData();
139         }
140         mlogData->pup(p);
141 #endif
142 }
143
144 int Chare::ckGetChareType() const {
145   return -3;
146 }
147 char *Chare::ckDebugChareName(void) {
148   char buf[100];
149   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
150   return strdup(buf);
151 }
152 int Chare::ckDebugChareID(char *str, int limit) {
153   // pure chares for now do not have a valid ID
154   str[0] = 0;
155   return 1;
156 }
157 void Chare::ckDebugPup(PUP::er &p) {
158   pup(p);
159 }
160
161 /// This method is called before starting a [threaded] entry method.
162 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
163   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
164   traceAddThreadListeners(th, UsrToEnv(msg));
165 }
166
167
168 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
169   p.comment("Bytes");
170   int ts=UsrToEnv(msg)->getTotalsize();
171   int msgLen=ts-sizeof(envelope);
172   if (msgLen>0)
173     p((char*)msg,msgLen);
174 }
175
176 IrrGroup::IrrGroup(void) {
177   thisgroup = CkpvAccess(_currentGroup);
178 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
179         mlogData->objID.type = TypeGroup;
180         mlogData->objID.data.group.id = thisgroup;
181         mlogData->objID.data.group.onPE = CkMyPe();
182 #endif
183 }
184
185 IrrGroup::~IrrGroup() {
186   // remove the object pointer
187   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
188   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
189   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
190 }
191
192 void IrrGroup::pup(PUP::er &p)
193 {
194   Chare::pup(p);
195   p|thisgroup;
196 }
197
198 int IrrGroup::ckGetChareType() const {
199   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
200 }
201
202 int IrrGroup::ckDebugChareID(char *str, int limit) {
203   if (limit<5) return -1;
204   str[0] = 1;
205   *((int*)&str[1]) = thisgroup.idx;
206   return 5;
207 }
208
209 char *IrrGroup::ckDebugChareName() {
210   return strdup(_chareTable[ckGetChareType()]->name);
211 }
212
213 void IrrGroup::ckJustMigrated(void)
214 {
215 }
216
217 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
218   /* FIXME: **CW** not entirely sure what we should do here yet */
219 }
220
221 void Group::CkAddThreadListeners(CthThread th, void *msg) {
222   Chare::CkAddThreadListeners(th, msg);
223   CthSetThreadID(th, thisgroup.idx, 0, 0);
224 }
225
226 void Group::pup(PUP::er &p)
227 {
228   CkReductionMgr::pup(p);
229   reductionInfo.pup(p);
230 }
231
232 /**** Delegation Manager Group */
233 CkDelegateMgr::~CkDelegateMgr() { }
234
235 //Default delegator implementation: do not delegate-- send directly
236 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
237   { CkSendMsg(ep,m,c); }
238 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
239   { CkSendMsgBranch(ep,m,onPE,g); }
240 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
241   { CkBroadcastMsgBranch(ep,m,g); }
242 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
243   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
244 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
245   { CkSendMsgNodeBranch(ep,m,onNode,g); }
246 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
247   { CkBroadcastMsgNodeBranch(ep,m,g); }
248 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
249   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
250 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
251 {
252         CProxyElement_ArrayBase ap(a,idx);
253         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
254 }
255 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
256 {
257         CProxyElement_ArrayBase ap(a,idx);
258         ap.ckSend((CkArrayMessage *)m,ep);
259 }
260 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
261 {
262         CProxy_ArrayBase ap(a);
263         ap.ckBroadcast((CkArrayMessage *)m,ep);
264 }
265
266 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
267 {
268         CmiAbort("ArraySectionSend is not implemented!\n");
269 /*
270         CProxyElement_ArrayBase ap(a,idx);
271         ap.ckSend((CkArrayMessage *)m,ep);
272 */
273 }
274
275 /*** Proxy <-> delegator communication */
276 CkDelegateData::~CkDelegateData() {}
277
278 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
279   return pd; // default implementation ignores pup call
280 }
281
282 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
283    this tricky manual reference counting business... */
284
285 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
286         if (dPtr) dPtr->ref();
287         ckUndelegate();
288         delegatedMgr = dTo;
289         delegatedPtr = dPtr;
290 }
291 void CProxy::ckUndelegate(void) {
292         delegatedMgr=NULL;
293         if (delegatedPtr) delegatedPtr->unref();
294         delegatedPtr=NULL;
295 }
296
297 /// Copy constructor
298 CProxy::CProxy(const CProxy &src)
299     :delegatedMgr(src.delegatedMgr)
300 {
301     delegatedPtr = NULL;
302     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
303         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
304 }
305
306 /// Assignment operator
307 CProxy& CProxy::operator=(const CProxy &src) {
308         CkDelegateData *oldPtr=delegatedPtr;
309         ckUndelegate();
310         delegatedMgr=src.delegatedMgr;
311
312         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
313             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
314         else
315             delegatedPtr = NULL;
316
317         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
318         if (oldPtr) oldPtr->unref();
319         return *this;
320 }
321
322 void CProxy::pup(PUP::er &p) {
323       CkGroupID delegatedTo;
324       delegatedTo.setZero();
325       int isNodeGroup = 0;
326       if (!p.isUnpacking()) {
327         if (delegatedMgr) {
328           delegatedTo = delegatedMgr->CkGetGroupID();
329           isNodeGroup = delegatedMgr->isNodeGroup();
330         }
331       }
332       p|delegatedTo;
333       if (!delegatedTo.isZero()) {
334         p|isNodeGroup;
335         if (p.isUnpacking()) {
336           if (isNodeGroup)
337                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
338           else
339                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
340         }
341
342         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
343         if (p.isUnpacking() && delegatedPtr)
344             delegatedPtr->ref();
345       }
346 }
347
348 /**** Array sections */
349 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
350 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
351   _cookie.aid = aid;    \
352   _cookie.get_pe() = CkMyPe();  \
353   _elems = new CkArrayIndexMax[nElems]; \
354   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
355   pelist = NULL;        \
356   npes  = 0;    \
357 }
358
359 CKSECTIONID_CONSTRUCTOR_DEF(1D)
360 CKSECTIONID_CONSTRUCTOR_DEF(2D)
361 CKSECTIONID_CONSTRUCTOR_DEF(3D)
362 CKSECTIONID_CONSTRUCTOR_DEF(4D)
363 CKSECTIONID_CONSTRUCTOR_DEF(5D)
364 CKSECTIONID_CONSTRUCTOR_DEF(6D)
365 CKSECTIONID_CONSTRUCTOR_DEF(Max)
366
367 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
368   pelist = new int[npes];
369   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
370   _cookie.aid = gid;
371 }
372
373 CkSectionID::CkSectionID(const CkSectionID &sid) {
374   int i;
375   _cookie = sid._cookie;
376   _nElems = sid._nElems;
377   if (_nElems > 0) {
378     _elems = new CkArrayIndexMax[_nElems];
379     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
380   } else _elems = NULL;
381   npes = sid.npes;
382   if (npes > 0) {
383     pelist = new int[npes];
384     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
385   } else pelist = NULL;
386 }
387
388 void CkSectionID::operator=(const CkSectionID &sid) {
389   int i;
390   _cookie = sid._cookie;
391   _nElems = sid._nElems;
392   if (_nElems > 0) {
393     _elems = new CkArrayIndexMax[_nElems];
394     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
395   } else _elems = NULL;
396   npes = sid.npes;
397   if (npes > 0) {
398     pelist = new int[npes];
399     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
400   } else pelist = NULL;
401 }
402
403 void CkSectionID::pup(PUP::er &p) {
404     p | _cookie;
405     p(_nElems);
406     if (_nElems > 0) {
407       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
408       for (int i=0; i< _nElems; i++) p | _elems[i];
409       npes = 0;
410       pelist = NULL;
411     } else {
412       // If _nElems is zero, than this section describes processors instead of array elements
413       _elems = NULL;
414       p(npes);
415       if (p.isUnpacking()) pelist = new int[npes];
416       p(pelist, npes);
417     }
418 }
419
420 /**** Tiny random API routines */
421
422 #ifdef CMK_CUDA
423 void CUDACallbackManager(void *fn) {
424   if (fn != NULL) {
425     CkCallback *cb = (CkCallback*) fn;
426     cb->send();
427   }
428 }
429
430 #endif
431
432 extern "C"
433 void CkSetRefNum(void *msg, int ref)
434 {
435   UsrToEnv(msg)->setRef(ref);
436 }
437
438 extern "C"
439 int CkGetRefNum(void *msg)
440 {
441   return UsrToEnv(msg)->getRef();
442 }
443
444 extern "C"
445 int CkGetSrcPe(void *msg)
446 {
447   return UsrToEnv(msg)->getSrcPe();
448 }
449
450 extern "C"
451 int CkGetSrcNode(void *msg)
452 {
453   return CmiNodeOf(CkGetSrcPe(msg));
454 }
455
456 extern "C"
457 void *CkLocalBranch(CkGroupID gID) {
458   return _localBranch(gID);
459 }
460
461 static
462 void *_ckLocalNodeBranch(CkGroupID groupID) {
463   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
464   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
465   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
466   return retval;
467 }
468
469 extern "C"
470 void *CkLocalNodeBranch(CkGroupID groupID)
471 {
472   void *retval;
473   // we are called in a constructor
474   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
475     return CkpvAccess(_currentNodeGroupObj);
476   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
477   { // Nodegroup hasn't finished being created yet-- schedule...
478     CsdScheduler(0);
479   }
480   return retval;
481 }
482
483 extern "C"
484 void *CkLocalChare(const CkChareID *pCid)
485 {
486         int pe=pCid->onPE;
487         if (pe<0) { //A virtual chare ID
488                 if (pe!=(-(CkMyPe()+1)))
489                         return NULL;//VID block not on this PE
490                 VidBlock *v=(VidBlock *)pCid->objPtr;
491                 return v->getLocalChare();
492         }
493         else
494         { //An ordinary chare ID
495                 if (pe!=CkMyPe())
496                         return NULL;//Chare not on this PE
497                 return pCid->objPtr;
498         }
499 }
500
501 CkpvDeclare(char **,Ck_argv);
502 extern "C" char **CkGetArgv(void) {
503         return CkpvAccess(Ck_argv);
504 }
505 extern "C" int CkGetArgc(void) {
506         return CmiGetArgc(CkpvAccess(Ck_argv));
507 }
508
509 /******************** Basic support *****************/
510 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
511 {
512 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
513         CpvAccess(_currentObj) = (Chare *)obj;
514 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
515 #endif
516   //BIGSIM_OOC DEBUGGING
517   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
518   //fflush(stdout);
519 #ifndef CMK_OPTIMIZE
520   CpdBeforeEp(epIdx, obj, msg);
521 #endif
522   _entryTable[epIdx]->call(msg, obj);
523 #ifndef CMK_OPTIMIZE
524   CpdAfterEp(epIdx);
525 #endif
526   if (_entryTable[epIdx]->noKeep)
527   { /* Method doesn't keep/delete the message, so we have to: */
528     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
529   }
530 }
531 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
532 {
533   //BIGSIM_OOC DEBUGGING
534   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
535   //fflush(stdout);
536
537   void *deliverMsg;
538 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
539         CpvAccess(_currentObj) = (Chare *)obj;
540 #endif
541   if (_entryTable[epIdx]->noKeep)
542   { /* Deliver a read-only copy of the message */
543     deliverMsg=(void *)msg;
544   } else
545   { /* Method needs a copy of the message to keep/delete */
546     void *oldMsg=(void *)msg;
547     deliverMsg=CkCopyMsg(&oldMsg);
548 #ifndef CMK_OPTIMIZE
549     if (oldMsg!=msg)
550       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
551 #endif
552   }
553 #ifndef CMK_OPTIMIZE
554   CpdBeforeEp(epIdx, obj, (void*)msg);
555 #endif
556   _entryTable[epIdx]->call(deliverMsg, obj);
557 #ifndef CMK_OPTIMIZE
558   CpdAfterEp(epIdx);
559 #endif
560 }
561
562 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
563 {
564   register void *msg = EnvToUsr(env);
565   _SET_USED(env, 0);
566   CkDeliverMessageFree(epIdx,msg,obj);
567 }
568
569 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
570 {
571
572 #ifndef CMK_OPTIMIZE /* Consider tracing: */
573   if (_entryTable[epIdx]->traceEnabled) {
574     _TRACE_BEGIN_EXECUTE(env);
575     _invokeEntryNoTrace(epIdx,env,obj);
576     _TRACE_END_EXECUTE();
577   }
578   else
579 #endif
580     _invokeEntryNoTrace(epIdx,env,obj);
581
582 }
583
584 /********************* Creation ********************/
585
586 extern "C"
587 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
588 {
589   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
590   envelope *env = UsrToEnv(msg);
591   _CHECK_USED(env);
592   if(pCid == 0) {
593     env->setMsgtype(NewChareMsg);
594   } else {
595     pCid->onPE = (-(CkMyPe()+1));
596     //  pCid->magic = _GETIDX(cIdx);
597     pCid->objPtr = (void *) new VidBlock();
598     _MEMCHECK(pCid->objPtr);
599     env->setMsgtype(NewVChareMsg);
600     env->setVidPtr(pCid->objPtr);
601 #ifndef CMK_CHARE_USE_PTR
602     CpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
603     int idx = CpvAccess(vidblocks).size()-1;
604     pCid->objPtr = (void *)idx;
605     env->setVidPtr((void *)idx);
606 #endif
607   }
608   env->setEpIdx(eIdx);
609   env->setSrcPe(CkMyPe());
610   CmiSetHandler(env, _charmHandlerIdx);
611   _TRACE_CREATION_1(env);
612   CpvAccess(_qd)->create();
613   _STATS_RECORD_CREATE_CHARE_1();
614   _SET_USED(env, 1);
615   if(destPE == CK_PE_ANY)
616     env->setForAnyPE(1);
617   else
618     env->setForAnyPE(0);
619   _CldEnqueue(destPE, env, _infoIdx);
620   _TRACE_CREATION_DONE(1);
621 }
622
623 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
624 {
625   register int gIdx = _entryTable[epIdx]->chareIdx;
626   register void *obj = malloc(_chareTable[gIdx]->size);
627   _MEMCHECK(obj);
628   setMemoryTypeChare(obj);
629   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
630   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
631   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
632   CkpvAccess(_groupIDTable)->push_back(groupID);
633   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
634   if(ptrq) {
635     void *pending;
636     while((pending=ptrq->deq())!=0)
637       _CldEnqueue(CkMyPe(), pending, _infoIdx);
638 //    delete ptrq;
639       CkpvAccess(_groupTable)->find(groupID).clearPending();
640   }
641   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
642
643   CkpvAccess(_currentGroup) = groupID;
644   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
645 #ifndef CMK_CHARE_USE_PTR
646   ((Chare *)obj)->chareIdx = -1;
647 #endif
648   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
649   _STATS_RECORD_PROCESS_GROUP_1();
650 }
651
652 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
653 {
654   register int gIdx = _entryTable[epIdx]->chareIdx;
655   int objSize=_chareTable[gIdx]->size;
656   register void *obj = malloc(objSize);
657   _MEMCHECK(obj);
658   setMemoryTypeChare(obj);
659   CkpvAccess(_currentGroup) = groupID;
660
661 // Now that the NodeGroup is created, add it to the table.
662 //  NodeGroups can be accessed by multiple processors, so
663 //  this is in the opposite order from groups - invoking the constructor
664 //  before registering it.
665 // User may call CkLocalNodeBranch() inside the nodegroup constructor
666 //  store nodegroup into _currentNodeGroupObj
667   CkpvAccess(_currentNodeGroupObj) = obj;
668 #ifndef CMK_CHARE_USE_PTR
669   ((Chare *)obj)->chareIdx = -1;
670 #endif
671   _invokeEntryNoTrace(epIdx,env,obj);
672   CkpvAccess(_currentNodeGroupObj) = NULL;
673   _STATS_RECORD_PROCESS_NODE_GROUP_1();
674
675   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
676   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
677   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
678   CksvAccess(_nodeGroupIDTable).push_back(groupID);
679
680   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
681   if(ptrq) {
682     void *pending;
683     while((pending=ptrq->deq())!=0)
684       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
685 //    delete ptrq;
686       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
687   }
688   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
689 }
690
691 void _createGroup(CkGroupID groupID, envelope *env)
692 {
693   _CHECK_USED(env);
694   _SET_USED(env, 1);
695   register int epIdx = env->getEpIdx();
696   int gIdx = _entryTable[epIdx]->chareIdx;
697   CkNodeGroupID rednMgr;
698   if(_chareTable[gIdx]->isIrr == 0){
699                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
700                 rednMgr = rednMgrProxy;
701 //              rednMgrProxy.setAttachedGroup(groupID);
702   }else{
703         rednMgr.setZero();
704   }
705   env->setGroupNum(groupID);
706   env->setSrcPe(CkMyPe());
707   env->setRednMgr(rednMgr);
708   env->setGroupEpoch(CkpvAccess(_charmEpoch));
709
710   if(CkNumPes()>1) {
711     CkPackMessage(&env);
712     CmiSetHandler(env, _bocHandlerIdx);
713     _numInitMsgs++;
714     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
715     CpvAccess(_qd)->create(CkNumPes()-1);
716     CkUnpackMessage(&env);
717   }
718   _STATS_RECORD_CREATE_GROUP_1();
719   CkCreateLocalGroup(groupID, epIdx, env);
720 }
721
722 void _createNodeGroup(CkGroupID groupID, envelope *env)
723 {
724   _CHECK_USED(env);
725   _SET_USED(env, 1);
726   register int epIdx = env->getEpIdx();
727   env->setGroupNum(groupID);
728   env->setSrcPe(CkMyPe());
729   env->setGroupEpoch(CkpvAccess(_charmEpoch));
730   if(CkNumNodes()>1) {
731     CkPackMessage(&env);
732     CmiSetHandler(env, _bocHandlerIdx);
733     _numInitMsgs++;
734     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
735     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
736     CpvAccess(_qd)->create(CkNumNodes()-1);
737     CkUnpackMessage(&env);
738   }
739   _STATS_RECORD_CREATE_NODE_GROUP_1();
740   CkCreateLocalNodeGroup(groupID, epIdx, env);
741 }
742
743 // new _groupCreate
744
745 static CkGroupID _groupCreate(envelope *env)
746 {
747   register CkGroupID groupNum;
748
749   // check CkMyPe(). if it is 0 then idx is _numGroups++
750   // if not, then something else...
751   if(CkMyPe() == 0)
752      groupNum.idx = CkpvAccess(_numGroups)++;
753   else
754      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
755   _createGroup(groupNum, env);
756   return groupNum;
757 }
758
759 // new _nodeGroupCreate
760 static CkGroupID _nodeGroupCreate(envelope *env)
761 {
762   register CkGroupID groupNum;
763   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
764   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
765           groupNum.idx = CksvAccess(_numNodeGroups)++;
766    else
767           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
768   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
769   _createNodeGroup(groupNum, env);
770   return groupNum;
771 }
772
773 /**** generate the group idx when group is creator pe is not pe0
774  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
775  **** remaining bits contain the group creator processor number and
776  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
777
778 int _getGroupIdx(int numNodes,int myNode,int numGroups)
779 {
780         int idx;
781         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
782         int n = 32 - (x+1);                                     // number of bits remaining for the index
783         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
784                                                                 // then add the next available index
785         // of course this won't work when int is 8 bytes long on T3E
786         //idx |= 0x80000000;                                      // set the most significant bit to 1
787         idx = - idx;
788                                                                 // if int is not 32 bits, wouldn't this be wrong?
789         return idx;
790 }
791
792 extern "C"
793 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
794 {
795   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
796   register envelope *env = UsrToEnv(msg);
797   env->setMsgtype(BocInitMsg);
798   env->setEpIdx(eIdx);
799   env->setSrcPe(CkMyPe());
800   _TRACE_CREATION_N(env, CkNumPes());
801   CkGroupID gid = _groupCreate(env);
802   _TRACE_CREATION_DONE(1);
803   return gid;
804 }
805
806 extern "C"
807 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
808 {
809   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
810   register envelope *env = UsrToEnv(msg);
811   env->setMsgtype(NodeBocInitMsg);
812   env->setEpIdx(eIdx);
813   env->setSrcPe(CkMyPe());
814   _TRACE_CREATION_N(env, CkNumNodes());
815   CkGroupID gid = _nodeGroupCreate(env);
816   _TRACE_CREATION_DONE(1);
817   return gid;
818 }
819
820 static inline void *_allocNewChare(envelope *env, int &idx)
821 {
822   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
823   void *tmp=malloc(_chareTable[chareIdx]->size);
824   _MEMCHECK(tmp);
825 #ifndef CMK_CHARE_USE_PTR
826   CpvAccess(chare_objs).push_back(tmp);
827   CpvAccess(chare_types).push_back(chareIdx);
828   idx = CpvAccess(chare_objs).size()-1;
829 #endif
830   setMemoryTypeChare(tmp);
831   return tmp;
832 }
833
834 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
835 {
836   int idx;
837   register void *obj = _allocNewChare(env, idx);
838 #ifndef CMK_CHARE_USE_PTR
839   ((Chare *)obj)->chareIdx = idx;
840 #endif
841   _invokeEntry(env->getEpIdx(),env,obj);
842 }
843
844 void CkCreateLocalChare(int epIdx, envelope *env)
845 {
846   env->setEpIdx(epIdx);
847   _processNewChareMsg(NULL, env);
848 }
849
850 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
851 {
852   int idx;
853   register void *obj = _allocNewChare(env, idx);
854   register CkChareID *pCid = (CkChareID *)
855       _allocMsg(FillVidMsg, sizeof(CkChareID));
856   pCid->onPE = CkMyPe();
857 #ifndef CMK_CHARE_USE_PTR
858   pCid->objPtr = (void*)idx;
859 #else
860   pCid->objPtr = obj;
861 #endif
862   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
863   register envelope *ret = UsrToEnv(pCid);
864   ret->setVidPtr(env->getVidPtr());
865   register int srcPe = env->getSrcPe();
866   ret->setSrcPe(CkMyPe());
867   CmiSetHandler(ret, _charmHandlerIdx);
868   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
869 #ifndef CMK_CHARE_USE_PTR
870   // register the remote vidblock for deletion when chare is deleted
871   CkChareID vid;
872   vid.onPE = srcPe;
873   vid.objPtr = env->getVidPtr();
874   CpvAccess(vmap)[idx] = vid;    
875 #endif
876   CpvAccess(_qd)->create();
877 #ifndef CMK_CHARE_USE_PTR
878   ((Chare *)obj)->chareIdx = idx;
879 #endif
880   _invokeEntry(env->getEpIdx(),env,obj);
881 }
882
883 /************** Receive: Chares *************/
884
885 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
886 {
887   register int epIdx = env->getEpIdx();
888   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
889   register void *obj;
890   if (mainIdx != -1)  {           // mainchare
891     CmiAssert(CkMyPe()==0);
892     obj = _mainTable[mainIdx]->getObj();
893   }
894   else {
895 #ifndef CMK_CHARE_USE_PTR
896     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
897       obj = CpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
898     else
899       obj = env->getObjPtr();
900 #else
901     obj = env->getObjPtr();
902 #endif
903   }
904   _invokeEntry(epIdx,env,obj);
905 }
906
907 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
908 {
909   register int epIdx = env->getEpIdx();
910   register void *obj = env->getObjPtr();
911   _invokeEntry(epIdx,env,obj);
912 }
913
914 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
915 {
916 #ifndef CMK_CHARE_USE_PTR
917   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
918 #else
919   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
920   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
921 #endif
922   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
923   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
924   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
925   CmiFree(env);
926 }
927
928 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
929 {
930 #ifndef CMK_CHARE_USE_PTR
931   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
932 #else
933   VidBlock *vptr = (VidBlock *) env->getVidPtr();
934   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
935 #endif
936   _SET_USED(env, 1);
937   vptr->send(env);
938 }
939
940 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
941 {
942 #ifndef CMK_CHARE_USE_PTR
943   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
944   delete vptr;
945   CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
946 #endif
947   CmiFree(env);
948 }
949
950 /************** Receive: Groups ****************/
951
952 /**
953  Return a pointer to the local BOC of "groupID".
954  The message "env" passed in has some known dependency on this groupID
955  (either it is to be delivered to this BOC, or it depends on this BOC being there).
956  Therefore, if the return value is NULL, this function buffers the massage so that
957  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
958  The message passed in must have its handlers correctly set so that it can be
959  scheduled again.
960 */
961 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
962 {
963
964         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
965         IrrGroup *obj = ck->localBranch(groupID);
966         if (obj==NULL) { /* groupmember not yet created: stash message */
967                 ck->getGroupTable()->find(groupID).enqMsg(env);
968         }
969         else { /* will be able to process message */
970                 ck->process();
971         }
972         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
973         return obj;
974 }
975
976 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
977 {
978 #if CMK_LBDB_ON
979   // if there is a running obj being measured, stop it temporarily
980   LDObjHandle objHandle;
981   int objstopped = 0;
982   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
983   if (the_lbdb->RunningObject(&objHandle)) {
984     objstopped = 1;
985     the_lbdb->ObjectStop(objHandle);
986   }
987 #endif
988   _invokeEntry(epIdx,env,obj);
989 #if CMK_LBDB_ON
990   if (objstopped) the_lbdb->ObjectStart(objHandle);
991 #endif
992   _STATS_RECORD_PROCESS_BRANCH_1();
993 }
994
995 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
996 {
997   register CkGroupID groupID =  env->getGroupNum();
998   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
999   if(obj) {
1000     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1001   }
1002 }
1003
1004 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1005 {
1006   env->setMsgtype(ForChareMsg);
1007   env->setObjPtr(obj);
1008   _processForChareMsg(ck,env);
1009   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1010 }
1011
1012 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1013 {
1014   env->setEpIdx(epIdx);
1015   _deliverForNodeBocMsg(ck,env, obj);
1016 }
1017
1018 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1019 {
1020   register CkGroupID groupID = env->getGroupNum();
1021   register void *obj;
1022
1023   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1024   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1025   if(!obj) { // groupmember not yet created
1026 #if CMK_IMMEDIATE_MSG
1027     if (CmiIsImmediate(env))     // buffer immediate message
1028       CmiDelayImmediate();
1029     else
1030 #endif
1031     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1032     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1033     return;
1034   }
1035   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1036 #if CMK_IMMEDIATE_MSG
1037   if (!CmiIsImmediate(env))
1038 #endif
1039   ck->process();
1040   env->setMsgtype(ForChareMsg);
1041   env->setObjPtr(obj);
1042   _processForChareMsg(ck,env);
1043   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1044 }
1045
1046 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1047 {
1048   register CkGroupID groupID = env->getGroupNum();
1049   register int epIdx = env->getEpIdx();
1050   if (!env->getGroupDep().isZero()) {      // dependence
1051     CkGroupID dep = env->getGroupDep();
1052     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1053     if (obj == NULL) return;
1054   }
1055   else
1056     ck->process();
1057   CkCreateLocalGroup(groupID, epIdx, env);
1058 }
1059
1060 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1061 {
1062   register CkGroupID groupID = env->getGroupNum();
1063   register int epIdx = env->getEpIdx();
1064   CkCreateLocalNodeGroup(groupID, epIdx, env);
1065 }
1066
1067 /************** Receive: Arrays *************/
1068
1069 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1070   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1071   if (mgr) {
1072     _SET_USED(env, 0);
1073     mgr->insertElement((CkMessage *)EnvToUsr(env));
1074   }
1075 }
1076 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1077   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1078   if (mgr) {
1079     _SET_USED(env, 0);
1080     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1081   }
1082 }
1083
1084 //BIGSIM_OOC DEBUGGING
1085 #define TELLMSGTYPE(x) //x
1086
1087 /**
1088  * This is the main converse-level handler used by all of Charm++.
1089  *
1090  * \addtogroup CriticalPathFramework
1091  */
1092 void _processHandler(void *converseMsg,CkCoreState *ck)
1093 {
1094   register envelope *env = (envelope *) converseMsg;
1095
1096   MESSAGE_PHASE_CHECK(env);
1097
1098 //#if CMK_RECORD_REPLAY
1099   if (ck->watcher!=NULL) {
1100     if (!ck->watcher->processMessage(env,ck)) return;
1101   }
1102 //#endif
1103 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1104         Chare *obj=NULL;
1105         CkObjID sender;
1106         MCount SN;
1107         MlogEntry *entry=NULL;
1108         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1109         env->getMsgtype() == ForArrayEltMsg){
1110                 sender = env->sender;
1111                 SN = env->SN;
1112                 int result = preProcessReceivedMessage(env,&obj,&entry);
1113                 if(result == 0){
1114                         return;
1115                 }
1116         }
1117 #endif
1118
1119 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1120   //  CkPrintf("START\n");
1121   criticalPath_start(env);
1122 #endif
1123
1124
1125   switch(env->getMsgtype()) {
1126 // Group support
1127     case BocInitMsg :
1128       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1129       // QD processing moved inside _processBocInitMsg because it is conditional
1130       //ck->process(); 
1131       if(env->isPacked()) CkUnpackMessage(&env);
1132       _processBocInitMsg(ck,env);
1133       break;
1134     case NodeBocInitMsg :
1135       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1136       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1137       _processNodeBocInitMsg(ck,env);
1138       break;
1139     case ForBocMsg :
1140       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1141       // QD processing moved inside _processForBocMsg because it is conditional
1142       if(env->isPacked()) CkUnpackMessage(&env);
1143       _processForBocMsg(ck,env);
1144       // stats record moved inside _processForBocMsg because it is conditional
1145       break;
1146     case ForNodeBocMsg :
1147       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1148       // QD processing moved to _processForNodeBocMsg because it is conditional
1149       if(env->isPacked()) CkUnpackMessage(&env);
1150       _processForNodeBocMsg(ck,env);
1151       // stats record moved to _processForNodeBocMsg because it is conditional
1152       break;
1153
1154 // Array support
1155     case ArrayEltInitMsg:
1156       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1157       if(env->isPacked()) CkUnpackMessage(&env);
1158       _processArrayEltInitMsg(ck,env);
1159       break;
1160     case ForArrayEltMsg:
1161       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1162       if(env->isPacked()) CkUnpackMessage(&env);
1163       _processArrayEltMsg(ck,env);
1164       break;
1165
1166 // Chare support
1167     case NewChareMsg :
1168       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1169       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1170       _processNewChareMsg(ck,env);
1171       _STATS_RECORD_PROCESS_CHARE_1();
1172       break;
1173     case NewVChareMsg :
1174       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1175       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1176       _processNewVChareMsg(ck,env);
1177       _STATS_RECORD_PROCESS_CHARE_1();
1178       break;
1179     case ForChareMsg :
1180       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1181       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1182       _processForPlainChareMsg(ck,env);
1183       _STATS_RECORD_PROCESS_MSG_1();
1184       break;
1185     case ForVidMsg   :
1186       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1187       ck->process();
1188       _processForVidMsg(ck,env);
1189       break;
1190     case FillVidMsg  :
1191       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1192       ck->process();
1193       _processFillVidMsg(ck,env);
1194       break;
1195     case DeleteVidMsg  :
1196       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1197       ck->process();
1198       _processDeleteVidMsg(ck,env);
1199       break;
1200
1201     default:
1202       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1203   }
1204 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1205         if(obj != NULL){
1206                 postProcessReceivedMessage(obj,sender,SN,entry);
1207         }
1208 #endif
1209
1210
1211 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1212   criticalPath_end();
1213   //  CkPrintf("STOP\n");
1214 #endif
1215
1216
1217 }
1218
1219
1220 /******************** Message Send **********************/
1221
1222 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1223              int *queueing, int *priobits, unsigned int **prioptr)
1224 {
1225   register envelope *env = (envelope *)converseMsg;
1226   *pfn = (CldPackFn)CkPackMessage;
1227   *len = env->getTotalsize();
1228   *queueing = env->getQueueing();
1229   *priobits = env->getPriobits();
1230   *prioptr = (unsigned int *) env->getPrioPtr();
1231 }
1232
1233 void CkPackMessage(envelope **pEnv)
1234 {
1235   register envelope *env = *pEnv;
1236   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1237     register void *msg = EnvToUsr(env);
1238     _TRACE_BEGIN_PACK();
1239     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1240     _TRACE_END_PACK();
1241     env=UsrToEnv(msg);
1242     env->setPacked(1);
1243     *pEnv = env;
1244   }
1245 }
1246
1247 void CkUnpackMessage(envelope **pEnv)
1248 {
1249   register envelope *env = *pEnv;
1250   register int msgIdx = env->getMsgIdx();
1251   if(env->isPacked()) {
1252     register void *msg = EnvToUsr(env);
1253     _TRACE_BEGIN_UNPACK();
1254     msg = _msgTable[msgIdx]->unpack(msg);
1255     _TRACE_END_UNPACK();
1256     env=UsrToEnv(msg);
1257     env->setPacked(0);
1258     *pEnv = env;
1259   }
1260 }
1261
1262 //There's no reason for most messages to go through the Cld--
1263 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1264 // Thus these accellerated versions of the Cld calls.
1265
1266 static int index_objectQHandler;
1267 int index_tokenHandler;
1268 int index_skipCldHandler;
1269
1270 void _skipCldHandler(void *converseMsg)
1271 {
1272   register envelope *env = (envelope *)(converseMsg);
1273   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1274 #if CMK_GRID_QUEUE_AVAILABLE
1275   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1276     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1277                        env, env->getQueueing (), env->getPriobits (),
1278                        (unsigned int *) env->getPrioPtr ());
1279   } else {
1280     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1281                        env, env->getQueueing (), env->getPriobits (),
1282                        (unsigned int *) env->getPrioPtr ());
1283   }
1284 #else
1285   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1286         env, env->getQueueing(),env->getPriobits(),
1287         (unsigned int *)env->getPrioPtr());
1288 #endif
1289 }
1290
1291
1292 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1293 // Made non-static to be used by ckmessagelogging
1294 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1295 {
1296 #if CMK_REPLAYSYSTEM
1297   if (_replaySystem) {
1298     CmiFree(env);
1299     return;
1300   }
1301 #endif
1302   if(pe == CkMyPe() ){
1303     if(!CmiNodeAlive(CkMyPe())){
1304         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1305 //      return;
1306     }
1307   }
1308   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1309 #if CMK_OBJECT_QUEUE_AVAILABLE
1310     Chare *obj = CkFindObjectPtr(env);
1311     if (obj && obj->CkGetObjQueue().queue()) {
1312       _enqObjQueue(obj, env);
1313     }
1314     else
1315 #endif
1316     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1317         env, env->getQueueing(),env->getPriobits(),
1318         (unsigned int *)env->getPrioPtr());
1319   } else {
1320     CkPackMessage(&env);
1321     int len=env->getTotalsize();
1322     CmiSetXHandler(env,CmiGetHandler(env));
1323 #if CMK_OBJECT_QUEUE_AVAILABLE
1324     CmiSetHandler(env,index_objectQHandler);
1325 #else
1326     CmiSetHandler(env,index_skipCldHandler);
1327 #endif
1328     CmiSetInfo(env,infoFn);
1329     if (pe==CLD_BROADCAST) {
1330 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1331                         CmiSyncBroadcast(len, (char *)env);
1332 #else
1333                         CmiSyncBroadcastAndFree(len, (char *)env); 
1334 #endif
1335
1336 }
1337     else if (pe==CLD_BROADCAST_ALL) { 
1338 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1339                         CmiSyncBroadcastAll(len, (char *)env);
1340 #else
1341                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1342 #endif
1343
1344 }
1345     else{
1346 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1347                         CmiSyncSend(pe, len, (char *)env);
1348 #else
1349                         CmiSyncSendAndFree(pe, len, (char *)env);
1350 #endif
1351
1352                 }
1353   }
1354 }
1355
1356 #if CMK_BLUEGENE_CHARM
1357 #   define  _skipCldEnqueue   _CldEnqueue
1358 #endif
1359
1360 // by pass Charm++ priority queue, send as Converse message
1361 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1362 {
1363 #if CMK_REPLAYSYSTEM
1364   if (_replaySystem) {
1365     CmiFree(env);
1366     return;
1367   }
1368 #endif
1369   CkPackMessage(&env);
1370   int len=env->getTotalsize();
1371   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1372 }
1373
1374 static void _noCldEnqueue(int pe, envelope *env)
1375 {
1376 /*
1377   if (pe == CkMyPe()) {
1378     CmiHandleMessage(env);
1379   } else
1380 */
1381 #if CMK_REPLAYSYSTEM
1382   if (_replaySystem) {
1383     CmiFree(env);
1384     return;
1385   }
1386 #endif
1387   CkPackMessage(&env);
1388   int len=env->getTotalsize();
1389   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1390   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1391   else CmiSyncSendAndFree(pe, len, (char *)env);
1392 }
1393
1394 //static void _noCldNodeEnqueue(int node, envelope *env)
1395 //Made non-static to be used by ckmessagelogging
1396 void _noCldNodeEnqueue(int node, envelope *env)
1397 {
1398 /*
1399   if (node == CkMyNode()) {
1400     CmiHandleMessage(env);
1401   } else {
1402 */
1403 #if CMK_REPLAYSYSTEM
1404   if (_replaySystem) {
1405     CmiFree(env);
1406     return;
1407   }
1408 #endif
1409   CkPackMessage(&env);
1410   int len=env->getTotalsize();
1411   if (node==CLD_BROADCAST) { 
1412 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1413         CmiSyncNodeBroadcast(len, (char *)env);
1414 #else
1415         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1416 #endif
1417 }
1418   else if (node==CLD_BROADCAST_ALL) { 
1419 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1420                 CmiSyncNodeBroadcastAll(len, (char *)env);
1421 #else
1422                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1423 #endif
1424
1425 }
1426   else {
1427 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1428         CmiSyncNodeSend(node, len, (char *)env);
1429 #else
1430         CmiSyncNodeSendAndFree(node, len, (char *)env);
1431 #endif
1432   }
1433 }
1434
1435 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1436 {
1437   register envelope *env = UsrToEnv(msg);
1438   _CHECK_USED(env);
1439   _SET_USED(env, 1);
1440   env->setMsgtype(ForChareMsg);
1441   env->setEpIdx(eIdx);
1442   env->setSrcPe(CkMyPe());
1443 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1444   criticalPath_send(env);
1445   automaticallySetMessagePriority(env);
1446 #endif
1447 #ifndef CMK_OPTIMIZE
1448   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1449 #endif
1450 #if CMK_OBJECT_QUEUE_AVAILABLE
1451   CmiSetHandler(env, index_objectQHandler);
1452 #else
1453   CmiSetHandler(env, _charmHandlerIdx);
1454 #endif
1455   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1456     register int pe = -(pCid->onPE+1);
1457     if(pe==CkMyPe()) {
1458 #ifndef CMK_CHARE_USE_PTR
1459       VidBlock *vblk = CpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1460 #else
1461       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1462 #endif
1463       void *objPtr;
1464       if (NULL!=(objPtr=vblk->getLocalChare()))
1465       { //A ready local chare
1466         env->setObjPtr(objPtr);
1467         return pe;
1468       }
1469       else { //The vidblock is not ready-- forget it
1470         vblk->send(env);
1471         return -1;
1472       }
1473     } else { //Valid vidblock for another PE:
1474       env->setMsgtype(ForVidMsg);
1475       env->setVidPtr(pCid->objPtr);
1476       return pe;
1477     }
1478   }
1479   else {
1480     env->setObjPtr(pCid->objPtr);
1481     return pCid->onPE;
1482   }
1483 }
1484
1485 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1486 {
1487   int destPE = _prepareMsg(eIdx, msg, pCid);
1488   if (destPE != -1) {
1489     register envelope *env = UsrToEnv(msg);
1490 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1491     criticalPath_send(env);
1492     automaticallySetMessagePriority(env);
1493 #endif
1494     CmiBecomeImmediate(env);
1495   }
1496   return destPE;
1497 }
1498
1499 extern "C"
1500 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1501 {
1502   if (opts & CK_MSG_INLINE) {
1503     CkSendMsgInline(entryIdx, msg, pCid, opts);
1504     return;
1505   }
1506 #ifndef CMK_OPTIMIZE
1507   if (opts & CK_MSG_IMMEDIATE) {
1508     CmiAbort("Immediate message is not allowed in Chare!");
1509   }
1510 #endif
1511   register envelope *env = UsrToEnv(msg);
1512   int destPE=_prepareMsg(entryIdx,msg,pCid);
1513   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1514   // VidBlock was not yet filled). The problem is that the creation was never
1515   // traced later when the VidBlock was filled. One solution is to trace the
1516   // creation here, the other to trace it in VidBlock->msgDeliver().
1517   _TRACE_CREATION_1(env);
1518   if (destPE!=-1) {
1519     CpvAccess(_qd)->create();
1520     if (opts & CK_MSG_SKIP_OR_IMM)
1521       _noCldEnqueue(destPE, env);
1522     else
1523       _CldEnqueue(destPE, env, _infoIdx);
1524   }
1525   _TRACE_CREATION_DONE(1);
1526 }
1527
1528 extern "C"
1529 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1530 {
1531   if (pCid->onPE==CkMyPe())
1532   {
1533     if(!CmiNodeAlive(CkMyPe())){
1534         return;
1535     }
1536 #ifndef CMK_OPTIMIZE
1537     //Just in case we need to breakpoint or use the envelope in some way
1538     _prepareMsg(entryIndex,msg,pCid);
1539 #endif
1540                 //Just directly call the chare (skip QD handling & scheduler)
1541     register envelope *env = UsrToEnv(msg);
1542     if (env->isPacked()) CkUnpackMessage(&env);
1543     _STATS_RECORD_PROCESS_MSG_1();
1544     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1545   }
1546   else {
1547     //No way to inline a cross-processor message:
1548     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1549   }
1550 }
1551
1552 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1553 {
1554   register envelope *env = UsrToEnv(msg);
1555   CkNodeGroupID nodeRedMgr;
1556   _CHECK_USED(env);
1557   _SET_USED(env, 1);
1558   env->setMsgtype(type);
1559   env->setEpIdx(eIdx);
1560   env->setGroupNum(gID);
1561   env->setSrcPe(CkMyPe());
1562 #ifndef CMK_OPTIMIZE
1563   nodeRedMgr.setZero();
1564   env->setRednMgr(nodeRedMgr);
1565 #endif
1566 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1567   criticalPath_send(env);
1568   automaticallySetMessagePriority(env);
1569 #endif
1570 #ifndef CMK_OPTIMIZE
1571   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1572 #endif
1573   CmiSetHandler(env, _charmHandlerIdx);
1574   return env;
1575 }
1576
1577 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1578 {
1579   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1580 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1581   criticalPath_send(env);
1582   automaticallySetMessagePriority(env);
1583 #endif
1584   CmiBecomeImmediate(env);
1585   return env;
1586 }
1587
1588 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1589                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1590 {
1591   int numPes;
1592   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1593 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1594   sendTicketGroupRequest(env,pe,_infoIdx);
1595 #else
1596   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1597   _TRACE_CREATION_N(env, numPes);
1598   if (opts & CK_MSG_SKIP_OR_IMM)
1599     _noCldEnqueue(pe, env);
1600   else
1601     _skipCldEnqueue(pe, env, _infoIdx);
1602   _TRACE_CREATION_DONE(1);
1603 #endif
1604 }
1605
1606 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1607                            int npes, int *pes)
1608 {
1609   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1610   _TRACE_CREATION_MULTICAST(env, npes, pes);
1611   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1612   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1613 }
1614
1615 extern "C"
1616 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1617 {
1618 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1619   if (destPE==CkMyPe())
1620   {
1621     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1622     return;
1623   }
1624   //Can't inline-- send the usual way
1625   register envelope *env = UsrToEnv(msg);
1626   int numPes;
1627   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1628   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1629   _TRACE_CREATION_N(env, numPes);
1630   _noCldEnqueue(destPE, env);
1631   _STATS_RECORD_SEND_BRANCH_1();
1632   CkpvAccess(_coreState)->create();
1633   _TRACE_CREATION_DONE(1);
1634 #else
1635   // no support for immediate message, send inline
1636   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1637 #endif
1638 }
1639
1640 extern "C"
1641 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1642 {
1643   if (destPE==CkMyPe())
1644   {
1645     if(!CmiNodeAlive(CkMyPe())){
1646         return;
1647     }
1648     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1649     if (obj!=NULL)
1650     { //Just directly call the group:
1651 #ifndef CMK_OPTIMIZE
1652       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1653 #else
1654       envelope *env=UsrToEnv(msg);
1655 #endif
1656       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1657       return;
1658     }
1659   }
1660   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1661   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1662 }
1663
1664 extern "C"
1665 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1666 {
1667   if (opts & CK_MSG_INLINE) {
1668     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1669     return;
1670   }
1671   if (opts & CK_MSG_IMMEDIATE) {
1672     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1673     return;
1674   }
1675   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1676   _STATS_RECORD_SEND_BRANCH_1();
1677   CkpvAccess(_coreState)->create();
1678 }
1679
1680 extern "C"
1681 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1682 {
1683 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1684   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1685   _TRACE_CREATION_MULTICAST(env, npes, pes);
1686   _noCldEnqueueMulti(npes, pes, env);
1687   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1688 #else
1689   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1690   CpvAccess(_qd)->create(-npes);
1691 #endif
1692   _STATS_RECORD_SEND_BRANCH_N(npes);
1693   CpvAccess(_qd)->create(npes);
1694 }
1695
1696 extern "C"
1697 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1698 {
1699   if (opts & CK_MSG_IMMEDIATE) {
1700     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1701     return;
1702   }
1703     // normal mesg
1704   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1705   _STATS_RECORD_SEND_BRANCH_N(npes);
1706   CpvAccess(_qd)->create(npes);
1707 }
1708
1709 extern "C"
1710 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1711 {
1712   int npes;
1713   int *pes;
1714   if (opts & CK_MSG_IMMEDIATE) {
1715     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1716     return;
1717   }
1718     // normal mesg
1719   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1720   CmiLookupGroup(grp, &npes, &pes);
1721   _TRACE_CREATION_MULTICAST(env, npes, pes);
1722   _CldEnqueueGroup(grp, env, _infoIdx);
1723   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1724   _STATS_RECORD_SEND_BRANCH_N(npes);
1725   CpvAccess(_qd)->create(npes);
1726 }
1727
1728 extern "C"
1729 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1730 {
1731   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1732   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1733   CpvAccess(_qd)->create(CkNumPes());
1734 }
1735
1736 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1737                 int node=CLD_BROADCAST_ALL, int opts=0)
1738 {
1739   int numPes;
1740   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1741 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1742         sendTicketNodeGroupRequest(env,node,_infoIdx);
1743 #else
1744   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1745   _TRACE_CREATION_N(env, numPes);
1746   if (opts & CK_MSG_SKIP_OR_IMM) {
1747     _noCldNodeEnqueue(node, env);
1748     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1749       CkpvAccess(_coreState)->create(-numPes);
1750     }
1751   }
1752   else
1753     _CldNodeEnqueue(node, env, _infoIdx);
1754   _TRACE_CREATION_DONE(1);
1755 #endif
1756 }
1757
1758 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1759                            int npes, int *nodes)
1760 {
1761   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1762   _TRACE_CREATION_N(env, npes);
1763   for (int i=0; i<npes; i++) {
1764     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1765   }
1766   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1767 }
1768
1769 extern "C"
1770 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1771 {
1772 #if CMK_IMMEDIATE_MSG
1773   if (node==CkMyNode())
1774   {
1775     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1776     return;
1777   }
1778   //Can't inline-- send the usual way
1779   register envelope *env = UsrToEnv(msg);
1780   int numPes;
1781   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1782   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1783   _TRACE_CREATION_N(env, numPes);
1784   _noCldNodeEnqueue(node, env);
1785   _STATS_RECORD_SEND_BRANCH_1();
1786   /* immeidate message is invisible to QD */
1787 //  CkpvAccess(_coreState)->create();
1788   _TRACE_CREATION_DONE(1);
1789 #else
1790   // no support for immediate message, send inline
1791   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1792 #endif
1793 }
1794
1795 extern "C"
1796 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1797 {
1798   if (node==CkMyNode())
1799   {
1800     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1801     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1802     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1803     if (obj!=NULL)
1804     { //Just directly call the group:
1805 #ifndef CMK_OPTIMIZE
1806       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1807 #else
1808       envelope *env=UsrToEnv(msg);
1809 #endif
1810       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1811       return;
1812     }
1813   }
1814   //Can't inline-- send the usual way
1815   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1816 }
1817
1818 extern "C"
1819 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1820 {
1821   if (opts & CK_MSG_INLINE) {
1822     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1823     return;
1824   }
1825   if (opts & CK_MSG_IMMEDIATE) {
1826     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1827     return;
1828   }
1829   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1830   _STATS_RECORD_SEND_NODE_BRANCH_1();
1831   CkpvAccess(_coreState)->create();
1832 }
1833
1834 extern "C"
1835 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1836 {
1837 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1838   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1839   _noCldEnqueueMulti(npes, nodes, env);
1840 #else
1841   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1842   CpvAccess(_qd)->create(-npes);
1843 #endif
1844   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1845   CpvAccess(_qd)->create(npes);
1846 }
1847
1848 extern "C"
1849 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1850 {
1851   if (opts & CK_MSG_IMMEDIATE) {
1852     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1853     return;
1854   }
1855     // normal mesg
1856   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1857   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1858   CpvAccess(_qd)->create(npes);
1859 }
1860
1861 extern "C"
1862 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1863 {
1864   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1865   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1866   CpvAccess(_qd)->create(CkNumNodes());
1867 }
1868
1869 //Needed by delegation manager:
1870 extern "C"
1871 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1872 { return _prepareMsg(eIdx,msg,pCid); }
1873 extern "C"
1874 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1875 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1876 extern "C"
1877 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1878 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1879
1880 void _ckModuleInit(void) {
1881         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1882         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1883         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1884         CkpvInitialize(TokenPool*, _tokenPool);
1885         CkpvAccess(_tokenPool) = new TokenPool;
1886 }
1887
1888
1889 /************** Send: Arrays *************/
1890
1891 extern void CkArrayManagerInsert(int onPe,void *msg);
1892 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1893
1894 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1895 {
1896   _CHECK_USED(env);
1897   _SET_USED(env, 1);
1898   env->setMsgtype(type);
1899 #ifndef CMK_OPTIMIZE
1900   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1901 #endif
1902   CmiSetHandler(env, _charmHandlerIdx);
1903   CpvAccess(_qd)->create();
1904 }
1905
1906 extern "C"
1907 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1908   register envelope *env = UsrToEnv(msg);
1909   env->getsetArrayMgr()=aID;
1910   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1911   _CldEnqueue(pe, env, _infoIdx);
1912 }
1913
1914 extern "C"
1915 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1916   register envelope *env = UsrToEnv(msg);
1917   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1918 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1919    sendTicketArrayRequest(env,pe,_infoIdx);
1920 #else
1921   if (opts & CK_MSG_IMMEDIATE)
1922     CmiBecomeImmediate(env);
1923   if (opts & CK_MSG_SKIP_OR_IMM)
1924     _noCldEnqueue(pe, env);
1925   else
1926     _skipCldEnqueue(pe, env, _infoIdx);
1927 #endif
1928 }
1929
1930 class ElementDestroyer : public CkLocIterator {
1931 private:
1932         CkLocMgr *locMgr;
1933 public:
1934         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1935         void addLocation(CkLocation &loc) {
1936           loc.destroyAll();
1937         }
1938 };
1939
1940 void CkDeleteChares() {
1941   int i;
1942   int numGroups = CkpvAccess(_groupIDTable)->size();
1943
1944   // delete all array elements
1945   for(i=0;i<numGroups;i++) {
1946     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1947     if(obj && obj->isLocMgr())  {
1948       CkLocMgr *mgr = (CkLocMgr*)obj;
1949       ElementDestroyer destroyer(mgr);
1950       mgr->iterate(destroyer);
1951 printf("[%d] DELETE!\n", CkMyPe());
1952     }
1953   }
1954
1955   // delete all groups
1956   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1957   for(i=0;i<numGroups;i++) {
1958     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1959     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1960     if (obj) delete obj;
1961   }
1962   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1963
1964   // delete all node groups
1965   if (CkMyRank() == 0) {
1966     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1967     for(i=0;i<numNodeGroups;i++) {
1968       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1969       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1970       if (obj) delete obj;
1971     }
1972   }
1973 }
1974
1975 //------------------- Message Watcher (record/replay) ----------------
1976
1977 #include "crc32.h"
1978
1979 CkpvDeclare(int, envelopeEventID);
1980 int _recplay_crc = 0;
1981 int _recplay_checksum = 0;
1982 int _recplay_logsize = 1024*1024;
1983
1984 CkMessageWatcher::~CkMessageWatcher() {}
1985
1986 class CkMessageRecorder : public CkMessageWatcher {
1987   char *buffer;
1988   unsigned int curpos;
1989 public:
1990   CkMessageRecorder(FILE *f_): curpos(0) { f=f_; buffer=new char[_recplay_logsize]; }
1991   ~CkMessageRecorder() {
1992     CkPrintf("closing log\n");
1993     flushLog();
1994     fprintf(f,"-1 -1 -1 ");
1995     fclose(f);
1996     delete[] buffer;
1997   }
1998
1999 private:
2000   void flushLog() {
2001     CkPrintf("flushing log\n");
2002     fprintf(f, "%s", buffer);
2003     curpos=0;
2004   }
2005   virtual CmiBool process(envelope *env,CkCoreState *ck) {
2006     if (env->getEvent()) {
2007       char tmp[128];
2008       bool wasPacked = env->isPacked();
2009       if (!wasPacked) CkPackMessage(&env);
2010       unsigned int crc1=0, crc2=0;
2011       if (_recplay_crc) {
2012         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2013         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2014         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2015       } else if (_recplay_checksum) {
2016         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2017         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2018       }
2019       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2);
2020       if (curpos > _recplay_logsize-128) flushLog();
2021       if (!wasPacked) CkUnpackMessage(&env);
2022     }
2023     return CmiTrue;
2024   }
2025   virtual int process(CthThreadToken *token,CkCoreState *ck) {
2026     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2027     if (curpos > _recplay_logsize-128) flushLog();
2028     return 1;
2029   }
2030 };
2031
2032 class CkMessageDetailRecorder : public CkMessageWatcher {
2033 public:
2034   CkMessageDetailRecorder(FILE *f_) {
2035     f=f_;
2036     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2037      * The value of 'x' is the pointer size.
2038      */
2039     CmiUInt2 little = sizeof(void*);
2040     fwrite(&little, 2, 1, f);
2041   }
2042   ~CkMessageDetailRecorder() {fclose(f);}
2043 private:
2044   virtual CmiBool process(envelope *env, CkCoreState *ck) {
2045     bool wasPacked = env->isPacked();
2046     if (!wasPacked) CkPackMessage(&env);
2047     CmiUInt4 size = env->getTotalsize();
2048     fwrite(&size, 4, 1, f);
2049     fwrite(env, env->getTotalsize(), 1, f);
2050     if (!wasPacked) CkUnpackMessage(&env);
2051     return CmiTrue;
2052   }
2053 };
2054
2055 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2056 #define REPLAYDEBUG(args) /* empty */
2057
2058 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2059 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2060
2061 class CkMessageReplay : public CkMessageWatcher {
2062   int counter;
2063         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2064         unsigned int crc1, crc2;
2065         /// Read the next message we need from the file:
2066         void getNext(void) {
2067           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2068           REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2069           if (nextSize > 0) {
2070             // We are reading a regular message
2071             if (3!=fscanf(f,"%d%x%x", &nexttype,&crc1,&crc2)) {
2072               CkAbort("CkMessageReplay> Syntax error reading replay file");
2073             }
2074           } else if (nextSize == -2) {
2075             // We are reading a special message (right now only thread awaken)
2076             // Nothing to do since we have already read all info
2077 #if CMK_BLUEGENE_CHARM
2078             return getNext();          // ignoring thread event is ok
2079 #endif
2080           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2081             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2082             CkAbort("CkMessageReplay> Unrecognized input");
2083           }
2084             /*
2085                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2086                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2087                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2088                 }
2089                 */
2090                 counter++;
2091         }
2092         /// If this is the next message we need, advance and return CmiTrue.
2093         CmiBool isNext(envelope *env) {
2094                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2095                 if (nextEvent!=env->getEvent()) return CmiFalse;
2096                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2097 #if ! CMK_BLUEGENE_CHARM
2098                 if (nextSize!=env->getTotalsize())
2099                 {
2100                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]\n", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
2101                         return CmiFalse;
2102                 }
2103                 if (_recplay_crc || _recplay_checksum) {
2104                   bool wasPacked = env->isPacked();
2105                   if (!wasPacked) CkPackMessage(&env);
2106                   if (_recplay_crc) {
2107                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2108                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2109                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2110                     if (crcnew1 != crc1) {
2111                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2112                     }
2113                     if (crcnew2 != crc2) {
2114                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2115                     }
2116                   } else if (_recplay_checksum) {
2117             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2118             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2119             if (crcnew1 != crc1) {
2120               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2121             }
2122             if (crcnew2 != crc2) {
2123               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2124             }               
2125                   }
2126                   if (!wasPacked) CkUnpackMessage(&env);
2127                 }
2128 #endif
2129                 return CmiTrue;
2130         }
2131         CmiBool isNext(CthThreadToken *token) {
2132           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2133           return CmiFalse;
2134         }
2135
2136         /// This is a (short) list of messages we aren't yet ready for:
2137         CkQ<envelope *> delayedMessages;
2138         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2139         CkQ<CthThreadToken *> delayedTokens;
2140
2141         /// Try to flush out any delayed messages
2142         void flush(void) {
2143           if (nextSize>0) {
2144                 int len=delayedMessages.length();
2145                 for (int i=0;i<len;i++) {
2146                         envelope *env=delayedMessages.deq();
2147                         if (isNext(env)) { /* this is the next message: process it */
2148                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2149                                 //CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
2150                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2151                                 return;
2152                         }
2153                         else /* Not ready yet-- put it back in the
2154                                 queue */
2155                           {
2156                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2157                                 delayedMessages.enq(env);
2158                           }
2159                 }
2160           } else if (nextSize==-2) {
2161             int len=delayedTokens.length();
2162             for (int i=0;i<len;++i) {
2163               CthThreadToken *token=delayedTokens.deq();
2164               if (isNext(token)) {
2165             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2166                 CsdEnqueueLifo((void*)token);
2167                 return;
2168               } else {
2169             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2170                 delayedTokens.enq(token);
2171               }
2172             }
2173           }
2174         }
2175
2176 public:
2177         CkMessageReplay(FILE *f_) {
2178           counter=0;
2179           f=f_;
2180           getNext();
2181           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2182           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2183         }
2184         ~CkMessageReplay() {fclose(f);}
2185
2186 private:
2187         virtual CmiBool process(envelope *env,CkCoreState *ck) {
2188           //CkAssert(*(int*)env == 0x34567890);
2189           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
2190                 if (env->getEvent() == 0) return CmiTrue;
2191                 if (isNext(env)) { /* This is the message we were expecting */
2192                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2193                         getNext(); /* Advance over this message */
2194                         flush(); /* try to process queued-up stuff */
2195                         return CmiTrue;
2196                 }
2197 #if CMK_SMP
2198                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2199                          // try next rank, we can't just buffer the msg and left
2200                          // we need to keep unprocessed msg on the fly
2201                         int nextpe = CkMyPe()+1;
2202                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2203                         nextpe = CkNodeFirst(CkMyNode());
2204                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2205                         return CmiFalse;
2206                 }
2207 #endif
2208                 else /*!isNext(env) */ {
2209                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
2210                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2211                         delayedMessages.enq(env);
2212                         flush();
2213                         return CmiFalse;
2214                 }
2215         }
2216         virtual int process(CthThreadToken *token, CkCoreState *ck) {
2217       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2218           if (isNext(token)) {
2219         REPLAYDEBUG("Executing token: "<<token->serialNo)
2220             getNext();
2221             flush();
2222             return 1;
2223           } else {
2224         REPLAYDEBUG("Queueing token: "<<token->serialNo
2225             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2226             delayedTokens.enq(token);
2227             return 0;
2228           }
2229         }
2230 };
2231
2232 class CkMessageDetailReplay : public CkMessageWatcher {
2233   void *getNext() {
2234     CmiUInt4 size; size_t nread;
2235     if ((nread=fread(&size, 4, 1, f)) < 1) {
2236       if (feof(f)) return NULL;
2237       CkPrintf("Broken record file (metadata) got %d\n",nread);
2238       CkAbort("");
2239     }
2240     void *env = CmiAlloc(size);
2241     long tell = ftell(f);
2242     if ((nread=fread(env, size, 1, f)) < 1) {
2243       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2244       CkAbort("");
2245     }
2246     //*(int*)env = 0x34567890; // set first integer as magic
2247     return env;
2248   }
2249 public:
2250   CkMessageDetailReplay(FILE *f_) {
2251     f=f_;
2252     /* This must match what CkMessageDetailRecorder did */
2253     CmiUInt2 little;
2254     fread(&little, 2, 1, f);
2255     if (little != sizeof(void*)) {
2256       CkAbort("Replaying on a different architecture from which recording was done!");
2257     }
2258
2259     CsdEnqueue(getNext());
2260
2261     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2262   }
2263   virtual CmiBool process(envelope *env,CkCoreState *ck) {
2264     void *msg = getNext();
2265     if (msg != NULL) CsdEnqueue(msg);
2266     return CmiTrue;
2267   }
2268 };
2269
2270 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2271 #if ! CMK_BLUEGENE_CHARM
2272   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2273 #endif
2274   CkMessageReplay *replay = (CkMessageReplay*)rep;
2275   //CmiStartQD(CkMessageReplayQuiescence, replay);
2276 }
2277
2278 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2279   CkPrintf("[%d] Detailed replay finished. Exiting.\n",CkMyPe());
2280   ConverseExit();
2281 }
2282
2283 static int CpdExecuteThreadResume(CthThreadToken *token) {
2284   CkCoreState *ck = CkpvAccess(_coreState);
2285   if (ck->watcher!=NULL) {
2286     return ck->watcher->processThread(token,ck);
2287   }
2288   return 1;
2289 }
2290
2291 CpvCExtern(int, CthResumeNormalThreadIdx);
2292 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2293 {
2294   CthThread t = token->thread;
2295
2296   if(t == NULL){
2297     free(token);
2298     return;
2299   }
2300 #ifndef CMK_OPTIMIZE
2301 #if ! CMK_TRACE_IN_CHARM
2302   if(CpvAccess(traceOn))
2303     CthTraceResume(t);
2304 /*    if(CpvAccess(_traceCoreOn)) 
2305             resumeTraceCore();*/
2306 #endif
2307 #endif
2308   
2309   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2310   if (CpdExecuteThreadResume(token)) {
2311     CthResume(t);
2312   }
2313 }
2314
2315 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2316
2317 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2318
2319         int i;
2320         char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2321         strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2322         sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2323         FILE *f=fopen(fName,permissions);
2324         REPLAYDEBUG("openReplayfile "<<fName);
2325         if (f==NULL) {
2326                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2327                         CkMyPe(),fName,permissions);
2328                 CkAbort("openReplayFile> Could not open replay file");
2329         }
2330         return f;
2331 }
2332
2333 #include "ckliststring.h"
2334 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2335     CmiBool forceReplay = CmiFalse;
2336     char *procs = NULL;
2337     _replaySystem = 0;
2338     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable crc32 for message record-replay")) {
2339       _recplay_crc = 1;
2340     }
2341     if (CmiGetArgFlagDesc(argv,"+recplay-checksum","Enable simple checksum for message record-replay")) {
2342       _recplay_checksum = 1;
2343     }
2344     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2345     REPLAYDEBUG("CkMessageWatcherInit ");
2346     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2347         CkListString list(procs);
2348         if (list.includes(CkMyPe())) {
2349           CpdSetInitializeMemory(1);
2350           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2351         }
2352     }
2353     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2354       if (CkMyPe() == 0) {
2355         CmiPrintf("Charm++> record mode.\n");
2356         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2357           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2358           _recplay_crc = _recplay_checksum = 0;
2359         }
2360       }
2361       CpdSetInitializeMemory(1);
2362       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2363       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2364     }
2365         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2366             forceReplay = CmiTrue;
2367             CpdSetInitializeMemory(1);
2368             // Set the parameters of the processor
2369 #if CMK_SHARED_VARS_UNAVAILABLE
2370             _Cmi_mype = atoi(procs);
2371             while (procs[0]!='/') procs++;
2372             procs++;
2373             _Cmi_numpes = atoi(procs);
2374 #else
2375             CkAbort("+replay-detail available only for non-SMP build");
2376 #endif
2377             _replaySystem = 1;
2378             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2379         }
2380         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2381           if (CkMyPe() == 0)  {
2382             CmiPrintf("Charm++> replay mode.\n");
2383             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2384               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2385               _recplay_crc = _recplay_checksum = 0;
2386             }
2387           }
2388           CpdSetInitializeMemory(1);
2389 #if ! CMK_BLUEGENE_CHARM
2390           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2391 #endif
2392           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2393         }
2394         if (_recplay_crc && _recplay_checksum) {
2395           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2396         }
2397 }
2398
2399 extern "C"
2400 int CkMessageToEpIdx(void *msg) {
2401         envelope *env=UsrToEnv(msg);
2402         int ep=env->getEpIdx();
2403         if (ep==CkIndex_CkArray::recvBroadcast(0))
2404                 return env->getsetArrayBcastEp();
2405         else
2406                 return ep;
2407 }
2408
2409 extern "C"
2410 int getCharmEnvelopeSize() {
2411   return sizeof(envelope);
2412 }
2413
2414
2415 #include "CkMarshall.def.h"
2416