af82a3d93f53d0afd7cac6d159e936f360b5d1e1
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 extern "C"
480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
481 {
482   UsrToEnv(msg)->setRef(ref);
483 }
484
485 extern "C"
486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
487 {
488   return UsrToEnv(msg)->getRef();
489 }
490
491 extern "C"
492 int CkGetSrcPe(void *msg)
493 {
494   return UsrToEnv(msg)->getSrcPe();
495 }
496
497 extern "C"
498 int CkGetSrcNode(void *msg)
499 {
500   return CmiNodeOf(CkGetSrcPe(msg));
501 }
502
503 extern "C"
504 void *CkLocalBranch(CkGroupID gID) {
505   return _localBranch(gID);
506 }
507
508 static
509 void *_ckLocalNodeBranch(CkGroupID groupID) {
510   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
511   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
512   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
513   return retval;
514 }
515
516 extern "C"
517 void *CkLocalNodeBranch(CkGroupID groupID)
518 {
519   void *retval;
520   // we are called in a constructor
521   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
522     return CkpvAccess(_currentNodeGroupObj);
523   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
524   { // Nodegroup hasn't finished being created yet-- schedule...
525     CsdScheduler(0);
526   }
527   return retval;
528 }
529
530 extern "C"
531 void *CkLocalChare(const CkChareID *pCid)
532 {
533         int pe=pCid->onPE;
534         if (pe<0) { //A virtual chare ID
535                 if (pe!=(-(CkMyPe()+1)))
536                         return NULL;//VID block not on this PE
537 #ifdef CMK_CHARE_USE_PTR
538                 VidBlock *v=(VidBlock *)pCid->objPtr;
539 #else
540                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
541 #endif
542                 return v->getLocalChareObj();
543         }
544         else
545         { //An ordinary chare ID
546                 if (pe!=CkMyPe())
547                         return NULL;//Chare not on this PE
548 #ifdef CMK_CHARE_USE_PTR
549                 return pCid->objPtr;
550 #else
551                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
552 #endif
553         }
554 }
555
556 CkpvDeclare(char**,Ck_argv);
557
558 extern "C" char **CkGetArgv(void) {
559         return CkpvAccess(Ck_argv);
560 }
561 extern "C" int CkGetArgc(void) {
562         return CmiGetArgc(CkpvAccess(Ck_argv));
563 }
564
565 /******************** Basic support *****************/
566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
567 {
568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
569         CpvAccess(_currentObj) = (Chare *)obj;
570 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
571 #endif
572   //BIGSIM_OOC DEBUGGING
573   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574   //fflush(stdout);
575 #if CMK_CHARMDEBUG
576   CpdBeforeEp(epIdx, obj, msg);
577 #endif    
578   _entryTable[epIdx]->call(msg, obj);
579 #if CMK_CHARMDEBUG
580   CpdAfterEp(epIdx);
581 #endif
582   if (_entryTable[epIdx]->noKeep)
583   { /* Method doesn't keep/delete the message, so we have to: */
584     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585   }
586 }
587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
588 {
589   //BIGSIM_OOC DEBUGGING
590   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
591   //fflush(stdout);
592
593   void *deliverMsg;
594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
595         CpvAccess(_currentObj) = (Chare *)obj;
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Deliver a read-only copy of the message */
599     deliverMsg=(void *)msg;
600   } else
601   { /* Method needs a copy of the message to keep/delete */
602     void *oldMsg=(void *)msg;
603     deliverMsg=CkCopyMsg(&oldMsg);
604 #if CMK_ERROR_CHECKING
605     if (oldMsg!=msg)
606       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
607 #endif
608   }
609 #if CMK_CHARMDEBUG
610   CpdBeforeEp(epIdx, obj, (void*)msg);
611 #endif
612   _entryTable[epIdx]->call(deliverMsg, obj);
613 #if CMK_CHARMDEBUG
614   CpdAfterEp(epIdx);
615 #endif
616 }
617
618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
619 {
620   register void *msg = EnvToUsr(env);
621   _SET_USED(env, 0);
622   CkDeliverMessageFree(epIdx,msg,obj);
623 }
624
625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 {
627
628 #if CMK_TRACE_ENABLED 
629   if (_entryTable[epIdx]->traceEnabled) {
630     _TRACE_BEGIN_EXECUTE(env);
631     _invokeEntryNoTrace(epIdx,env,obj);
632     _TRACE_END_EXECUTE();
633   }
634   else
635 #endif
636     _invokeEntryNoTrace(epIdx,env,obj);
637
638 }
639
640 /********************* Creation ********************/
641
642 extern "C"
643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
644 {
645   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
646   envelope *env = UsrToEnv(msg);
647   _CHECK_USED(env);
648   if(pCid == 0) {
649     env->setMsgtype(NewChareMsg);
650   } else {
651     pCid->onPE = (-(CkMyPe()+1));
652     //  pCid->magic = _GETIDX(cIdx);
653     pCid->objPtr = (void *) new VidBlock();
654     _MEMCHECK(pCid->objPtr);
655     env->setMsgtype(NewVChareMsg);
656     env->setVidPtr(pCid->objPtr);
657 #ifndef CMK_CHARE_USE_PTR
658     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
659     int idx = CkpvAccess(vidblocks).size()-1;
660     pCid->objPtr = (void *)(CmiIntPtr)idx;
661     env->setVidPtr((void *)(CmiIntPtr)idx);
662 #endif
663   }
664   env->setEpIdx(eIdx);
665   env->setSrcPe(CkMyPe());
666   CmiSetHandler(env, _charmHandlerIdx);
667   _TRACE_CREATION_1(env);
668   CpvAccess(_qd)->create();
669   _STATS_RECORD_CREATE_CHARE_1();
670   _SET_USED(env, 1);
671   if(destPE == CK_PE_ANY)
672     env->setForAnyPE(1);
673   else
674     env->setForAnyPE(0);
675   _CldEnqueue(destPE, env, _infoIdx);
676   _TRACE_CREATION_DONE(1);
677 }
678
679 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
680 {
681   register int gIdx = _entryTable[epIdx]->chareIdx;
682   register void *obj = malloc(_chareTable[gIdx]->size);
683   _MEMCHECK(obj);
684   setMemoryTypeChare(obj);
685   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
686   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
687   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
688   CkpvAccess(_groupIDTable)->push_back(groupID);
689   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
690   if(ptrq) {
691     void *pending;
692     while((pending=ptrq->deq())!=0)
693       _CldEnqueue(CkMyPe(), pending, _infoIdx);
694 //    delete ptrq;
695       CkpvAccess(_groupTable)->find(groupID).clearPending();
696   }
697   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
698
699   CkpvAccess(_currentGroup) = groupID;
700   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
701 #ifndef CMK_CHARE_USE_PTR
702   //((Chare *)obj)->chareIdx = -1;
703   CkpvAccess(currentChareIdx) = -1;
704 #endif
705   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
706   _STATS_RECORD_PROCESS_GROUP_1();
707 }
708
709 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
710 {
711   register int gIdx = _entryTable[epIdx]->chareIdx;
712   int objSize=_chareTable[gIdx]->size;
713   register void *obj = malloc(objSize);
714   _MEMCHECK(obj);
715   setMemoryTypeChare(obj);
716   CkpvAccess(_currentGroup) = groupID;
717
718 // Now that the NodeGroup is created, add it to the table.
719 //  NodeGroups can be accessed by multiple processors, so
720 //  this is in the opposite order from groups - invoking the constructor
721 //  before registering it.
722 // User may call CkLocalNodeBranch() inside the nodegroup constructor
723 //  store nodegroup into _currentNodeGroupObj
724   CkpvAccess(_currentNodeGroupObj) = obj;
725 #ifndef CMK_CHARE_USE_PTR
726   //((Chare *)obj)->chareIdx = -1;
727   CkpvAccess(currentChareIdx) = -1;
728 #endif
729   _invokeEntryNoTrace(epIdx,env,obj);
730   CkpvAccess(_currentNodeGroupObj) = NULL;
731   _STATS_RECORD_PROCESS_NODE_GROUP_1();
732
733   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
734   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
735   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
736   CksvAccess(_nodeGroupIDTable).push_back(groupID);
737
738   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
739   if(ptrq) {
740     void *pending;
741     while((pending=ptrq->deq())!=0)
742       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
743 //    delete ptrq;
744       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
745   }
746   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
747 }
748
749 void _createGroup(CkGroupID groupID, envelope *env)
750 {
751   _CHECK_USED(env);
752   _SET_USED(env, 1);
753   register int epIdx = env->getEpIdx();
754   int gIdx = _entryTable[epIdx]->chareIdx;
755   CkNodeGroupID rednMgr;
756   if(_chareTable[gIdx]->isIrr == 0){
757                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
758                 rednMgr = rednMgrProxy;
759 //              rednMgrProxy.setAttachedGroup(groupID);
760   }else{
761         rednMgr.setZero();
762   }
763   env->setGroupNum(groupID);
764   env->setSrcPe(CkMyPe());
765   env->setRednMgr(rednMgr);
766   env->setGroupEpoch(CkpvAccess(_charmEpoch));
767
768   if(CkNumPes()>1) {
769     CkPackMessage(&env);
770     CmiSetHandler(env, _bocHandlerIdx);
771     _numInitMsgs++;
772     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
773     CpvAccess(_qd)->create(CkNumPes()-1);
774     CkUnpackMessage(&env);
775   }
776   _STATS_RECORD_CREATE_GROUP_1();
777   CkCreateLocalGroup(groupID, epIdx, env);
778 }
779
780 void _createNodeGroup(CkGroupID groupID, envelope *env)
781 {
782   _CHECK_USED(env);
783   _SET_USED(env, 1);
784   register int epIdx = env->getEpIdx();
785   env->setGroupNum(groupID);
786   env->setSrcPe(CkMyPe());
787   env->setGroupEpoch(CkpvAccess(_charmEpoch));
788   if(CkNumNodes()>1) {
789     CkPackMessage(&env);
790     CmiSetHandler(env, _bocHandlerIdx);
791     _numInitMsgs++;
792     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
793     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
794     CpvAccess(_qd)->create(CkNumNodes()-1);
795     CkUnpackMessage(&env);
796   }
797   _STATS_RECORD_CREATE_NODE_GROUP_1();
798   CkCreateLocalNodeGroup(groupID, epIdx, env);
799 }
800
801 // new _groupCreate
802
803 static CkGroupID _groupCreate(envelope *env)
804 {
805   register CkGroupID groupNum;
806
807   // check CkMyPe(). if it is 0 then idx is _numGroups++
808   // if not, then something else...
809   if(CkMyPe() == 0)
810      groupNum.idx = CkpvAccess(_numGroups)++;
811   else
812      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
813   _createGroup(groupNum, env);
814   return groupNum;
815 }
816
817 // new _nodeGroupCreate
818 static CkGroupID _nodeGroupCreate(envelope *env)
819 {
820   register CkGroupID groupNum;
821   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
822   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
823           groupNum.idx = CksvAccess(_numNodeGroups)++;
824    else
825           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
826   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
827   _createNodeGroup(groupNum, env);
828   return groupNum;
829 }
830
831 /**** generate the group idx when group is creator pe is not pe0
832  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
833  **** remaining bits contain the group creator processor number and
834  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
835
836 int _getGroupIdx(int numNodes,int myNode,int numGroups)
837 {
838         int idx;
839         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
840         int n = 32 - (x+1);                                     // number of bits remaining for the index
841         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
842                                                                 // then add the next available index
843         // of course this won't work when int is 8 bytes long on T3E
844         //idx |= 0x80000000;                                      // set the most significant bit to 1
845         idx = - idx;
846                                                                 // if int is not 32 bits, wouldn't this be wrong?
847         return idx;
848 }
849
850 extern "C"
851 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
852 {
853   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
854   register envelope *env = UsrToEnv(msg);
855   env->setMsgtype(BocInitMsg);
856   env->setEpIdx(eIdx);
857   env->setSrcPe(CkMyPe());
858   _TRACE_CREATION_N(env, CkNumPes());
859   CkGroupID gid = _groupCreate(env);
860   _TRACE_CREATION_DONE(1);
861   return gid;
862 }
863
864 extern "C"
865 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
866 {
867   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
868   register envelope *env = UsrToEnv(msg);
869   env->setMsgtype(NodeBocInitMsg);
870   env->setEpIdx(eIdx);
871   env->setSrcPe(CkMyPe());
872   _TRACE_CREATION_N(env, CkNumNodes());
873   CkGroupID gid = _nodeGroupCreate(env);
874   _TRACE_CREATION_DONE(1);
875   return gid;
876 }
877
878 static inline void *_allocNewChare(envelope *env, int &idx)
879 {
880   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
881   void *tmp=malloc(_chareTable[chareIdx]->size);
882   _MEMCHECK(tmp);
883 #ifndef CMK_CHARE_USE_PTR
884   CkpvAccess(chare_objs).push_back(tmp);
885   CkpvAccess(chare_types).push_back(chareIdx);
886   idx = CkpvAccess(chare_objs).size()-1;
887 #endif
888   setMemoryTypeChare(tmp);
889   return tmp;
890 }
891
892 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
893 {
894   int idx;
895   register void *obj = _allocNewChare(env, idx);
896 #ifndef CMK_CHARE_USE_PTR
897   //((Chare *)obj)->chareIdx = idx;
898   CkpvAccess(currentChareIdx) = idx;
899 #endif
900   _invokeEntry(env->getEpIdx(),env,obj);
901 }
902
903 void CkCreateLocalChare(int epIdx, envelope *env)
904 {
905   env->setEpIdx(epIdx);
906   _processNewChareMsg(NULL, env);
907 }
908
909 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
910 {
911   int idx;
912   register void *obj = _allocNewChare(env, idx);
913   register CkChareID *pCid = (CkChareID *)
914       _allocMsg(FillVidMsg, sizeof(CkChareID));
915   pCid->onPE = CkMyPe();
916 #ifndef CMK_CHARE_USE_PTR
917   pCid->objPtr = (void*)(CmiIntPtr)idx;
918 #else
919   pCid->objPtr = obj;
920 #endif
921   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
922   register envelope *ret = UsrToEnv(pCid);
923   ret->setVidPtr(env->getVidPtr());
924   register int srcPe = env->getSrcPe();
925   ret->setSrcPe(CkMyPe());
926   CmiSetHandler(ret, _charmHandlerIdx);
927   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
928 #ifndef CMK_CHARE_USE_PTR
929   // register the remote vidblock for deletion when chare is deleted
930   CkChareID vid;
931   vid.onPE = srcPe;
932   vid.objPtr = env->getVidPtr();
933   CkpvAccess(vmap)[idx] = vid;    
934 #endif
935   CpvAccess(_qd)->create();
936 #ifndef CMK_CHARE_USE_PTR
937   //((Chare *)obj)->chareIdx = idx;
938   CkpvAccess(currentChareIdx) = idx;
939 #endif
940   _invokeEntry(env->getEpIdx(),env,obj);
941 }
942
943 /************** Receive: Chares *************/
944
945 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
946 {
947   register int epIdx = env->getEpIdx();
948   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
949   register void *obj;
950   if (mainIdx != -1)  {           // mainchare
951     CmiAssert(CkMyPe()==0);
952     obj = _mainTable[mainIdx]->getObj();
953   }
954   else {
955 #ifndef CMK_CHARE_USE_PTR
956     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
957       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
958     else
959       obj = env->getObjPtr();
960 #else
961     obj = env->getObjPtr();
962 #endif
963   }
964   _invokeEntry(epIdx,env,obj);
965 }
966
967 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
968 {
969   register int epIdx = env->getEpIdx();
970   register void *obj = env->getObjPtr();
971   _invokeEntry(epIdx,env,obj);
972 }
973
974 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
975 {
976 #ifndef CMK_CHARE_USE_PTR
977   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
978 #else
979   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
980   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
981 #endif
982   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
983   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
984   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
985   CmiFree(env);
986 }
987
988 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
989 {
990 #ifndef CMK_CHARE_USE_PTR
991   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
992 #else
993   VidBlock *vptr = (VidBlock *) env->getVidPtr();
994   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
995 #endif
996   _SET_USED(env, 1);
997   vptr->send(env);
998 }
999
1000 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1001 {
1002 #ifndef CMK_CHARE_USE_PTR
1003   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1004   delete vptr;
1005   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1006 #endif
1007   CmiFree(env);
1008 }
1009
1010 /************** Receive: Groups ****************/
1011
1012 /**
1013  Return a pointer to the local BOC of "groupID".
1014  The message "env" passed in has some known dependency on this groupID
1015  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1016  Therefore, if the return value is NULL, this function buffers the massage so that
1017  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1018  The message passed in must have its handlers correctly set so that it can be
1019  scheduled again.
1020 */
1021 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1022 {
1023
1024         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1025         IrrGroup *obj = ck->localBranch(groupID);
1026         if (obj==NULL) { /* groupmember not yet created: stash message */
1027                 ck->getGroupTable()->find(groupID).enqMsg(env);
1028         }
1029         else { /* will be able to process message */
1030                 ck->process();
1031         }
1032         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1033         return obj;
1034 }
1035
1036 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1037 {
1038 #if CMK_LBDB_ON
1039   // if there is a running obj being measured, stop it temporarily
1040   LDObjHandle objHandle;
1041   int objstopped = 0;
1042   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1043   if (the_lbdb->RunningObject(&objHandle)) {
1044     objstopped = 1;
1045     the_lbdb->ObjectStop(objHandle);
1046   }
1047 #endif
1048   _invokeEntry(epIdx,env,obj);
1049 #if CMK_LBDB_ON
1050   if (objstopped) the_lbdb->ObjectStart(objHandle);
1051 #endif
1052   _STATS_RECORD_PROCESS_BRANCH_1();
1053 }
1054
1055 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1056 {
1057   register CkGroupID groupID =  env->getGroupNum();
1058   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1059   if(obj) {
1060     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1061   }
1062 }
1063
1064 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1065 {
1066   env->setMsgtype(ForChareMsg);
1067   env->setObjPtr(obj);
1068   _processForChareMsg(ck,env);
1069   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1070 }
1071
1072 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1073 {
1074   env->setEpIdx(epIdx);
1075   _deliverForNodeBocMsg(ck,env, obj);
1076 }
1077
1078 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1079 {
1080   register CkGroupID groupID = env->getGroupNum();
1081   register void *obj;
1082
1083   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1084   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1085   if(!obj) { // groupmember not yet created
1086 #if CMK_IMMEDIATE_MSG
1087     if (CmiIsImmediate(env))     // buffer immediate message
1088       CmiDelayImmediate();
1089     else
1090 #endif
1091     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1092     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1093     return;
1094   }
1095   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1096 #if CMK_IMMEDIATE_MSG
1097   if (!CmiIsImmediate(env))
1098 #endif
1099   ck->process();
1100   env->setMsgtype(ForChareMsg);
1101   env->setObjPtr(obj);
1102   _processForChareMsg(ck,env);
1103   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1104 }
1105
1106 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1107 {
1108   register CkGroupID groupID = env->getGroupNum();
1109   register int epIdx = env->getEpIdx();
1110   if (!env->getGroupDep().isZero()) {      // dependence
1111     CkGroupID dep = env->getGroupDep();
1112     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1113     if (obj == NULL) return;
1114   }
1115   else
1116     ck->process();
1117   CkCreateLocalGroup(groupID, epIdx, env);
1118 }
1119
1120 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1121 {
1122   register CkGroupID groupID = env->getGroupNum();
1123   register int epIdx = env->getEpIdx();
1124   CkCreateLocalNodeGroup(groupID, epIdx, env);
1125 }
1126
1127 /************** Receive: Arrays *************/
1128
1129 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1130   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1131   if (mgr) {
1132     _SET_USED(env, 0);
1133     mgr->insertElement((CkMessage *)EnvToUsr(env));
1134   }
1135 }
1136 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1137   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1138   if (mgr) {
1139     _SET_USED(env, 0);
1140     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1141   }
1142 }
1143
1144 //BIGSIM_OOC DEBUGGING
1145 #define TELLMSGTYPE(x) //x
1146
1147 /**
1148  * This is the main converse-level handler used by all of Charm++.
1149  *
1150  * \addtogroup CriticalPathFramework
1151  */
1152 void _processHandler(void *converseMsg,CkCoreState *ck)
1153 {
1154   register envelope *env = (envelope *) converseMsg;
1155
1156   MESSAGE_PHASE_CHECK(env);
1157
1158 //#if CMK_RECORD_REPLAY
1159   if (ck->watcher!=NULL) {
1160     if (!ck->watcher->processMessage(&env,ck)) return;
1161   }
1162 //#endif
1163 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1164         Chare *obj=NULL;
1165         CkObjID sender;
1166         MCount SN;
1167         MlogEntry *entry=NULL;
1168         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1169         env->getMsgtype() == ForArrayEltMsg){
1170                 sender = env->sender;
1171                 SN = env->SN;
1172                 int result = preProcessReceivedMessage(env,&obj,&entry);
1173                 if(result == 0){
1174                         return;
1175                 }
1176         }
1177 #endif
1178
1179 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1180   //  CkPrintf("START\n");
1181   criticalPath_start(env);
1182 #endif
1183
1184
1185   switch(env->getMsgtype()) {
1186 // Group support
1187     case BocInitMsg :
1188       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1189       // QD processing moved inside _processBocInitMsg because it is conditional
1190       //ck->process(); 
1191       if(env->isPacked()) CkUnpackMessage(&env);
1192       _processBocInitMsg(ck,env);
1193       break;
1194     case NodeBocInitMsg :
1195       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1196       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1197       _processNodeBocInitMsg(ck,env);
1198       break;
1199     case ForBocMsg :
1200       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1201       // QD processing moved inside _processForBocMsg because it is conditional
1202       if(env->isPacked()) CkUnpackMessage(&env);
1203       _processForBocMsg(ck,env);
1204       // stats record moved inside _processForBocMsg because it is conditional
1205       break;
1206     case ForNodeBocMsg :
1207       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1208       // QD processing moved to _processForNodeBocMsg because it is conditional
1209       if(env->isPacked()) CkUnpackMessage(&env);
1210       _processForNodeBocMsg(ck,env);
1211       // stats record moved to _processForNodeBocMsg because it is conditional
1212       break;
1213
1214 // Array support
1215     case ArrayEltInitMsg:
1216       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1217       if(env->isPacked()) CkUnpackMessage(&env);
1218       _processArrayEltInitMsg(ck,env);
1219       break;
1220     case ForArrayEltMsg:
1221       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1222       if(env->isPacked()) CkUnpackMessage(&env);
1223       _processArrayEltMsg(ck,env);
1224       break;
1225
1226 // Chare support
1227     case NewChareMsg :
1228       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1229       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1230       _processNewChareMsg(ck,env);
1231       _STATS_RECORD_PROCESS_CHARE_1();
1232       break;
1233     case NewVChareMsg :
1234       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1235       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1236       _processNewVChareMsg(ck,env);
1237       _STATS_RECORD_PROCESS_CHARE_1();
1238       break;
1239     case ForChareMsg :
1240       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1241       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1242       _processForPlainChareMsg(ck,env);
1243       _STATS_RECORD_PROCESS_MSG_1();
1244       break;
1245     case ForVidMsg   :
1246       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1247       ck->process();
1248       _processForVidMsg(ck,env);
1249       break;
1250     case FillVidMsg  :
1251       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1252       ck->process();
1253       _processFillVidMsg(ck,env);
1254       break;
1255     case DeleteVidMsg  :
1256       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1257       ck->process();
1258       _processDeleteVidMsg(ck,env);
1259       break;
1260
1261     default:
1262       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1263   }
1264 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1265         if(obj != NULL){
1266                 postProcessReceivedMessage(obj,sender,SN,entry);
1267         }
1268 #endif
1269
1270
1271 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1272   criticalPath_end();
1273   //  CkPrintf("STOP\n");
1274 #endif
1275
1276
1277 }
1278
1279
1280 /******************** Message Send **********************/
1281
1282 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1283              int *queueing, int *priobits, unsigned int **prioptr)
1284 {
1285   register envelope *env = (envelope *)converseMsg;
1286   *pfn = (CldPackFn)CkPackMessage;
1287   *len = env->getTotalsize();
1288   *queueing = env->getQueueing();
1289   *priobits = env->getPriobits();
1290   *prioptr = (unsigned int *) env->getPrioPtr();
1291 }
1292
1293 void CkPackMessage(envelope **pEnv)
1294 {
1295   register envelope *env = *pEnv;
1296   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1297     register void *msg = EnvToUsr(env);
1298     _TRACE_BEGIN_PACK();
1299     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1300     _TRACE_END_PACK();
1301     env=UsrToEnv(msg);
1302     env->setPacked(1);
1303     *pEnv = env;
1304   }
1305 }
1306
1307 void CkUnpackMessage(envelope **pEnv)
1308 {
1309   register envelope *env = *pEnv;
1310   register int msgIdx = env->getMsgIdx();
1311   if(env->isPacked()) {
1312     register void *msg = EnvToUsr(env);
1313     _TRACE_BEGIN_UNPACK();
1314     msg = _msgTable[msgIdx]->unpack(msg);
1315     _TRACE_END_UNPACK();
1316     env=UsrToEnv(msg);
1317     env->setPacked(0);
1318     *pEnv = env;
1319   }
1320 }
1321
1322 //There's no reason for most messages to go through the Cld--
1323 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1324 // Thus these accellerated versions of the Cld calls.
1325 #if CMK_OBJECT_QUEUE_AVAILABLE
1326 static int index_objectQHandler;
1327 #endif
1328 int index_tokenHandler;
1329 int index_skipCldHandler;
1330
1331 void _skipCldHandler(void *converseMsg)
1332 {
1333   register envelope *env = (envelope *)(converseMsg);
1334   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1335 #if CMK_GRID_QUEUE_AVAILABLE
1336   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1337     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1338                        env, env->getQueueing (), env->getPriobits (),
1339                        (unsigned int *) env->getPrioPtr ());
1340   } else {
1341     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1342                        env, env->getQueueing (), env->getPriobits (),
1343                        (unsigned int *) env->getPrioPtr ());
1344   }
1345 #else
1346   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1347         env, env->getQueueing(),env->getPriobits(),
1348         (unsigned int *)env->getPrioPtr());
1349 #endif
1350 }
1351
1352
1353 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1354 // Made non-static to be used by ckmessagelogging
1355 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1356 {
1357 #if CMK_CHARMDEBUG
1358   if (!ConverseDeliver(pe)) {
1359     CmiFree(env);
1360     return;
1361   }
1362 #endif
1363   if(pe == CkMyPe() ){
1364     if(!CmiNodeAlive(CkMyPe())){
1365         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1366 //      return;
1367     }
1368   }
1369   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1370 #if CMK_OBJECT_QUEUE_AVAILABLE
1371     Chare *obj = CkFindObjectPtr(env);
1372     if (obj && obj->CkGetObjQueue().queue()) {
1373       _enqObjQueue(obj, env);
1374     }
1375     else
1376 #endif
1377     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1378         env, env->getQueueing(),env->getPriobits(),
1379         (unsigned int *)env->getPrioPtr());
1380   } else {
1381     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1382       CkPackMessage(&env);
1383     int len=env->getTotalsize();
1384     CmiSetXHandler(env,CmiGetHandler(env));
1385 #if CMK_OBJECT_QUEUE_AVAILABLE
1386     CmiSetHandler(env,index_objectQHandler);
1387 #else
1388     CmiSetHandler(env,index_skipCldHandler);
1389 #endif
1390     CmiSetInfo(env,infoFn);
1391     if (pe==CLD_BROADCAST) {
1392 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1393                         CmiSyncBroadcast(len, (char *)env);
1394 #else
1395                         CmiSyncBroadcastAndFree(len, (char *)env); 
1396 #endif
1397
1398 }
1399     else if (pe==CLD_BROADCAST_ALL) { 
1400 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1401                         CmiSyncBroadcastAll(len, (char *)env);
1402 #else
1403                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1404 #endif
1405
1406 }
1407     else{
1408 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1409                         CmiSyncSend(pe, len, (char *)env);
1410 #else
1411                         CmiSyncSendAndFree(pe, len, (char *)env);
1412 #endif
1413
1414                 }
1415   }
1416 }
1417
1418 #if CMK_BIGSIM_CHARM
1419 #   define  _skipCldEnqueue   _CldEnqueue
1420 #endif
1421
1422 // by pass Charm++ priority queue, send as Converse message
1423 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1424 {
1425 #if CMK_CHARMDEBUG
1426   if (!ConverseDeliver(-1)) {
1427     CmiFree(env);
1428     return;
1429   }
1430 #endif
1431   CkPackMessage(&env);
1432   int len=env->getTotalsize();
1433   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1434 }
1435
1436 static void _noCldEnqueue(int pe, envelope *env)
1437 {
1438 /*
1439   if (pe == CkMyPe()) {
1440     CmiHandleMessage(env);
1441   } else
1442 */
1443 #if CMK_CHARMDEBUG
1444   if (!ConverseDeliver(pe)) {
1445     CmiFree(env);
1446     return;
1447   }
1448 #endif
1449   CkPackMessage(&env);
1450   int len=env->getTotalsize();
1451   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1452   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1453   else CmiSyncSendAndFree(pe, len, (char *)env);
1454 }
1455
1456 //static void _noCldNodeEnqueue(int node, envelope *env)
1457 //Made non-static to be used by ckmessagelogging
1458 void _noCldNodeEnqueue(int node, envelope *env)
1459 {
1460 /*
1461   if (node == CkMyNode()) {
1462     CmiHandleMessage(env);
1463   } else {
1464 */
1465 #if CMK_CHARMDEBUG
1466   if (!ConverseDeliver(node)) {
1467     CmiFree(env);
1468     return;
1469   }
1470 #endif
1471   CkPackMessage(&env);
1472   int len=env->getTotalsize();
1473   if (node==CLD_BROADCAST) { 
1474 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1475         CmiSyncNodeBroadcast(len, (char *)env);
1476 #else
1477         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1478 #endif
1479 }
1480   else if (node==CLD_BROADCAST_ALL) { 
1481 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1482                 CmiSyncNodeBroadcastAll(len, (char *)env);
1483 #else
1484                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1485 #endif
1486
1487 }
1488   else {
1489 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1490         CmiSyncNodeSend(node, len, (char *)env);
1491 #else
1492         CmiSyncNodeSendAndFree(node, len, (char *)env);
1493 #endif
1494   }
1495 }
1496
1497 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1498 {
1499   register envelope *env = UsrToEnv(msg);
1500   _CHECK_USED(env);
1501   _SET_USED(env, 1);
1502 #if CMK_REPLAYSYSTEM
1503   setEventID(env);
1504 #endif
1505   env->setMsgtype(ForChareMsg);
1506   env->setEpIdx(eIdx);
1507   env->setSrcPe(CkMyPe());
1508 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1509   criticalPath_send(env);
1510   automaticallySetMessagePriority(env);
1511 #endif
1512 #if CMK_CHARMDEBUG
1513   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1514 #endif
1515 #if CMK_OBJECT_QUEUE_AVAILABLE
1516   CmiSetHandler(env, index_objectQHandler);
1517 #else
1518   CmiSetHandler(env, _charmHandlerIdx);
1519 #endif
1520   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1521     register int pe = -(pCid->onPE+1);
1522     if(pe==CkMyPe()) {
1523 #ifndef CMK_CHARE_USE_PTR
1524       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1525 #else
1526       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1527 #endif
1528       void *objPtr;
1529       if (NULL!=(objPtr=vblk->getLocalChare()))
1530       { //A ready local chare
1531         env->setObjPtr(objPtr);
1532         return pe;
1533       }
1534       else { //The vidblock is not ready-- forget it
1535         vblk->send(env);
1536         return -1;
1537       }
1538     } else { //Valid vidblock for another PE:
1539       env->setMsgtype(ForVidMsg);
1540       env->setVidPtr(pCid->objPtr);
1541       return pe;
1542     }
1543   }
1544   else {
1545     env->setObjPtr(pCid->objPtr);
1546     return pCid->onPE;
1547   }
1548 }
1549
1550 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1551 {
1552   int destPE = _prepareMsg(eIdx, msg, pCid);
1553   if (destPE != -1) {
1554     register envelope *env = UsrToEnv(msg);
1555 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1556     criticalPath_send(env);
1557     automaticallySetMessagePriority(env);
1558 #endif
1559     CmiBecomeImmediate(env);
1560   }
1561   return destPE;
1562 }
1563
1564 extern "C"
1565 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1566 {
1567   if (opts & CK_MSG_INLINE) {
1568     CkSendMsgInline(entryIdx, msg, pCid, opts);
1569     return;
1570   }
1571 #if CMK_ERROR_CHECKING
1572   if (opts & CK_MSG_IMMEDIATE) {
1573     CmiAbort("Immediate message is not allowed in Chare!");
1574   }
1575 #endif
1576   register envelope *env = UsrToEnv(msg);
1577   int destPE=_prepareMsg(entryIdx,msg,pCid);
1578   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1579   // VidBlock was not yet filled). The problem is that the creation was never
1580   // traced later when the VidBlock was filled. One solution is to trace the
1581   // creation here, the other to trace it in VidBlock->msgDeliver().
1582   _TRACE_CREATION_1(env);
1583   if (destPE!=-1) {
1584     CpvAccess(_qd)->create();
1585     if (opts & CK_MSG_SKIP_OR_IMM)
1586       _noCldEnqueue(destPE, env);
1587     else
1588       _CldEnqueue(destPE, env, _infoIdx);
1589   }
1590   _TRACE_CREATION_DONE(1);
1591 }
1592
1593 extern "C"
1594 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1595 {
1596   if (pCid->onPE==CkMyPe())
1597   {
1598     if(!CmiNodeAlive(CkMyPe())){
1599         return;
1600     }
1601 #if CMK_CHARMDEBUG
1602     //Just in case we need to breakpoint or use the envelope in some way
1603     _prepareMsg(entryIndex,msg,pCid);
1604 #endif
1605                 //Just directly call the chare (skip QD handling & scheduler)
1606     register envelope *env = UsrToEnv(msg);
1607     if (env->isPacked()) CkUnpackMessage(&env);
1608     _STATS_RECORD_PROCESS_MSG_1();
1609     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1610   }
1611   else {
1612     //No way to inline a cross-processor message:
1613     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1614   }
1615 }
1616
1617 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1618 {
1619   register envelope *env = UsrToEnv(msg);
1620 #if CMK_ERROR_CHECKING
1621   CkNodeGroupID nodeRedMgr;
1622 #endif
1623   _CHECK_USED(env);
1624   _SET_USED(env, 1);
1625 #if CMK_REPLAYSYSTEM
1626   setEventID(env);
1627 #endif
1628   env->setMsgtype(type);
1629   env->setEpIdx(eIdx);
1630   env->setGroupNum(gID);
1631   env->setSrcPe(CkMyPe());
1632 #if CMK_ERROR_CHECKING
1633   nodeRedMgr.setZero();
1634   env->setRednMgr(nodeRedMgr);
1635 #endif
1636 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1637   criticalPath_send(env);
1638   automaticallySetMessagePriority(env);
1639 #endif
1640 #if CMK_CHARMDEBUG
1641   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1642 #endif
1643   CmiSetHandler(env, _charmHandlerIdx);
1644   return env;
1645 }
1646
1647 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1648 {
1649   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1650 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1651   criticalPath_send(env);
1652   automaticallySetMessagePriority(env);
1653 #endif
1654   CmiBecomeImmediate(env);
1655   return env;
1656 }
1657
1658 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1659                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1660 {
1661   int numPes;
1662   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1663 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1664   sendTicketGroupRequest(env,pe,_infoIdx);
1665 #else
1666   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1667   _TRACE_CREATION_N(env, numPes);
1668   if (opts & CK_MSG_SKIP_OR_IMM)
1669     _noCldEnqueue(pe, env);
1670   else
1671     _skipCldEnqueue(pe, env, _infoIdx);
1672   _TRACE_CREATION_DONE(1);
1673 #endif
1674 }
1675
1676 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1677                            int npes, int *pes)
1678 {
1679   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1680   _TRACE_CREATION_MULTICAST(env, npes, pes);
1681   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1682   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1683 }
1684
1685 extern "C"
1686 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1687 {
1688 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1689   if (destPE==CkMyPe())
1690   {
1691     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1692     return;
1693   }
1694   //Can't inline-- send the usual way
1695   register envelope *env = UsrToEnv(msg);
1696   int numPes;
1697   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1698   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1699   _TRACE_CREATION_N(env, numPes);
1700   _noCldEnqueue(destPE, env);
1701   _STATS_RECORD_SEND_BRANCH_1();
1702   CkpvAccess(_coreState)->create();
1703   _TRACE_CREATION_DONE(1);
1704 #else
1705   // no support for immediate message, send inline
1706   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1707 #endif
1708 }
1709
1710 extern "C"
1711 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1712 {
1713   if (destPE==CkMyPe())
1714   {
1715     if(!CmiNodeAlive(CkMyPe())){
1716         return;
1717     }
1718     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1719     if (obj!=NULL)
1720     { //Just directly call the group:
1721 #if CMK_ERROR_CHECKING
1722       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1723 #else
1724       envelope *env=UsrToEnv(msg);
1725 #endif
1726       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1727       return;
1728     }
1729   }
1730   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1731   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1732 }
1733
1734 extern "C"
1735 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1736 {
1737   if (opts & CK_MSG_INLINE) {
1738     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1739     return;
1740   }
1741   if (opts & CK_MSG_IMMEDIATE) {
1742     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1743     return;
1744   }
1745   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1746   _STATS_RECORD_SEND_BRANCH_1();
1747   CkpvAccess(_coreState)->create();
1748 }
1749
1750 extern "C"
1751 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1752 {
1753 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1754   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1755   _TRACE_CREATION_MULTICAST(env, npes, pes);
1756   _noCldEnqueueMulti(npes, pes, env);
1757   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1758 #else
1759   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1760   CpvAccess(_qd)->create(-npes);
1761 #endif
1762   _STATS_RECORD_SEND_BRANCH_N(npes);
1763   CpvAccess(_qd)->create(npes);
1764 }
1765
1766 extern "C"
1767 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1768 {
1769   if (opts & CK_MSG_IMMEDIATE) {
1770     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1771     return;
1772   }
1773     // normal mesg
1774   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1775   _STATS_RECORD_SEND_BRANCH_N(npes);
1776   CpvAccess(_qd)->create(npes);
1777 }
1778
1779 extern "C"
1780 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1781 {
1782   int npes;
1783   int *pes;
1784   if (opts & CK_MSG_IMMEDIATE) {
1785     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1786     return;
1787   }
1788     // normal mesg
1789   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1790   CmiLookupGroup(grp, &npes, &pes);
1791   _TRACE_CREATION_MULTICAST(env, npes, pes);
1792   _CldEnqueueGroup(grp, env, _infoIdx);
1793   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1794   _STATS_RECORD_SEND_BRANCH_N(npes);
1795   CpvAccess(_qd)->create(npes);
1796 }
1797
1798 extern "C"
1799 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1800 {
1801   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1802   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1803   CpvAccess(_qd)->create(CkNumPes());
1804 }
1805
1806 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1807                 int node=CLD_BROADCAST_ALL, int opts=0)
1808 {
1809   int numPes;
1810   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1811 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1812         sendTicketNodeGroupRequest(env,node,_infoIdx);
1813 #else
1814   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1815   _TRACE_CREATION_N(env, numPes);
1816   if (opts & CK_MSG_SKIP_OR_IMM) {
1817     _noCldNodeEnqueue(node, env);
1818     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1819       CkpvAccess(_coreState)->create(-numPes);
1820     }
1821   }
1822   else
1823     _CldNodeEnqueue(node, env, _infoIdx);
1824   _TRACE_CREATION_DONE(1);
1825 #endif
1826 }
1827
1828 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1829                            int npes, int *nodes)
1830 {
1831   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1832   _TRACE_CREATION_N(env, npes);
1833   for (int i=0; i<npes; i++) {
1834     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1835   }
1836   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1837 }
1838
1839 extern "C"
1840 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1841 {
1842 #if CMK_IMMEDIATE_MSG
1843   if (node==CkMyNode())
1844   {
1845     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1846     return;
1847   }
1848   //Can't inline-- send the usual way
1849   register envelope *env = UsrToEnv(msg);
1850   int numPes;
1851   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1852   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1853   _TRACE_CREATION_N(env, numPes);
1854   _noCldNodeEnqueue(node, env);
1855   _STATS_RECORD_SEND_BRANCH_1();
1856   /* immeidate message is invisible to QD */
1857 //  CkpvAccess(_coreState)->create();
1858   _TRACE_CREATION_DONE(1);
1859 #else
1860   // no support for immediate message, send inline
1861   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1862 #endif
1863 }
1864
1865 extern "C"
1866 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1867 {
1868   if (node==CkMyNode())
1869   {
1870     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1871     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1872     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1873     if (obj!=NULL)
1874     { //Just directly call the group:
1875 #if CMK_ERROR_CHECKING
1876       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1877 #else
1878       envelope *env=UsrToEnv(msg);
1879 #endif
1880       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1881       return;
1882     }
1883   }
1884   //Can't inline-- send the usual way
1885   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1886 }
1887
1888 extern "C"
1889 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1890 {
1891   if (opts & CK_MSG_INLINE) {
1892     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1893     return;
1894   }
1895   if (opts & CK_MSG_IMMEDIATE) {
1896     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1897     return;
1898   }
1899   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1900   _STATS_RECORD_SEND_NODE_BRANCH_1();
1901   CkpvAccess(_coreState)->create();
1902 }
1903
1904 extern "C"
1905 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1906 {
1907 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1908   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1909   _noCldEnqueueMulti(npes, nodes, env);
1910 #else
1911   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1912   CpvAccess(_qd)->create(-npes);
1913 #endif
1914   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1915   CpvAccess(_qd)->create(npes);
1916 }
1917
1918 extern "C"
1919 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1920 {
1921   if (opts & CK_MSG_IMMEDIATE) {
1922     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1923     return;
1924   }
1925     // normal mesg
1926   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1927   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1928   CpvAccess(_qd)->create(npes);
1929 }
1930
1931 extern "C"
1932 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1933 {
1934   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1935   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1936   CpvAccess(_qd)->create(CkNumNodes());
1937 }
1938
1939 //Needed by delegation manager:
1940 extern "C"
1941 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1942 { return _prepareMsg(eIdx,msg,pCid); }
1943 extern "C"
1944 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1945 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1946 extern "C"
1947 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1948 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1949
1950 void _ckModuleInit(void) {
1951         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1952 #if CMK_OBJECT_QUEUE_AVAILABLE
1953         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1954 #endif
1955         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1956         CkpvInitialize(TokenPool*, _tokenPool);
1957         CkpvAccess(_tokenPool) = new TokenPool;
1958 }
1959
1960
1961 /************** Send: Arrays *************/
1962
1963 extern void CkArrayManagerInsert(int onPe,void *msg);
1964 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1965
1966 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1967 {
1968   _CHECK_USED(env);
1969   _SET_USED(env, 1);
1970   env->setMsgtype(type);
1971 #if CMK_CHARMDEBUG
1972   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1973 #endif
1974   CmiSetHandler(env, _charmHandlerIdx);
1975   CpvAccess(_qd)->create();
1976 }
1977
1978 extern "C"
1979 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1980   register envelope *env = UsrToEnv(msg);
1981   env->getsetArrayMgr()=aID;
1982   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1983   _CldEnqueue(pe, env, _infoIdx);
1984 }
1985
1986 extern "C"
1987 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1988   register envelope *env = UsrToEnv(msg);
1989   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1990 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1991    sendTicketArrayRequest(env,pe,_infoIdx);
1992 #else
1993   if (opts & CK_MSG_IMMEDIATE)
1994     CmiBecomeImmediate(env);
1995   if (opts & CK_MSG_SKIP_OR_IMM)
1996     _noCldEnqueue(pe, env);
1997   else
1998     _skipCldEnqueue(pe, env, _infoIdx);
1999 #endif
2000 }
2001
2002 class ElementDestroyer : public CkLocIterator {
2003 private:
2004         CkLocMgr *locMgr;
2005 public:
2006         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2007         void addLocation(CkLocation &loc) {
2008           loc.destroyAll();
2009         }
2010 };
2011
2012 void CkDeleteChares() {
2013   int i;
2014   int numGroups = CkpvAccess(_groupIDTable)->size();
2015
2016   // delete all plain chares
2017 #ifndef CMK_CHARE_USE_PTR
2018   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2019         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2020         delete obj;
2021         CkpvAccess(chare_objs)[i] = NULL;
2022   }
2023   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2024         VidBlock *obj = CkpvAccess(vidblocks)[i];
2025         delete obj;
2026         CkpvAccess(vidblocks)[i] = NULL;
2027   }
2028 #endif
2029
2030   // delete all array elements
2031   for(i=0;i<numGroups;i++) {
2032     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2033     if(obj && obj->isLocMgr())  {
2034       CkLocMgr *mgr = (CkLocMgr*)obj;
2035       ElementDestroyer destroyer(mgr);
2036       mgr->iterate(destroyer);
2037     }
2038   }
2039
2040   // delete all groups
2041   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2042   for(i=0;i<numGroups;i++) {
2043     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2044     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2045     if (obj) delete obj;
2046   }
2047   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2048
2049   // delete all node groups
2050   if (CkMyRank() == 0) {
2051     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2052     for(i=0;i<numNodeGroups;i++) {
2053       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2054       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2055       if (obj) delete obj;
2056     }
2057   }
2058 }
2059
2060 //------------------- Message Watcher (record/replay) ----------------
2061
2062 #include "crc32.h"
2063
2064 CkpvDeclare(int, envelopeEventID);
2065 int _recplay_crc = 0;
2066 int _recplay_checksum = 0;
2067 int _recplay_logsize = 1024*1024;
2068
2069 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2070 #define REPLAYDEBUG(args) /* empty */
2071
2072 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2073
2074 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2075
2076 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2077
2078     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2079     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2080     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2081     FILE *f=fopen(fName,permissions);
2082     REPLAYDEBUG("openReplayfile "<<fName);
2083     if (f==NULL) {
2084         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2085             CkMyPe(),fName,permissions);
2086         CkAbort("openReplayFile> Could not open replay file");
2087     }
2088     return f;
2089 }
2090
2091 #include "BaseLB.h" /* For LBMigrateMsg message */
2092
2093 class CkMessageRecorder : public CkMessageWatcher {
2094   char *buffer;
2095   unsigned int curpos;
2096   bool firstOpen;
2097 public:
2098   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2099   ~CkMessageRecorder() {
2100     flushLog(0);
2101     fprintf(f,"-1 -1 -1 ");
2102     fclose(f);
2103     delete[] buffer;
2104 #if 0
2105     FILE *stsfp = fopen("sts", "w");
2106     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2107     traceWriteSTS(stsfp, 0);
2108     fclose(stsfp);
2109 #endif
2110     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2111   }
2112
2113 private:
2114   void flushLog(int verbose=1) {
2115     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2116     fprintf(f, "%s", buffer);
2117     curpos=0;
2118   }
2119   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2120     if ((*envptr)->getEvent()) {
2121       bool wasPacked = (*envptr)->isPacked();
2122       if (!wasPacked) CkPackMessage(envptr);
2123       envelope *env = *envptr;
2124       unsigned int crc1=0, crc2=0;
2125       if (_recplay_crc) {
2126         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2127         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2128         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2129       } else if (_recplay_checksum) {
2130         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2131         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2132       }
2133       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2134       if (curpos > _recplay_logsize-128) flushLog();
2135       if (!wasPacked) CkUnpackMessage(envptr);
2136     }
2137     return CmiTrue;
2138   }
2139   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2140     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2141     if (curpos > _recplay_logsize-128) flushLog();
2142     return CmiTrue;
2143   }
2144   
2145   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2146     FILE *f;
2147     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2148     else f = openReplayFile("ckreplay_",".lb","a");
2149     firstOpen = false;
2150     if (f != NULL) {
2151       PUP::toDisk p(f);
2152       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2153       (*msg)->pup(p);
2154       fclose(f);
2155     }
2156     return CmiTrue;
2157   }
2158 };
2159
2160 class CkMessageDetailRecorder : public CkMessageWatcher {
2161 public:
2162   CkMessageDetailRecorder(FILE *f_) {
2163     f=f_;
2164     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2165      * The value of 'x' is the pointer size.
2166      */
2167     CmiUInt2 little = sizeof(void*);
2168     fwrite(&little, 2, 1, f);
2169   }
2170   ~CkMessageDetailRecorder() {fclose(f);}
2171 private:
2172   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2173     bool wasPacked = (*envptr)->isPacked();
2174     if (!wasPacked) CkPackMessage(envptr);
2175     envelope *env = *envptr;
2176     CmiUInt4 size = env->getTotalsize();
2177     fwrite(&size, 4, 1, f);
2178     fwrite(env, env->getTotalsize(), 1, f);
2179     if (!wasPacked) CkUnpackMessage(envptr);
2180     return CmiTrue;
2181   }
2182 };
2183
2184 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2185 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2186
2187 #if CMK_BIGSIM_CHARM
2188 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2189                                    int pb,unsigned int *prio);
2190 #endif
2191
2192 class CkMessageReplay : public CkMessageWatcher {
2193   int counter;
2194         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2195         int nextEP;
2196         unsigned int crc1, crc2;
2197         FILE *lbFile;
2198         /// Read the next message we need from the file:
2199         void getNext(void) {
2200           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2201           if (nextSize > 0) {
2202             // We are reading a regular message
2203             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2204               CkAbort("CkMessageReplay> Syntax error reading replay file");
2205             }
2206             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2207           } else if (nextSize == -2) {
2208             // We are reading a special message (right now only thread awaken)
2209             // Nothing to do since we have already read all info
2210             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2211           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2212             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2213             CkAbort("CkMessageReplay> Unrecognized input");
2214           }
2215             /*
2216                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2217                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2218                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2219                 }
2220                 */
2221                 counter++;
2222         }
2223         /// If this is the next message we need, advance and return CmiTrue.
2224         CmiBool isNext(envelope *env) {
2225                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2226                 if (nextEvent!=env->getEvent()) return CmiFalse;
2227                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2228 #if 1
2229                 if (nextEP != env->getEpIdx()) {
2230                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2231                         return CmiFalse;
2232                 }
2233 #endif
2234 #if ! CMK_BIGSIM_CHARM
2235                 if (nextSize!=env->getTotalsize())
2236                 {
2237                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2238                         return CmiFalse;
2239                 }
2240                 if (_recplay_crc || _recplay_checksum) {
2241                   bool wasPacked = env->isPacked();
2242                   if (!wasPacked) CkPackMessage(&env);
2243                   if (_recplay_crc) {
2244                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2245                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2246                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2247                     if (crcnew1 != crc1) {
2248                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2249                     }
2250                     if (crcnew2 != crc2) {
2251                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2252                     }
2253                   } else if (_recplay_checksum) {
2254             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2255             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2256             if (crcnew1 != crc1) {
2257               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2258             }
2259             if (crcnew2 != crc2) {
2260               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2261             }               
2262                   }
2263                   if (!wasPacked) CkUnpackMessage(&env);
2264                 }
2265 #endif
2266                 return CmiTrue;
2267         }
2268         CmiBool isNext(CthThreadToken *token) {
2269           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2270           return CmiFalse;
2271         }
2272
2273         /// This is a (short) list of messages we aren't yet ready for:
2274         CkQ<envelope *> delayedMessages;
2275         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2276         CkQ<CthThreadToken *> delayedTokens;
2277
2278         /// Try to flush out any delayed messages
2279         void flush(void) {
2280           if (nextSize>0) {
2281                 int len=delayedMessages.length();
2282                 for (int i=0;i<len;i++) {
2283                         envelope *env=delayedMessages.deq();
2284                         if (isNext(env)) { /* this is the next message: process it */
2285                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2286                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2287                                 return;
2288                         }
2289                         else /* Not ready yet-- put it back in the
2290                                 queue */
2291                           {
2292                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2293                                 delayedMessages.enq(env);
2294                           }
2295                 }
2296           } else if (nextSize==-2) {
2297             int len=delayedTokens.length();
2298             for (int i=0;i<len;++i) {
2299               CthThreadToken *token=delayedTokens.deq();
2300               if (isNext(token)) {
2301             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2302 #if ! CMK_BIGSIM_CHARM
2303                 CsdEnqueueLifo((void*)token);
2304 #else
2305                 CthEnqueueBigSimThread(token,0,0,NULL);
2306 #endif
2307                 return;
2308               } else {
2309             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2310                 delayedTokens.enq(token);
2311               }
2312             }
2313           }
2314         }
2315
2316 public:
2317         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2318           counter=0;
2319           f=f_;
2320           getNext();
2321           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2322           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2323         }
2324         ~CkMessageReplay() {fclose(f);}
2325
2326 private:
2327         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2328           bool wasPacked = (*envptr)->isPacked();
2329           if (!wasPacked) CkPackMessage(envptr);
2330           envelope *env = *envptr;
2331           //CkAssert(*(int*)env == 0x34567890);
2332           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2333                 if (env->getEvent() == 0) return CmiTrue;
2334                 if (isNext(env)) { /* This is the message we were expecting */
2335                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2336                         getNext(); /* Advance over this message */
2337                         flush(); /* try to process queued-up stuff */
2338                         if (!wasPacked) CkUnpackMessage(envptr);
2339                         return CmiTrue;
2340                 }
2341 #if CMK_SMP
2342                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2343                          // try next rank, we can't just buffer the msg and left
2344                          // we need to keep unprocessed msg on the fly
2345                         int nextpe = CkMyPe()+1;
2346                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2347                         nextpe = CkNodeFirst(CkMyNode());
2348                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2349                         return CmiFalse;
2350                 }
2351 #endif
2352                 else /*!isNext(env) */ {
2353                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2354                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2355                         delayedMessages.enq(env);
2356                         flush();
2357                         return CmiFalse;
2358                 }
2359         }
2360         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2361       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2362           if (isNext(token)) {
2363         REPLAYDEBUG("Executing token: "<<token->serialNo)
2364             getNext();
2365             flush();
2366             return CmiTrue;
2367           } else {
2368         REPLAYDEBUG("Queueing token: "<<token->serialNo
2369             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2370             delayedTokens.enq(token);
2371             return CmiFalse;
2372           }
2373         }
2374
2375         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2376           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2377           if (lbFile != NULL) {
2378             int num_moves;
2379         PUP::fromDisk p(lbFile);
2380             p | num_moves;
2381             if (num_moves != (*msg)->n_moves) {
2382               delete *msg;
2383               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2384             }
2385             (*msg)->pup(p);
2386           }
2387           return CmiTrue;
2388         }
2389 };
2390
2391 class CkMessageDetailReplay : public CkMessageWatcher {
2392   void *getNext() {
2393     CmiUInt4 size; size_t nread;
2394     if ((nread=fread(&size, 4, 1, f)) < 1) {
2395       if (feof(f)) return NULL;
2396       CkPrintf("Broken record file (metadata) got %d\n",nread);
2397       CkAbort("");
2398     }
2399     void *env = CmiAlloc(size);
2400     long tell = ftell(f);
2401     if ((nread=fread(env, size, 1, f)) < 1) {
2402       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2403       CkAbort("");
2404     }
2405     //*(int*)env = 0x34567890; // set first integer as magic
2406     return env;
2407   }
2408 public:
2409   double starttime;
2410   CkMessageDetailReplay(FILE *f_) {
2411     f=f_;
2412     starttime=CkWallTimer();
2413     /* This must match what CkMessageDetailRecorder did */
2414     CmiUInt2 little;
2415     fread(&little, 2, 1, f);
2416     if (little != sizeof(void*)) {
2417       CkAbort("Replaying on a different architecture from which recording was done!");
2418     }
2419
2420     CsdEnqueue(getNext());
2421
2422     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2423   }
2424   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2425     void *msg = getNext();
2426     if (msg != NULL) CsdEnqueue(msg);
2427     return CmiTrue;
2428   }
2429 };
2430
2431 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2432 #if ! CMK_BIGSIM_CHARM
2433   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2434 #endif
2435   CkMessageReplay *replay = (CkMessageReplay*)rep;
2436   //CmiStartQD(CkMessageReplayQuiescence, replay);
2437 }
2438
2439 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2440   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2441   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2442   ConverseExit();
2443 }
2444
2445 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2446   CkCoreState *ck = CkpvAccess(_coreState);
2447   if (ck->watcher!=NULL) {
2448     return ck->watcher->processThread(token,ck);
2449   }
2450   return CmiTrue;
2451 }
2452
2453 CpvCExtern(int, CthResumeNormalThreadIdx);
2454 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2455 {
2456   CthThread t = token->thread;
2457
2458   if(t == NULL){
2459     free(token);
2460     return;
2461   }
2462 #if CMK_TRACE_ENABLED
2463 #if ! CMK_TRACE_IN_CHARM
2464   if(CpvAccess(traceOn))
2465     CthTraceResume(t);
2466 /*    if(CpvAccess(_traceCoreOn)) 
2467             resumeTraceCore();*/
2468 #endif
2469 #endif
2470   
2471   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2472   if (CpdExecuteThreadResume(token)) {
2473     CthResume(t);
2474   }
2475 }
2476
2477 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2478   CkCoreState *ck = CkpvAccess(_coreState);
2479   if (ck->watcher!=NULL) {
2480     ck->watcher->processLBMessage(msg, ck);
2481   }
2482 }
2483
2484 #if CMK_BIGSIM_CHARM
2485 CpvExtern(int      , CthResumeBigSimThreadIdx);
2486 #endif
2487
2488 #include "ckliststring.h"
2489 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2490     CmiArgGroup("Charm++","Record/Replay");
2491     CmiBool forceReplay = CmiFalse;
2492     char *procs = NULL;
2493     _replaySystem = 0;
2494     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2495       _recplay_crc = 1;
2496     }
2497     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2498       _recplay_checksum = 1;
2499     }
2500     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2501     REPLAYDEBUG("CkMessageWatcherInit ");
2502     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2503         CkListString list(procs);
2504         if (list.includes(CkMyPe())) {
2505           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2506           CpdSetInitializeMemory(1);
2507           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2508         }
2509     }
2510     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2511       if (CkMyPe() == 0) {
2512         CmiPrintf("Charm++> record mode.\n");
2513         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2514           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2515           _recplay_crc = _recplay_checksum = 0;
2516         }
2517       }
2518       CpdSetInitializeMemory(1);
2519       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2520       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2521     }
2522         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2523             forceReplay = CmiTrue;
2524             CpdSetInitializeMemory(1);
2525             // Set the parameters of the processor
2526 #if CMK_SHARED_VARS_UNAVAILABLE
2527             _Cmi_mype = atoi(procs);
2528             while (procs[0]!='/') procs++;
2529             procs++;
2530             _Cmi_numpes = atoi(procs);
2531 #else
2532             CkAbort("+replay-detail available only for non-SMP build");
2533 #endif
2534             _replaySystem = 1;
2535             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2536         }
2537         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2538           if (CkMyPe() == 0)  {
2539             CmiPrintf("Charm++> replay mode.\n");
2540             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2541               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2542               _recplay_crc = _recplay_checksum = 0;
2543             }
2544           }
2545           CpdSetInitializeMemory(1);
2546 #if ! CMK_BIGSIM_CHARM
2547           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2548 #else
2549           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2550 #endif
2551           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2552         }
2553         if (_recplay_crc && _recplay_checksum) {
2554           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2555         }
2556 }
2557
2558 extern "C"
2559 int CkMessageToEpIdx(void *msg) {
2560         envelope *env=UsrToEnv(msg);
2561         int ep=env->getEpIdx();
2562         if (ep==CkIndex_CkArray::recvBroadcast(0))
2563                 return env->getsetArrayBcastEp();
2564         else
2565                 return ep;
2566 }
2567
2568 extern "C"
2569 int getCharmEnvelopeSize() {
2570   return sizeof(envelope);
2571 }
2572
2573
2574 #include "CkMarshall.def.h"
2575