Adding fault tolerance support for individual chares. Upon message reception, determi...
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 extern "C"
480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
481 {
482   UsrToEnv(msg)->setRef(ref);
483 }
484
485 extern "C"
486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
487 {
488   return UsrToEnv(msg)->getRef();
489 }
490
491 extern "C"
492 int CkGetSrcPe(void *msg)
493 {
494   return UsrToEnv(msg)->getSrcPe();
495 }
496
497 extern "C"
498 int CkGetSrcNode(void *msg)
499 {
500   return CmiNodeOf(CkGetSrcPe(msg));
501 }
502
503 extern "C"
504 void *CkLocalBranch(CkGroupID gID) {
505   return _localBranch(gID);
506 }
507
508 static
509 void *_ckLocalNodeBranch(CkGroupID groupID) {
510   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
511   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
512   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
513   return retval;
514 }
515
516 extern "C"
517 void *CkLocalNodeBranch(CkGroupID groupID)
518 {
519   void *retval;
520   // we are called in a constructor
521   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
522     return CkpvAccess(_currentNodeGroupObj);
523   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
524   { // Nodegroup hasn't finished being created yet-- schedule...
525     CsdScheduler(0);
526   }
527   return retval;
528 }
529
530 extern "C"
531 void *CkLocalChare(const CkChareID *pCid)
532 {
533         int pe=pCid->onPE;
534         if (pe<0) { //A virtual chare ID
535                 if (pe!=(-(CkMyPe()+1)))
536                         return NULL;//VID block not on this PE
537 #ifdef CMK_CHARE_USE_PTR
538                 VidBlock *v=(VidBlock *)pCid->objPtr;
539 #else
540                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
541 #endif
542                 return v->getLocalChareObj();
543         }
544         else
545         { //An ordinary chare ID
546                 if (pe!=CkMyPe())
547                         return NULL;//Chare not on this PE
548 #ifdef CMK_CHARE_USE_PTR
549                 return pCid->objPtr;
550 #else
551                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
552 #endif
553         }
554 }
555
556 CkpvDeclare(char**,Ck_argv);
557
558 extern "C" char **CkGetArgv(void) {
559         return CkpvAccess(Ck_argv);
560 }
561 extern "C" int CkGetArgc(void) {
562         return CmiGetArgc(CkpvAccess(Ck_argv));
563 }
564
565 /******************** Basic support *****************/
566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
567 {
568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
569         CpvAccess(_currentObj) = (Chare *)obj;
570 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
571 #endif
572   //BIGSIM_OOC DEBUGGING
573   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574   //fflush(stdout);
575 #if CMK_CHARMDEBUG
576   CpdBeforeEp(epIdx, obj, msg);
577 #endif    
578   _entryTable[epIdx]->call(msg, obj);
579 #if CMK_CHARMDEBUG
580   CpdAfterEp(epIdx);
581 #endif
582   if (_entryTable[epIdx]->noKeep)
583   { /* Method doesn't keep/delete the message, so we have to: */
584     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585   }
586 }
587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
588 {
589   //BIGSIM_OOC DEBUGGING
590   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
591   //fflush(stdout);
592
593   void *deliverMsg;
594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
595         CpvAccess(_currentObj) = (Chare *)obj;
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Deliver a read-only copy of the message */
599     deliverMsg=(void *)msg;
600   } else
601   { /* Method needs a copy of the message to keep/delete */
602     void *oldMsg=(void *)msg;
603     deliverMsg=CkCopyMsg(&oldMsg);
604 #if CMK_ERROR_CHECKING
605     if (oldMsg!=msg)
606       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
607 #endif
608   }
609 #if CMK_CHARMDEBUG
610   CpdBeforeEp(epIdx, obj, (void*)msg);
611 #endif
612   _entryTable[epIdx]->call(deliverMsg, obj);
613 #if CMK_CHARMDEBUG
614   CpdAfterEp(epIdx);
615 #endif
616 }
617
618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
619 {
620   register void *msg = EnvToUsr(env);
621   _SET_USED(env, 0);
622   CkDeliverMessageFree(epIdx,msg,obj);
623 }
624
625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 {
627
628 #if CMK_TRACE_ENABLED 
629   if (_entryTable[epIdx]->traceEnabled) {
630     _TRACE_BEGIN_EXECUTE(env);
631     _invokeEntryNoTrace(epIdx,env,obj);
632     _TRACE_END_EXECUTE();
633   }
634   else
635 #endif
636     _invokeEntryNoTrace(epIdx,env,obj);
637
638 }
639
640 /********************* Creation ********************/
641
642 extern "C"
643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
644 {
645   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
646   envelope *env = UsrToEnv(msg);
647   _CHECK_USED(env);
648   if(pCid == 0) {
649     env->setMsgtype(NewChareMsg);
650   } else {
651     pCid->onPE = (-(CkMyPe()+1));
652     //  pCid->magic = _GETIDX(cIdx);
653     pCid->objPtr = (void *) new VidBlock();
654     _MEMCHECK(pCid->objPtr);
655     env->setMsgtype(NewVChareMsg);
656     env->setVidPtr(pCid->objPtr);
657 #ifndef CMK_CHARE_USE_PTR
658     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
659     int idx = CkpvAccess(vidblocks).size()-1;
660     pCid->objPtr = (void *)(CmiIntPtr)idx;
661     env->setVidPtr((void *)(CmiIntPtr)idx);
662 #endif
663   }
664   env->setEpIdx(eIdx);
665   env->setByPe(CkMyPe());
666   env->setSrcPe(CkMyPe());
667   CmiSetHandler(env, _charmHandlerIdx);
668   _TRACE_CREATION_1(env);
669   CpvAccess(_qd)->create();
670   _STATS_RECORD_CREATE_CHARE_1();
671   _SET_USED(env, 1);
672   if(destPE == CK_PE_ANY)
673     env->setForAnyPE(1);
674   else
675     env->setForAnyPE(0);
676   _CldEnqueue(destPE, env, _infoIdx);
677   _TRACE_CREATION_DONE(1);
678 }
679
680 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
681 {
682   register int gIdx = _entryTable[epIdx]->chareIdx;
683   register void *obj = malloc(_chareTable[gIdx]->size);
684   _MEMCHECK(obj);
685   setMemoryTypeChare(obj);
686   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
687   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
688   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
689   CkpvAccess(_groupIDTable)->push_back(groupID);
690   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
691   if(ptrq) {
692     void *pending;
693     while((pending=ptrq->deq())!=0)
694       _CldEnqueue(CkMyPe(), pending, _infoIdx);
695 //    delete ptrq;
696       CkpvAccess(_groupTable)->find(groupID).clearPending();
697   }
698   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
699
700   CkpvAccess(_currentGroup) = groupID;
701   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
702 #ifndef CMK_CHARE_USE_PTR
703   //((Chare *)obj)->chareIdx = -1;
704   CkpvAccess(currentChareIdx) = -1;
705 #endif
706   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
707   _STATS_RECORD_PROCESS_GROUP_1();
708 }
709
710 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
711 {
712   register int gIdx = _entryTable[epIdx]->chareIdx;
713   int objSize=_chareTable[gIdx]->size;
714   register void *obj = malloc(objSize);
715   _MEMCHECK(obj);
716   setMemoryTypeChare(obj);
717   CkpvAccess(_currentGroup) = groupID;
718
719 // Now that the NodeGroup is created, add it to the table.
720 //  NodeGroups can be accessed by multiple processors, so
721 //  this is in the opposite order from groups - invoking the constructor
722 //  before registering it.
723 // User may call CkLocalNodeBranch() inside the nodegroup constructor
724 //  store nodegroup into _currentNodeGroupObj
725   CkpvAccess(_currentNodeGroupObj) = obj;
726 #ifndef CMK_CHARE_USE_PTR
727   //((Chare *)obj)->chareIdx = -1;
728   CkpvAccess(currentChareIdx) = -1;
729 #endif
730   _invokeEntryNoTrace(epIdx,env,obj);
731   CkpvAccess(_currentNodeGroupObj) = NULL;
732   _STATS_RECORD_PROCESS_NODE_GROUP_1();
733
734   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
735   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
736   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
737   CksvAccess(_nodeGroupIDTable).push_back(groupID);
738
739   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
740   if(ptrq) {
741     void *pending;
742     while((pending=ptrq->deq())!=0)
743       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
744 //    delete ptrq;
745       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
746   }
747   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
748 }
749
750 void _createGroup(CkGroupID groupID, envelope *env)
751 {
752   _CHECK_USED(env);
753   _SET_USED(env, 1);
754   register int epIdx = env->getEpIdx();
755   int gIdx = _entryTable[epIdx]->chareIdx;
756   CkNodeGroupID rednMgr;
757   if(_chareTable[gIdx]->isIrr == 0){
758                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
759                 rednMgr = rednMgrProxy;
760 //              rednMgrProxy.setAttachedGroup(groupID);
761   }else{
762         rednMgr.setZero();
763   }
764   env->setGroupNum(groupID);
765   env->setSrcPe(CkMyPe());
766   env->setRednMgr(rednMgr);
767   env->setGroupEpoch(CkpvAccess(_charmEpoch));
768
769   if(CkNumPes()>1) {
770     CkPackMessage(&env);
771     CmiSetHandler(env, _bocHandlerIdx);
772     _numInitMsgs++;
773     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
774     CpvAccess(_qd)->create(CkNumPes()-1);
775     CkUnpackMessage(&env);
776   }
777   _STATS_RECORD_CREATE_GROUP_1();
778   CkCreateLocalGroup(groupID, epIdx, env);
779 }
780
781 void _createNodeGroup(CkGroupID groupID, envelope *env)
782 {
783   _CHECK_USED(env);
784   _SET_USED(env, 1);
785   register int epIdx = env->getEpIdx();
786   env->setGroupNum(groupID);
787   env->setSrcPe(CkMyPe());
788   env->setGroupEpoch(CkpvAccess(_charmEpoch));
789   if(CkNumNodes()>1) {
790     CkPackMessage(&env);
791     CmiSetHandler(env, _bocHandlerIdx);
792     _numInitMsgs++;
793     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
794     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
795     CpvAccess(_qd)->create(CkNumNodes()-1);
796     CkUnpackMessage(&env);
797   }
798   _STATS_RECORD_CREATE_NODE_GROUP_1();
799   CkCreateLocalNodeGroup(groupID, epIdx, env);
800 }
801
802 // new _groupCreate
803
804 static CkGroupID _groupCreate(envelope *env)
805 {
806   register CkGroupID groupNum;
807
808   // check CkMyPe(). if it is 0 then idx is _numGroups++
809   // if not, then something else...
810   if(CkMyPe() == 0)
811      groupNum.idx = CkpvAccess(_numGroups)++;
812   else
813      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
814   _createGroup(groupNum, env);
815   return groupNum;
816 }
817
818 // new _nodeGroupCreate
819 static CkGroupID _nodeGroupCreate(envelope *env)
820 {
821   register CkGroupID groupNum;
822   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
823   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
824           groupNum.idx = CksvAccess(_numNodeGroups)++;
825    else
826           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
827   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
828   _createNodeGroup(groupNum, env);
829   return groupNum;
830 }
831
832 /**** generate the group idx when group is creator pe is not pe0
833  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
834  **** remaining bits contain the group creator processor number and
835  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
836
837 int _getGroupIdx(int numNodes,int myNode,int numGroups)
838 {
839         int idx;
840         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
841         int n = 32 - (x+1);                                     // number of bits remaining for the index
842         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
843                                                                 // then add the next available index
844         // of course this won't work when int is 8 bytes long on T3E
845         //idx |= 0x80000000;                                      // set the most significant bit to 1
846         idx = - idx;
847                                                                 // if int is not 32 bits, wouldn't this be wrong?
848         return idx;
849 }
850
851 extern "C"
852 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
853 {
854   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
855   register envelope *env = UsrToEnv(msg);
856   env->setMsgtype(BocInitMsg);
857   env->setEpIdx(eIdx);
858   env->setSrcPe(CkMyPe());
859   _TRACE_CREATION_N(env, CkNumPes());
860   CkGroupID gid = _groupCreate(env);
861   _TRACE_CREATION_DONE(1);
862   return gid;
863 }
864
865 extern "C"
866 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
867 {
868   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
869   register envelope *env = UsrToEnv(msg);
870   env->setMsgtype(NodeBocInitMsg);
871   env->setEpIdx(eIdx);
872   env->setSrcPe(CkMyPe());
873   _TRACE_CREATION_N(env, CkNumNodes());
874   CkGroupID gid = _nodeGroupCreate(env);
875   _TRACE_CREATION_DONE(1);
876   return gid;
877 }
878
879 static inline void *_allocNewChare(envelope *env, int &idx)
880 {
881   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
882   void *tmp=malloc(_chareTable[chareIdx]->size);
883   _MEMCHECK(tmp);
884 #ifndef CMK_CHARE_USE_PTR
885   CkpvAccess(chare_objs).push_back(tmp);
886   CkpvAccess(chare_types).push_back(chareIdx);
887   idx = CkpvAccess(chare_objs).size()-1;
888 #endif
889   setMemoryTypeChare(tmp);
890   return tmp;
891 }
892
893 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
894 {
895   int idx;
896   register void *obj = _allocNewChare(env, idx);
897 #ifndef CMK_CHARE_USE_PTR
898   //((Chare *)obj)->chareIdx = idx;
899   CkpvAccess(currentChareIdx) = idx;
900 #endif
901   _invokeEntry(env->getEpIdx(),env,obj);
902 }
903
904 void CkCreateLocalChare(int epIdx, envelope *env)
905 {
906   env->setEpIdx(epIdx);
907   _processNewChareMsg(NULL, env);
908 }
909
910 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
911 {
912   int idx;
913   register void *obj = _allocNewChare(env, idx);
914   register CkChareID *pCid = (CkChareID *)
915       _allocMsg(FillVidMsg, sizeof(CkChareID));
916   pCid->onPE = CkMyPe();
917 #ifndef CMK_CHARE_USE_PTR
918   pCid->objPtr = (void*)(CmiIntPtr)idx;
919 #else
920   pCid->objPtr = obj;
921 #endif
922   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
923   register envelope *ret = UsrToEnv(pCid);
924   ret->setVidPtr(env->getVidPtr());
925   register int srcPe = env->getByPe();
926   ret->setSrcPe(CkMyPe());
927   CmiSetHandler(ret, _charmHandlerIdx);
928   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
929 #ifndef CMK_CHARE_USE_PTR
930   // register the remote vidblock for deletion when chare is deleted
931   CkChareID vid;
932   vid.onPE = srcPe;
933   vid.objPtr = env->getVidPtr();
934   CkpvAccess(vmap)[idx] = vid;    
935 #endif
936   CpvAccess(_qd)->create();
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
942 }
943
944 /************** Receive: Chares *************/
945
946 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
947 {
948   register int epIdx = env->getEpIdx();
949   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
950   register void *obj;
951   if (mainIdx != -1)  {           // mainchare
952     CmiAssert(CkMyPe()==0);
953     obj = _mainTable[mainIdx]->getObj();
954   }
955   else {
956 #ifndef CMK_CHARE_USE_PTR
957     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
958       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
959     else
960       obj = env->getObjPtr();
961 #else
962     obj = env->getObjPtr();
963 #endif
964   }
965   _invokeEntry(epIdx,env,obj);
966 }
967
968 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
969 {
970   register int epIdx = env->getEpIdx();
971   register void *obj = env->getObjPtr();
972   _invokeEntry(epIdx,env,obj);
973 }
974
975 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
976 {
977 #ifndef CMK_CHARE_USE_PTR
978   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
979 #else
980   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
981   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
982 #endif
983   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
984   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
985   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
986   CmiFree(env);
987 }
988
989 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
990 {
991 #ifndef CMK_CHARE_USE_PTR
992   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
993 #else
994   VidBlock *vptr = (VidBlock *) env->getVidPtr();
995   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
996 #endif
997   _SET_USED(env, 1);
998   vptr->send(env);
999 }
1000
1001 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1002 {
1003 #ifndef CMK_CHARE_USE_PTR
1004   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1005   delete vptr;
1006   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1007 #endif
1008   CmiFree(env);
1009 }
1010
1011 /************** Receive: Groups ****************/
1012
1013 /**
1014  Return a pointer to the local BOC of "groupID".
1015  The message "env" passed in has some known dependency on this groupID
1016  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1017  Therefore, if the return value is NULL, this function buffers the massage so that
1018  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1019  The message passed in must have its handlers correctly set so that it can be
1020  scheduled again.
1021 */
1022 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1023 {
1024
1025         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1026         IrrGroup *obj = ck->localBranch(groupID);
1027         if (obj==NULL) { /* groupmember not yet created: stash message */
1028                 ck->getGroupTable()->find(groupID).enqMsg(env);
1029         }
1030         else { /* will be able to process message */
1031                 ck->process();
1032         }
1033         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1034         return obj;
1035 }
1036
1037 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1038 {
1039 #if CMK_LBDB_ON
1040   // if there is a running obj being measured, stop it temporarily
1041   LDObjHandle objHandle;
1042   int objstopped = 0;
1043   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1044   if (the_lbdb->RunningObject(&objHandle)) {
1045     objstopped = 1;
1046     the_lbdb->ObjectStop(objHandle);
1047   }
1048 #endif
1049   _invokeEntry(epIdx,env,obj);
1050 #if CMK_LBDB_ON
1051   if (objstopped) the_lbdb->ObjectStart(objHandle);
1052 #endif
1053   _STATS_RECORD_PROCESS_BRANCH_1();
1054 }
1055
1056 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1057 {
1058   register CkGroupID groupID =  env->getGroupNum();
1059   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1060   if(obj) {
1061     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1062   }
1063 }
1064
1065 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1066 {
1067   env->setMsgtype(ForChareMsg);
1068   env->setObjPtr(obj);
1069   _processForChareMsg(ck,env);
1070   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1071 }
1072
1073 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1074 {
1075   env->setEpIdx(epIdx);
1076   _deliverForNodeBocMsg(ck,env, obj);
1077 }
1078
1079 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1080 {
1081   register CkGroupID groupID = env->getGroupNum();
1082   register void *obj;
1083
1084   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1085   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1086   if(!obj) { // groupmember not yet created
1087 #if CMK_IMMEDIATE_MSG
1088     if (CmiIsImmediate(env))     // buffer immediate message
1089       CmiDelayImmediate();
1090     else
1091 #endif
1092     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1093     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1094     return;
1095   }
1096   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1097 #if CMK_IMMEDIATE_MSG
1098   if (!CmiIsImmediate(env))
1099 #endif
1100   ck->process();
1101   env->setMsgtype(ForChareMsg);
1102   env->setObjPtr(obj);
1103   _processForChareMsg(ck,env);
1104   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1105 }
1106
1107 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1108 {
1109   register CkGroupID groupID = env->getGroupNum();
1110   register int epIdx = env->getEpIdx();
1111   if (!env->getGroupDep().isZero()) {      // dependence
1112     CkGroupID dep = env->getGroupDep();
1113     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1114     if (obj == NULL) return;
1115   }
1116   else
1117     ck->process();
1118   CkCreateLocalGroup(groupID, epIdx, env);
1119 }
1120
1121 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1122 {
1123   register CkGroupID groupID = env->getGroupNum();
1124   register int epIdx = env->getEpIdx();
1125   CkCreateLocalNodeGroup(groupID, epIdx, env);
1126 }
1127
1128 /************** Receive: Arrays *************/
1129
1130 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1131   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1132   if (mgr) {
1133     _SET_USED(env, 0);
1134     mgr->insertElement((CkMessage *)EnvToUsr(env));
1135   }
1136 }
1137 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1138   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1139   if (mgr) {
1140     _SET_USED(env, 0);
1141     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1142   }
1143 }
1144
1145 //BIGSIM_OOC DEBUGGING
1146 #define TELLMSGTYPE(x) //x
1147
1148 /**
1149  * This is the main converse-level handler used by all of Charm++.
1150  *
1151  * \addtogroup CriticalPathFramework
1152  */
1153 void _processHandler(void *converseMsg,CkCoreState *ck)
1154 {
1155   register envelope *env = (envelope *) converseMsg;
1156
1157   MESSAGE_PHASE_CHECK(env);
1158
1159 //#if CMK_RECORD_REPLAY
1160   if (ck->watcher!=NULL) {
1161     if (!ck->watcher->processMessage(&env,ck)) return;
1162   }
1163 //#endif
1164 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1165         Chare *obj=NULL;
1166         CkObjID sender;
1167         MCount SN;
1168         MlogEntry *entry=NULL;
1169         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1170         env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForChareMsg){
1171                 sender = env->sender;
1172                 SN = env->SN;
1173                 int result = preProcessReceivedMessage(env,&obj,&entry);
1174                 if(result == 0){
1175                         return;
1176                 }
1177         }
1178 #endif
1179
1180 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1181   //  CkPrintf("START\n");
1182   criticalPath_start(env);
1183 #endif
1184
1185
1186   switch(env->getMsgtype()) {
1187 // Group support
1188     case BocInitMsg :
1189       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1190       // QD processing moved inside _processBocInitMsg because it is conditional
1191       //ck->process(); 
1192       if(env->isPacked()) CkUnpackMessage(&env);
1193       _processBocInitMsg(ck,env);
1194       break;
1195     case NodeBocInitMsg :
1196       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1197       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1198       _processNodeBocInitMsg(ck,env);
1199       break;
1200     case ForBocMsg :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1202       // QD processing moved inside _processForBocMsg because it is conditional
1203       if(env->isPacked()) CkUnpackMessage(&env);
1204       _processForBocMsg(ck,env);
1205       // stats record moved inside _processForBocMsg because it is conditional
1206       break;
1207     case ForNodeBocMsg :
1208       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1209       // QD processing moved to _processForNodeBocMsg because it is conditional
1210       if(env->isPacked()) CkUnpackMessage(&env);
1211       _processForNodeBocMsg(ck,env);
1212       // stats record moved to _processForNodeBocMsg because it is conditional
1213       break;
1214
1215 // Array support
1216     case ArrayEltInitMsg:
1217       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1218       if(env->isPacked()) CkUnpackMessage(&env);
1219       _processArrayEltInitMsg(ck,env);
1220       break;
1221     case ForArrayEltMsg:
1222       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1223       if(env->isPacked()) CkUnpackMessage(&env);
1224       _processArrayEltMsg(ck,env);
1225       break;
1226
1227 // Chare support
1228     case NewChareMsg :
1229       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1230       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1231       _processNewChareMsg(ck,env);
1232       _STATS_RECORD_PROCESS_CHARE_1();
1233       break;
1234     case NewVChareMsg :
1235       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1236       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1237       _processNewVChareMsg(ck,env);
1238       _STATS_RECORD_PROCESS_CHARE_1();
1239       break;
1240     case ForChareMsg :
1241       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1242       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1243       _processForPlainChareMsg(ck,env);
1244       _STATS_RECORD_PROCESS_MSG_1();
1245       break;
1246     case ForVidMsg   :
1247       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1248       ck->process();
1249       _processForVidMsg(ck,env);
1250       break;
1251     case FillVidMsg  :
1252       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1253       ck->process();
1254       _processFillVidMsg(ck,env);
1255       break;
1256     case DeleteVidMsg  :
1257       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1258       ck->process();
1259       _processDeleteVidMsg(ck,env);
1260       break;
1261
1262     default:
1263       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1264   }
1265 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1266         if(obj != NULL){
1267                 postProcessReceivedMessage(obj,sender,SN,entry);
1268         }
1269 #endif
1270
1271
1272 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1273   criticalPath_end();
1274   //  CkPrintf("STOP\n");
1275 #endif
1276
1277
1278 }
1279
1280
1281 /******************** Message Send **********************/
1282
1283 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1284              int *queueing, int *priobits, unsigned int **prioptr)
1285 {
1286   register envelope *env = (envelope *)converseMsg;
1287   *pfn = (CldPackFn)CkPackMessage;
1288   *len = env->getTotalsize();
1289   *queueing = env->getQueueing();
1290   *priobits = env->getPriobits();
1291   *prioptr = (unsigned int *) env->getPrioPtr();
1292 }
1293
1294 void CkPackMessage(envelope **pEnv)
1295 {
1296   register envelope *env = *pEnv;
1297   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1298     register void *msg = EnvToUsr(env);
1299     _TRACE_BEGIN_PACK();
1300     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1301     _TRACE_END_PACK();
1302     env=UsrToEnv(msg);
1303     env->setPacked(1);
1304     *pEnv = env;
1305   }
1306 }
1307
1308 void CkUnpackMessage(envelope **pEnv)
1309 {
1310   register envelope *env = *pEnv;
1311   register int msgIdx = env->getMsgIdx();
1312   if(env->isPacked()) {
1313     register void *msg = EnvToUsr(env);
1314     _TRACE_BEGIN_UNPACK();
1315     msg = _msgTable[msgIdx]->unpack(msg);
1316     _TRACE_END_UNPACK();
1317     env=UsrToEnv(msg);
1318     env->setPacked(0);
1319     *pEnv = env;
1320   }
1321 }
1322
1323 //There's no reason for most messages to go through the Cld--
1324 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1325 // Thus these accellerated versions of the Cld calls.
1326 #if CMK_OBJECT_QUEUE_AVAILABLE
1327 static int index_objectQHandler;
1328 #endif
1329 int index_tokenHandler;
1330 int index_skipCldHandler;
1331
1332 void _skipCldHandler(void *converseMsg)
1333 {
1334   register envelope *env = (envelope *)(converseMsg);
1335   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1336 #if CMK_GRID_QUEUE_AVAILABLE
1337   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1338     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1339                        env, env->getQueueing (), env->getPriobits (),
1340                        (unsigned int *) env->getPrioPtr ());
1341   } else {
1342     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1343                        env, env->getQueueing (), env->getPriobits (),
1344                        (unsigned int *) env->getPrioPtr ());
1345   }
1346 #else
1347   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1348         env, env->getQueueing(),env->getPriobits(),
1349         (unsigned int *)env->getPrioPtr());
1350 #endif
1351 }
1352
1353
1354 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1355 // Made non-static to be used by ckmessagelogging
1356 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1357 {
1358 #if CMK_CHARMDEBUG
1359   if (!ConverseDeliver(pe)) {
1360     CmiFree(env);
1361     return;
1362   }
1363 #endif
1364   if(pe == CkMyPe() ){
1365     if(!CmiNodeAlive(CkMyPe())){
1366         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1367 //      return;
1368     }
1369   }
1370   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1371 #if CMK_OBJECT_QUEUE_AVAILABLE
1372     Chare *obj = CkFindObjectPtr(env);
1373     if (obj && obj->CkGetObjQueue().queue()) {
1374       _enqObjQueue(obj, env);
1375     }
1376     else
1377 #endif
1378     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1379         env, env->getQueueing(),env->getPriobits(),
1380         (unsigned int *)env->getPrioPtr());
1381 #if CMK_PERSISTENT_COMM
1382     CmiPersistentOneSend();
1383 #endif
1384   } else {
1385     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1386       CkPackMessage(&env);
1387     int len=env->getTotalsize();
1388     CmiSetXHandler(env,CmiGetHandler(env));
1389 #if CMK_OBJECT_QUEUE_AVAILABLE
1390     CmiSetHandler(env,index_objectQHandler);
1391 #else
1392     CmiSetHandler(env,index_skipCldHandler);
1393 #endif
1394     CmiSetInfo(env,infoFn);
1395     if (pe==CLD_BROADCAST) {
1396 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1397                         CmiSyncBroadcast(len, (char *)env);
1398 #else
1399                         CmiSyncBroadcastAndFree(len, (char *)env); 
1400 #endif
1401
1402 }
1403     else if (pe==CLD_BROADCAST_ALL) { 
1404 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1405                         CmiSyncBroadcastAll(len, (char *)env);
1406 #else
1407                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1408 #endif
1409
1410 }
1411     else{
1412 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1413                         CmiSyncSend(pe, len, (char *)env);
1414 #else
1415                         CmiSyncSendAndFree(pe, len, (char *)env);
1416 #endif
1417
1418                 }
1419   }
1420 }
1421
1422 #if CMK_BIGSIM_CHARM
1423 #   define  _skipCldEnqueue   _CldEnqueue
1424 #endif
1425
1426 // by pass Charm++ priority queue, send as Converse message
1427 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1428 {
1429 #if CMK_CHARMDEBUG
1430   if (!ConverseDeliver(-1)) {
1431     CmiFree(env);
1432     return;
1433   }
1434 #endif
1435   CkPackMessage(&env);
1436   int len=env->getTotalsize();
1437   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1438 }
1439
1440 static void _noCldEnqueue(int pe, envelope *env)
1441 {
1442 /*
1443   if (pe == CkMyPe()) {
1444     CmiHandleMessage(env);
1445   } else
1446 */
1447 #if CMK_CHARMDEBUG
1448   if (!ConverseDeliver(pe)) {
1449     CmiFree(env);
1450     return;
1451   }
1452 #endif
1453   CkPackMessage(&env);
1454   int len=env->getTotalsize();
1455   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1456   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1457   else CmiSyncSendAndFree(pe, len, (char *)env);
1458 }
1459
1460 //static void _noCldNodeEnqueue(int node, envelope *env)
1461 //Made non-static to be used by ckmessagelogging
1462 void _noCldNodeEnqueue(int node, envelope *env)
1463 {
1464 /*
1465   if (node == CkMyNode()) {
1466     CmiHandleMessage(env);
1467   } else {
1468 */
1469 #if CMK_CHARMDEBUG
1470   if (!ConverseDeliver(node)) {
1471     CmiFree(env);
1472     return;
1473   }
1474 #endif
1475   CkPackMessage(&env);
1476   int len=env->getTotalsize();
1477   if (node==CLD_BROADCAST) { 
1478 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1479         CmiSyncNodeBroadcast(len, (char *)env);
1480 #else
1481         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1482 #endif
1483 }
1484   else if (node==CLD_BROADCAST_ALL) { 
1485 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1486                 CmiSyncNodeBroadcastAll(len, (char *)env);
1487 #else
1488                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1489 #endif
1490
1491 }
1492   else {
1493 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1494         CmiSyncNodeSend(node, len, (char *)env);
1495 #else
1496         CmiSyncNodeSendAndFree(node, len, (char *)env);
1497 #endif
1498   }
1499 }
1500
1501 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1502 {
1503   register envelope *env = UsrToEnv(msg);
1504   _CHECK_USED(env);
1505   _SET_USED(env, 1);
1506 #if CMK_REPLAYSYSTEM
1507   setEventID(env);
1508 #endif
1509   env->setMsgtype(ForChareMsg);
1510   env->setEpIdx(eIdx);
1511   env->setSrcPe(CkMyPe());
1512 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1513   criticalPath_send(env);
1514   automaticallySetMessagePriority(env);
1515 #endif
1516 #if CMK_CHARMDEBUG
1517   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1518 #endif
1519 #if CMK_OBJECT_QUEUE_AVAILABLE
1520   CmiSetHandler(env, index_objectQHandler);
1521 #else
1522   CmiSetHandler(env, _charmHandlerIdx);
1523 #endif
1524   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1525     register int pe = -(pCid->onPE+1);
1526     if(pe==CkMyPe()) {
1527 #ifndef CMK_CHARE_USE_PTR
1528       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1529 #else
1530       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1531 #endif
1532       void *objPtr;
1533       if (NULL!=(objPtr=vblk->getLocalChare()))
1534       { //A ready local chare
1535         env->setObjPtr(objPtr);
1536         return pe;
1537       }
1538       else { //The vidblock is not ready-- forget it
1539         vblk->send(env);
1540         return -1;
1541       }
1542     } else { //Valid vidblock for another PE:
1543       env->setMsgtype(ForVidMsg);
1544       env->setVidPtr(pCid->objPtr);
1545       return pe;
1546     }
1547   }
1548   else {
1549     env->setObjPtr(pCid->objPtr);
1550     return pCid->onPE;
1551   }
1552 }
1553
1554 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1555 {
1556   int destPE = _prepareMsg(eIdx, msg, pCid);
1557   if (destPE != -1) {
1558     register envelope *env = UsrToEnv(msg);
1559 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1560     criticalPath_send(env);
1561     automaticallySetMessagePriority(env);
1562 #endif
1563     CmiBecomeImmediate(env);
1564   }
1565   return destPE;
1566 }
1567
1568 extern "C"
1569 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1570 {
1571   if (opts & CK_MSG_INLINE) {
1572     CkSendMsgInline(entryIdx, msg, pCid, opts);
1573     return;
1574   }
1575 #if CMK_ERROR_CHECKING
1576   if (opts & CK_MSG_IMMEDIATE) {
1577     CmiAbort("Immediate message is not allowed in Chare!");
1578   }
1579 #endif
1580   register envelope *env = UsrToEnv(msg);
1581   int destPE=_prepareMsg(entryIdx,msg,pCid);
1582   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1583   // VidBlock was not yet filled). The problem is that the creation was never
1584   // traced later when the VidBlock was filled. One solution is to trace the
1585   // creation here, the other to trace it in VidBlock->msgDeliver().
1586 #if defined(_FAULT_CAUSAL_)
1587         sendChareMsg(env,destPE,_infoIdx,pCid);
1588 #else
1589   _TRACE_CREATION_1(env);
1590   if (destPE!=-1) {
1591     CpvAccess(_qd)->create();
1592     if (opts & CK_MSG_SKIP_OR_IMM)
1593       _noCldEnqueue(destPE, env);
1594     else
1595       _CldEnqueue(destPE, env, _infoIdx);
1596   }
1597   _TRACE_CREATION_DONE(1);
1598 #endif
1599 }
1600
1601 extern "C"
1602 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1603 {
1604   if (pCid->onPE==CkMyPe())
1605   {
1606     if(!CmiNodeAlive(CkMyPe())){
1607         return;
1608     }
1609 #if CMK_CHARMDEBUG
1610     //Just in case we need to breakpoint or use the envelope in some way
1611     _prepareMsg(entryIndex,msg,pCid);
1612 #endif
1613                 //Just directly call the chare (skip QD handling & scheduler)
1614     register envelope *env = UsrToEnv(msg);
1615     if (env->isPacked()) CkUnpackMessage(&env);
1616     _STATS_RECORD_PROCESS_MSG_1();
1617     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1618   }
1619   else {
1620     //No way to inline a cross-processor message:
1621     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1622   }
1623 }
1624
1625 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1626 {
1627   register envelope *env = UsrToEnv(msg);
1628 #if CMK_ERROR_CHECKING
1629   CkNodeGroupID nodeRedMgr;
1630 #endif
1631   _CHECK_USED(env);
1632   _SET_USED(env, 1);
1633 #if CMK_REPLAYSYSTEM
1634   setEventID(env);
1635 #endif
1636   env->setMsgtype(type);
1637   env->setEpIdx(eIdx);
1638   env->setGroupNum(gID);
1639   env->setSrcPe(CkMyPe());
1640 #if CMK_ERROR_CHECKING
1641   nodeRedMgr.setZero();
1642   env->setRednMgr(nodeRedMgr);
1643 #endif
1644 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1645   criticalPath_send(env);
1646   automaticallySetMessagePriority(env);
1647 #endif
1648 #if CMK_CHARMDEBUG
1649   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1650 #endif
1651   CmiSetHandler(env, _charmHandlerIdx);
1652   return env;
1653 }
1654
1655 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1656 {
1657   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1658 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1659   criticalPath_send(env);
1660   automaticallySetMessagePriority(env);
1661 #endif
1662   CmiBecomeImmediate(env);
1663   return env;
1664 }
1665
1666 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1667                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1668 {
1669   int numPes;
1670   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1671 #if defined(_FAULT_MLOG_) 
1672   sendTicketGroupRequest(env,pe,_infoIdx);
1673 #elif defined(_FAULT_CAUSAL_)
1674         sendGroupMsg(env,pe,_infoIdx);
1675 #else
1676   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1677   _TRACE_CREATION_N(env, numPes);
1678   if (opts & CK_MSG_SKIP_OR_IMM)
1679     _noCldEnqueue(pe, env);
1680   else
1681     _skipCldEnqueue(pe, env, _infoIdx);
1682   _TRACE_CREATION_DONE(1);
1683 #endif
1684 }
1685
1686 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1687                            int npes, int *pes)
1688 {
1689   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1690   _TRACE_CREATION_MULTICAST(env, npes, pes);
1691   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1692   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1693 }
1694
1695 extern "C"
1696 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1697 {
1698 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1699   if (destPE==CkMyPe())
1700   {
1701     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1702     return;
1703   }
1704   //Can't inline-- send the usual way
1705   register envelope *env = UsrToEnv(msg);
1706   int numPes;
1707   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1708   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1709   _TRACE_CREATION_N(env, numPes);
1710   _noCldEnqueue(destPE, env);
1711   _STATS_RECORD_SEND_BRANCH_1();
1712   CkpvAccess(_coreState)->create();
1713   _TRACE_CREATION_DONE(1);
1714 #else
1715   // no support for immediate message, send inline
1716   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1717 #endif
1718 }
1719
1720 extern "C"
1721 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1722 {
1723   if (destPE==CkMyPe())
1724   {
1725     if(!CmiNodeAlive(CkMyPe())){
1726         return;
1727     }
1728     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1729     if (obj!=NULL)
1730     { //Just directly call the group:
1731 #if CMK_ERROR_CHECKING
1732       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1733 #else
1734       envelope *env=UsrToEnv(msg);
1735 #endif
1736       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1737       return;
1738     }
1739   }
1740   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1741   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1742 }
1743
1744 extern "C"
1745 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1746 {
1747   if (opts & CK_MSG_INLINE) {
1748     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1749     return;
1750   }
1751   if (opts & CK_MSG_IMMEDIATE) {
1752     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1753     return;
1754   }
1755   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1756   _STATS_RECORD_SEND_BRANCH_1();
1757   CkpvAccess(_coreState)->create();
1758 }
1759
1760 extern "C"
1761 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1762 {
1763 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1764   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1765   _TRACE_CREATION_MULTICAST(env, npes, pes);
1766   _noCldEnqueueMulti(npes, pes, env);
1767   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1768 #else
1769   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1770   CpvAccess(_qd)->create(-npes);
1771 #endif
1772   _STATS_RECORD_SEND_BRANCH_N(npes);
1773   CpvAccess(_qd)->create(npes);
1774 }
1775
1776 extern "C"
1777 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1778 {
1779   if (opts & CK_MSG_IMMEDIATE) {
1780     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1781     return;
1782   }
1783     // normal mesg
1784   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1785   _STATS_RECORD_SEND_BRANCH_N(npes);
1786   CpvAccess(_qd)->create(npes);
1787 }
1788
1789 extern "C"
1790 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1791 {
1792   int npes;
1793   int *pes;
1794   if (opts & CK_MSG_IMMEDIATE) {
1795     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1796     return;
1797   }
1798     // normal mesg
1799   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1800   CmiLookupGroup(grp, &npes, &pes);
1801   _TRACE_CREATION_MULTICAST(env, npes, pes);
1802   _CldEnqueueGroup(grp, env, _infoIdx);
1803   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1804   _STATS_RECORD_SEND_BRANCH_N(npes);
1805   CpvAccess(_qd)->create(npes);
1806 }
1807
1808 extern "C"
1809 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1810 {
1811   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1812   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1813   CpvAccess(_qd)->create(CkNumPes());
1814 }
1815
1816 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1817                 int node=CLD_BROADCAST_ALL, int opts=0)
1818 {
1819   int numPes;
1820   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1821 #if defined(_FAULT_MLOG_)
1822         sendTicketNodeGroupRequest(env,node,_infoIdx);
1823 #elif defined(_FAULT_CAUSAL_)
1824         sendNodeGroupMsg(env,node,_infoIdx);
1825 #else
1826   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1827   _TRACE_CREATION_N(env, numPes);
1828   if (opts & CK_MSG_SKIP_OR_IMM) {
1829     _noCldNodeEnqueue(node, env);
1830     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1831       CkpvAccess(_coreState)->create(-numPes);
1832     }
1833   }
1834   else
1835     _CldNodeEnqueue(node, env, _infoIdx);
1836   _TRACE_CREATION_DONE(1);
1837 #endif
1838 }
1839
1840 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1841                            int npes, int *nodes)
1842 {
1843   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1844   _TRACE_CREATION_N(env, npes);
1845   for (int i=0; i<npes; i++) {
1846     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1847   }
1848   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1849 }
1850
1851 extern "C"
1852 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1853 {
1854 #if CMK_IMMEDIATE_MSG
1855   if (node==CkMyNode())
1856   {
1857     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1858     return;
1859   }
1860   //Can't inline-- send the usual way
1861   register envelope *env = UsrToEnv(msg);
1862   int numPes;
1863   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1864   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1865   _TRACE_CREATION_N(env, numPes);
1866   _noCldNodeEnqueue(node, env);
1867   _STATS_RECORD_SEND_BRANCH_1();
1868   /* immeidate message is invisible to QD */
1869 //  CkpvAccess(_coreState)->create();
1870   _TRACE_CREATION_DONE(1);
1871 #else
1872   // no support for immediate message, send inline
1873   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1874 #endif
1875 }
1876
1877 extern "C"
1878 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1879 {
1880   if (node==CkMyNode())
1881   {
1882     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1883     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1884     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1885     if (obj!=NULL)
1886     { //Just directly call the group:
1887 #if CMK_ERROR_CHECKING
1888       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1889 #else
1890       envelope *env=UsrToEnv(msg);
1891 #endif
1892       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1893       return;
1894     }
1895   }
1896   //Can't inline-- send the usual way
1897   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1898 }
1899
1900 extern "C"
1901 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1902 {
1903   if (opts & CK_MSG_INLINE) {
1904     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1905     return;
1906   }
1907   if (opts & CK_MSG_IMMEDIATE) {
1908     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1909     return;
1910   }
1911   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1912   _STATS_RECORD_SEND_NODE_BRANCH_1();
1913   CkpvAccess(_coreState)->create();
1914 }
1915
1916 extern "C"
1917 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1918 {
1919 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1920   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1921   _noCldEnqueueMulti(npes, nodes, env);
1922 #else
1923   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1924   CpvAccess(_qd)->create(-npes);
1925 #endif
1926   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1927   CpvAccess(_qd)->create(npes);
1928 }
1929
1930 extern "C"
1931 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1932 {
1933   if (opts & CK_MSG_IMMEDIATE) {
1934     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1935     return;
1936   }
1937     // normal mesg
1938   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1939   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1940   CpvAccess(_qd)->create(npes);
1941 }
1942
1943 extern "C"
1944 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1945 {
1946   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1947   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1948   CpvAccess(_qd)->create(CkNumNodes());
1949 }
1950
1951 //Needed by delegation manager:
1952 extern "C"
1953 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1954 { return _prepareMsg(eIdx,msg,pCid); }
1955 extern "C"
1956 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1957 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1958 extern "C"
1959 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1960 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1961
1962 void _ckModuleInit(void) {
1963         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1964 #if CMK_OBJECT_QUEUE_AVAILABLE
1965         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1966 #endif
1967         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1968         CkpvInitialize(TokenPool*, _tokenPool);
1969         CkpvAccess(_tokenPool) = new TokenPool;
1970 }
1971
1972
1973 /************** Send: Arrays *************/
1974
1975 extern void CkArrayManagerInsert(int onPe,void *msg);
1976 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1977
1978 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1979 {
1980   _CHECK_USED(env);
1981   _SET_USED(env, 1);
1982   env->setMsgtype(type);
1983 #if CMK_CHARMDEBUG
1984   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1985 #endif
1986   CmiSetHandler(env, _charmHandlerIdx);
1987   CpvAccess(_qd)->create();
1988 }
1989
1990 extern "C"
1991 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1992   register envelope *env = UsrToEnv(msg);
1993   env->getsetArrayMgr()=aID;
1994   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1995   _CldEnqueue(pe, env, _infoIdx);
1996 }
1997
1998 extern "C"
1999 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2000   register envelope *env = UsrToEnv(msg);
2001   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2002 #if defined(_FAULT_MLOG_)
2003    sendTicketArrayRequest(env,pe,_infoIdx);
2004 #elif defined(_FAULT_CAUSAL_)
2005         sendArrayMsg(env,pe,_infoIdx);
2006 #else
2007   if (opts & CK_MSG_IMMEDIATE)
2008     CmiBecomeImmediate(env);
2009   if (opts & CK_MSG_SKIP_OR_IMM)
2010     _noCldEnqueue(pe, env);
2011   else
2012     _skipCldEnqueue(pe, env, _infoIdx);
2013 #endif
2014 }
2015
2016 class ElementDestroyer : public CkLocIterator {
2017 private:
2018         CkLocMgr *locMgr;
2019 public:
2020         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2021         void addLocation(CkLocation &loc) {
2022           loc.destroyAll();
2023         }
2024 };
2025
2026 void CkDeleteChares() {
2027   int i;
2028   int numGroups = CkpvAccess(_groupIDTable)->size();
2029
2030   // delete all plain chares
2031 #ifndef CMK_CHARE_USE_PTR
2032   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2033         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2034         delete obj;
2035         CkpvAccess(chare_objs)[i] = NULL;
2036   }
2037   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2038         VidBlock *obj = CkpvAccess(vidblocks)[i];
2039         delete obj;
2040         CkpvAccess(vidblocks)[i] = NULL;
2041   }
2042 #endif
2043
2044   // delete all array elements
2045   for(i=0;i<numGroups;i++) {
2046     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2047     if(obj && obj->isLocMgr())  {
2048       CkLocMgr *mgr = (CkLocMgr*)obj;
2049       ElementDestroyer destroyer(mgr);
2050       mgr->iterate(destroyer);
2051     }
2052   }
2053
2054   // delete all groups
2055   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2056   for(i=0;i<numGroups;i++) {
2057     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2058     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2059     if (obj) delete obj;
2060   }
2061   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2062
2063   // delete all node groups
2064   if (CkMyRank() == 0) {
2065     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2066     for(i=0;i<numNodeGroups;i++) {
2067       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2068       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2069       if (obj) delete obj;
2070     }
2071   }
2072 }
2073
2074 //------------------- Message Watcher (record/replay) ----------------
2075
2076 #include "crc32.h"
2077
2078 CkpvDeclare(int, envelopeEventID);
2079 int _recplay_crc = 0;
2080 int _recplay_checksum = 0;
2081 int _recplay_logsize = 1024*1024;
2082
2083 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2084 #define REPLAYDEBUG(args) /* empty */
2085
2086 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2087
2088 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2089
2090 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2091
2092     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2093     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2094     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2095     FILE *f=fopen(fName,permissions);
2096     REPLAYDEBUG("openReplayfile "<<fName);
2097     if (f==NULL) {
2098         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2099             CkMyPe(),fName,permissions);
2100         CkAbort("openReplayFile> Could not open replay file");
2101     }
2102     return f;
2103 }
2104
2105 #include "BaseLB.h" /* For LBMigrateMsg message */
2106
2107 class CkMessageRecorder : public CkMessageWatcher {
2108   char *buffer;
2109   unsigned int curpos;
2110   bool firstOpen;
2111 public:
2112   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2113   ~CkMessageRecorder() {
2114     flushLog(0);
2115     fprintf(f,"-1 -1 -1 ");
2116     fclose(f);
2117     delete[] buffer;
2118 #if 0
2119     FILE *stsfp = fopen("sts", "w");
2120     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2121     traceWriteSTS(stsfp, 0);
2122     fclose(stsfp);
2123 #endif
2124     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2125   }
2126
2127 private:
2128   void flushLog(int verbose=1) {
2129     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2130     fprintf(f, "%s", buffer);
2131     curpos=0;
2132   }
2133   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2134     if ((*envptr)->getEvent()) {
2135       bool wasPacked = (*envptr)->isPacked();
2136       if (!wasPacked) CkPackMessage(envptr);
2137       envelope *env = *envptr;
2138       unsigned int crc1=0, crc2=0;
2139       if (_recplay_crc) {
2140         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2141         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2142         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2143       } else if (_recplay_checksum) {
2144         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2145         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2146       }
2147       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2148       if (curpos > _recplay_logsize-128) flushLog();
2149       if (!wasPacked) CkUnpackMessage(envptr);
2150     }
2151     return CmiTrue;
2152   }
2153   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2154     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2155     if (curpos > _recplay_logsize-128) flushLog();
2156     return CmiTrue;
2157   }
2158   
2159   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2160     FILE *f;
2161     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2162     else f = openReplayFile("ckreplay_",".lb","a");
2163     firstOpen = false;
2164     if (f != NULL) {
2165       PUP::toDisk p(f);
2166       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2167       (*msg)->pup(p);
2168       fclose(f);
2169     }
2170     return CmiTrue;
2171   }
2172 };
2173
2174 class CkMessageDetailRecorder : public CkMessageWatcher {
2175 public:
2176   CkMessageDetailRecorder(FILE *f_) {
2177     f=f_;
2178     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2179      * The value of 'x' is the pointer size.
2180      */
2181     CmiUInt2 little = sizeof(void*);
2182     fwrite(&little, 2, 1, f);
2183   }
2184   ~CkMessageDetailRecorder() {fclose(f);}
2185 private:
2186   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2187     bool wasPacked = (*envptr)->isPacked();
2188     if (!wasPacked) CkPackMessage(envptr);
2189     envelope *env = *envptr;
2190     CmiUInt4 size = env->getTotalsize();
2191     fwrite(&size, 4, 1, f);
2192     fwrite(env, env->getTotalsize(), 1, f);
2193     if (!wasPacked) CkUnpackMessage(envptr);
2194     return CmiTrue;
2195   }
2196 };
2197
2198 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2199 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2200
2201 #if CMK_BIGSIM_CHARM
2202 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2203                                    int pb,unsigned int *prio);
2204 #endif
2205
2206 class CkMessageReplay : public CkMessageWatcher {
2207   int counter;
2208         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2209         int nextEP;
2210         unsigned int crc1, crc2;
2211         FILE *lbFile;
2212         /// Read the next message we need from the file:
2213         void getNext(void) {
2214           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2215           if (nextSize > 0) {
2216             // We are reading a regular message
2217             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2218               CkAbort("CkMessageReplay> Syntax error reading replay file");
2219             }
2220             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2221           } else if (nextSize == -2) {
2222             // We are reading a special message (right now only thread awaken)
2223             // Nothing to do since we have already read all info
2224             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2225           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2226             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2227             CkAbort("CkMessageReplay> Unrecognized input");
2228           }
2229             /*
2230                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2231                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2232                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2233                 }
2234                 */
2235                 counter++;
2236         }
2237         /// If this is the next message we need, advance and return CmiTrue.
2238         CmiBool isNext(envelope *env) {
2239                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2240                 if (nextEvent!=env->getEvent()) return CmiFalse;
2241                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2242 #if 1
2243                 if (nextEP != env->getEpIdx()) {
2244                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2245                         return CmiFalse;
2246                 }
2247 #endif
2248 #if ! CMK_BIGSIM_CHARM
2249                 if (nextSize!=env->getTotalsize())
2250                 {
2251                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2252                         return CmiFalse;
2253                 }
2254                 if (_recplay_crc || _recplay_checksum) {
2255                   bool wasPacked = env->isPacked();
2256                   if (!wasPacked) CkPackMessage(&env);
2257                   if (_recplay_crc) {
2258                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2259                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2260                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2261                     if (crcnew1 != crc1) {
2262                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2263                     }
2264                     if (crcnew2 != crc2) {
2265                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2266                     }
2267                   } else if (_recplay_checksum) {
2268             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2269             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2270             if (crcnew1 != crc1) {
2271               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2272             }
2273             if (crcnew2 != crc2) {
2274               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2275             }               
2276                   }
2277                   if (!wasPacked) CkUnpackMessage(&env);
2278                 }
2279 #endif
2280                 return CmiTrue;
2281         }
2282         CmiBool isNext(CthThreadToken *token) {
2283           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2284           return CmiFalse;
2285         }
2286
2287         /// This is a (short) list of messages we aren't yet ready for:
2288         CkQ<envelope *> delayedMessages;
2289         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2290         CkQ<CthThreadToken *> delayedTokens;
2291
2292         /// Try to flush out any delayed messages
2293         void flush(void) {
2294           if (nextSize>0) {
2295                 int len=delayedMessages.length();
2296                 for (int i=0;i<len;i++) {
2297                         envelope *env=delayedMessages.deq();
2298                         if (isNext(env)) { /* this is the next message: process it */
2299                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2300                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2301                                 return;
2302                         }
2303                         else /* Not ready yet-- put it back in the
2304                                 queue */
2305                           {
2306                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2307                                 delayedMessages.enq(env);
2308                           }
2309                 }
2310           } else if (nextSize==-2) {
2311             int len=delayedTokens.length();
2312             for (int i=0;i<len;++i) {
2313               CthThreadToken *token=delayedTokens.deq();
2314               if (isNext(token)) {
2315             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2316 #if ! CMK_BIGSIM_CHARM
2317                 CsdEnqueueLifo((void*)token);
2318 #else
2319                 CthEnqueueBigSimThread(token,0,0,NULL);
2320 #endif
2321                 return;
2322               } else {
2323             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2324                 delayedTokens.enq(token);
2325               }
2326             }
2327           }
2328         }
2329
2330 public:
2331         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2332           counter=0;
2333           f=f_;
2334           getNext();
2335           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2336           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2337         }
2338         ~CkMessageReplay() {fclose(f);}
2339
2340 private:
2341         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2342           bool wasPacked = (*envptr)->isPacked();
2343           if (!wasPacked) CkPackMessage(envptr);
2344           envelope *env = *envptr;
2345           //CkAssert(*(int*)env == 0x34567890);
2346           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2347                 if (env->getEvent() == 0) return CmiTrue;
2348                 if (isNext(env)) { /* This is the message we were expecting */
2349                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2350                         getNext(); /* Advance over this message */
2351                         flush(); /* try to process queued-up stuff */
2352                         if (!wasPacked) CkUnpackMessage(envptr);
2353                         return CmiTrue;
2354                 }
2355 #if CMK_SMP
2356                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2357                          // try next rank, we can't just buffer the msg and left
2358                          // we need to keep unprocessed msg on the fly
2359                         int nextpe = CkMyPe()+1;
2360                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2361                         nextpe = CkNodeFirst(CkMyNode());
2362                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2363                         return CmiFalse;
2364                 }
2365 #endif
2366                 else /*!isNext(env) */ {
2367                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2368                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2369                         delayedMessages.enq(env);
2370                         flush();
2371                         return CmiFalse;
2372                 }
2373         }
2374         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2375       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2376           if (isNext(token)) {
2377         REPLAYDEBUG("Executing token: "<<token->serialNo)
2378             getNext();
2379             flush();
2380             return CmiTrue;
2381           } else {
2382         REPLAYDEBUG("Queueing token: "<<token->serialNo
2383             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2384             delayedTokens.enq(token);
2385             return CmiFalse;
2386           }
2387         }
2388
2389         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2390           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2391           if (lbFile != NULL) {
2392             int num_moves;
2393         PUP::fromDisk p(lbFile);
2394             p | num_moves;
2395             if (num_moves != (*msg)->n_moves) {
2396               delete *msg;
2397               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2398             }
2399             (*msg)->pup(p);
2400           }
2401           return CmiTrue;
2402         }
2403 };
2404
2405 class CkMessageDetailReplay : public CkMessageWatcher {
2406   void *getNext() {
2407     CmiUInt4 size; size_t nread;
2408     if ((nread=fread(&size, 4, 1, f)) < 1) {
2409       if (feof(f)) return NULL;
2410       CkPrintf("Broken record file (metadata) got %d\n",nread);
2411       CkAbort("");
2412     }
2413     void *env = CmiAlloc(size);
2414     long tell = ftell(f);
2415     if ((nread=fread(env, size, 1, f)) < 1) {
2416       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2417       CkAbort("");
2418     }
2419     //*(int*)env = 0x34567890; // set first integer as magic
2420     return env;
2421   }
2422 public:
2423   double starttime;
2424   CkMessageDetailReplay(FILE *f_) {
2425     f=f_;
2426     starttime=CkWallTimer();
2427     /* This must match what CkMessageDetailRecorder did */
2428     CmiUInt2 little;
2429     fread(&little, 2, 1, f);
2430     if (little != sizeof(void*)) {
2431       CkAbort("Replaying on a different architecture from which recording was done!");
2432     }
2433
2434     CsdEnqueue(getNext());
2435
2436     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2437   }
2438   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2439     void *msg = getNext();
2440     if (msg != NULL) CsdEnqueue(msg);
2441     return CmiTrue;
2442   }
2443 };
2444
2445 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2446 #if ! CMK_BIGSIM_CHARM
2447   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2448 #endif
2449   CkMessageReplay *replay = (CkMessageReplay*)rep;
2450   //CmiStartQD(CkMessageReplayQuiescence, replay);
2451 }
2452
2453 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2454   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2455   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2456   ConverseExit();
2457 }
2458
2459 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2460   CkCoreState *ck = CkpvAccess(_coreState);
2461   if (ck->watcher!=NULL) {
2462     return ck->watcher->processThread(token,ck);
2463   }
2464   return CmiTrue;
2465 }
2466
2467 CpvCExtern(int, CthResumeNormalThreadIdx);
2468 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2469 {
2470   CthThread t = token->thread;
2471
2472   if(t == NULL){
2473     free(token);
2474     return;
2475   }
2476 #if CMK_TRACE_ENABLED
2477 #if ! CMK_TRACE_IN_CHARM
2478   if(CpvAccess(traceOn))
2479     CthTraceResume(t);
2480 /*    if(CpvAccess(_traceCoreOn)) 
2481             resumeTraceCore();*/
2482 #endif
2483 #endif
2484   
2485   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2486   if (CpdExecuteThreadResume(token)) {
2487     CthResume(t);
2488   }
2489 }
2490
2491 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2492   CkCoreState *ck = CkpvAccess(_coreState);
2493   if (ck->watcher!=NULL) {
2494     ck->watcher->processLBMessage(msg, ck);
2495   }
2496 }
2497
2498 #if CMK_BIGSIM_CHARM
2499 CpvExtern(int      , CthResumeBigSimThreadIdx);
2500 #endif
2501
2502 #include "ckliststring.h"
2503 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2504     CmiArgGroup("Charm++","Record/Replay");
2505     CmiBool forceReplay = CmiFalse;
2506     char *procs = NULL;
2507     _replaySystem = 0;
2508     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2509       _recplay_crc = 1;
2510     }
2511     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2512       _recplay_checksum = 1;
2513     }
2514     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2515     REPLAYDEBUG("CkMessageWatcherInit ");
2516     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2517         CkListString list(procs);
2518         if (list.includes(CkMyPe())) {
2519           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2520           CpdSetInitializeMemory(1);
2521           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2522         }
2523     }
2524     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2525       if (CkMyPe() == 0) {
2526         CmiPrintf("Charm++> record mode.\n");
2527         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2528           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2529           _recplay_crc = _recplay_checksum = 0;
2530         }
2531       }
2532       CpdSetInitializeMemory(1);
2533       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2534       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2535     }
2536         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2537             forceReplay = CmiTrue;
2538             CpdSetInitializeMemory(1);
2539             // Set the parameters of the processor
2540 #if CMK_SHARED_VARS_UNAVAILABLE
2541             _Cmi_mype = atoi(procs);
2542             while (procs[0]!='/') procs++;
2543             procs++;
2544             _Cmi_numpes = atoi(procs);
2545 #else
2546             CkAbort("+replay-detail available only for non-SMP build");
2547 #endif
2548             _replaySystem = 1;
2549             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2550         }
2551         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2552           if (CkMyPe() == 0)  {
2553             CmiPrintf("Charm++> replay mode.\n");
2554             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2555               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2556               _recplay_crc = _recplay_checksum = 0;
2557             }
2558           }
2559           CpdSetInitializeMemory(1);
2560 #if ! CMK_BIGSIM_CHARM
2561           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2562 #else
2563           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2564 #endif
2565           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2566         }
2567         if (_recplay_crc && _recplay_checksum) {
2568           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2569         }
2570 }
2571
2572 extern "C"
2573 int CkMessageToEpIdx(void *msg) {
2574         envelope *env=UsrToEnv(msg);
2575         int ep=env->getEpIdx();
2576         if (ep==CkIndex_CkArray::recvBroadcast(0))
2577                 return env->getsetArrayBcastEp();
2578         else
2579                 return ep;
2580 }
2581
2582 extern "C"
2583 int getCharmEnvelopeSize() {
2584   return sizeof(envelope);
2585 }
2586
2587 extern "C"
2588 int isCharmEnvelope(void *msg) {
2589       // best efford guessing if this is a charm envelope
2590     envelope *e = (envelope *)msg;
2591     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2592     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2593     if (e->getTotalsize() < sizeof(envelope)) return 0;
2594     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2595 #if CMK_SMP
2596     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2597 #else
2598     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2599 #endif
2600     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2601     return 1;
2602 }
2603
2604
2605 #include "CkMarshall.def.h"
2606