srcpe is used for two purpose, one for plain chare creation, another mainly by projec...
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 extern "C"
480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
481 {
482   UsrToEnv(msg)->setRef(ref);
483 }
484
485 extern "C"
486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
487 {
488   return UsrToEnv(msg)->getRef();
489 }
490
491 extern "C"
492 int CkGetSrcPe(void *msg)
493 {
494   return UsrToEnv(msg)->getSrcPe();
495 }
496
497 extern "C"
498 int CkGetSrcNode(void *msg)
499 {
500   return CmiNodeOf(CkGetSrcPe(msg));
501 }
502
503 extern "C"
504 void *CkLocalBranch(CkGroupID gID) {
505   return _localBranch(gID);
506 }
507
508 static
509 void *_ckLocalNodeBranch(CkGroupID groupID) {
510   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
511   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
512   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
513   return retval;
514 }
515
516 extern "C"
517 void *CkLocalNodeBranch(CkGroupID groupID)
518 {
519   void *retval;
520   // we are called in a constructor
521   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
522     return CkpvAccess(_currentNodeGroupObj);
523   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
524   { // Nodegroup hasn't finished being created yet-- schedule...
525     CsdScheduler(0);
526   }
527   return retval;
528 }
529
530 extern "C"
531 void *CkLocalChare(const CkChareID *pCid)
532 {
533         int pe=pCid->onPE;
534         if (pe<0) { //A virtual chare ID
535                 if (pe!=(-(CkMyPe()+1)))
536                         return NULL;//VID block not on this PE
537 #ifdef CMK_CHARE_USE_PTR
538                 VidBlock *v=(VidBlock *)pCid->objPtr;
539 #else
540                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
541 #endif
542                 return v->getLocalChareObj();
543         }
544         else
545         { //An ordinary chare ID
546                 if (pe!=CkMyPe())
547                         return NULL;//Chare not on this PE
548 #ifdef CMK_CHARE_USE_PTR
549                 return pCid->objPtr;
550 #else
551                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
552 #endif
553         }
554 }
555
556 CkpvDeclare(char**,Ck_argv);
557
558 extern "C" char **CkGetArgv(void) {
559         return CkpvAccess(Ck_argv);
560 }
561 extern "C" int CkGetArgc(void) {
562         return CmiGetArgc(CkpvAccess(Ck_argv));
563 }
564
565 /******************** Basic support *****************/
566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
567 {
568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
569         CpvAccess(_currentObj) = (Chare *)obj;
570 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
571 #endif
572   //BIGSIM_OOC DEBUGGING
573   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574   //fflush(stdout);
575 #if CMK_CHARMDEBUG
576   CpdBeforeEp(epIdx, obj, msg);
577 #endif    
578   _entryTable[epIdx]->call(msg, obj);
579 #if CMK_CHARMDEBUG
580   CpdAfterEp(epIdx);
581 #endif
582   if (_entryTable[epIdx]->noKeep)
583   { /* Method doesn't keep/delete the message, so we have to: */
584     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585   }
586 }
587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
588 {
589   //BIGSIM_OOC DEBUGGING
590   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
591   //fflush(stdout);
592
593   void *deliverMsg;
594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
595         CpvAccess(_currentObj) = (Chare *)obj;
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Deliver a read-only copy of the message */
599     deliverMsg=(void *)msg;
600   } else
601   { /* Method needs a copy of the message to keep/delete */
602     void *oldMsg=(void *)msg;
603     deliverMsg=CkCopyMsg(&oldMsg);
604 #if CMK_ERROR_CHECKING
605     if (oldMsg!=msg)
606       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
607 #endif
608   }
609 #if CMK_CHARMDEBUG
610   CpdBeforeEp(epIdx, obj, (void*)msg);
611 #endif
612   _entryTable[epIdx]->call(deliverMsg, obj);
613 #if CMK_CHARMDEBUG
614   CpdAfterEp(epIdx);
615 #endif
616 }
617
618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
619 {
620   register void *msg = EnvToUsr(env);
621   _SET_USED(env, 0);
622   CkDeliverMessageFree(epIdx,msg,obj);
623 }
624
625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 {
627
628 #if CMK_TRACE_ENABLED 
629   if (_entryTable[epIdx]->traceEnabled) {
630     _TRACE_BEGIN_EXECUTE(env);
631     _invokeEntryNoTrace(epIdx,env,obj);
632     _TRACE_END_EXECUTE();
633   }
634   else
635 #endif
636     _invokeEntryNoTrace(epIdx,env,obj);
637
638 }
639
640 /********************* Creation ********************/
641
642 extern "C"
643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
644 {
645   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
646   envelope *env = UsrToEnv(msg);
647   _CHECK_USED(env);
648   if(pCid == 0) {
649     env->setMsgtype(NewChareMsg);
650   } else {
651     pCid->onPE = (-(CkMyPe()+1));
652     //  pCid->magic = _GETIDX(cIdx);
653     pCid->objPtr = (void *) new VidBlock();
654     _MEMCHECK(pCid->objPtr);
655     env->setMsgtype(NewVChareMsg);
656     env->setVidPtr(pCid->objPtr);
657 #ifndef CMK_CHARE_USE_PTR
658     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
659     int idx = CkpvAccess(vidblocks).size()-1;
660     pCid->objPtr = (void *)(CmiIntPtr)idx;
661     env->setVidPtr((void *)(CmiIntPtr)idx);
662 #endif
663   }
664   env->setEpIdx(eIdx);
665   env->setByPe(CkMyPe());
666   env->setSrcPe(CkMyPe());
667   CmiSetHandler(env, _charmHandlerIdx);
668   _TRACE_CREATION_1(env);
669   CpvAccess(_qd)->create();
670   _STATS_RECORD_CREATE_CHARE_1();
671   _SET_USED(env, 1);
672   if(destPE == CK_PE_ANY)
673     env->setForAnyPE(1);
674   else
675     env->setForAnyPE(0);
676   _CldEnqueue(destPE, env, _infoIdx);
677   _TRACE_CREATION_DONE(1);
678 }
679
680 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
681 {
682   register int gIdx = _entryTable[epIdx]->chareIdx;
683   register void *obj = malloc(_chareTable[gIdx]->size);
684   _MEMCHECK(obj);
685   setMemoryTypeChare(obj);
686   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
687   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
688   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
689   CkpvAccess(_groupIDTable)->push_back(groupID);
690   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
691   if(ptrq) {
692     void *pending;
693     while((pending=ptrq->deq())!=0)
694       _CldEnqueue(CkMyPe(), pending, _infoIdx);
695 //    delete ptrq;
696       CkpvAccess(_groupTable)->find(groupID).clearPending();
697   }
698   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
699
700   CkpvAccess(_currentGroup) = groupID;
701   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
702 #ifndef CMK_CHARE_USE_PTR
703   //((Chare *)obj)->chareIdx = -1;
704   CkpvAccess(currentChareIdx) = -1;
705 #endif
706   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
707   _STATS_RECORD_PROCESS_GROUP_1();
708 }
709
710 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
711 {
712   register int gIdx = _entryTable[epIdx]->chareIdx;
713   int objSize=_chareTable[gIdx]->size;
714   register void *obj = malloc(objSize);
715   _MEMCHECK(obj);
716   setMemoryTypeChare(obj);
717   CkpvAccess(_currentGroup) = groupID;
718
719 // Now that the NodeGroup is created, add it to the table.
720 //  NodeGroups can be accessed by multiple processors, so
721 //  this is in the opposite order from groups - invoking the constructor
722 //  before registering it.
723 // User may call CkLocalNodeBranch() inside the nodegroup constructor
724 //  store nodegroup into _currentNodeGroupObj
725   CkpvAccess(_currentNodeGroupObj) = obj;
726 #ifndef CMK_CHARE_USE_PTR
727   //((Chare *)obj)->chareIdx = -1;
728   CkpvAccess(currentChareIdx) = -1;
729 #endif
730   _invokeEntryNoTrace(epIdx,env,obj);
731   CkpvAccess(_currentNodeGroupObj) = NULL;
732   _STATS_RECORD_PROCESS_NODE_GROUP_1();
733
734   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
735   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
736   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
737   CksvAccess(_nodeGroupIDTable).push_back(groupID);
738
739   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
740   if(ptrq) {
741     void *pending;
742     while((pending=ptrq->deq())!=0)
743       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
744 //    delete ptrq;
745       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
746   }
747   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
748 }
749
750 void _createGroup(CkGroupID groupID, envelope *env)
751 {
752   _CHECK_USED(env);
753   _SET_USED(env, 1);
754   register int epIdx = env->getEpIdx();
755   int gIdx = _entryTable[epIdx]->chareIdx;
756   CkNodeGroupID rednMgr;
757   if(_chareTable[gIdx]->isIrr == 0){
758                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
759                 rednMgr = rednMgrProxy;
760 //              rednMgrProxy.setAttachedGroup(groupID);
761   }else{
762         rednMgr.setZero();
763   }
764   env->setGroupNum(groupID);
765   env->setSrcPe(CkMyPe());
766   env->setRednMgr(rednMgr);
767   env->setGroupEpoch(CkpvAccess(_charmEpoch));
768
769   if(CkNumPes()>1) {
770     CkPackMessage(&env);
771     CmiSetHandler(env, _bocHandlerIdx);
772     _numInitMsgs++;
773     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
774     CpvAccess(_qd)->create(CkNumPes()-1);
775     CkUnpackMessage(&env);
776   }
777   _STATS_RECORD_CREATE_GROUP_1();
778   CkCreateLocalGroup(groupID, epIdx, env);
779 }
780
781 void _createNodeGroup(CkGroupID groupID, envelope *env)
782 {
783   _CHECK_USED(env);
784   _SET_USED(env, 1);
785   register int epIdx = env->getEpIdx();
786   env->setGroupNum(groupID);
787   env->setSrcPe(CkMyPe());
788   env->setGroupEpoch(CkpvAccess(_charmEpoch));
789   if(CkNumNodes()>1) {
790     CkPackMessage(&env);
791     CmiSetHandler(env, _bocHandlerIdx);
792     _numInitMsgs++;
793     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
794     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
795     CpvAccess(_qd)->create(CkNumNodes()-1);
796     CkUnpackMessage(&env);
797   }
798   _STATS_RECORD_CREATE_NODE_GROUP_1();
799   CkCreateLocalNodeGroup(groupID, epIdx, env);
800 }
801
802 // new _groupCreate
803
804 static CkGroupID _groupCreate(envelope *env)
805 {
806   register CkGroupID groupNum;
807
808   // check CkMyPe(). if it is 0 then idx is _numGroups++
809   // if not, then something else...
810   if(CkMyPe() == 0)
811      groupNum.idx = CkpvAccess(_numGroups)++;
812   else
813      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
814   _createGroup(groupNum, env);
815   return groupNum;
816 }
817
818 // new _nodeGroupCreate
819 static CkGroupID _nodeGroupCreate(envelope *env)
820 {
821   register CkGroupID groupNum;
822   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
823   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
824           groupNum.idx = CksvAccess(_numNodeGroups)++;
825    else
826           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
827   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
828   _createNodeGroup(groupNum, env);
829   return groupNum;
830 }
831
832 /**** generate the group idx when group is creator pe is not pe0
833  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
834  **** remaining bits contain the group creator processor number and
835  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
836
837 int _getGroupIdx(int numNodes,int myNode,int numGroups)
838 {
839         int idx;
840         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
841         int n = 32 - (x+1);                                     // number of bits remaining for the index
842         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
843                                                                 // then add the next available index
844         // of course this won't work when int is 8 bytes long on T3E
845         //idx |= 0x80000000;                                      // set the most significant bit to 1
846         idx = - idx;
847                                                                 // if int is not 32 bits, wouldn't this be wrong?
848         return idx;
849 }
850
851 extern "C"
852 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
853 {
854   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
855   register envelope *env = UsrToEnv(msg);
856   env->setMsgtype(BocInitMsg);
857   env->setEpIdx(eIdx);
858   env->setSrcPe(CkMyPe());
859   _TRACE_CREATION_N(env, CkNumPes());
860   CkGroupID gid = _groupCreate(env);
861   _TRACE_CREATION_DONE(1);
862   return gid;
863 }
864
865 extern "C"
866 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
867 {
868   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
869   register envelope *env = UsrToEnv(msg);
870   env->setMsgtype(NodeBocInitMsg);
871   env->setEpIdx(eIdx);
872   env->setSrcPe(CkMyPe());
873   _TRACE_CREATION_N(env, CkNumNodes());
874   CkGroupID gid = _nodeGroupCreate(env);
875   _TRACE_CREATION_DONE(1);
876   return gid;
877 }
878
879 static inline void *_allocNewChare(envelope *env, int &idx)
880 {
881   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
882   void *tmp=malloc(_chareTable[chareIdx]->size);
883   _MEMCHECK(tmp);
884 #ifndef CMK_CHARE_USE_PTR
885   CkpvAccess(chare_objs).push_back(tmp);
886   CkpvAccess(chare_types).push_back(chareIdx);
887   idx = CkpvAccess(chare_objs).size()-1;
888 #endif
889   setMemoryTypeChare(tmp);
890   return tmp;
891 }
892
893 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
894 {
895   int idx;
896   register void *obj = _allocNewChare(env, idx);
897 #ifndef CMK_CHARE_USE_PTR
898   //((Chare *)obj)->chareIdx = idx;
899   CkpvAccess(currentChareIdx) = idx;
900 #endif
901   _invokeEntry(env->getEpIdx(),env,obj);
902 }
903
904 void CkCreateLocalChare(int epIdx, envelope *env)
905 {
906   env->setEpIdx(epIdx);
907   _processNewChareMsg(NULL, env);
908 }
909
910 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
911 {
912   int idx;
913   register void *obj = _allocNewChare(env, idx);
914   register CkChareID *pCid = (CkChareID *)
915       _allocMsg(FillVidMsg, sizeof(CkChareID));
916   pCid->onPE = CkMyPe();
917 #ifndef CMK_CHARE_USE_PTR
918   pCid->objPtr = (void*)(CmiIntPtr)idx;
919 #else
920   pCid->objPtr = obj;
921 #endif
922   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
923   register envelope *ret = UsrToEnv(pCid);
924   ret->setVidPtr(env->getVidPtr());
925   register int srcPe = env->getByPe();
926   ret->setSrcPe(CkMyPe());
927   CmiSetHandler(ret, _charmHandlerIdx);
928   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
929 #ifndef CMK_CHARE_USE_PTR
930   // register the remote vidblock for deletion when chare is deleted
931   CkChareID vid;
932   vid.onPE = srcPe;
933   vid.objPtr = env->getVidPtr();
934   CkpvAccess(vmap)[idx] = vid;    
935 #endif
936   CpvAccess(_qd)->create();
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
942 }
943
944 /************** Receive: Chares *************/
945
946 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
947 {
948   register int epIdx = env->getEpIdx();
949   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
950   register void *obj;
951   if (mainIdx != -1)  {           // mainchare
952     CmiAssert(CkMyPe()==0);
953     obj = _mainTable[mainIdx]->getObj();
954   }
955   else {
956 #ifndef CMK_CHARE_USE_PTR
957     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
958       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
959     else
960       obj = env->getObjPtr();
961 #else
962     obj = env->getObjPtr();
963 #endif
964   }
965   _invokeEntry(epIdx,env,obj);
966 }
967
968 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
969 {
970   register int epIdx = env->getEpIdx();
971   register void *obj = env->getObjPtr();
972   _invokeEntry(epIdx,env,obj);
973 }
974
975 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
976 {
977 #ifndef CMK_CHARE_USE_PTR
978   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
979 #else
980   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
981   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
982 #endif
983   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
984   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
985   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
986   CmiFree(env);
987 }
988
989 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
990 {
991 #ifndef CMK_CHARE_USE_PTR
992   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
993 #else
994   VidBlock *vptr = (VidBlock *) env->getVidPtr();
995   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
996 #endif
997   _SET_USED(env, 1);
998   vptr->send(env);
999 }
1000
1001 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1002 {
1003 #ifndef CMK_CHARE_USE_PTR
1004   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1005   delete vptr;
1006   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1007 #endif
1008   CmiFree(env);
1009 }
1010
1011 /************** Receive: Groups ****************/
1012
1013 /**
1014  Return a pointer to the local BOC of "groupID".
1015  The message "env" passed in has some known dependency on this groupID
1016  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1017  Therefore, if the return value is NULL, this function buffers the massage so that
1018  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1019  The message passed in must have its handlers correctly set so that it can be
1020  scheduled again.
1021 */
1022 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1023 {
1024
1025         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1026         IrrGroup *obj = ck->localBranch(groupID);
1027         if (obj==NULL) { /* groupmember not yet created: stash message */
1028                 ck->getGroupTable()->find(groupID).enqMsg(env);
1029         }
1030         else { /* will be able to process message */
1031                 ck->process();
1032         }
1033         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1034         return obj;
1035 }
1036
1037 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1038 {
1039 #if CMK_LBDB_ON
1040   // if there is a running obj being measured, stop it temporarily
1041   LDObjHandle objHandle;
1042   int objstopped = 0;
1043   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1044   if (the_lbdb->RunningObject(&objHandle)) {
1045     objstopped = 1;
1046     the_lbdb->ObjectStop(objHandle);
1047   }
1048 #endif
1049   _invokeEntry(epIdx,env,obj);
1050 #if CMK_LBDB_ON
1051   if (objstopped) the_lbdb->ObjectStart(objHandle);
1052 #endif
1053   _STATS_RECORD_PROCESS_BRANCH_1();
1054 }
1055
1056 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1057 {
1058   register CkGroupID groupID =  env->getGroupNum();
1059   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1060   if(obj) {
1061     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1062   }
1063 }
1064
1065 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1066 {
1067   env->setMsgtype(ForChareMsg);
1068   env->setObjPtr(obj);
1069   _processForChareMsg(ck,env);
1070   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1071 }
1072
1073 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1074 {
1075   env->setEpIdx(epIdx);
1076   _deliverForNodeBocMsg(ck,env, obj);
1077 }
1078
1079 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1080 {
1081   register CkGroupID groupID = env->getGroupNum();
1082   register void *obj;
1083
1084   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1085   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1086   if(!obj) { // groupmember not yet created
1087 #if CMK_IMMEDIATE_MSG
1088     if (CmiIsImmediate(env))     // buffer immediate message
1089       CmiDelayImmediate();
1090     else
1091 #endif
1092     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1093     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1094     return;
1095   }
1096   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1097 #if CMK_IMMEDIATE_MSG
1098   if (!CmiIsImmediate(env))
1099 #endif
1100   ck->process();
1101   env->setMsgtype(ForChareMsg);
1102   env->setObjPtr(obj);
1103   _processForChareMsg(ck,env);
1104   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1105 }
1106
1107 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1108 {
1109   register CkGroupID groupID = env->getGroupNum();
1110   register int epIdx = env->getEpIdx();
1111   if (!env->getGroupDep().isZero()) {      // dependence
1112     CkGroupID dep = env->getGroupDep();
1113     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1114     if (obj == NULL) return;
1115   }
1116   else
1117     ck->process();
1118   CkCreateLocalGroup(groupID, epIdx, env);
1119 }
1120
1121 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1122 {
1123   register CkGroupID groupID = env->getGroupNum();
1124   register int epIdx = env->getEpIdx();
1125   CkCreateLocalNodeGroup(groupID, epIdx, env);
1126 }
1127
1128 /************** Receive: Arrays *************/
1129
1130 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1131   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1132   if (mgr) {
1133     _SET_USED(env, 0);
1134     mgr->insertElement((CkMessage *)EnvToUsr(env));
1135   }
1136 }
1137 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1138   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1139   if (mgr) {
1140     _SET_USED(env, 0);
1141     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1142   }
1143 }
1144
1145 //BIGSIM_OOC DEBUGGING
1146 #define TELLMSGTYPE(x) //x
1147
1148 /**
1149  * This is the main converse-level handler used by all of Charm++.
1150  *
1151  * \addtogroup CriticalPathFramework
1152  */
1153 void _processHandler(void *converseMsg,CkCoreState *ck)
1154 {
1155   register envelope *env = (envelope *) converseMsg;
1156
1157   MESSAGE_PHASE_CHECK(env);
1158
1159 //#if CMK_RECORD_REPLAY
1160   if (ck->watcher!=NULL) {
1161     if (!ck->watcher->processMessage(&env,ck)) return;
1162   }
1163 //#endif
1164 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1165         Chare *obj=NULL;
1166         CkObjID sender;
1167         MCount SN;
1168         MlogEntry *entry=NULL;
1169         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1170         env->getMsgtype() == ForArrayEltMsg){
1171                 sender = env->sender;
1172                 SN = env->SN;
1173                 int result = preProcessReceivedMessage(env,&obj,&entry);
1174                 if(result == 0){
1175                         return;
1176                 }
1177         }
1178 #endif
1179
1180 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1181   //  CkPrintf("START\n");
1182   criticalPath_start(env);
1183 #endif
1184
1185
1186   switch(env->getMsgtype()) {
1187 // Group support
1188     case BocInitMsg :
1189       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1190       // QD processing moved inside _processBocInitMsg because it is conditional
1191       //ck->process(); 
1192       if(env->isPacked()) CkUnpackMessage(&env);
1193       _processBocInitMsg(ck,env);
1194       break;
1195     case NodeBocInitMsg :
1196       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1197       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1198       _processNodeBocInitMsg(ck,env);
1199       break;
1200     case ForBocMsg :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1202       // QD processing moved inside _processForBocMsg because it is conditional
1203       if(env->isPacked()) CkUnpackMessage(&env);
1204       _processForBocMsg(ck,env);
1205       // stats record moved inside _processForBocMsg because it is conditional
1206       break;
1207     case ForNodeBocMsg :
1208       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1209       // QD processing moved to _processForNodeBocMsg because it is conditional
1210       if(env->isPacked()) CkUnpackMessage(&env);
1211       _processForNodeBocMsg(ck,env);
1212       // stats record moved to _processForNodeBocMsg because it is conditional
1213       break;
1214
1215 // Array support
1216     case ArrayEltInitMsg:
1217       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1218       if(env->isPacked()) CkUnpackMessage(&env);
1219       _processArrayEltInitMsg(ck,env);
1220       break;
1221     case ForArrayEltMsg:
1222       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1223       if(env->isPacked()) CkUnpackMessage(&env);
1224       _processArrayEltMsg(ck,env);
1225       break;
1226
1227 // Chare support
1228     case NewChareMsg :
1229       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1230       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1231       _processNewChareMsg(ck,env);
1232       _STATS_RECORD_PROCESS_CHARE_1();
1233       break;
1234     case NewVChareMsg :
1235       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1236       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1237       _processNewVChareMsg(ck,env);
1238       _STATS_RECORD_PROCESS_CHARE_1();
1239       break;
1240     case ForChareMsg :
1241       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1242       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1243       _processForPlainChareMsg(ck,env);
1244       _STATS_RECORD_PROCESS_MSG_1();
1245       break;
1246     case ForVidMsg   :
1247       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1248       ck->process();
1249       _processForVidMsg(ck,env);
1250       break;
1251     case FillVidMsg  :
1252       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1253       ck->process();
1254       _processFillVidMsg(ck,env);
1255       break;
1256     case DeleteVidMsg  :
1257       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1258       ck->process();
1259       _processDeleteVidMsg(ck,env);
1260       break;
1261
1262     default:
1263       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1264   }
1265 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1266         if(obj != NULL){
1267                 postProcessReceivedMessage(obj,sender,SN,entry);
1268         }
1269 #endif
1270
1271
1272 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1273   criticalPath_end();
1274   //  CkPrintf("STOP\n");
1275 #endif
1276
1277
1278 }
1279
1280
1281 /******************** Message Send **********************/
1282
1283 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1284              int *queueing, int *priobits, unsigned int **prioptr)
1285 {
1286   register envelope *env = (envelope *)converseMsg;
1287   *pfn = (CldPackFn)CkPackMessage;
1288   *len = env->getTotalsize();
1289   *queueing = env->getQueueing();
1290   *priobits = env->getPriobits();
1291   *prioptr = (unsigned int *) env->getPrioPtr();
1292 }
1293
1294 void CkPackMessage(envelope **pEnv)
1295 {
1296   register envelope *env = *pEnv;
1297   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1298     register void *msg = EnvToUsr(env);
1299     _TRACE_BEGIN_PACK();
1300     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1301     _TRACE_END_PACK();
1302     env=UsrToEnv(msg);
1303     env->setPacked(1);
1304     *pEnv = env;
1305   }
1306 }
1307
1308 void CkUnpackMessage(envelope **pEnv)
1309 {
1310   register envelope *env = *pEnv;
1311   register int msgIdx = env->getMsgIdx();
1312   if(env->isPacked()) {
1313     register void *msg = EnvToUsr(env);
1314     _TRACE_BEGIN_UNPACK();
1315     msg = _msgTable[msgIdx]->unpack(msg);
1316     _TRACE_END_UNPACK();
1317     env=UsrToEnv(msg);
1318     env->setPacked(0);
1319     *pEnv = env;
1320   }
1321 }
1322
1323 //There's no reason for most messages to go through the Cld--
1324 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1325 // Thus these accellerated versions of the Cld calls.
1326 #if CMK_OBJECT_QUEUE_AVAILABLE
1327 static int index_objectQHandler;
1328 #endif
1329 int index_tokenHandler;
1330 int index_skipCldHandler;
1331
1332 void _skipCldHandler(void *converseMsg)
1333 {
1334   register envelope *env = (envelope *)(converseMsg);
1335   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1336 #if CMK_GRID_QUEUE_AVAILABLE
1337   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1338     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1339                        env, env->getQueueing (), env->getPriobits (),
1340                        (unsigned int *) env->getPrioPtr ());
1341   } else {
1342     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1343                        env, env->getQueueing (), env->getPriobits (),
1344                        (unsigned int *) env->getPrioPtr ());
1345   }
1346 #else
1347   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1348         env, env->getQueueing(),env->getPriobits(),
1349         (unsigned int *)env->getPrioPtr());
1350 #endif
1351 }
1352
1353
1354 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1355 // Made non-static to be used by ckmessagelogging
1356 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1357 {
1358 #if CMK_CHARMDEBUG
1359   if (!ConverseDeliver(pe)) {
1360     CmiFree(env);
1361     return;
1362   }
1363 #endif
1364   if(pe == CkMyPe() ){
1365     if(!CmiNodeAlive(CkMyPe())){
1366         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1367 //      return;
1368     }
1369   }
1370   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1371 #if CMK_OBJECT_QUEUE_AVAILABLE
1372     Chare *obj = CkFindObjectPtr(env);
1373     if (obj && obj->CkGetObjQueue().queue()) {
1374       _enqObjQueue(obj, env);
1375     }
1376     else
1377 #endif
1378     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1379         env, env->getQueueing(),env->getPriobits(),
1380         (unsigned int *)env->getPrioPtr());
1381   } else {
1382     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1383       CkPackMessage(&env);
1384     int len=env->getTotalsize();
1385     CmiSetXHandler(env,CmiGetHandler(env));
1386 #if CMK_OBJECT_QUEUE_AVAILABLE
1387     CmiSetHandler(env,index_objectQHandler);
1388 #else
1389     CmiSetHandler(env,index_skipCldHandler);
1390 #endif
1391     CmiSetInfo(env,infoFn);
1392     if (pe==CLD_BROADCAST) {
1393 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1394                         CmiSyncBroadcast(len, (char *)env);
1395 #else
1396                         CmiSyncBroadcastAndFree(len, (char *)env); 
1397 #endif
1398
1399 }
1400     else if (pe==CLD_BROADCAST_ALL) { 
1401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1402                         CmiSyncBroadcastAll(len, (char *)env);
1403 #else
1404                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1405 #endif
1406
1407 }
1408     else{
1409 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1410                         CmiSyncSend(pe, len, (char *)env);
1411 #else
1412                         CmiSyncSendAndFree(pe, len, (char *)env);
1413 #endif
1414
1415                 }
1416   }
1417 }
1418
1419 #if CMK_BIGSIM_CHARM
1420 #   define  _skipCldEnqueue   _CldEnqueue
1421 #endif
1422
1423 // by pass Charm++ priority queue, send as Converse message
1424 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1425 {
1426 #if CMK_CHARMDEBUG
1427   if (!ConverseDeliver(-1)) {
1428     CmiFree(env);
1429     return;
1430   }
1431 #endif
1432   CkPackMessage(&env);
1433   int len=env->getTotalsize();
1434   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1435 }
1436
1437 static void _noCldEnqueue(int pe, envelope *env)
1438 {
1439 /*
1440   if (pe == CkMyPe()) {
1441     CmiHandleMessage(env);
1442   } else
1443 */
1444 #if CMK_CHARMDEBUG
1445   if (!ConverseDeliver(pe)) {
1446     CmiFree(env);
1447     return;
1448   }
1449 #endif
1450   CkPackMessage(&env);
1451   int len=env->getTotalsize();
1452   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1453   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1454   else CmiSyncSendAndFree(pe, len, (char *)env);
1455 }
1456
1457 //static void _noCldNodeEnqueue(int node, envelope *env)
1458 //Made non-static to be used by ckmessagelogging
1459 void _noCldNodeEnqueue(int node, envelope *env)
1460 {
1461 /*
1462   if (node == CkMyNode()) {
1463     CmiHandleMessage(env);
1464   } else {
1465 */
1466 #if CMK_CHARMDEBUG
1467   if (!ConverseDeliver(node)) {
1468     CmiFree(env);
1469     return;
1470   }
1471 #endif
1472   CkPackMessage(&env);
1473   int len=env->getTotalsize();
1474   if (node==CLD_BROADCAST) { 
1475 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1476         CmiSyncNodeBroadcast(len, (char *)env);
1477 #else
1478         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1479 #endif
1480 }
1481   else if (node==CLD_BROADCAST_ALL) { 
1482 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1483                 CmiSyncNodeBroadcastAll(len, (char *)env);
1484 #else
1485                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1486 #endif
1487
1488 }
1489   else {
1490 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1491         CmiSyncNodeSend(node, len, (char *)env);
1492 #else
1493         CmiSyncNodeSendAndFree(node, len, (char *)env);
1494 #endif
1495   }
1496 }
1497
1498 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1499 {
1500   register envelope *env = UsrToEnv(msg);
1501   _CHECK_USED(env);
1502   _SET_USED(env, 1);
1503 #if CMK_REPLAYSYSTEM
1504   setEventID(env);
1505 #endif
1506   env->setMsgtype(ForChareMsg);
1507   env->setEpIdx(eIdx);
1508   env->setSrcPe(CkMyPe());
1509 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1510   criticalPath_send(env);
1511   automaticallySetMessagePriority(env);
1512 #endif
1513 #if CMK_CHARMDEBUG
1514   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1515 #endif
1516 #if CMK_OBJECT_QUEUE_AVAILABLE
1517   CmiSetHandler(env, index_objectQHandler);
1518 #else
1519   CmiSetHandler(env, _charmHandlerIdx);
1520 #endif
1521   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1522     register int pe = -(pCid->onPE+1);
1523     if(pe==CkMyPe()) {
1524 #ifndef CMK_CHARE_USE_PTR
1525       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1526 #else
1527       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1528 #endif
1529       void *objPtr;
1530       if (NULL!=(objPtr=vblk->getLocalChare()))
1531       { //A ready local chare
1532         env->setObjPtr(objPtr);
1533         return pe;
1534       }
1535       else { //The vidblock is not ready-- forget it
1536         vblk->send(env);
1537         return -1;
1538       }
1539     } else { //Valid vidblock for another PE:
1540       env->setMsgtype(ForVidMsg);
1541       env->setVidPtr(pCid->objPtr);
1542       return pe;
1543     }
1544   }
1545   else {
1546     env->setObjPtr(pCid->objPtr);
1547     return pCid->onPE;
1548   }
1549 }
1550
1551 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1552 {
1553   int destPE = _prepareMsg(eIdx, msg, pCid);
1554   if (destPE != -1) {
1555     register envelope *env = UsrToEnv(msg);
1556 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1557     criticalPath_send(env);
1558     automaticallySetMessagePriority(env);
1559 #endif
1560     CmiBecomeImmediate(env);
1561   }
1562   return destPE;
1563 }
1564
1565 extern "C"
1566 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1567 {
1568   if (opts & CK_MSG_INLINE) {
1569     CkSendMsgInline(entryIdx, msg, pCid, opts);
1570     return;
1571   }
1572 #if CMK_ERROR_CHECKING
1573   if (opts & CK_MSG_IMMEDIATE) {
1574     CmiAbort("Immediate message is not allowed in Chare!");
1575   }
1576 #endif
1577   register envelope *env = UsrToEnv(msg);
1578   int destPE=_prepareMsg(entryIdx,msg,pCid);
1579   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1580   // VidBlock was not yet filled). The problem is that the creation was never
1581   // traced later when the VidBlock was filled. One solution is to trace the
1582   // creation here, the other to trace it in VidBlock->msgDeliver().
1583   _TRACE_CREATION_1(env);
1584   if (destPE!=-1) {
1585     CpvAccess(_qd)->create();
1586     if (opts & CK_MSG_SKIP_OR_IMM)
1587       _noCldEnqueue(destPE, env);
1588     else
1589       _CldEnqueue(destPE, env, _infoIdx);
1590   }
1591   _TRACE_CREATION_DONE(1);
1592 }
1593
1594 extern "C"
1595 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1596 {
1597   if (pCid->onPE==CkMyPe())
1598   {
1599     if(!CmiNodeAlive(CkMyPe())){
1600         return;
1601     }
1602 #if CMK_CHARMDEBUG
1603     //Just in case we need to breakpoint or use the envelope in some way
1604     _prepareMsg(entryIndex,msg,pCid);
1605 #endif
1606                 //Just directly call the chare (skip QD handling & scheduler)
1607     register envelope *env = UsrToEnv(msg);
1608     if (env->isPacked()) CkUnpackMessage(&env);
1609     _STATS_RECORD_PROCESS_MSG_1();
1610     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1611   }
1612   else {
1613     //No way to inline a cross-processor message:
1614     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1615   }
1616 }
1617
1618 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1619 {
1620   register envelope *env = UsrToEnv(msg);
1621 #if CMK_ERROR_CHECKING
1622   CkNodeGroupID nodeRedMgr;
1623 #endif
1624   _CHECK_USED(env);
1625   _SET_USED(env, 1);
1626 #if CMK_REPLAYSYSTEM
1627   setEventID(env);
1628 #endif
1629   env->setMsgtype(type);
1630   env->setEpIdx(eIdx);
1631   env->setGroupNum(gID);
1632   env->setSrcPe(CkMyPe());
1633 #if CMK_ERROR_CHECKING
1634   nodeRedMgr.setZero();
1635   env->setRednMgr(nodeRedMgr);
1636 #endif
1637 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1638   criticalPath_send(env);
1639   automaticallySetMessagePriority(env);
1640 #endif
1641 #if CMK_CHARMDEBUG
1642   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1643 #endif
1644   CmiSetHandler(env, _charmHandlerIdx);
1645   return env;
1646 }
1647
1648 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1649 {
1650   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1651 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1652   criticalPath_send(env);
1653   automaticallySetMessagePriority(env);
1654 #endif
1655   CmiBecomeImmediate(env);
1656   return env;
1657 }
1658
1659 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1660                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1661 {
1662   int numPes;
1663   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1664 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1665   sendTicketGroupRequest(env,pe,_infoIdx);
1666 #else
1667   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1668   _TRACE_CREATION_N(env, numPes);
1669   if (opts & CK_MSG_SKIP_OR_IMM)
1670     _noCldEnqueue(pe, env);
1671   else
1672     _skipCldEnqueue(pe, env, _infoIdx);
1673   _TRACE_CREATION_DONE(1);
1674 #endif
1675 }
1676
1677 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1678                            int npes, int *pes)
1679 {
1680   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1681   _TRACE_CREATION_MULTICAST(env, npes, pes);
1682   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1683   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1684 }
1685
1686 extern "C"
1687 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1688 {
1689 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1690   if (destPE==CkMyPe())
1691   {
1692     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1693     return;
1694   }
1695   //Can't inline-- send the usual way
1696   register envelope *env = UsrToEnv(msg);
1697   int numPes;
1698   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1699   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1700   _TRACE_CREATION_N(env, numPes);
1701   _noCldEnqueue(destPE, env);
1702   _STATS_RECORD_SEND_BRANCH_1();
1703   CkpvAccess(_coreState)->create();
1704   _TRACE_CREATION_DONE(1);
1705 #else
1706   // no support for immediate message, send inline
1707   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1708 #endif
1709 }
1710
1711 extern "C"
1712 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1713 {
1714   if (destPE==CkMyPe())
1715   {
1716     if(!CmiNodeAlive(CkMyPe())){
1717         return;
1718     }
1719     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1720     if (obj!=NULL)
1721     { //Just directly call the group:
1722 #if CMK_ERROR_CHECKING
1723       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1724 #else
1725       envelope *env=UsrToEnv(msg);
1726 #endif
1727       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1728       return;
1729     }
1730   }
1731   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1732   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1733 }
1734
1735 extern "C"
1736 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1737 {
1738   if (opts & CK_MSG_INLINE) {
1739     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1740     return;
1741   }
1742   if (opts & CK_MSG_IMMEDIATE) {
1743     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1744     return;
1745   }
1746   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1747   _STATS_RECORD_SEND_BRANCH_1();
1748   CkpvAccess(_coreState)->create();
1749 }
1750
1751 extern "C"
1752 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1753 {
1754 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1755   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1756   _TRACE_CREATION_MULTICAST(env, npes, pes);
1757   _noCldEnqueueMulti(npes, pes, env);
1758   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1759 #else
1760   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1761   CpvAccess(_qd)->create(-npes);
1762 #endif
1763   _STATS_RECORD_SEND_BRANCH_N(npes);
1764   CpvAccess(_qd)->create(npes);
1765 }
1766
1767 extern "C"
1768 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1769 {
1770   if (opts & CK_MSG_IMMEDIATE) {
1771     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1772     return;
1773   }
1774     // normal mesg
1775   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1776   _STATS_RECORD_SEND_BRANCH_N(npes);
1777   CpvAccess(_qd)->create(npes);
1778 }
1779
1780 extern "C"
1781 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1782 {
1783   int npes;
1784   int *pes;
1785   if (opts & CK_MSG_IMMEDIATE) {
1786     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1787     return;
1788   }
1789     // normal mesg
1790   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1791   CmiLookupGroup(grp, &npes, &pes);
1792   _TRACE_CREATION_MULTICAST(env, npes, pes);
1793   _CldEnqueueGroup(grp, env, _infoIdx);
1794   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1795   _STATS_RECORD_SEND_BRANCH_N(npes);
1796   CpvAccess(_qd)->create(npes);
1797 }
1798
1799 extern "C"
1800 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1801 {
1802   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1803   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1804   CpvAccess(_qd)->create(CkNumPes());
1805 }
1806
1807 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1808                 int node=CLD_BROADCAST_ALL, int opts=0)
1809 {
1810   int numPes;
1811   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1812 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1813         sendTicketNodeGroupRequest(env,node,_infoIdx);
1814 #else
1815   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1816   _TRACE_CREATION_N(env, numPes);
1817   if (opts & CK_MSG_SKIP_OR_IMM) {
1818     _noCldNodeEnqueue(node, env);
1819     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1820       CkpvAccess(_coreState)->create(-numPes);
1821     }
1822   }
1823   else
1824     _CldNodeEnqueue(node, env, _infoIdx);
1825   _TRACE_CREATION_DONE(1);
1826 #endif
1827 }
1828
1829 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1830                            int npes, int *nodes)
1831 {
1832   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1833   _TRACE_CREATION_N(env, npes);
1834   for (int i=0; i<npes; i++) {
1835     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1836   }
1837   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1838 }
1839
1840 extern "C"
1841 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1842 {
1843 #if CMK_IMMEDIATE_MSG
1844   if (node==CkMyNode())
1845   {
1846     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1847     return;
1848   }
1849   //Can't inline-- send the usual way
1850   register envelope *env = UsrToEnv(msg);
1851   int numPes;
1852   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1853   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1854   _TRACE_CREATION_N(env, numPes);
1855   _noCldNodeEnqueue(node, env);
1856   _STATS_RECORD_SEND_BRANCH_1();
1857   /* immeidate message is invisible to QD */
1858 //  CkpvAccess(_coreState)->create();
1859   _TRACE_CREATION_DONE(1);
1860 #else
1861   // no support for immediate message, send inline
1862   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1863 #endif
1864 }
1865
1866 extern "C"
1867 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1868 {
1869   if (node==CkMyNode())
1870   {
1871     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1872     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1873     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1874     if (obj!=NULL)
1875     { //Just directly call the group:
1876 #if CMK_ERROR_CHECKING
1877       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1878 #else
1879       envelope *env=UsrToEnv(msg);
1880 #endif
1881       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1882       return;
1883     }
1884   }
1885   //Can't inline-- send the usual way
1886   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1887 }
1888
1889 extern "C"
1890 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1891 {
1892   if (opts & CK_MSG_INLINE) {
1893     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1894     return;
1895   }
1896   if (opts & CK_MSG_IMMEDIATE) {
1897     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1898     return;
1899   }
1900   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1901   _STATS_RECORD_SEND_NODE_BRANCH_1();
1902   CkpvAccess(_coreState)->create();
1903 }
1904
1905 extern "C"
1906 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1907 {
1908 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1909   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1910   _noCldEnqueueMulti(npes, nodes, env);
1911 #else
1912   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1913   CpvAccess(_qd)->create(-npes);
1914 #endif
1915   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1916   CpvAccess(_qd)->create(npes);
1917 }
1918
1919 extern "C"
1920 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1921 {
1922   if (opts & CK_MSG_IMMEDIATE) {
1923     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1924     return;
1925   }
1926     // normal mesg
1927   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1928   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1929   CpvAccess(_qd)->create(npes);
1930 }
1931
1932 extern "C"
1933 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1934 {
1935   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1936   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1937   CpvAccess(_qd)->create(CkNumNodes());
1938 }
1939
1940 //Needed by delegation manager:
1941 extern "C"
1942 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1943 { return _prepareMsg(eIdx,msg,pCid); }
1944 extern "C"
1945 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1946 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1947 extern "C"
1948 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1949 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1950
1951 void _ckModuleInit(void) {
1952         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1953 #if CMK_OBJECT_QUEUE_AVAILABLE
1954         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1955 #endif
1956         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1957         CkpvInitialize(TokenPool*, _tokenPool);
1958         CkpvAccess(_tokenPool) = new TokenPool;
1959 }
1960
1961
1962 /************** Send: Arrays *************/
1963
1964 extern void CkArrayManagerInsert(int onPe,void *msg);
1965 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1966
1967 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1968 {
1969   _CHECK_USED(env);
1970   _SET_USED(env, 1);
1971   env->setMsgtype(type);
1972 #if CMK_CHARMDEBUG
1973   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1974 #endif
1975   CmiSetHandler(env, _charmHandlerIdx);
1976   CpvAccess(_qd)->create();
1977 }
1978
1979 extern "C"
1980 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1981   register envelope *env = UsrToEnv(msg);
1982   env->getsetArrayMgr()=aID;
1983   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1984   _CldEnqueue(pe, env, _infoIdx);
1985 }
1986
1987 extern "C"
1988 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1989   register envelope *env = UsrToEnv(msg);
1990   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1991 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1992    sendTicketArrayRequest(env,pe,_infoIdx);
1993 #else
1994   if (opts & CK_MSG_IMMEDIATE)
1995     CmiBecomeImmediate(env);
1996   if (opts & CK_MSG_SKIP_OR_IMM)
1997     _noCldEnqueue(pe, env);
1998   else
1999     _skipCldEnqueue(pe, env, _infoIdx);
2000 #endif
2001 }
2002
2003 class ElementDestroyer : public CkLocIterator {
2004 private:
2005         CkLocMgr *locMgr;
2006 public:
2007         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2008         void addLocation(CkLocation &loc) {
2009           loc.destroyAll();
2010         }
2011 };
2012
2013 void CkDeleteChares() {
2014   int i;
2015   int numGroups = CkpvAccess(_groupIDTable)->size();
2016
2017   // delete all plain chares
2018 #ifndef CMK_CHARE_USE_PTR
2019   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2020         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2021         delete obj;
2022         CkpvAccess(chare_objs)[i] = NULL;
2023   }
2024   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2025         VidBlock *obj = CkpvAccess(vidblocks)[i];
2026         delete obj;
2027         CkpvAccess(vidblocks)[i] = NULL;
2028   }
2029 #endif
2030
2031   // delete all array elements
2032   for(i=0;i<numGroups;i++) {
2033     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2034     if(obj && obj->isLocMgr())  {
2035       CkLocMgr *mgr = (CkLocMgr*)obj;
2036       ElementDestroyer destroyer(mgr);
2037       mgr->iterate(destroyer);
2038     }
2039   }
2040
2041   // delete all groups
2042   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2043   for(i=0;i<numGroups;i++) {
2044     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2045     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2046     if (obj) delete obj;
2047   }
2048   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2049
2050   // delete all node groups
2051   if (CkMyRank() == 0) {
2052     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2053     for(i=0;i<numNodeGroups;i++) {
2054       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2055       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2056       if (obj) delete obj;
2057     }
2058   }
2059 }
2060
2061 //------------------- Message Watcher (record/replay) ----------------
2062
2063 #include "crc32.h"
2064
2065 CkpvDeclare(int, envelopeEventID);
2066 int _recplay_crc = 0;
2067 int _recplay_checksum = 0;
2068 int _recplay_logsize = 1024*1024;
2069
2070 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2071 #define REPLAYDEBUG(args) /* empty */
2072
2073 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2074
2075 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2076
2077 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2078
2079     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2080     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2081     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2082     FILE *f=fopen(fName,permissions);
2083     REPLAYDEBUG("openReplayfile "<<fName);
2084     if (f==NULL) {
2085         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2086             CkMyPe(),fName,permissions);
2087         CkAbort("openReplayFile> Could not open replay file");
2088     }
2089     return f;
2090 }
2091
2092 #include "BaseLB.h" /* For LBMigrateMsg message */
2093
2094 class CkMessageRecorder : public CkMessageWatcher {
2095   char *buffer;
2096   unsigned int curpos;
2097   bool firstOpen;
2098 public:
2099   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2100   ~CkMessageRecorder() {
2101     flushLog(0);
2102     fprintf(f,"-1 -1 -1 ");
2103     fclose(f);
2104     delete[] buffer;
2105 #if 0
2106     FILE *stsfp = fopen("sts", "w");
2107     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2108     traceWriteSTS(stsfp, 0);
2109     fclose(stsfp);
2110 #endif
2111     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2112   }
2113
2114 private:
2115   void flushLog(int verbose=1) {
2116     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2117     fprintf(f, "%s", buffer);
2118     curpos=0;
2119   }
2120   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2121     if ((*envptr)->getEvent()) {
2122       bool wasPacked = (*envptr)->isPacked();
2123       if (!wasPacked) CkPackMessage(envptr);
2124       envelope *env = *envptr;
2125       unsigned int crc1=0, crc2=0;
2126       if (_recplay_crc) {
2127         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2128         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2129         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2130       } else if (_recplay_checksum) {
2131         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2132         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2133       }
2134       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2135       if (curpos > _recplay_logsize-128) flushLog();
2136       if (!wasPacked) CkUnpackMessage(envptr);
2137     }
2138     return CmiTrue;
2139   }
2140   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2141     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2142     if (curpos > _recplay_logsize-128) flushLog();
2143     return CmiTrue;
2144   }
2145   
2146   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2147     FILE *f;
2148     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2149     else f = openReplayFile("ckreplay_",".lb","a");
2150     firstOpen = false;
2151     if (f != NULL) {
2152       PUP::toDisk p(f);
2153       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2154       (*msg)->pup(p);
2155       fclose(f);
2156     }
2157     return CmiTrue;
2158   }
2159 };
2160
2161 class CkMessageDetailRecorder : public CkMessageWatcher {
2162 public:
2163   CkMessageDetailRecorder(FILE *f_) {
2164     f=f_;
2165     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2166      * The value of 'x' is the pointer size.
2167      */
2168     CmiUInt2 little = sizeof(void*);
2169     fwrite(&little, 2, 1, f);
2170   }
2171   ~CkMessageDetailRecorder() {fclose(f);}
2172 private:
2173   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2174     bool wasPacked = (*envptr)->isPacked();
2175     if (!wasPacked) CkPackMessage(envptr);
2176     envelope *env = *envptr;
2177     CmiUInt4 size = env->getTotalsize();
2178     fwrite(&size, 4, 1, f);
2179     fwrite(env, env->getTotalsize(), 1, f);
2180     if (!wasPacked) CkUnpackMessage(envptr);
2181     return CmiTrue;
2182   }
2183 };
2184
2185 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2186 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2187
2188 #if CMK_BIGSIM_CHARM
2189 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2190                                    int pb,unsigned int *prio);
2191 #endif
2192
2193 class CkMessageReplay : public CkMessageWatcher {
2194   int counter;
2195         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2196         int nextEP;
2197         unsigned int crc1, crc2;
2198         FILE *lbFile;
2199         /// Read the next message we need from the file:
2200         void getNext(void) {
2201           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2202           if (nextSize > 0) {
2203             // We are reading a regular message
2204             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2205               CkAbort("CkMessageReplay> Syntax error reading replay file");
2206             }
2207             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2208           } else if (nextSize == -2) {
2209             // We are reading a special message (right now only thread awaken)
2210             // Nothing to do since we have already read all info
2211             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2212           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2213             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2214             CkAbort("CkMessageReplay> Unrecognized input");
2215           }
2216             /*
2217                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2218                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2219                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2220                 }
2221                 */
2222                 counter++;
2223         }
2224         /// If this is the next message we need, advance and return CmiTrue.
2225         CmiBool isNext(envelope *env) {
2226                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2227                 if (nextEvent!=env->getEvent()) return CmiFalse;
2228                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2229 #if 1
2230                 if (nextEP != env->getEpIdx()) {
2231                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2232                         return CmiFalse;
2233                 }
2234 #endif
2235 #if ! CMK_BIGSIM_CHARM
2236                 if (nextSize!=env->getTotalsize())
2237                 {
2238                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2239                         return CmiFalse;
2240                 }
2241                 if (_recplay_crc || _recplay_checksum) {
2242                   bool wasPacked = env->isPacked();
2243                   if (!wasPacked) CkPackMessage(&env);
2244                   if (_recplay_crc) {
2245                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2246                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2247                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2248                     if (crcnew1 != crc1) {
2249                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2250                     }
2251                     if (crcnew2 != crc2) {
2252                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2253                     }
2254                   } else if (_recplay_checksum) {
2255             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2256             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2257             if (crcnew1 != crc1) {
2258               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2259             }
2260             if (crcnew2 != crc2) {
2261               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2262             }               
2263                   }
2264                   if (!wasPacked) CkUnpackMessage(&env);
2265                 }
2266 #endif
2267                 return CmiTrue;
2268         }
2269         CmiBool isNext(CthThreadToken *token) {
2270           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2271           return CmiFalse;
2272         }
2273
2274         /// This is a (short) list of messages we aren't yet ready for:
2275         CkQ<envelope *> delayedMessages;
2276         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2277         CkQ<CthThreadToken *> delayedTokens;
2278
2279         /// Try to flush out any delayed messages
2280         void flush(void) {
2281           if (nextSize>0) {
2282                 int len=delayedMessages.length();
2283                 for (int i=0;i<len;i++) {
2284                         envelope *env=delayedMessages.deq();
2285                         if (isNext(env)) { /* this is the next message: process it */
2286                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2287                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2288                                 return;
2289                         }
2290                         else /* Not ready yet-- put it back in the
2291                                 queue */
2292                           {
2293                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2294                                 delayedMessages.enq(env);
2295                           }
2296                 }
2297           } else if (nextSize==-2) {
2298             int len=delayedTokens.length();
2299             for (int i=0;i<len;++i) {
2300               CthThreadToken *token=delayedTokens.deq();
2301               if (isNext(token)) {
2302             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2303 #if ! CMK_BIGSIM_CHARM
2304                 CsdEnqueueLifo((void*)token);
2305 #else
2306                 CthEnqueueBigSimThread(token,0,0,NULL);
2307 #endif
2308                 return;
2309               } else {
2310             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2311                 delayedTokens.enq(token);
2312               }
2313             }
2314           }
2315         }
2316
2317 public:
2318         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2319           counter=0;
2320           f=f_;
2321           getNext();
2322           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2323           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2324         }
2325         ~CkMessageReplay() {fclose(f);}
2326
2327 private:
2328         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2329           bool wasPacked = (*envptr)->isPacked();
2330           if (!wasPacked) CkPackMessage(envptr);
2331           envelope *env = *envptr;
2332           //CkAssert(*(int*)env == 0x34567890);
2333           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2334                 if (env->getEvent() == 0) return CmiTrue;
2335                 if (isNext(env)) { /* This is the message we were expecting */
2336                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2337                         getNext(); /* Advance over this message */
2338                         flush(); /* try to process queued-up stuff */
2339                         if (!wasPacked) CkUnpackMessage(envptr);
2340                         return CmiTrue;
2341                 }
2342 #if CMK_SMP
2343                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2344                          // try next rank, we can't just buffer the msg and left
2345                          // we need to keep unprocessed msg on the fly
2346                         int nextpe = CkMyPe()+1;
2347                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2348                         nextpe = CkNodeFirst(CkMyNode());
2349                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2350                         return CmiFalse;
2351                 }
2352 #endif
2353                 else /*!isNext(env) */ {
2354                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2355                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2356                         delayedMessages.enq(env);
2357                         flush();
2358                         return CmiFalse;
2359                 }
2360         }
2361         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2362       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2363           if (isNext(token)) {
2364         REPLAYDEBUG("Executing token: "<<token->serialNo)
2365             getNext();
2366             flush();
2367             return CmiTrue;
2368           } else {
2369         REPLAYDEBUG("Queueing token: "<<token->serialNo
2370             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2371             delayedTokens.enq(token);
2372             return CmiFalse;
2373           }
2374         }
2375
2376         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2377           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2378           if (lbFile != NULL) {
2379             int num_moves;
2380         PUP::fromDisk p(lbFile);
2381             p | num_moves;
2382             if (num_moves != (*msg)->n_moves) {
2383               delete *msg;
2384               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2385             }
2386             (*msg)->pup(p);
2387           }
2388           return CmiTrue;
2389         }
2390 };
2391
2392 class CkMessageDetailReplay : public CkMessageWatcher {
2393   void *getNext() {
2394     CmiUInt4 size; size_t nread;
2395     if ((nread=fread(&size, 4, 1, f)) < 1) {
2396       if (feof(f)) return NULL;
2397       CkPrintf("Broken record file (metadata) got %d\n",nread);
2398       CkAbort("");
2399     }
2400     void *env = CmiAlloc(size);
2401     long tell = ftell(f);
2402     if ((nread=fread(env, size, 1, f)) < 1) {
2403       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2404       CkAbort("");
2405     }
2406     //*(int*)env = 0x34567890; // set first integer as magic
2407     return env;
2408   }
2409 public:
2410   double starttime;
2411   CkMessageDetailReplay(FILE *f_) {
2412     f=f_;
2413     starttime=CkWallTimer();
2414     /* This must match what CkMessageDetailRecorder did */
2415     CmiUInt2 little;
2416     fread(&little, 2, 1, f);
2417     if (little != sizeof(void*)) {
2418       CkAbort("Replaying on a different architecture from which recording was done!");
2419     }
2420
2421     CsdEnqueue(getNext());
2422
2423     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2424   }
2425   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2426     void *msg = getNext();
2427     if (msg != NULL) CsdEnqueue(msg);
2428     return CmiTrue;
2429   }
2430 };
2431
2432 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2433 #if ! CMK_BIGSIM_CHARM
2434   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2435 #endif
2436   CkMessageReplay *replay = (CkMessageReplay*)rep;
2437   //CmiStartQD(CkMessageReplayQuiescence, replay);
2438 }
2439
2440 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2441   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2442   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2443   ConverseExit();
2444 }
2445
2446 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2447   CkCoreState *ck = CkpvAccess(_coreState);
2448   if (ck->watcher!=NULL) {
2449     return ck->watcher->processThread(token,ck);
2450   }
2451   return CmiTrue;
2452 }
2453
2454 CpvCExtern(int, CthResumeNormalThreadIdx);
2455 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2456 {
2457   CthThread t = token->thread;
2458
2459   if(t == NULL){
2460     free(token);
2461     return;
2462   }
2463 #if CMK_TRACE_ENABLED
2464 #if ! CMK_TRACE_IN_CHARM
2465   if(CpvAccess(traceOn))
2466     CthTraceResume(t);
2467 /*    if(CpvAccess(_traceCoreOn)) 
2468             resumeTraceCore();*/
2469 #endif
2470 #endif
2471   
2472   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2473   if (CpdExecuteThreadResume(token)) {
2474     CthResume(t);
2475   }
2476 }
2477
2478 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2479   CkCoreState *ck = CkpvAccess(_coreState);
2480   if (ck->watcher!=NULL) {
2481     ck->watcher->processLBMessage(msg, ck);
2482   }
2483 }
2484
2485 #if CMK_BIGSIM_CHARM
2486 CpvExtern(int      , CthResumeBigSimThreadIdx);
2487 #endif
2488
2489 #include "ckliststring.h"
2490 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2491     CmiArgGroup("Charm++","Record/Replay");
2492     CmiBool forceReplay = CmiFalse;
2493     char *procs = NULL;
2494     _replaySystem = 0;
2495     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2496       _recplay_crc = 1;
2497     }
2498     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2499       _recplay_checksum = 1;
2500     }
2501     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2502     REPLAYDEBUG("CkMessageWatcherInit ");
2503     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2504         CkListString list(procs);
2505         if (list.includes(CkMyPe())) {
2506           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2507           CpdSetInitializeMemory(1);
2508           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2509         }
2510     }
2511     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2512       if (CkMyPe() == 0) {
2513         CmiPrintf("Charm++> record mode.\n");
2514         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2515           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2516           _recplay_crc = _recplay_checksum = 0;
2517         }
2518       }
2519       CpdSetInitializeMemory(1);
2520       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2521       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2522     }
2523         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2524             forceReplay = CmiTrue;
2525             CpdSetInitializeMemory(1);
2526             // Set the parameters of the processor
2527 #if CMK_SHARED_VARS_UNAVAILABLE
2528             _Cmi_mype = atoi(procs);
2529             while (procs[0]!='/') procs++;
2530             procs++;
2531             _Cmi_numpes = atoi(procs);
2532 #else
2533             CkAbort("+replay-detail available only for non-SMP build");
2534 #endif
2535             _replaySystem = 1;
2536             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2537         }
2538         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2539           if (CkMyPe() == 0)  {
2540             CmiPrintf("Charm++> replay mode.\n");
2541             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2542               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2543               _recplay_crc = _recplay_checksum = 0;
2544             }
2545           }
2546           CpdSetInitializeMemory(1);
2547 #if ! CMK_BIGSIM_CHARM
2548           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2549 #else
2550           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2551 #endif
2552           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2553         }
2554         if (_recplay_crc && _recplay_checksum) {
2555           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2556         }
2557 }
2558
2559 extern "C"
2560 int CkMessageToEpIdx(void *msg) {
2561         envelope *env=UsrToEnv(msg);
2562         int ep=env->getEpIdx();
2563         if (ep==CkIndex_CkArray::recvBroadcast(0))
2564                 return env->getsetArrayBcastEp();
2565         else
2566                 return ep;
2567 }
2568
2569 extern "C"
2570 int getCharmEnvelopeSize() {
2571   return sizeof(envelope);
2572 }
2573
2574
2575 #include "CkMarshall.def.h"
2576