d9e89fd18bf22e1c2a47ddf614f46e930eaf9400
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 extern "C"
480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
481 {
482   UsrToEnv(msg)->setRef(ref);
483 }
484
485 extern "C"
486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
487 {
488   return UsrToEnv(msg)->getRef();
489 }
490
491 extern "C"
492 int CkGetSrcPe(void *msg)
493 {
494   return UsrToEnv(msg)->getSrcPe();
495 }
496
497 extern "C"
498 int CkGetSrcNode(void *msg)
499 {
500   return CmiNodeOf(CkGetSrcPe(msg));
501 }
502
503 extern "C"
504 void *CkLocalBranch(CkGroupID gID) {
505   return _localBranch(gID);
506 }
507
508 static
509 void *_ckLocalNodeBranch(CkGroupID groupID) {
510   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
511   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
512   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
513   return retval;
514 }
515
516 extern "C"
517 void *CkLocalNodeBranch(CkGroupID groupID)
518 {
519   void *retval;
520   // we are called in a constructor
521   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
522     return CkpvAccess(_currentNodeGroupObj);
523   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
524   { // Nodegroup hasn't finished being created yet-- schedule...
525     CsdScheduler(0);
526   }
527   return retval;
528 }
529
530 extern "C"
531 void *CkLocalChare(const CkChareID *pCid)
532 {
533         int pe=pCid->onPE;
534         if (pe<0) { //A virtual chare ID
535                 if (pe!=(-(CkMyPe()+1)))
536                         return NULL;//VID block not on this PE
537 #ifdef CMK_CHARE_USE_PTR
538                 VidBlock *v=(VidBlock *)pCid->objPtr;
539 #else
540                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
541 #endif
542                 return v->getLocalChareObj();
543         }
544         else
545         { //An ordinary chare ID
546                 if (pe!=CkMyPe())
547                         return NULL;//Chare not on this PE
548 #ifdef CMK_CHARE_USE_PTR
549                 return pCid->objPtr;
550 #else
551                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
552 #endif
553         }
554 }
555
556 CkpvDeclare(char**,Ck_argv);
557
558 extern "C" char **CkGetArgv(void) {
559         return CkpvAccess(Ck_argv);
560 }
561 extern "C" int CkGetArgc(void) {
562         return CmiGetArgc(CkpvAccess(Ck_argv));
563 }
564
565 /******************** Basic support *****************/
566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
567 {
568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
569         CpvAccess(_currentObj) = (Chare *)obj;
570 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
571 #endif
572   //BIGSIM_OOC DEBUGGING
573   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574   //fflush(stdout);
575 #if CMK_CHARMDEBUG
576   CpdBeforeEp(epIdx, obj, msg);
577 #endif    
578   _entryTable[epIdx]->call(msg, obj);
579 #if CMK_CHARMDEBUG
580   CpdAfterEp(epIdx);
581 #endif
582   if (_entryTable[epIdx]->noKeep)
583   { /* Method doesn't keep/delete the message, so we have to: */
584     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585   }
586 }
587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
588 {
589   //BIGSIM_OOC DEBUGGING
590   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
591   //fflush(stdout);
592
593   void *deliverMsg;
594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
595         CpvAccess(_currentObj) = (Chare *)obj;
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Deliver a read-only copy of the message */
599     deliverMsg=(void *)msg;
600   } else
601   { /* Method needs a copy of the message to keep/delete */
602     void *oldMsg=(void *)msg;
603     deliverMsg=CkCopyMsg(&oldMsg);
604 #if CMK_ERROR_CHECKING
605     if (oldMsg!=msg)
606       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
607 #endif
608   }
609 #if CMK_CHARMDEBUG
610   CpdBeforeEp(epIdx, obj, (void*)msg);
611 #endif
612   _entryTable[epIdx]->call(deliverMsg, obj);
613 #if CMK_CHARMDEBUG
614   CpdAfterEp(epIdx);
615 #endif
616 }
617
618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
619 {
620   register void *msg = EnvToUsr(env);
621   _SET_USED(env, 0);
622   CkDeliverMessageFree(epIdx,msg,obj);
623 }
624
625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 {
627
628 #if CMK_TRACE_ENABLED 
629   if (_entryTable[epIdx]->traceEnabled) {
630     _TRACE_BEGIN_EXECUTE(env);
631     _invokeEntryNoTrace(epIdx,env,obj);
632     _TRACE_END_EXECUTE();
633   }
634   else
635 #endif
636     _invokeEntryNoTrace(epIdx,env,obj);
637
638 }
639
640 /********************* Creation ********************/
641
642 extern "C"
643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
644 {
645   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
646   envelope *env = UsrToEnv(msg);
647   _CHECK_USED(env);
648   if(pCid == 0) {
649     env->setMsgtype(NewChareMsg);
650   } else {
651     pCid->onPE = (-(CkMyPe()+1));
652     //  pCid->magic = _GETIDX(cIdx);
653     pCid->objPtr = (void *) new VidBlock();
654     _MEMCHECK(pCid->objPtr);
655     env->setMsgtype(NewVChareMsg);
656     env->setVidPtr(pCid->objPtr);
657 #ifndef CMK_CHARE_USE_PTR
658     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
659     int idx = CkpvAccess(vidblocks).size()-1;
660     pCid->objPtr = (void *)(CmiIntPtr)idx;
661     env->setVidPtr((void *)(CmiIntPtr)idx);
662 #endif
663   }
664   env->setEpIdx(eIdx);
665   env->setByPe(CkMyPe());
666   env->setSrcPe(CkMyPe());
667   CmiSetHandler(env, _charmHandlerIdx);
668   _TRACE_CREATION_1(env);
669   CpvAccess(_qd)->create();
670   _STATS_RECORD_CREATE_CHARE_1();
671   _SET_USED(env, 1);
672   if(destPE == CK_PE_ANY)
673     env->setForAnyPE(1);
674   else
675     env->setForAnyPE(0);
676   _CldEnqueue(destPE, env, _infoIdx);
677   _TRACE_CREATION_DONE(1);
678 }
679
680 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
681 {
682   register int gIdx = _entryTable[epIdx]->chareIdx;
683   register void *obj = malloc(_chareTable[gIdx]->size);
684   _MEMCHECK(obj);
685   setMemoryTypeChare(obj);
686   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
687   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
688   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
689   CkpvAccess(_groupIDTable)->push_back(groupID);
690   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
691   if(ptrq) {
692     void *pending;
693     while((pending=ptrq->deq())!=0)
694       _CldEnqueue(CkMyPe(), pending, _infoIdx);
695 //    delete ptrq;
696       CkpvAccess(_groupTable)->find(groupID).clearPending();
697   }
698   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
699
700   CkpvAccess(_currentGroup) = groupID;
701   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
702 #ifndef CMK_CHARE_USE_PTR
703   //((Chare *)obj)->chareIdx = -1;
704   CkpvAccess(currentChareIdx) = -1;
705 #endif
706   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
707   _STATS_RECORD_PROCESS_GROUP_1();
708 }
709
710 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
711 {
712   register int gIdx = _entryTable[epIdx]->chareIdx;
713   int objSize=_chareTable[gIdx]->size;
714   register void *obj = malloc(objSize);
715   _MEMCHECK(obj);
716   setMemoryTypeChare(obj);
717   CkpvAccess(_currentGroup) = groupID;
718
719 // Now that the NodeGroup is created, add it to the table.
720 //  NodeGroups can be accessed by multiple processors, so
721 //  this is in the opposite order from groups - invoking the constructor
722 //  before registering it.
723 // User may call CkLocalNodeBranch() inside the nodegroup constructor
724 //  store nodegroup into _currentNodeGroupObj
725   CkpvAccess(_currentNodeGroupObj) = obj;
726 #ifndef CMK_CHARE_USE_PTR
727   //((Chare *)obj)->chareIdx = -1;
728   CkpvAccess(currentChareIdx) = -1;
729 #endif
730   _invokeEntryNoTrace(epIdx,env,obj);
731   CkpvAccess(_currentNodeGroupObj) = NULL;
732   _STATS_RECORD_PROCESS_NODE_GROUP_1();
733
734   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
735   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
736   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
737   CksvAccess(_nodeGroupIDTable).push_back(groupID);
738
739   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
740   if(ptrq) {
741     void *pending;
742     while((pending=ptrq->deq())!=0)
743       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
744 //    delete ptrq;
745       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
746   }
747   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
748 }
749
750 void _createGroup(CkGroupID groupID, envelope *env)
751 {
752   _CHECK_USED(env);
753   _SET_USED(env, 1);
754   register int epIdx = env->getEpIdx();
755   int gIdx = _entryTable[epIdx]->chareIdx;
756   CkNodeGroupID rednMgr;
757   if(_chareTable[gIdx]->isIrr == 0){
758                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
759                 rednMgr = rednMgrProxy;
760 //              rednMgrProxy.setAttachedGroup(groupID);
761   }else{
762         rednMgr.setZero();
763   }
764   env->setGroupNum(groupID);
765   env->setSrcPe(CkMyPe());
766   env->setRednMgr(rednMgr);
767   env->setGroupEpoch(CkpvAccess(_charmEpoch));
768
769   if(CkNumPes()>1) {
770     CkPackMessage(&env);
771     CmiSetHandler(env, _bocHandlerIdx);
772     _numInitMsgs++;
773     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
774     CpvAccess(_qd)->create(CkNumPes()-1);
775     CkUnpackMessage(&env);
776   }
777   _STATS_RECORD_CREATE_GROUP_1();
778   CkCreateLocalGroup(groupID, epIdx, env);
779 }
780
781 void _createNodeGroup(CkGroupID groupID, envelope *env)
782 {
783   _CHECK_USED(env);
784   _SET_USED(env, 1);
785   register int epIdx = env->getEpIdx();
786   env->setGroupNum(groupID);
787   env->setSrcPe(CkMyPe());
788   env->setGroupEpoch(CkpvAccess(_charmEpoch));
789   if(CkNumNodes()>1) {
790     CkPackMessage(&env);
791     CmiSetHandler(env, _bocHandlerIdx);
792     _numInitMsgs++;
793     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
794     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
795     CpvAccess(_qd)->create(CkNumNodes()-1);
796     CkUnpackMessage(&env);
797   }
798   _STATS_RECORD_CREATE_NODE_GROUP_1();
799   CkCreateLocalNodeGroup(groupID, epIdx, env);
800 }
801
802 // new _groupCreate
803
804 static CkGroupID _groupCreate(envelope *env)
805 {
806   register CkGroupID groupNum;
807
808   // check CkMyPe(). if it is 0 then idx is _numGroups++
809   // if not, then something else...
810   if(CkMyPe() == 0)
811      groupNum.idx = CkpvAccess(_numGroups)++;
812   else
813      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
814   _createGroup(groupNum, env);
815   return groupNum;
816 }
817
818 // new _nodeGroupCreate
819 static CkGroupID _nodeGroupCreate(envelope *env)
820 {
821   register CkGroupID groupNum;
822   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
823   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
824           groupNum.idx = CksvAccess(_numNodeGroups)++;
825    else
826           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
827   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
828   _createNodeGroup(groupNum, env);
829   return groupNum;
830 }
831
832 /**** generate the group idx when group is creator pe is not pe0
833  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
834  **** remaining bits contain the group creator processor number and
835  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
836
837 int _getGroupIdx(int numNodes,int myNode,int numGroups)
838 {
839         int idx;
840         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
841         int n = 32 - (x+1);                                     // number of bits remaining for the index
842         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
843                                                                 // then add the next available index
844         // of course this won't work when int is 8 bytes long on T3E
845         //idx |= 0x80000000;                                      // set the most significant bit to 1
846         idx = - idx;
847                                                                 // if int is not 32 bits, wouldn't this be wrong?
848         return idx;
849 }
850
851 extern "C"
852 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
853 {
854   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
855   register envelope *env = UsrToEnv(msg);
856   env->setMsgtype(BocInitMsg);
857   env->setEpIdx(eIdx);
858   env->setSrcPe(CkMyPe());
859   _TRACE_CREATION_N(env, CkNumPes());
860   CkGroupID gid = _groupCreate(env);
861   _TRACE_CREATION_DONE(1);
862   return gid;
863 }
864
865 extern "C"
866 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
867 {
868   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
869   register envelope *env = UsrToEnv(msg);
870   env->setMsgtype(NodeBocInitMsg);
871   env->setEpIdx(eIdx);
872   env->setSrcPe(CkMyPe());
873   _TRACE_CREATION_N(env, CkNumNodes());
874   CkGroupID gid = _nodeGroupCreate(env);
875   _TRACE_CREATION_DONE(1);
876   return gid;
877 }
878
879 static inline void *_allocNewChare(envelope *env, int &idx)
880 {
881   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
882   void *tmp=malloc(_chareTable[chareIdx]->size);
883   _MEMCHECK(tmp);
884 #ifndef CMK_CHARE_USE_PTR
885   CkpvAccess(chare_objs).push_back(tmp);
886   CkpvAccess(chare_types).push_back(chareIdx);
887   idx = CkpvAccess(chare_objs).size()-1;
888 #endif
889   setMemoryTypeChare(tmp);
890   return tmp;
891 }
892
893 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
894 {
895   int idx;
896   register void *obj = _allocNewChare(env, idx);
897 #ifndef CMK_CHARE_USE_PTR
898   //((Chare *)obj)->chareIdx = idx;
899   CkpvAccess(currentChareIdx) = idx;
900 #endif
901   _invokeEntry(env->getEpIdx(),env,obj);
902 }
903
904 void CkCreateLocalChare(int epIdx, envelope *env)
905 {
906   env->setEpIdx(epIdx);
907   _processNewChareMsg(NULL, env);
908 }
909
910 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
911 {
912   int idx;
913   register void *obj = _allocNewChare(env, idx);
914   register CkChareID *pCid = (CkChareID *)
915       _allocMsg(FillVidMsg, sizeof(CkChareID));
916   pCid->onPE = CkMyPe();
917 #ifndef CMK_CHARE_USE_PTR
918   pCid->objPtr = (void*)(CmiIntPtr)idx;
919 #else
920   pCid->objPtr = obj;
921 #endif
922   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
923   register envelope *ret = UsrToEnv(pCid);
924   ret->setVidPtr(env->getVidPtr());
925   register int srcPe = env->getByPe();
926   ret->setSrcPe(CkMyPe());
927   CmiSetHandler(ret, _charmHandlerIdx);
928   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
929 #ifndef CMK_CHARE_USE_PTR
930   // register the remote vidblock for deletion when chare is deleted
931   CkChareID vid;
932   vid.onPE = srcPe;
933   vid.objPtr = env->getVidPtr();
934   CkpvAccess(vmap)[idx] = vid;    
935 #endif
936   CpvAccess(_qd)->create();
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
942 }
943
944 /************** Receive: Chares *************/
945
946 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
947 {
948   register int epIdx = env->getEpIdx();
949   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
950   register void *obj;
951   if (mainIdx != -1)  {           // mainchare
952     CmiAssert(CkMyPe()==0);
953     obj = _mainTable[mainIdx]->getObj();
954   }
955   else {
956 #ifndef CMK_CHARE_USE_PTR
957     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
958       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
959     else
960       obj = env->getObjPtr();
961 #else
962     obj = env->getObjPtr();
963 #endif
964   }
965   _invokeEntry(epIdx,env,obj);
966 }
967
968 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
969 {
970   register int epIdx = env->getEpIdx();
971   register void *obj = env->getObjPtr();
972   _invokeEntry(epIdx,env,obj);
973 }
974
975 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
976 {
977 #ifndef CMK_CHARE_USE_PTR
978   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
979 #else
980   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
981   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
982 #endif
983   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
984   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
985   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
986   CmiFree(env);
987 }
988
989 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
990 {
991 #ifndef CMK_CHARE_USE_PTR
992   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
993 #else
994   VidBlock *vptr = (VidBlock *) env->getVidPtr();
995   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
996 #endif
997   _SET_USED(env, 1);
998   vptr->send(env);
999 }
1000
1001 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1002 {
1003 #ifndef CMK_CHARE_USE_PTR
1004   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1005   delete vptr;
1006   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1007 #endif
1008   CmiFree(env);
1009 }
1010
1011 /************** Receive: Groups ****************/
1012
1013 /**
1014  Return a pointer to the local BOC of "groupID".
1015  The message "env" passed in has some known dependency on this groupID
1016  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1017  Therefore, if the return value is NULL, this function buffers the massage so that
1018  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1019  The message passed in must have its handlers correctly set so that it can be
1020  scheduled again.
1021 */
1022 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1023 {
1024
1025         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1026         IrrGroup *obj = ck->localBranch(groupID);
1027         if (obj==NULL) { /* groupmember not yet created: stash message */
1028                 ck->getGroupTable()->find(groupID).enqMsg(env);
1029         }
1030         else { /* will be able to process message */
1031                 ck->process();
1032         }
1033         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1034         return obj;
1035 }
1036
1037 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1038 {
1039 #if CMK_LBDB_ON
1040   // if there is a running obj being measured, stop it temporarily
1041   LDObjHandle objHandle;
1042   int objstopped = 0;
1043   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1044   if (the_lbdb->RunningObject(&objHandle)) {
1045     objstopped = 1;
1046     the_lbdb->ObjectStop(objHandle);
1047   }
1048 #endif
1049   _invokeEntry(epIdx,env,obj);
1050 #if CMK_LBDB_ON
1051   if (objstopped) the_lbdb->ObjectStart(objHandle);
1052 #endif
1053   _STATS_RECORD_PROCESS_BRANCH_1();
1054 }
1055
1056 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1057 {
1058   register CkGroupID groupID =  env->getGroupNum();
1059   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1060   if(obj) {
1061     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1062   }
1063 }
1064
1065 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1066 {
1067   env->setMsgtype(ForChareMsg);
1068   env->setObjPtr(obj);
1069   _processForChareMsg(ck,env);
1070   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1071 }
1072
1073 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1074 {
1075   env->setEpIdx(epIdx);
1076   _deliverForNodeBocMsg(ck,env, obj);
1077 }
1078
1079 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1080 {
1081   register CkGroupID groupID = env->getGroupNum();
1082   register void *obj;
1083
1084   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1085   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1086   if(!obj) { // groupmember not yet created
1087 #if CMK_IMMEDIATE_MSG
1088     if (CmiIsImmediate(env))     // buffer immediate message
1089       CmiDelayImmediate();
1090     else
1091 #endif
1092     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1093     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1094     return;
1095   }
1096   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1097 #if CMK_IMMEDIATE_MSG
1098   if (!CmiIsImmediate(env))
1099 #endif
1100   ck->process();
1101   env->setMsgtype(ForChareMsg);
1102   env->setObjPtr(obj);
1103   _processForChareMsg(ck,env);
1104   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1105 }
1106
1107 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1108 {
1109   register CkGroupID groupID = env->getGroupNum();
1110   register int epIdx = env->getEpIdx();
1111   if (!env->getGroupDep().isZero()) {      // dependence
1112     CkGroupID dep = env->getGroupDep();
1113     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1114     if (obj == NULL) return;
1115   }
1116   else
1117     ck->process();
1118   CkCreateLocalGroup(groupID, epIdx, env);
1119 }
1120
1121 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1122 {
1123   register CkGroupID groupID = env->getGroupNum();
1124   register int epIdx = env->getEpIdx();
1125   CkCreateLocalNodeGroup(groupID, epIdx, env);
1126 }
1127
1128 /************** Receive: Arrays *************/
1129
1130 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1131   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1132   if (mgr) {
1133     _SET_USED(env, 0);
1134     mgr->insertElement((CkMessage *)EnvToUsr(env));
1135   }
1136 }
1137 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1138   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1139   if (mgr) {
1140     _SET_USED(env, 0);
1141     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1142   }
1143 }
1144
1145 //BIGSIM_OOC DEBUGGING
1146 #define TELLMSGTYPE(x) //x
1147
1148 /**
1149  * This is the main converse-level handler used by all of Charm++.
1150  *
1151  * \addtogroup CriticalPathFramework
1152  */
1153 void _processHandler(void *converseMsg,CkCoreState *ck)
1154 {
1155   register envelope *env = (envelope *) converseMsg;
1156
1157   MESSAGE_PHASE_CHECK(env);
1158
1159 //#if CMK_RECORD_REPLAY
1160   if (ck->watcher!=NULL) {
1161     if (!ck->watcher->processMessage(&env,ck)) return;
1162   }
1163 //#endif
1164 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1165         Chare *obj=NULL;
1166         CkObjID sender;
1167         MCount SN;
1168         MlogEntry *entry=NULL;
1169         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1170         env->getMsgtype() == ForArrayEltMsg){
1171                 sender = env->sender;
1172                 SN = env->SN;
1173                 int result = preProcessReceivedMessage(env,&obj,&entry);
1174                 if(result == 0){
1175                         return;
1176                 }
1177         }
1178 #endif
1179
1180 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1181   //  CkPrintf("START\n");
1182   criticalPath_start(env);
1183 #endif
1184
1185
1186   switch(env->getMsgtype()) {
1187 // Group support
1188     case BocInitMsg :
1189       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1190       // QD processing moved inside _processBocInitMsg because it is conditional
1191       //ck->process(); 
1192       if(env->isPacked()) CkUnpackMessage(&env);
1193       _processBocInitMsg(ck,env);
1194       break;
1195     case NodeBocInitMsg :
1196       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1197       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1198       _processNodeBocInitMsg(ck,env);
1199       break;
1200     case ForBocMsg :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1202       // QD processing moved inside _processForBocMsg because it is conditional
1203       if(env->isPacked()) CkUnpackMessage(&env);
1204       _processForBocMsg(ck,env);
1205       // stats record moved inside _processForBocMsg because it is conditional
1206       break;
1207     case ForNodeBocMsg :
1208       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1209       // QD processing moved to _processForNodeBocMsg because it is conditional
1210       if(env->isPacked()) CkUnpackMessage(&env);
1211       _processForNodeBocMsg(ck,env);
1212       // stats record moved to _processForNodeBocMsg because it is conditional
1213       break;
1214
1215 // Array support
1216     case ArrayEltInitMsg:
1217       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1218       if(env->isPacked()) CkUnpackMessage(&env);
1219       _processArrayEltInitMsg(ck,env);
1220       break;
1221     case ForArrayEltMsg:
1222       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1223       if(env->isPacked()) CkUnpackMessage(&env);
1224       _processArrayEltMsg(ck,env);
1225       break;
1226
1227 // Chare support
1228     case NewChareMsg :
1229       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1230       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1231       _processNewChareMsg(ck,env);
1232       _STATS_RECORD_PROCESS_CHARE_1();
1233       break;
1234     case NewVChareMsg :
1235       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1236       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1237       _processNewVChareMsg(ck,env);
1238       _STATS_RECORD_PROCESS_CHARE_1();
1239       break;
1240     case ForChareMsg :
1241       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1242       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1243       _processForPlainChareMsg(ck,env);
1244       _STATS_RECORD_PROCESS_MSG_1();
1245       break;
1246     case ForVidMsg   :
1247       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1248       ck->process();
1249       _processForVidMsg(ck,env);
1250       break;
1251     case FillVidMsg  :
1252       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1253       ck->process();
1254       _processFillVidMsg(ck,env);
1255       break;
1256     case DeleteVidMsg  :
1257       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1258       ck->process();
1259       _processDeleteVidMsg(ck,env);
1260       break;
1261
1262     default:
1263       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1264   }
1265 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1266         if(obj != NULL){
1267                 postProcessReceivedMessage(obj,sender,SN,entry);
1268         }
1269 #endif
1270
1271
1272 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1273   criticalPath_end();
1274   //  CkPrintf("STOP\n");
1275 #endif
1276
1277
1278 }
1279
1280
1281 /******************** Message Send **********************/
1282
1283 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1284              int *queueing, int *priobits, unsigned int **prioptr)
1285 {
1286   register envelope *env = (envelope *)converseMsg;
1287   *pfn = (CldPackFn)CkPackMessage;
1288   *len = env->getTotalsize();
1289   *queueing = env->getQueueing();
1290   *priobits = env->getPriobits();
1291   *prioptr = (unsigned int *) env->getPrioPtr();
1292 }
1293
1294 void CkPackMessage(envelope **pEnv)
1295 {
1296   register envelope *env = *pEnv;
1297   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1298     register void *msg = EnvToUsr(env);
1299     _TRACE_BEGIN_PACK();
1300     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1301     _TRACE_END_PACK();
1302     env=UsrToEnv(msg);
1303     env->setPacked(1);
1304     *pEnv = env;
1305   }
1306 }
1307
1308 void CkUnpackMessage(envelope **pEnv)
1309 {
1310   register envelope *env = *pEnv;
1311   register int msgIdx = env->getMsgIdx();
1312   if(env->isPacked()) {
1313     register void *msg = EnvToUsr(env);
1314     _TRACE_BEGIN_UNPACK();
1315     msg = _msgTable[msgIdx]->unpack(msg);
1316     _TRACE_END_UNPACK();
1317     env=UsrToEnv(msg);
1318     env->setPacked(0);
1319     *pEnv = env;
1320   }
1321 }
1322
1323 //There's no reason for most messages to go through the Cld--
1324 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1325 // Thus these accellerated versions of the Cld calls.
1326 #if CMK_OBJECT_QUEUE_AVAILABLE
1327 static int index_objectQHandler;
1328 #endif
1329 int index_tokenHandler;
1330 int index_skipCldHandler;
1331
1332 void _skipCldHandler(void *converseMsg)
1333 {
1334   register envelope *env = (envelope *)(converseMsg);
1335   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1336 #if CMK_GRID_QUEUE_AVAILABLE
1337   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1338     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1339                        env, env->getQueueing (), env->getPriobits (),
1340                        (unsigned int *) env->getPrioPtr ());
1341   } else {
1342     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1343                        env, env->getQueueing (), env->getPriobits (),
1344                        (unsigned int *) env->getPrioPtr ());
1345   }
1346 #else
1347   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1348         env, env->getQueueing(),env->getPriobits(),
1349         (unsigned int *)env->getPrioPtr());
1350 #endif
1351 }
1352
1353
1354 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1355 // Made non-static to be used by ckmessagelogging
1356 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1357 {
1358 #if CMK_CHARMDEBUG
1359   if (!ConverseDeliver(pe)) {
1360     CmiFree(env);
1361     return;
1362   }
1363 #endif
1364   if(pe == CkMyPe() ){
1365     if(!CmiNodeAlive(CkMyPe())){
1366         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1367 //      return;
1368     }
1369   }
1370   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1371 #if CMK_OBJECT_QUEUE_AVAILABLE
1372     Chare *obj = CkFindObjectPtr(env);
1373     if (obj && obj->CkGetObjQueue().queue()) {
1374       _enqObjQueue(obj, env);
1375     }
1376     else
1377 #endif
1378     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1379         env, env->getQueueing(),env->getPriobits(),
1380         (unsigned int *)env->getPrioPtr());
1381 #if CMK_PERSISTENT_COMM
1382     CmiPersistentOneSend();
1383 #endif
1384   } else {
1385     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1386       CkPackMessage(&env);
1387     int len=env->getTotalsize();
1388     CmiSetXHandler(env,CmiGetHandler(env));
1389 #if CMK_OBJECT_QUEUE_AVAILABLE
1390     CmiSetHandler(env,index_objectQHandler);
1391 #else
1392     CmiSetHandler(env,index_skipCldHandler);
1393 #endif
1394     CmiSetInfo(env,infoFn);
1395     if (pe==CLD_BROADCAST) {
1396 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1397                         CmiSyncBroadcast(len, (char *)env);
1398 #else
1399                         CmiSyncBroadcastAndFree(len, (char *)env); 
1400 #endif
1401
1402 }
1403     else if (pe==CLD_BROADCAST_ALL) { 
1404 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1405                         CmiSyncBroadcastAll(len, (char *)env);
1406 #else
1407                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1408 #endif
1409
1410 }
1411     else{
1412 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1413                         CmiSyncSend(pe, len, (char *)env);
1414 #else
1415                         CmiSyncSendAndFree(pe, len, (char *)env);
1416 #endif
1417
1418                 }
1419   }
1420 }
1421
1422 #if CMK_BIGSIM_CHARM
1423 #   define  _skipCldEnqueue   _CldEnqueue
1424 #endif
1425
1426 // by pass Charm++ priority queue, send as Converse message
1427 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1428 {
1429 #if CMK_CHARMDEBUG
1430   if (!ConverseDeliver(-1)) {
1431     CmiFree(env);
1432     return;
1433   }
1434 #endif
1435   CkPackMessage(&env);
1436   int len=env->getTotalsize();
1437   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1438 }
1439
1440 static void _noCldEnqueue(int pe, envelope *env)
1441 {
1442 /*
1443   if (pe == CkMyPe()) {
1444     CmiHandleMessage(env);
1445   } else
1446 */
1447 #if CMK_CHARMDEBUG
1448   if (!ConverseDeliver(pe)) {
1449     CmiFree(env);
1450     return;
1451   }
1452 #endif
1453   CkPackMessage(&env);
1454   int len=env->getTotalsize();
1455   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1456   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1457   else CmiSyncSendAndFree(pe, len, (char *)env);
1458 }
1459
1460 //static void _noCldNodeEnqueue(int node, envelope *env)
1461 //Made non-static to be used by ckmessagelogging
1462 void _noCldNodeEnqueue(int node, envelope *env)
1463 {
1464 /*
1465   if (node == CkMyNode()) {
1466     CmiHandleMessage(env);
1467   } else {
1468 */
1469 #if CMK_CHARMDEBUG
1470   if (!ConverseDeliver(node)) {
1471     CmiFree(env);
1472     return;
1473   }
1474 #endif
1475   CkPackMessage(&env);
1476   int len=env->getTotalsize();
1477   if (node==CLD_BROADCAST) { 
1478 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1479         CmiSyncNodeBroadcast(len, (char *)env);
1480 #else
1481         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1482 #endif
1483 }
1484   else if (node==CLD_BROADCAST_ALL) { 
1485 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1486                 CmiSyncNodeBroadcastAll(len, (char *)env);
1487 #else
1488                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1489 #endif
1490
1491 }
1492   else {
1493 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1494         CmiSyncNodeSend(node, len, (char *)env);
1495 #else
1496         CmiSyncNodeSendAndFree(node, len, (char *)env);
1497 #endif
1498   }
1499 }
1500
1501 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1502 {
1503   register envelope *env = UsrToEnv(msg);
1504   _CHECK_USED(env);
1505   _SET_USED(env, 1);
1506 #if CMK_REPLAYSYSTEM
1507   setEventID(env);
1508 #endif
1509   env->setMsgtype(ForChareMsg);
1510   env->setEpIdx(eIdx);
1511   env->setSrcPe(CkMyPe());
1512 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1513   criticalPath_send(env);
1514   automaticallySetMessagePriority(env);
1515 #endif
1516 #if CMK_CHARMDEBUG
1517   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1518 #endif
1519 #if CMK_OBJECT_QUEUE_AVAILABLE
1520   CmiSetHandler(env, index_objectQHandler);
1521 #else
1522   CmiSetHandler(env, _charmHandlerIdx);
1523 #endif
1524   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1525     register int pe = -(pCid->onPE+1);
1526     if(pe==CkMyPe()) {
1527 #ifndef CMK_CHARE_USE_PTR
1528       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1529 #else
1530       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1531 #endif
1532       void *objPtr;
1533       if (NULL!=(objPtr=vblk->getLocalChare()))
1534       { //A ready local chare
1535         env->setObjPtr(objPtr);
1536         return pe;
1537       }
1538       else { //The vidblock is not ready-- forget it
1539         vblk->send(env);
1540         return -1;
1541       }
1542     } else { //Valid vidblock for another PE:
1543       env->setMsgtype(ForVidMsg);
1544       env->setVidPtr(pCid->objPtr);
1545       return pe;
1546     }
1547   }
1548   else {
1549     env->setObjPtr(pCid->objPtr);
1550     return pCid->onPE;
1551   }
1552 }
1553
1554 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1555 {
1556   int destPE = _prepareMsg(eIdx, msg, pCid);
1557   if (destPE != -1) {
1558     register envelope *env = UsrToEnv(msg);
1559 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1560     criticalPath_send(env);
1561     automaticallySetMessagePriority(env);
1562 #endif
1563     CmiBecomeImmediate(env);
1564   }
1565   return destPE;
1566 }
1567
1568 extern "C"
1569 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1570 {
1571   if (opts & CK_MSG_INLINE) {
1572     CkSendMsgInline(entryIdx, msg, pCid, opts);
1573     return;
1574   }
1575 #if CMK_ERROR_CHECKING
1576   if (opts & CK_MSG_IMMEDIATE) {
1577     CmiAbort("Immediate message is not allowed in Chare!");
1578   }
1579 #endif
1580   register envelope *env = UsrToEnv(msg);
1581   int destPE=_prepareMsg(entryIdx,msg,pCid);
1582   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1583   // VidBlock was not yet filled). The problem is that the creation was never
1584   // traced later when the VidBlock was filled. One solution is to trace the
1585   // creation here, the other to trace it in VidBlock->msgDeliver().
1586   _TRACE_CREATION_1(env);
1587   if (destPE!=-1) {
1588     CpvAccess(_qd)->create();
1589     if (opts & CK_MSG_SKIP_OR_IMM)
1590       _noCldEnqueue(destPE, env);
1591     else
1592       _CldEnqueue(destPE, env, _infoIdx);
1593   }
1594   _TRACE_CREATION_DONE(1);
1595 }
1596
1597 extern "C"
1598 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1599 {
1600   if (pCid->onPE==CkMyPe())
1601   {
1602     if(!CmiNodeAlive(CkMyPe())){
1603         return;
1604     }
1605 #if CMK_CHARMDEBUG
1606     //Just in case we need to breakpoint or use the envelope in some way
1607     _prepareMsg(entryIndex,msg,pCid);
1608 #endif
1609                 //Just directly call the chare (skip QD handling & scheduler)
1610     register envelope *env = UsrToEnv(msg);
1611     if (env->isPacked()) CkUnpackMessage(&env);
1612     _STATS_RECORD_PROCESS_MSG_1();
1613     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1614   }
1615   else {
1616     //No way to inline a cross-processor message:
1617     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1618   }
1619 }
1620
1621 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1622 {
1623   register envelope *env = UsrToEnv(msg);
1624 #if CMK_ERROR_CHECKING
1625   CkNodeGroupID nodeRedMgr;
1626 #endif
1627   _CHECK_USED(env);
1628   _SET_USED(env, 1);
1629 #if CMK_REPLAYSYSTEM
1630   setEventID(env);
1631 #endif
1632   env->setMsgtype(type);
1633   env->setEpIdx(eIdx);
1634   env->setGroupNum(gID);
1635   env->setSrcPe(CkMyPe());
1636 #if CMK_ERROR_CHECKING
1637   nodeRedMgr.setZero();
1638   env->setRednMgr(nodeRedMgr);
1639 #endif
1640 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1641   criticalPath_send(env);
1642   automaticallySetMessagePriority(env);
1643 #endif
1644 #if CMK_CHARMDEBUG
1645   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1646 #endif
1647   CmiSetHandler(env, _charmHandlerIdx);
1648   return env;
1649 }
1650
1651 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1652 {
1653   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1654 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1655   criticalPath_send(env);
1656   automaticallySetMessagePriority(env);
1657 #endif
1658   CmiBecomeImmediate(env);
1659   return env;
1660 }
1661
1662 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1663                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1664 {
1665   int numPes;
1666   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1667 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1668   sendTicketGroupRequest(env,pe,_infoIdx);
1669 #else
1670   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1671   _TRACE_CREATION_N(env, numPes);
1672   if (opts & CK_MSG_SKIP_OR_IMM)
1673     _noCldEnqueue(pe, env);
1674   else
1675     _skipCldEnqueue(pe, env, _infoIdx);
1676   _TRACE_CREATION_DONE(1);
1677 #endif
1678 }
1679
1680 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1681                            int npes, int *pes)
1682 {
1683   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1684   _TRACE_CREATION_MULTICAST(env, npes, pes);
1685   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1686   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1687 }
1688
1689 extern "C"
1690 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1691 {
1692 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1693   if (destPE==CkMyPe())
1694   {
1695     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1696     return;
1697   }
1698   //Can't inline-- send the usual way
1699   register envelope *env = UsrToEnv(msg);
1700   int numPes;
1701   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1702   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1703   _TRACE_CREATION_N(env, numPes);
1704   _noCldEnqueue(destPE, env);
1705   _STATS_RECORD_SEND_BRANCH_1();
1706   CkpvAccess(_coreState)->create();
1707   _TRACE_CREATION_DONE(1);
1708 #else
1709   // no support for immediate message, send inline
1710   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1711 #endif
1712 }
1713
1714 extern "C"
1715 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1716 {
1717   if (destPE==CkMyPe())
1718   {
1719     if(!CmiNodeAlive(CkMyPe())){
1720         return;
1721     }
1722     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1723     if (obj!=NULL)
1724     { //Just directly call the group:
1725 #if CMK_ERROR_CHECKING
1726       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1727 #else
1728       envelope *env=UsrToEnv(msg);
1729 #endif
1730       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1731       return;
1732     }
1733   }
1734   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1735   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1736 }
1737
1738 extern "C"
1739 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1740 {
1741   if (opts & CK_MSG_INLINE) {
1742     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1743     return;
1744   }
1745   if (opts & CK_MSG_IMMEDIATE) {
1746     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1747     return;
1748   }
1749   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1750   _STATS_RECORD_SEND_BRANCH_1();
1751   CkpvAccess(_coreState)->create();
1752 }
1753
1754 extern "C"
1755 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1756 {
1757 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1758   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1759   _TRACE_CREATION_MULTICAST(env, npes, pes);
1760   _noCldEnqueueMulti(npes, pes, env);
1761   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1762 #else
1763   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1764   CpvAccess(_qd)->create(-npes);
1765 #endif
1766   _STATS_RECORD_SEND_BRANCH_N(npes);
1767   CpvAccess(_qd)->create(npes);
1768 }
1769
1770 extern "C"
1771 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1772 {
1773   if (opts & CK_MSG_IMMEDIATE) {
1774     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1775     return;
1776   }
1777     // normal mesg
1778   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1779   _STATS_RECORD_SEND_BRANCH_N(npes);
1780   CpvAccess(_qd)->create(npes);
1781 }
1782
1783 extern "C"
1784 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1785 {
1786   int npes;
1787   int *pes;
1788   if (opts & CK_MSG_IMMEDIATE) {
1789     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1790     return;
1791   }
1792     // normal mesg
1793   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1794   CmiLookupGroup(grp, &npes, &pes);
1795   _TRACE_CREATION_MULTICAST(env, npes, pes);
1796   _CldEnqueueGroup(grp, env, _infoIdx);
1797   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1798   _STATS_RECORD_SEND_BRANCH_N(npes);
1799   CpvAccess(_qd)->create(npes);
1800 }
1801
1802 extern "C"
1803 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1804 {
1805   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1806   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1807   CpvAccess(_qd)->create(CkNumPes());
1808 }
1809
1810 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1811                 int node=CLD_BROADCAST_ALL, int opts=0)
1812 {
1813   int numPes;
1814   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1815 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1816         sendTicketNodeGroupRequest(env,node,_infoIdx);
1817 #else
1818   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1819   _TRACE_CREATION_N(env, numPes);
1820   if (opts & CK_MSG_SKIP_OR_IMM) {
1821     _noCldNodeEnqueue(node, env);
1822     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1823       CkpvAccess(_coreState)->create(-numPes);
1824     }
1825   }
1826   else
1827     _CldNodeEnqueue(node, env, _infoIdx);
1828   _TRACE_CREATION_DONE(1);
1829 #endif
1830 }
1831
1832 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1833                            int npes, int *nodes)
1834 {
1835   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1836   _TRACE_CREATION_N(env, npes);
1837   for (int i=0; i<npes; i++) {
1838     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1839   }
1840   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1841 }
1842
1843 extern "C"
1844 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1845 {
1846 #if CMK_IMMEDIATE_MSG
1847   if (node==CkMyNode())
1848   {
1849     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1850     return;
1851   }
1852   //Can't inline-- send the usual way
1853   register envelope *env = UsrToEnv(msg);
1854   int numPes;
1855   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1856   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1857   _TRACE_CREATION_N(env, numPes);
1858   _noCldNodeEnqueue(node, env);
1859   _STATS_RECORD_SEND_BRANCH_1();
1860   /* immeidate message is invisible to QD */
1861 //  CkpvAccess(_coreState)->create();
1862   _TRACE_CREATION_DONE(1);
1863 #else
1864   // no support for immediate message, send inline
1865   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1866 #endif
1867 }
1868
1869 extern "C"
1870 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1871 {
1872   if (node==CkMyNode())
1873   {
1874     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1875     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1876     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1877     if (obj!=NULL)
1878     { //Just directly call the group:
1879 #if CMK_ERROR_CHECKING
1880       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1881 #else
1882       envelope *env=UsrToEnv(msg);
1883 #endif
1884       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1885       return;
1886     }
1887   }
1888   //Can't inline-- send the usual way
1889   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1890 }
1891
1892 extern "C"
1893 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1894 {
1895   if (opts & CK_MSG_INLINE) {
1896     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1897     return;
1898   }
1899   if (opts & CK_MSG_IMMEDIATE) {
1900     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1901     return;
1902   }
1903   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1904   _STATS_RECORD_SEND_NODE_BRANCH_1();
1905   CkpvAccess(_coreState)->create();
1906 }
1907
1908 extern "C"
1909 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1910 {
1911 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1912   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1913   _noCldEnqueueMulti(npes, nodes, env);
1914 #else
1915   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1916   CpvAccess(_qd)->create(-npes);
1917 #endif
1918   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1919   CpvAccess(_qd)->create(npes);
1920 }
1921
1922 extern "C"
1923 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1924 {
1925   if (opts & CK_MSG_IMMEDIATE) {
1926     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1927     return;
1928   }
1929     // normal mesg
1930   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1931   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1932   CpvAccess(_qd)->create(npes);
1933 }
1934
1935 extern "C"
1936 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1937 {
1938   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1939   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1940   CpvAccess(_qd)->create(CkNumNodes());
1941 }
1942
1943 //Needed by delegation manager:
1944 extern "C"
1945 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1946 { return _prepareMsg(eIdx,msg,pCid); }
1947 extern "C"
1948 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1949 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1950 extern "C"
1951 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1952 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1953
1954 void _ckModuleInit(void) {
1955         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1956 #if CMK_OBJECT_QUEUE_AVAILABLE
1957         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1958 #endif
1959         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1960         CkpvInitialize(TokenPool*, _tokenPool);
1961         CkpvAccess(_tokenPool) = new TokenPool;
1962 }
1963
1964
1965 /************** Send: Arrays *************/
1966
1967 extern void CkArrayManagerInsert(int onPe,void *msg);
1968 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1969
1970 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1971 {
1972   _CHECK_USED(env);
1973   _SET_USED(env, 1);
1974   env->setMsgtype(type);
1975 #if CMK_CHARMDEBUG
1976   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1977 #endif
1978   CmiSetHandler(env, _charmHandlerIdx);
1979   CpvAccess(_qd)->create();
1980 }
1981
1982 extern "C"
1983 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1984   register envelope *env = UsrToEnv(msg);
1985   env->getsetArrayMgr()=aID;
1986   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1987   _CldEnqueue(pe, env, _infoIdx);
1988 }
1989
1990 extern "C"
1991 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1992   register envelope *env = UsrToEnv(msg);
1993   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1994 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1995    sendTicketArrayRequest(env,pe,_infoIdx);
1996 #else
1997   if (opts & CK_MSG_IMMEDIATE)
1998     CmiBecomeImmediate(env);
1999   if (opts & CK_MSG_SKIP_OR_IMM)
2000     _noCldEnqueue(pe, env);
2001   else
2002     _skipCldEnqueue(pe, env, _infoIdx);
2003 #endif
2004 }
2005
2006 class ElementDestroyer : public CkLocIterator {
2007 private:
2008         CkLocMgr *locMgr;
2009 public:
2010         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2011         void addLocation(CkLocation &loc) {
2012           loc.destroyAll();
2013         }
2014 };
2015
2016 void CkDeleteChares() {
2017   int i;
2018   int numGroups = CkpvAccess(_groupIDTable)->size();
2019
2020   // delete all plain chares
2021 #ifndef CMK_CHARE_USE_PTR
2022   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2023         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2024         delete obj;
2025         CkpvAccess(chare_objs)[i] = NULL;
2026   }
2027   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2028         VidBlock *obj = CkpvAccess(vidblocks)[i];
2029         delete obj;
2030         CkpvAccess(vidblocks)[i] = NULL;
2031   }
2032 #endif
2033
2034   // delete all array elements
2035   for(i=0;i<numGroups;i++) {
2036     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2037     if(obj && obj->isLocMgr())  {
2038       CkLocMgr *mgr = (CkLocMgr*)obj;
2039       ElementDestroyer destroyer(mgr);
2040       mgr->iterate(destroyer);
2041     }
2042   }
2043
2044   // delete all groups
2045   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2046   for(i=0;i<numGroups;i++) {
2047     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2048     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2049     if (obj) delete obj;
2050   }
2051   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2052
2053   // delete all node groups
2054   if (CkMyRank() == 0) {
2055     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2056     for(i=0;i<numNodeGroups;i++) {
2057       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2058       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2059       if (obj) delete obj;
2060     }
2061   }
2062 }
2063
2064 //------------------- Message Watcher (record/replay) ----------------
2065
2066 #include "crc32.h"
2067
2068 CkpvDeclare(int, envelopeEventID);
2069 int _recplay_crc = 0;
2070 int _recplay_checksum = 0;
2071 int _recplay_logsize = 1024*1024;
2072
2073 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2074 #define REPLAYDEBUG(args) /* empty */
2075
2076 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2077
2078 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2079
2080 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2081
2082     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2083     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2084     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2085     FILE *f=fopen(fName,permissions);
2086     REPLAYDEBUG("openReplayfile "<<fName);
2087     if (f==NULL) {
2088         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2089             CkMyPe(),fName,permissions);
2090         CkAbort("openReplayFile> Could not open replay file");
2091     }
2092     return f;
2093 }
2094
2095 #include "BaseLB.h" /* For LBMigrateMsg message */
2096
2097 class CkMessageRecorder : public CkMessageWatcher {
2098   char *buffer;
2099   unsigned int curpos;
2100   bool firstOpen;
2101 public:
2102   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2103   ~CkMessageRecorder() {
2104     flushLog(0);
2105     fprintf(f,"-1 -1 -1 ");
2106     fclose(f);
2107     delete[] buffer;
2108 #if 0
2109     FILE *stsfp = fopen("sts", "w");
2110     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2111     traceWriteSTS(stsfp, 0);
2112     fclose(stsfp);
2113 #endif
2114     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2115   }
2116
2117 private:
2118   void flushLog(int verbose=1) {
2119     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2120     fprintf(f, "%s", buffer);
2121     curpos=0;
2122   }
2123   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2124     if ((*envptr)->getEvent()) {
2125       bool wasPacked = (*envptr)->isPacked();
2126       if (!wasPacked) CkPackMessage(envptr);
2127       envelope *env = *envptr;
2128       unsigned int crc1=0, crc2=0;
2129       if (_recplay_crc) {
2130         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2131         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2132         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2133       } else if (_recplay_checksum) {
2134         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2135         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2136       }
2137       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2138       if (curpos > _recplay_logsize-128) flushLog();
2139       if (!wasPacked) CkUnpackMessage(envptr);
2140     }
2141     return CmiTrue;
2142   }
2143   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2144     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2145     if (curpos > _recplay_logsize-128) flushLog();
2146     return CmiTrue;
2147   }
2148   
2149   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2150     FILE *f;
2151     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2152     else f = openReplayFile("ckreplay_",".lb","a");
2153     firstOpen = false;
2154     if (f != NULL) {
2155       PUP::toDisk p(f);
2156       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2157       (*msg)->pup(p);
2158       fclose(f);
2159     }
2160     return CmiTrue;
2161   }
2162 };
2163
2164 class CkMessageDetailRecorder : public CkMessageWatcher {
2165 public:
2166   CkMessageDetailRecorder(FILE *f_) {
2167     f=f_;
2168     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2169      * The value of 'x' is the pointer size.
2170      */
2171     CmiUInt2 little = sizeof(void*);
2172     fwrite(&little, 2, 1, f);
2173   }
2174   ~CkMessageDetailRecorder() {fclose(f);}
2175 private:
2176   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2177     bool wasPacked = (*envptr)->isPacked();
2178     if (!wasPacked) CkPackMessage(envptr);
2179     envelope *env = *envptr;
2180     CmiUInt4 size = env->getTotalsize();
2181     fwrite(&size, 4, 1, f);
2182     fwrite(env, env->getTotalsize(), 1, f);
2183     if (!wasPacked) CkUnpackMessage(envptr);
2184     return CmiTrue;
2185   }
2186 };
2187
2188 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2189 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2190
2191 #if CMK_BIGSIM_CHARM
2192 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2193                                    int pb,unsigned int *prio);
2194 #endif
2195
2196 class CkMessageReplay : public CkMessageWatcher {
2197   int counter;
2198         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2199         int nextEP;
2200         unsigned int crc1, crc2;
2201         FILE *lbFile;
2202         /// Read the next message we need from the file:
2203         void getNext(void) {
2204           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2205           if (nextSize > 0) {
2206             // We are reading a regular message
2207             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2208               CkAbort("CkMessageReplay> Syntax error reading replay file");
2209             }
2210             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2211           } else if (nextSize == -2) {
2212             // We are reading a special message (right now only thread awaken)
2213             // Nothing to do since we have already read all info
2214             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2215           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2216             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2217             CkAbort("CkMessageReplay> Unrecognized input");
2218           }
2219             /*
2220                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2221                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2222                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2223                 }
2224                 */
2225                 counter++;
2226         }
2227         /// If this is the next message we need, advance and return CmiTrue.
2228         CmiBool isNext(envelope *env) {
2229                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2230                 if (nextEvent!=env->getEvent()) return CmiFalse;
2231                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2232 #if 1
2233                 if (nextEP != env->getEpIdx()) {
2234                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2235                         return CmiFalse;
2236                 }
2237 #endif
2238 #if ! CMK_BIGSIM_CHARM
2239                 if (nextSize!=env->getTotalsize())
2240                 {
2241                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2242                         return CmiFalse;
2243                 }
2244                 if (_recplay_crc || _recplay_checksum) {
2245                   bool wasPacked = env->isPacked();
2246                   if (!wasPacked) CkPackMessage(&env);
2247                   if (_recplay_crc) {
2248                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2249                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2250                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2251                     if (crcnew1 != crc1) {
2252                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2253                     }
2254                     if (crcnew2 != crc2) {
2255                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2256                     }
2257                   } else if (_recplay_checksum) {
2258             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2259             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2260             if (crcnew1 != crc1) {
2261               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2262             }
2263             if (crcnew2 != crc2) {
2264               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2265             }               
2266                   }
2267                   if (!wasPacked) CkUnpackMessage(&env);
2268                 }
2269 #endif
2270                 return CmiTrue;
2271         }
2272         CmiBool isNext(CthThreadToken *token) {
2273           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2274           return CmiFalse;
2275         }
2276
2277         /// This is a (short) list of messages we aren't yet ready for:
2278         CkQ<envelope *> delayedMessages;
2279         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2280         CkQ<CthThreadToken *> delayedTokens;
2281
2282         /// Try to flush out any delayed messages
2283         void flush(void) {
2284           if (nextSize>0) {
2285                 int len=delayedMessages.length();
2286                 for (int i=0;i<len;i++) {
2287                         envelope *env=delayedMessages.deq();
2288                         if (isNext(env)) { /* this is the next message: process it */
2289                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2290                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2291                                 return;
2292                         }
2293                         else /* Not ready yet-- put it back in the
2294                                 queue */
2295                           {
2296                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2297                                 delayedMessages.enq(env);
2298                           }
2299                 }
2300           } else if (nextSize==-2) {
2301             int len=delayedTokens.length();
2302             for (int i=0;i<len;++i) {
2303               CthThreadToken *token=delayedTokens.deq();
2304               if (isNext(token)) {
2305             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2306 #if ! CMK_BIGSIM_CHARM
2307                 CsdEnqueueLifo((void*)token);
2308 #else
2309                 CthEnqueueBigSimThread(token,0,0,NULL);
2310 #endif
2311                 return;
2312               } else {
2313             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2314                 delayedTokens.enq(token);
2315               }
2316             }
2317           }
2318         }
2319
2320 public:
2321         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2322           counter=0;
2323           f=f_;
2324           getNext();
2325           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2326           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2327         }
2328         ~CkMessageReplay() {fclose(f);}
2329
2330 private:
2331         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2332           bool wasPacked = (*envptr)->isPacked();
2333           if (!wasPacked) CkPackMessage(envptr);
2334           envelope *env = *envptr;
2335           //CkAssert(*(int*)env == 0x34567890);
2336           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2337                 if (env->getEvent() == 0) return CmiTrue;
2338                 if (isNext(env)) { /* This is the message we were expecting */
2339                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2340                         getNext(); /* Advance over this message */
2341                         flush(); /* try to process queued-up stuff */
2342                         if (!wasPacked) CkUnpackMessage(envptr);
2343                         return CmiTrue;
2344                 }
2345 #if CMK_SMP
2346                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2347                          // try next rank, we can't just buffer the msg and left
2348                          // we need to keep unprocessed msg on the fly
2349                         int nextpe = CkMyPe()+1;
2350                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2351                         nextpe = CkNodeFirst(CkMyNode());
2352                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2353                         return CmiFalse;
2354                 }
2355 #endif
2356                 else /*!isNext(env) */ {
2357                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2358                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2359                         delayedMessages.enq(env);
2360                         flush();
2361                         return CmiFalse;
2362                 }
2363         }
2364         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2365       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2366           if (isNext(token)) {
2367         REPLAYDEBUG("Executing token: "<<token->serialNo)
2368             getNext();
2369             flush();
2370             return CmiTrue;
2371           } else {
2372         REPLAYDEBUG("Queueing token: "<<token->serialNo
2373             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2374             delayedTokens.enq(token);
2375             return CmiFalse;
2376           }
2377         }
2378
2379         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2380           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2381           if (lbFile != NULL) {
2382             int num_moves;
2383         PUP::fromDisk p(lbFile);
2384             p | num_moves;
2385             if (num_moves != (*msg)->n_moves) {
2386               delete *msg;
2387               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2388             }
2389             (*msg)->pup(p);
2390           }
2391           return CmiTrue;
2392         }
2393 };
2394
2395 class CkMessageDetailReplay : public CkMessageWatcher {
2396   void *getNext() {
2397     CmiUInt4 size; size_t nread;
2398     if ((nread=fread(&size, 4, 1, f)) < 1) {
2399       if (feof(f)) return NULL;
2400       CkPrintf("Broken record file (metadata) got %d\n",nread);
2401       CkAbort("");
2402     }
2403     void *env = CmiAlloc(size);
2404     long tell = ftell(f);
2405     if ((nread=fread(env, size, 1, f)) < 1) {
2406       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2407       CkAbort("");
2408     }
2409     //*(int*)env = 0x34567890; // set first integer as magic
2410     return env;
2411   }
2412 public:
2413   double starttime;
2414   CkMessageDetailReplay(FILE *f_) {
2415     f=f_;
2416     starttime=CkWallTimer();
2417     /* This must match what CkMessageDetailRecorder did */
2418     CmiUInt2 little;
2419     fread(&little, 2, 1, f);
2420     if (little != sizeof(void*)) {
2421       CkAbort("Replaying on a different architecture from which recording was done!");
2422     }
2423
2424     CsdEnqueue(getNext());
2425
2426     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2427   }
2428   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2429     void *msg = getNext();
2430     if (msg != NULL) CsdEnqueue(msg);
2431     return CmiTrue;
2432   }
2433 };
2434
2435 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2436 #if ! CMK_BIGSIM_CHARM
2437   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2438 #endif
2439   CkMessageReplay *replay = (CkMessageReplay*)rep;
2440   //CmiStartQD(CkMessageReplayQuiescence, replay);
2441 }
2442
2443 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2444   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2445   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2446   ConverseExit();
2447 }
2448
2449 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2450   CkCoreState *ck = CkpvAccess(_coreState);
2451   if (ck->watcher!=NULL) {
2452     return ck->watcher->processThread(token,ck);
2453   }
2454   return CmiTrue;
2455 }
2456
2457 CpvCExtern(int, CthResumeNormalThreadIdx);
2458 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2459 {
2460   CthThread t = token->thread;
2461
2462   if(t == NULL){
2463     free(token);
2464     return;
2465   }
2466 #if CMK_TRACE_ENABLED
2467 #if ! CMK_TRACE_IN_CHARM
2468   if(CpvAccess(traceOn))
2469     CthTraceResume(t);
2470 /*    if(CpvAccess(_traceCoreOn)) 
2471             resumeTraceCore();*/
2472 #endif
2473 #endif
2474   
2475   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2476   if (CpdExecuteThreadResume(token)) {
2477     CthResume(t);
2478   }
2479 }
2480
2481 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2482   CkCoreState *ck = CkpvAccess(_coreState);
2483   if (ck->watcher!=NULL) {
2484     ck->watcher->processLBMessage(msg, ck);
2485   }
2486 }
2487
2488 #if CMK_BIGSIM_CHARM
2489 CpvExtern(int      , CthResumeBigSimThreadIdx);
2490 #endif
2491
2492 #include "ckliststring.h"
2493 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2494     CmiArgGroup("Charm++","Record/Replay");
2495     CmiBool forceReplay = CmiFalse;
2496     char *procs = NULL;
2497     _replaySystem = 0;
2498     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2499       _recplay_crc = 1;
2500     }
2501     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2502       _recplay_checksum = 1;
2503     }
2504     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2505     REPLAYDEBUG("CkMessageWatcherInit ");
2506     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2507         CkListString list(procs);
2508         if (list.includes(CkMyPe())) {
2509           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2510           CpdSetInitializeMemory(1);
2511           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2512         }
2513     }
2514     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2515       if (CkMyPe() == 0) {
2516         CmiPrintf("Charm++> record mode.\n");
2517         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2518           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2519           _recplay_crc = _recplay_checksum = 0;
2520         }
2521       }
2522       CpdSetInitializeMemory(1);
2523       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2524       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2525     }
2526         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2527             forceReplay = CmiTrue;
2528             CpdSetInitializeMemory(1);
2529             // Set the parameters of the processor
2530 #if CMK_SHARED_VARS_UNAVAILABLE
2531             _Cmi_mype = atoi(procs);
2532             while (procs[0]!='/') procs++;
2533             procs++;
2534             _Cmi_numpes = atoi(procs);
2535 #else
2536             CkAbort("+replay-detail available only for non-SMP build");
2537 #endif
2538             _replaySystem = 1;
2539             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2540         }
2541         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2542           if (CkMyPe() == 0)  {
2543             CmiPrintf("Charm++> replay mode.\n");
2544             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2545               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2546               _recplay_crc = _recplay_checksum = 0;
2547             }
2548           }
2549           CpdSetInitializeMemory(1);
2550 #if ! CMK_BIGSIM_CHARM
2551           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2552 #else
2553           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2554 #endif
2555           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2556         }
2557         if (_recplay_crc && _recplay_checksum) {
2558           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2559         }
2560 }
2561
2562 extern "C"
2563 int CkMessageToEpIdx(void *msg) {
2564         envelope *env=UsrToEnv(msg);
2565         int ep=env->getEpIdx();
2566         if (ep==CkIndex_CkArray::recvBroadcast(0))
2567                 return env->getsetArrayBcastEp();
2568         else
2569                 return ep;
2570 }
2571
2572 extern "C"
2573 int getCharmEnvelopeSize() {
2574   return sizeof(envelope);
2575 }
2576
2577
2578 #include "CkMarshall.def.h"
2579