Documentation #1777: Document message types in Charm++ message handler code
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #include "pathHistory.h"
14
15 #if CMK_LBDB_ON
16 #include "LBDatabase.h"
17 #endif // CMK_LBDB_ON
18
19 #ifndef CMK_CHARE_USE_PTR
20 #include <map>
21 CkpvDeclare(std::vector<void *>, chare_objs);
22 CkpvDeclare(std::vector<int>, chare_types);
23 CkpvDeclare(std::vector<VidBlock *>, vidblocks);
24
25 typedef std::map<int, CkChareID>  Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
28 #endif
29
30 // Map of array IDs to array elements for fast message delivery
31 CkpvDeclare(ArrayObjMap, array_objs);
32
33 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
34
35 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
36
37 int CMessage_CkMessage::__idx=-1;
38 int CMessage_CkArgMsg::__idx=0;
39 int CkIndex_Chare::__idx;
40 int CkIndex_Group::__idx;
41 int CkIndex_ArrayBase::__idx=-1;
42
43 extern int _defaultObjectQ;
44
45 void _initChareTables()
46 {
47 #ifndef CMK_CHARE_USE_PTR
48           /* chare and vidblock table */
49   CkpvInitialize(std::vector<void *>, chare_objs);
50   CkpvInitialize(std::vector<int>, chare_types);
51   CkpvInitialize(std::vector<VidBlock *>, vidblocks);
52   CkpvInitialize(Vidblockmap, vmap);
53   CkpvInitialize(int, currentChareIdx);
54   CkpvAccess(currentChareIdx) = -1;
55 #endif
56
57   CkpvInitialize(ArrayObjMap, array_objs);
58 }
59
60 //Charm++ virtual functions: declaring these here results in a smaller executable
61 Chare::Chare(void) {
62   thishandle.onPE=CkMyPe();
63   thishandle.objPtr=this;
64 #if CMK_ERROR_CHECKING
65   magic = CHARE_MAGIC;
66 #endif
67 #ifndef CMK_CHARE_USE_PTR
68      // for plain chare, objPtr is actually the index to chare obj table
69   if (CkpvAccess(currentChareIdx) >= 0) {
70     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
71   }
72   chareIdx = CkpvAccess(currentChareIdx);
73 #endif
74 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
75   mlogData = new ChareMlogData();
76   mlogData->objID.type = TypeChare;
77   mlogData->objID.data.chare.id = thishandle;
78 #endif
79 #if CMK_OBJECT_QUEUE_AVAILABLE
80   if (_defaultObjectQ)  CkEnableObjQ();
81 #endif
82 }
83
84 Chare::Chare(CkMigrateMessage* m) {
85   thishandle.onPE=CkMyPe();
86   thishandle.objPtr=this;
87 #if CMK_ERROR_CHECKING
88   magic = 0;
89 #endif
90
91 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
92         mlogData = NULL;
93 #endif
94
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96   if (_defaultObjectQ)  CkEnableObjQ();
97 #endif
98 }
99
100 void Chare::CkEnableObjQ()
101 {
102 #if CMK_OBJECT_QUEUE_AVAILABLE
103   objQ.create();
104 #endif
105 }
106
107 Chare::~Chare() {
108 #ifndef CMK_CHARE_USE_PTR
109 /*
110   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
111 */
112   if (chareIdx != -1)
113   {
114     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
115     CkpvAccess(chare_objs)[chareIdx] = NULL;
116     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
117     if (iter != CkpvAccess(vmap).end()) {
118       CkChareID *pCid = (CkChareID *)
119         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
120       int srcPe = iter->second.onPE;
121       *pCid = iter->second;
122       envelope *ret = UsrToEnv(pCid);
123       ret->setVidPtr(iter->second.objPtr);
124       ret->setSrcPe(CkMyPe());
125       CmiSetHandler(ret, _charmHandlerIdx);
126       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
127       CpvAccess(_qd)->create();
128       CkpvAccess(vmap).erase(iter);
129     }
130   }
131 #endif
132 }
133
134 void Chare::pup(PUP::er &p)
135 {
136   p(thishandle.onPE);
137   thishandle.objPtr=(void *)this;
138 #ifndef CMK_CHARE_USE_PTR
139   p(chareIdx);
140   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
141 #endif
142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
143         if(p.isUnpacking()){
144                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
145                 mlogData = new ChareMlogData();
146         }
147         mlogData->pup(p);
148 #endif
149 #if CMK_ERROR_CHECKING
150   p(magic);
151 #endif
152 }
153
154 int Chare::ckGetChareType() const {
155   return -3;
156 }
157 char *Chare::ckDebugChareName(void) {
158   char buf[100];
159   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),(void*)this);
160   return strdup(buf);
161 }
162 int Chare::ckDebugChareID(char *str, int limit) {
163   // pure chares for now do not have a valid ID
164   str[0] = 0;
165   return 1;
166 }
167 void Chare::ckDebugPup(PUP::er &p) {
168   pup(p);
169 }
170
171 /// This method is called before starting a [threaded] entry method.
172 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
173   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
174   traceAddThreadListeners(th, UsrToEnv(msg));
175 }
176
177 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
178   p.comment("Bytes");
179   int ts=UsrToEnv(msg)->getTotalsize();
180   int msgLen=ts-sizeof(envelope);
181   if (msgLen>0)
182     p((char*)msg,msgLen);
183 }
184
185 IrrGroup::IrrGroup(void) {
186   thisgroup = CkpvAccess(_currentGroup);
187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
188         mlogData->objID.type = TypeGroup;
189         mlogData->objID.data.group.id = thisgroup;
190         mlogData->objID.data.group.onPE = CkMyPe();
191 #endif
192 }
193
194 IrrGroup::~IrrGroup() {
195   // remove the object pointer
196   if (CkpvAccess(_destroyingNodeGroup)) {
197     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
198     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
199     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
200     CkpvAccess(_destroyingNodeGroup) = false;
201   } else {
202     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
203     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
204     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
205   }
206 }
207
208 void IrrGroup::pup(PUP::er &p)
209 {
210   Chare::pup(p);
211   p|thisgroup;
212 }
213
214 int IrrGroup::ckGetChareType() const {
215   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
216 }
217
218 int IrrGroup::ckDebugChareID(char *str, int limit) {
219   if (limit<5) return -1;
220   str[0] = 1;
221   *((int*)&str[1]) = thisgroup.idx;
222   return 5;
223 }
224
225 char *IrrGroup::ckDebugChareName() {
226   return strdup(_chareTable[ckGetChareType()]->name);
227 }
228
229 void IrrGroup::ckJustMigrated(void)
230 {
231 }
232
233 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
234   /* FIXME: **CW** not entirely sure what we should do here yet */
235 }
236
237 void Group::CkAddThreadListeners(CthThread th, void *msg) {
238   Chare::CkAddThreadListeners(th, msg);
239   CthSetThreadID(th, thisgroup.idx, 0, 0);
240 }
241
242 void Group::pup(PUP::er &p)
243 {
244   CkReductionMgr::pup(p);
245   p|reductionInfo;
246 }
247
248 /**** Delegation Manager Group */
249 CkDelegateMgr::~CkDelegateMgr() { }
250
251 //Default delegator implementation: do not delegate-- send directly
252 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
253   { CkSendMsg(ep,m,c); }
254 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
255   { CkSendMsgBranch(ep,m,onPE,g); }
256 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
257   { CkBroadcastMsgBranch(ep,m,g); }
258 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
259   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
260 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
261   { CkSendMsgNodeBranch(ep,m,onNode,g); }
262 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
263   { CkBroadcastMsgNodeBranch(ep,m,g); }
264 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
265   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
266 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
267 {
268         CProxyElement_ArrayBase ap(a,idx);
269         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
270 }
271 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
272 {
273         CProxyElement_ArrayBase ap(a,idx);
274         ap.ckSend((CkArrayMessage *)m,ep);
275 }
276 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
277 {
278         CProxy_ArrayBase ap(a);
279         ap.ckBroadcast((CkArrayMessage *)m,ep);
280 }
281
282 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
283 {
284         CmiAbort("ArraySectionSend is not implemented!\n");
285 /*
286         CProxyElement_ArrayBase ap(a,idx);
287         ap.ckSend((CkArrayMessage *)m,ep);
288 */
289 }
290
291 /*** Proxy <-> delegator communication */
292 CkDelegateData::~CkDelegateData() {}
293
294 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
295   return pd; // default implementation ignores pup call
296 }
297
298 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
299    this tricky manual reference counting business... */
300
301 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
302         if (dPtr) dPtr->ref();
303         ckUndelegate();
304         delegatedMgr = dTo;
305         delegatedPtr = dPtr;
306         delegatedGroupId = delegatedMgr->CkGetGroupID();
307         isNodeGroup = delegatedMgr->isNodeGroup();
308 }
309 void CProxy::ckUndelegate(void) {
310         delegatedMgr=NULL;
311         delegatedGroupId.setZero();
312         if (delegatedPtr) delegatedPtr->unref();
313         delegatedPtr=NULL;
314 }
315
316 /// Copy constructor
317 CProxy::CProxy(const CProxy &src)
318   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
319    isNodeGroup(src.isNodeGroup) {
320     delegatedPtr = NULL;
321     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
322         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
323     }
324 }
325
326 /// Assignment operator
327 CProxy& CProxy::operator=(const CProxy &src) {
328         CkDelegateData *oldPtr=delegatedPtr;
329         ckUndelegate();
330         delegatedMgr=src.delegatedMgr;
331         delegatedGroupId = src.delegatedGroupId; 
332         isNodeGroup = src.isNodeGroup;
333
334         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
335             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
336         else
337             delegatedPtr = NULL;
338
339         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
340         if (oldPtr) oldPtr->unref();
341         return *this;
342 }
343
344 void CProxy::pup(PUP::er &p) {
345   if (!p.isUnpacking()) {
346     if (ckDelegatedTo() != NULL) {
347       delegatedGroupId = delegatedMgr->CkGetGroupID();
348       isNodeGroup = delegatedMgr->isNodeGroup();
349     }
350   }
351   p|delegatedGroupId;
352   if (!delegatedGroupId.isZero()) {
353     p|isNodeGroup;
354     if (p.isUnpacking()) {
355       delegatedMgr = ckDelegatedTo(); 
356     }
357
358     int migCtor = 0, cIdx; 
359     if (!p.isUnpacking()) {
360       if (isNodeGroup) {
361         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
362         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
363         migCtor = _chareTable[cIdx]->migCtor; 
364         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
365       }
366       else  {
367         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
368         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
369         migCtor = _chareTable[cIdx]->migCtor; 
370         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
371       }         
372     }
373
374     p|migCtor;
375
376     // if delegated manager has not been created, construct a dummy
377     // object on which to call DelegatePointerPup
378     if (delegatedMgr == NULL) {
379
380       // create a dummy object for calling DelegatePointerPup
381       int objId = _entryTable[migCtor]->chareIdx; 
382       size_t objSize = _chareTable[objId]->size;
383       void *obj = malloc(objSize); 
384       _entryTable[migCtor]->call(NULL, obj); 
385       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
386         ->DelegatePointerPup(p, delegatedPtr);           
387       free(obj);
388
389     }
390     else {
391
392       // delegated manager has been created, so we can use it
393       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
394
395     }
396
397     if (p.isUnpacking() && delegatedPtr) {
398       delegatedPtr->ref();
399     }
400   }
401 }
402
403 /**** Array sections */
404 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
405 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems, int factor): bfactor(factor) { \
406   _elems.assign(elems, elems+nElems);  \
407   _cookie.get_aid() = aid;      \
408   _cookie.get_pe() = CkMyPe();  \
409 } \
410 CkSectionID::CkSectionID(const CkArrayID &aid, const std::vector<CkArrayIndex##index> &elems, int factor): bfactor(factor) { \
411   _elems.resize(elems.size()); \
412   for (int i=0; i<_elems.size(); ++i) { \
413     _elems[i] = static_cast<CkArrayIndex>(elems[i]); \
414   } \
415   _cookie.get_aid() = aid;      \
416   _cookie.get_pe() = CkMyPe();  \
417 } \
418
419 CKSECTIONID_CONSTRUCTOR_DEF(1D)
420 CKSECTIONID_CONSTRUCTOR_DEF(2D)
421 CKSECTIONID_CONSTRUCTOR_DEF(3D)
422 CKSECTIONID_CONSTRUCTOR_DEF(4D)
423 CKSECTIONID_CONSTRUCTOR_DEF(5D)
424 CKSECTIONID_CONSTRUCTOR_DEF(6D)
425 CKSECTIONID_CONSTRUCTOR_DEF(Max)
426
427 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes, int factor): bfactor(factor) {
428   _cookie.get_aid() = gid;
429   pelist.assign(_pelist, _pelist+_npes);
430 }
431
432 CkSectionID::CkSectionID(const CkGroupID &gid, const std::vector<int>& _pelist, int factor): pelist(_pelist), bfactor(factor) {
433   _cookie.get_aid() = gid;
434 }
435
436 CkSectionID::CkSectionID(const CkSectionID &sid) {
437   _cookie = sid._cookie;
438   pelist = sid.pelist;
439   _elems = sid._elems;
440   bfactor = sid.bfactor;
441 }
442
443 void CkSectionID::operator=(const CkSectionID &sid) {
444   _cookie = sid._cookie;
445   pelist = sid.pelist;
446   _elems = sid._elems;
447   bfactor = sid.bfactor;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451   p | _cookie;
452   p | pelist;
453   p | _elems;
454   p | bfactor;
455 }
456
457 /**** Tiny random API routines */
458
459 #if CMK_CUDA
460 void CUDACallbackManager(void *fn) {
461   if (fn != NULL) {
462     CkCallback *cb = (CkCallback*) fn;
463     cb->send();
464   }
465 }
466
467 #endif
468
469 void QdCreate(int n) {
470   CpvAccess(_qd)->create(n);
471 }
472
473 void QdProcess(int n) {
474   CpvAccess(_qd)->process(n);
475 }
476
477 extern "C"
478 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
479 {
480   UsrToEnv(msg)->setRef(ref);
481 }
482
483 extern "C"
484 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
485 {
486   return UsrToEnv(msg)->getRef();
487 }
488
489 extern "C"
490 int CkGetSrcPe(void *msg)
491 {
492   return UsrToEnv(msg)->getSrcPe();
493 }
494
495 extern "C"
496 int CkGetSrcNode(void *msg)
497 {
498   return CmiNodeOf(CkGetSrcPe(msg));
499 }
500
501 extern "C"
502 void *CkLocalBranch(CkGroupID gID) {
503   return _localBranch(gID);
504 }
505
506 static
507 void *_ckLocalNodeBranch(CkGroupID groupID) {
508   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
509   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
510   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
511   return retval;
512 }
513
514 extern "C"
515 void *CkLocalNodeBranch(CkGroupID groupID)
516 {
517   void *retval;
518   // we are called in a constructor
519   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
520     return CkpvAccess(_currentNodeGroupObj);
521   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
522   { // Nodegroup hasn't finished being created yet-- schedule...
523     CsdScheduler(0);
524   }
525   return retval;
526 }
527
528 extern "C"
529 void *CkLocalChare(const CkChareID *pCid)
530 {
531         int pe=pCid->onPE;
532         if (pe<0) { //A virtual chare ID
533                 if (pe!=(-(CkMyPe()+1)))
534                         return NULL;//VID block not on this PE
535 #ifdef CMK_CHARE_USE_PTR
536                 VidBlock *v=(VidBlock *)pCid->objPtr;
537 #else
538                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
539 #endif
540                 return v->getLocalChareObj();
541         }
542         else
543         { //An ordinary chare ID
544                 if (pe!=CkMyPe())
545                         return NULL;//Chare not on this PE
546 #ifdef CMK_CHARE_USE_PTR
547                 return pCid->objPtr;
548 #else
549                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
550 #endif
551         }
552 }
553
554 CkpvDeclare(char**,Ck_argv);
555
556 extern "C" char **CkGetArgv(void) {
557         return CkpvAccess(Ck_argv);
558 }
559 extern "C" int CkGetArgc(void) {
560         return CmiGetArgc(CkpvAccess(Ck_argv));
561 }
562
563 /******************** Basic support *****************/
564 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
565 {
566 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
567         CpvAccess(_currentObj) = (Chare *)obj;
568 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
569 #endif
570   //BIGSIM_OOC DEBUGGING
571   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
572   //fflush(stdout);
573 #if CMK_CHARMDEBUG
574   CpdBeforeEp(epIdx, obj, msg);
575 #endif    
576   _entryTable[epIdx]->call(msg, obj);
577 #if CMK_CHARMDEBUG
578   CpdAfterEp(epIdx);
579 #endif
580   if (_entryTable[epIdx]->noKeep)
581   { /* Method doesn't keep/delete the message, so we have to: */
582     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
583   }
584 }
585 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
586 {
587   //BIGSIM_OOC DEBUGGING
588   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
589   //fflush(stdout);
590
591   void *deliverMsg;
592 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
593         CpvAccess(_currentObj) = (Chare *)obj;
594 #endif
595   if (_entryTable[epIdx]->noKeep)
596   { /* Deliver a read-only copy of the message */
597     deliverMsg=(void *)msg;
598   } else
599   { /* Method needs a copy of the message to keep/delete */
600     void *oldMsg=(void *)msg;
601     deliverMsg=CkCopyMsg(&oldMsg);
602 #if CMK_ERROR_CHECKING
603     if (oldMsg!=msg)
604       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
605 #endif
606   }
607 #if CMK_CHARMDEBUG
608   CpdBeforeEp(epIdx, obj, (void*)msg);
609 #endif
610   _entryTable[epIdx]->call(deliverMsg, obj);
611 #if CMK_CHARMDEBUG
612   CpdAfterEp(epIdx);
613 #endif
614 }
615
616 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
617 {
618   void *msg = EnvToUsr(env);
619   _SET_USED(env, 0);
620   CkDeliverMessageFree(epIdx,msg,obj);
621 }
622
623 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
624 {
625
626 #if CMK_TRACE_ENABLED 
627   if (_entryTable[epIdx]->traceEnabled) {
628     _TRACE_BEGIN_EXECUTE(env, obj);
629     if(_entryTable[epIdx]->appWork)
630         _TRACE_BEGIN_APPWORK();
631     _invokeEntryNoTrace(epIdx,env,obj);
632     if(_entryTable[epIdx]->appWork)
633         _TRACE_END_APPWORK();
634     _TRACE_END_EXECUTE();
635   }
636   else
637 #endif
638     _invokeEntryNoTrace(epIdx,env,obj);
639
640 }
641
642 /********************* Creation ********************/
643
644 extern "C"
645 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
646 {
647   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
648   envelope *env = UsrToEnv(msg);
649   _CHECK_USED(env);
650   if(pCid == 0) {
651     env->setMsgtype(NewChareMsg);
652   } else {
653     pCid->onPE = (-(CkMyPe()+1));
654     //  pCid->magic = _GETIDX(cIdx);
655     pCid->objPtr = (void *) new VidBlock();
656     _MEMCHECK(pCid->objPtr);
657     env->setMsgtype(NewVChareMsg);
658     env->setVidPtr(pCid->objPtr);
659 #ifndef CMK_CHARE_USE_PTR
660     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
661     int idx = CkpvAccess(vidblocks).size()-1;
662     pCid->objPtr = (void *)(CmiIntPtr)idx;
663     env->setVidPtr((void *)(CmiIntPtr)idx);
664 #endif
665   }
666   env->setEpIdx(eIdx);
667   env->setByPe(CkMyPe());
668   env->setSrcPe(CkMyPe());
669   CmiSetHandler(env, _charmHandlerIdx);
670   _TRACE_CREATION_1(env);
671   CpvAccess(_qd)->create();
672   _STATS_RECORD_CREATE_CHARE_1();
673   _SET_USED(env, 1);
674   if(destPE == CK_PE_ANY)
675     env->setForAnyPE(1);
676   else
677     env->setForAnyPE(0);
678   _CldEnqueue(destPE, env, _infoIdx);
679   _TRACE_CREATION_DONE(1);
680 }
681
682 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
683 {
684   int gIdx = _entryTable[epIdx]->chareIdx;
685   void *obj = malloc(_chareTable[gIdx]->size);
686   _MEMCHECK(obj);
687   setMemoryTypeChare(obj);
688   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
689   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
690   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
691   CkpvAccess(_groupIDTable)->push_back(groupID);
692   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
693   if(ptrq) {
694     void *pending;
695     while((pending=ptrq->deq())!=0) {
696 #if CMK_BIGSIM_CHARM
697       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
698       //handler to converse-level BigSim handler.
699       _CldEnqueue(CkMyPe(), pending, _infoIdx);
700 #else
701       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
702 #endif
703     }
704     CkpvAccess(_groupTable)->find(groupID).clearPending();
705   }
706   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
707
708   CkpvAccess(_currentGroup) = groupID;
709   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
710
711 #ifndef CMK_CHARE_USE_PTR
712   int callingChareIdx = CkpvAccess(currentChareIdx);
713   CkpvAccess(currentChareIdx) = -1;
714 #endif
715
716   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
717
718 #ifndef CMK_CHARE_USE_PTR
719   CkpvAccess(currentChareIdx) = callingChareIdx;
720 #endif
721
722   _STATS_RECORD_PROCESS_GROUP_1();
723 }
724
725 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
726 {
727   int gIdx = _entryTable[epIdx]->chareIdx;
728   size_t objSize=_chareTable[gIdx]->size;
729   void *obj = malloc(objSize);
730   _MEMCHECK(obj);
731   setMemoryTypeChare(obj);
732   CkpvAccess(_currentGroup) = groupID;
733
734 // Now that the NodeGroup is created, add it to the table.
735 //  NodeGroups can be accessed by multiple processors, so
736 //  this is in the opposite order from groups - invoking the constructor
737 //  before registering it.
738 // User may call CkLocalNodeBranch() inside the nodegroup constructor
739 //  store nodegroup into _currentNodeGroupObj
740   CkpvAccess(_currentNodeGroupObj) = obj;
741
742 #ifndef CMK_CHARE_USE_PTR
743   int callingChareIdx = CkpvAccess(currentChareIdx);
744   CkpvAccess(currentChareIdx) = -1;
745 #endif
746
747   _invokeEntryNoTrace(epIdx,env,obj);
748
749 #ifndef CMK_CHARE_USE_PTR
750   CkpvAccess(currentChareIdx) = callingChareIdx;
751 #endif
752
753   CkpvAccess(_currentNodeGroupObj) = NULL;
754   _STATS_RECORD_PROCESS_NODE_GROUP_1();
755
756   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
757   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
758   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
759   CksvAccess(_nodeGroupIDTable).push_back(groupID);
760
761   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
762   if(ptrq) {
763     void *pending;
764     while((pending=ptrq->deq())!=0) {
765       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
766     }
767     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
768   }
769   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
770 }
771
772 void _createGroup(CkGroupID groupID, envelope *env)
773 {
774   _CHECK_USED(env);
775   _SET_USED(env, 1);
776   int epIdx = env->getEpIdx();
777   int gIdx = _entryTable[epIdx]->chareIdx;
778   env->setGroupNum(groupID);
779   env->setSrcPe(CkMyPe());
780   env->setGroupEpoch(CkpvAccess(_charmEpoch));
781
782   if(CkNumPes()>1) {
783     CkPackMessage(&env);
784     CmiSetHandler(env, _bocHandlerIdx);
785     _numInitMsgs++;
786     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
787     CpvAccess(_qd)->create(CkNumPes()-1);
788     CkUnpackMessage(&env);
789   }
790   _STATS_RECORD_CREATE_GROUP_1();
791   CkCreateLocalGroup(groupID, epIdx, env);
792 }
793
794 void _createNodeGroup(CkGroupID groupID, envelope *env)
795 {
796   _CHECK_USED(env);
797   _SET_USED(env, 1);
798   int epIdx = env->getEpIdx();
799   env->setGroupNum(groupID);
800   env->setSrcPe(CkMyPe());
801   env->setGroupEpoch(CkpvAccess(_charmEpoch));
802   if(CkNumNodes()>1) {
803     CkPackMessage(&env);
804     CmiSetHandler(env, _bocHandlerIdx);
805     _numInitMsgs++;
806     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
807     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
808     CpvAccess(_qd)->create(CkNumNodes()-1);
809     CkUnpackMessage(&env);
810   }
811   _STATS_RECORD_CREATE_NODE_GROUP_1();
812   CkCreateLocalNodeGroup(groupID, epIdx, env);
813 }
814
815 // new _groupCreate
816
817 static CkGroupID _groupCreate(envelope *env)
818 {
819   CkGroupID groupNum;
820
821   // check CkMyPe(). if it is 0 then idx is _numGroups++
822   // if not, then something else...
823   if(CkMyPe() == 0)
824      groupNum.idx = CkpvAccess(_numGroups)++;
825   else
826      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
827   _createGroup(groupNum, env);
828   return groupNum;
829 }
830
831 // new _nodeGroupCreate
832 static CkGroupID _nodeGroupCreate(envelope *env)
833 {
834   CkGroupID groupNum;
835   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
836   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
837           groupNum.idx = CksvAccess(_numNodeGroups)++;
838    else
839           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
840   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
841   _createNodeGroup(groupNum, env);
842   return groupNum;
843 }
844
845 /**** generate the group idx when group is creator pe is not pe0
846  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
847  **** remaining bits contain the group creator processor number and
848  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
849
850 int _getGroupIdx(int numNodes,int myNode,int numGroups)
851 {
852         int idx;
853         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
854         int n = 32 - (x+1);                                     // number of bits remaining for the index
855         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
856                                                                 // then add the next available index
857         // of course this won't work when int is 8 bytes long on T3E
858         //idx |= 0x80000000;                                      // set the most significant bit to 1
859         idx = - idx;
860                                                                 // if int is not 32 bits, wouldn't this be wrong?
861         return idx;
862 }
863
864 extern "C"
865 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
866 {
867   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
868   envelope *env = UsrToEnv(msg);
869   env->setMsgtype(BocInitMsg);
870   env->setEpIdx(eIdx);
871   env->setSrcPe(CkMyPe());
872   _TRACE_CREATION_N(env, CkNumPes());
873   CkGroupID gid = _groupCreate(env);
874   _TRACE_CREATION_DONE(1);
875   return gid;
876 }
877
878 extern "C"
879 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
880 {
881   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
882   envelope *env = UsrToEnv(msg);
883   env->setMsgtype(NodeBocInitMsg);
884   env->setEpIdx(eIdx);
885   env->setSrcPe(CkMyPe());
886   _TRACE_CREATION_N(env, CkNumNodes());
887   CkGroupID gid = _nodeGroupCreate(env);
888   _TRACE_CREATION_DONE(1);
889   return gid;
890 }
891
892 static inline void *_allocNewChare(envelope *env, int &idx)
893 {
894   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
895   void *tmp=malloc(_chareTable[chareIdx]->size);
896   _MEMCHECK(tmp);
897 #ifndef CMK_CHARE_USE_PTR
898   CkpvAccess(chare_objs).push_back(tmp);
899   CkpvAccess(chare_types).push_back(chareIdx);
900   idx = CkpvAccess(chare_objs).size()-1;
901 #endif
902   setMemoryTypeChare(tmp);
903   return tmp;
904 }
905
906 // Method returns true if one or more group dependencies are unsatisfied
907 inline bool isGroupDepUnsatisfied(const CkCoreState *ck, const envelope *env) {
908   int groupDepNum = env->getGroupDepNum();
909   if(groupDepNum != 0) {
910     CkGroupID *groupDepPtr = (CkGroupID *)(env->getGroupDepPtr());
911     for(int i=0;i<groupDepNum;i++) {
912       CkGroupID depID = groupDepPtr[i];
913       if (!depID.isZero() && !_lookupGroupAndBufferIfNotThere(ck, env, depID)) {
914         return true;
915       }
916     }
917   }
918   return false;
919 }
920
921 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
922 {
923   if(isGroupDepUnsatisfied(ck, env))
924     return;
925   if(ck)
926     ck->process(); // ck->process() updates mProcessed count used in QD
927   int idx;
928   void *obj = _allocNewChare(env, idx);
929 #ifndef CMK_CHARE_USE_PTR
930   CkpvAccess(currentChareIdx) = idx;
931 #endif
932   _invokeEntry(env->getEpIdx(),env,obj);
933   if(ck)
934     _STATS_RECORD_PROCESS_CHARE_1();
935 }
936
937 void CkCreateLocalChare(int epIdx, envelope *env)
938 {
939   env->setEpIdx(epIdx);
940   _processNewChareMsg(NULL, env);
941 }
942
943 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
944 {
945   if(isGroupDepUnsatisfied(ck, env))
946     return;
947   ck->process(); // ck->process() updates mProcessed count used in QD
948   int idx;
949   void *obj = _allocNewChare(env, idx);
950   CkChareID *pCid = (CkChareID *)
951       _allocMsg(FillVidMsg, sizeof(CkChareID));
952   pCid->onPE = CkMyPe();
953 #ifndef CMK_CHARE_USE_PTR
954   pCid->objPtr = (void*)(CmiIntPtr)idx;
955 #else
956   pCid->objPtr = obj;
957 #endif
958   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
959   envelope *ret = UsrToEnv(pCid);
960   ret->setVidPtr(env->getVidPtr());
961   int srcPe = env->getByPe();
962   ret->setSrcPe(CkMyPe());
963   CmiSetHandler(ret, _charmHandlerIdx);
964   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
965 #ifndef CMK_CHARE_USE_PTR
966   // register the remote vidblock for deletion when chare is deleted
967   CkChareID vid;
968   vid.onPE = srcPe;
969   vid.objPtr = env->getVidPtr();
970   CkpvAccess(vmap)[idx] = vid;    
971 #endif
972   CpvAccess(_qd)->create();
973 #ifndef CMK_CHARE_USE_PTR
974   CkpvAccess(currentChareIdx) = idx;
975 #endif
976   _invokeEntry(env->getEpIdx(),env,obj);
977   _STATS_RECORD_PROCESS_CHARE_1();
978 }
979
980 /************** Receive: Chares *************/
981
982 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
983 {
984   if(isGroupDepUnsatisfied(ck, env))
985     return;
986   ck->process(); // ck->process() updates mProcessed count used in QD
987   int epIdx = env->getEpIdx();
988   int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
989   void *obj;
990   if (mainIdx != -1)  {           // mainchare
991     CmiAssert(CkMyPe()==0);
992     obj = _mainTable[mainIdx]->getObj();
993   }
994   else {
995 #ifndef CMK_CHARE_USE_PTR
996     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
997       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
998     else
999       obj = env->getObjPtr();
1000 #else
1001     obj = env->getObjPtr();
1002 #endif
1003   }
1004   _invokeEntry(epIdx,env,obj);
1005   _STATS_RECORD_PROCESS_MSG_1();
1006 }
1007
1008 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
1009 {
1010   int epIdx = env->getEpIdx();
1011   void *obj = env->getObjPtr();
1012   _invokeEntry(epIdx,env,obj);
1013 }
1014
1015 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1016 {
1017   ck->process(); // ck->process() updates mProcessed count used in QD
1018 #ifndef CMK_CHARE_USE_PTR
1019   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1020 #else
1021   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1022   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1023 #endif
1024   CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1025   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1026   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1027   CmiFree(env);
1028 }
1029
1030 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1031 {
1032   ck->process(); // ck->process() updates mProcessed count used in QD
1033 #ifndef CMK_CHARE_USE_PTR
1034   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1035 #else
1036   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1037   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1038 #endif
1039   _SET_USED(env, 1);
1040   vptr->send(env);
1041 }
1042
1043 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1044 {
1045   ck->process(); // ck->process() updates mProcessed count used in QD
1046 #ifndef CMK_CHARE_USE_PTR
1047   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1048   delete vptr;
1049   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1050 #endif
1051   CmiFree(env);
1052 }
1053
1054 /************** Receive: Groups ****************/
1055
1056 /**
1057  Return a pointer to the local BOC of "groupID".
1058  The message "env" passed in has some known dependency on this groupID
1059  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1060  Therefore, if the return value is NULL, this function buffers the message so that
1061  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1062  The message passed in must have its handlers correctly set so that it can be
1063  scheduled again.
1064 */
1065 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env, const CkGroupID &groupID)
1066 {
1067
1068         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1069         IrrGroup *obj = ck->localBranch(groupID);
1070         if (obj==NULL) { /* groupmember not yet created: stash message */
1071                 ck->getGroupTable()->find(groupID).enqMsg((envelope *)env);
1072         }
1073         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1074         return obj;
1075 }
1076
1077 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1078 {
1079   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1080 }
1081
1082 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1083 {
1084 #if CMK_LBDB_ON
1085   // if there is a running obj being measured, stop it temporarily
1086   LDObjHandle objHandle;
1087   int objstopped = 0;
1088   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1089   if (the_lbdb->RunningObject(&objHandle)) {
1090     objstopped = 1;
1091     the_lbdb->ObjectStop(objHandle);
1092   }
1093 #endif
1094   _invokeEntry(epIdx,env,obj);
1095 #if CMK_LBDB_ON
1096   if (objstopped) the_lbdb->ObjectStart(objHandle);
1097 #endif
1098   _STATS_RECORD_PROCESS_BRANCH_1();
1099 }
1100
1101 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1102 {
1103   if(isGroupDepUnsatisfied(ck, env))
1104     return;
1105   CkGroupID groupID =  env->getGroupNum();
1106   IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1107   if(obj) {
1108     ck->process(); // ck->process() updates mProcessed count used in QD
1109     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1110   }
1111 }
1112
1113 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1114 {
1115   env->setMsgtype(ForChareMsg);
1116   env->setObjPtr(obj);
1117   _processForChareMsg(ck,env);
1118   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1119 }
1120
1121 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1122 {
1123   env->setEpIdx(epIdx);
1124   _deliverForNodeBocMsg(ck,env, obj);
1125 }
1126
1127 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1128 {
1129   if(isGroupDepUnsatisfied(ck, env))
1130     return;
1131   CkGroupID groupID = env->getGroupNum();
1132   void *obj;
1133
1134   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1135   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1136   if(!obj) { // groupmember not yet created
1137 #if CMK_IMMEDIATE_MSG
1138     if (CmiIsImmediate(env)) {
1139       //CmiDelayImmediate();        /* buffer immediate message */
1140       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1141     }
1142 #endif
1143     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1144     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1145     return;
1146   }
1147   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1148   ck->process(); // ck->process() updates mProcessed count used in QD
1149   env->setMsgtype(ForChareMsg);
1150   env->setObjPtr(obj);
1151   _processForChareMsg(ck,env);
1152   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1153 }
1154
1155 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1156 {
1157   if(isGroupDepUnsatisfied(ck, env))
1158     return;
1159   CkGroupID groupID = env->getGroupNum();
1160   int epIdx = env->getEpIdx();
1161   ck->process(); // ck->process() updates mProcessed count used in QD
1162   CkCreateLocalGroup(groupID, epIdx, env);
1163 }
1164
1165 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1166 {
1167   if(isGroupDepUnsatisfied(ck, env))
1168     return;
1169   ck->process(); // ck->process() updates mProcessed count used in QD
1170   CkGroupID groupID = env->getGroupNum();
1171   int epIdx = env->getEpIdx();
1172   CkCreateLocalNodeGroup(groupID, epIdx, env);
1173 }
1174
1175 /************** Receive: Arrays *************/
1176 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1177   ArrayObjMap& object_map = CkpvAccess(array_objs);
1178   auto iter = object_map.find(env->getRecipientID());
1179   if (iter != object_map.end()) {
1180     // First see if we already have a direct pointer to the object
1181     _SET_USED(env, 0);
1182     ck->process(); // ck->process() updates mProcessed count used in QD
1183     int opts = 0;
1184     CkArrayMessage* msg = (CkArrayMessage*)EnvToUsr(env);
1185     if (msg->array_hops()>1) {
1186       CProxy_ArrayBase(env->getArrayMgr()).ckLocMgr()->multiHop(msg);
1187     }
1188     iter->second->ckInvokeEntry(env->getEpIdx(), msg, !(opts & CK_MSG_KEEP));
1189   } else {
1190     // Otherwise fallback to delivery through the array manager
1191     CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1192     if (mgr) {
1193       _SET_USED(env, 0);
1194       ck->process(); // ck->process() updates mProcessed count used in QD
1195       mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
1196     }
1197   }
1198 }
1199
1200 //BIGSIM_OOC DEBUGGING
1201 #define TELLMSGTYPE(x) //x
1202
1203 /**
1204  * This is the main converse-level handler used by all of Charm++.
1205  *
1206  * \addtogroup CriticalPathFramework
1207  */
1208 void _processHandler(void *converseMsg,CkCoreState *ck)
1209 {
1210   envelope *env = (envelope *) converseMsg;
1211
1212   MESSAGE_PHASE_CHECK(env);
1213
1214 #if CMK_ONESIDED_IMPL
1215   if(env->isRdma()){
1216     envelope *prevEnv = env;
1217     env = CkRdmaIssueRgets(prevEnv);
1218     if(env) {
1219       // Within pe or logical node, env points to new message with data
1220
1221       // Free prevEnv
1222       CkFreeMsg(EnvToUsr(prevEnv));
1223     } else{
1224       // async rdma call in place, asynchronous return and ack handling
1225       return;
1226     }
1227   }
1228 #endif
1229
1230 //#if CMK_RECORD_REPLAY
1231   if (ck->watcher!=NULL) {
1232     if (!ck->watcher->processMessage(&env,ck)) return;
1233   }
1234 //#endif
1235 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1236         Chare *obj=NULL;
1237         CkObjID sender;
1238         MCount SN;
1239         MlogEntry *entry=NULL;
1240         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1241            || env->getMsgtype() == ForArrayEltMsg
1242            || env->getMsgtype() == ForChareMsg) {
1243                 sender = env->sender;
1244                 SN = env->SN;
1245                 int result = preProcessReceivedMessage(env,&obj,&entry);
1246                 if(result == 0){
1247                         return;
1248                 }
1249         }
1250 #endif
1251 #if USE_CRITICAL_PATH_HEADER_ARRAY
1252   CK_CRITICALPATH_START(env)
1253 #endif
1254
1255   switch(env->getMsgtype()) {
1256 // Group support
1257     case BocInitMsg : // Group creation message
1258       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1259       // QD processing moved inside _processBocInitMsg because it is conditional
1260       //ck->process(); 
1261       if(env->isPacked()) CkUnpackMessage(&env);
1262       _processBocInitMsg(ck,env);
1263       break;
1264     case NodeBocInitMsg : // Nodegroup creation message
1265       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1266       if(env->isPacked()) CkUnpackMessage(&env);
1267       _processNodeBocInitMsg(ck,env);
1268       break;
1269     case ForBocMsg : // Group entry method message (non creation)
1270       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1271       // QD processing moved inside _processForBocMsg because it is conditional
1272       if(env->isPacked()) CkUnpackMessage(&env);
1273       _processForBocMsg(ck,env);
1274       // stats record moved inside _processForBocMsg because it is conditional
1275       break;
1276     case ForNodeBocMsg : // Nodegroup entry method message (non creation)
1277       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1278       // QD processing moved to _processForNodeBocMsg because it is conditional
1279       if(env->isPacked()) CkUnpackMessage(&env);
1280       _processForNodeBocMsg(ck,env);
1281       // stats record moved to _processForNodeBocMsg because it is conditional
1282       break;
1283
1284 // Array support
1285     case ForArrayEltMsg: // Array element entry method message
1286       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1287       if(env->isPacked()) CkUnpackMessage(&env);
1288       _processArrayEltMsg(ck,env);
1289       break;
1290
1291 // Chare support
1292     case NewChareMsg : // Singleton chare creation message
1293       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1294       if(env->isPacked()) CkUnpackMessage(&env);
1295       _processNewChareMsg(ck,env);
1296       break;
1297     case NewVChareMsg : // Singleton virtual chare creation message
1298       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1299       if(env->isPacked()) CkUnpackMessage(&env);
1300       _processNewVChareMsg(ck,env);
1301       break;
1302     case ForChareMsg : // Singeton chare entry method message (non creation)
1303       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1304       if(env->isPacked()) CkUnpackMessage(&env);
1305       _processForPlainChareMsg(ck,env);
1306       break;
1307     case ForVidMsg   : // Singleton virtual chare entry method message (non creation)
1308       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1309       _processForVidMsg(ck,env);
1310       break;
1311     case FillVidMsg  : // Message sent back from the real chare PE to the virtual chare PE to
1312                        // fill the VidBlock (called when the real chare is constructed)
1313       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1314       _processFillVidMsg(ck,env);
1315       break;
1316     case DeleteVidMsg  : // Message sent back from the real chare PE to the virtual chare PE to
1317                          // delete the Vidblock (called when the real chare is deleted by the destructor)
1318       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1319       _processDeleteVidMsg(ck,env);
1320       break;
1321
1322     default:
1323       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1324   }
1325 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1326         if(obj != NULL){
1327                 postProcessReceivedMessage(obj,sender,SN,entry);
1328         }
1329 #endif
1330
1331
1332 #if USE_CRITICAL_PATH_HEADER_ARRAY
1333   CK_CRITICALPATH_END()
1334 #endif
1335
1336 }
1337
1338
1339 /******************** Message Send **********************/
1340
1341 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1342              int *queueing, int *priobits, unsigned int **prioptr)
1343 {
1344   envelope *env = (envelope *)converseMsg;
1345   *pfn = (CldPackFn)CkPackMessage;
1346   *len = env->getTotalsize();
1347   *queueing = env->getQueueing();
1348   *priobits = env->getPriobits();
1349   *prioptr = (unsigned int *) env->getPrioPtr();
1350 }
1351
1352 void CkPackMessage(envelope **pEnv)
1353 {
1354   envelope *env = *pEnv;
1355   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1356     void *msg = EnvToUsr(env);
1357     _TRACE_BEGIN_PACK();
1358     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1359     _TRACE_END_PACK();
1360     env=UsrToEnv(msg);
1361     env->setPacked(1);
1362     *pEnv = env;
1363   }
1364 }
1365
1366 void CkUnpackMessage(envelope **pEnv)
1367 {
1368   envelope *env = *pEnv;
1369   int msgIdx = env->getMsgIdx();
1370   if(env->isPacked()) {
1371     void *msg = EnvToUsr(env);
1372     _TRACE_BEGIN_UNPACK();
1373     msg = _msgTable[msgIdx]->unpack(msg);
1374     _TRACE_END_UNPACK();
1375     env=UsrToEnv(msg);
1376     env->setPacked(0);
1377     *pEnv = env;
1378   }
1379 }
1380
1381 //There's no reason for most messages to go through the Cld--
1382 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1383 // Thus these accellerated versions of the Cld calls.
1384 #if CMK_OBJECT_QUEUE_AVAILABLE
1385 static int index_objectQHandler;
1386 #endif
1387 int index_tokenHandler;
1388 int index_skipCldHandler;
1389
1390 void _skipCldHandler(void *converseMsg)
1391 {
1392   envelope *env = (envelope *)(converseMsg);
1393   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1394 #if CMK_GRID_QUEUE_AVAILABLE
1395   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1396     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1397                        env, env->getQueueing (), env->getPriobits (),
1398                        (unsigned int *) env->getPrioPtr ());
1399   } else {
1400     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1401                        env, env->getQueueing (), env->getPriobits (),
1402                        (unsigned int *) env->getPrioPtr ());
1403   }
1404 #else
1405   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1406         env, env->getQueueing(),env->getPriobits(),
1407         (unsigned int *)env->getPrioPtr());
1408 #endif
1409 }
1410
1411
1412 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1413 // Made non-static to be used by ckmessagelogging
1414 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1415 {
1416 #if CMK_CHARMDEBUG
1417   if (!ConverseDeliver(pe)) {
1418     CmiFree(env);
1419     return;
1420   }
1421 #endif
1422
1423 #if CMK_FAULT_EVAC
1424   if(pe == CkMyPe() ){
1425     if(!CmiNodeAlive(CkMyPe())){
1426         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1427 //      return;
1428     }
1429   }
1430 #endif
1431   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1432 #if CMK_OBJECT_QUEUE_AVAILABLE
1433     Chare *obj = CkFindObjectPtr(env);
1434     if (obj && obj->CkGetObjQueue().queue()) {
1435       _enqObjQueue(obj, env);
1436     }
1437     else
1438 #endif
1439     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1440         env, env->getQueueing(),env->getPriobits(),
1441         (unsigned int *)env->getPrioPtr());
1442 #if CMK_PERSISTENT_COMM
1443     CmiPersistentOneSend();
1444 #endif
1445   } else {
1446     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1447       CkPackMessage(&env);
1448     int len=env->getTotalsize();
1449     CmiSetXHandler(env,CmiGetHandler(env));
1450 #if CMK_OBJECT_QUEUE_AVAILABLE
1451     CmiSetHandler(env,index_objectQHandler);
1452 #else
1453     CmiSetHandler(env,index_skipCldHandler);
1454 #endif
1455     CmiSetInfo(env,infoFn);
1456     if (pe==CLD_BROADCAST) {
1457 #if CMK_MESSAGE_LOGGING
1458         if(env->flags & CK_FREE_MSG_MLOG)
1459                 CmiSyncBroadcastAndFree(len, (char *)env); 
1460         else
1461                 CmiSyncBroadcast(len, (char *)env);
1462 #else
1463                         CmiSyncBroadcastAndFree(len, (char *)env); 
1464 #endif
1465
1466 }
1467     else if (pe==CLD_BROADCAST_ALL) { 
1468 #if CMK_MESSAGE_LOGGING
1469         if(env->flags & CK_FREE_MSG_MLOG)
1470                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1471         else
1472                 CmiSyncBroadcastAll(len, (char *)env);
1473 #else
1474                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1475 #endif
1476
1477 }
1478     else{
1479 #if CMK_MESSAGE_LOGGING
1480         if(env->flags & CK_FREE_MSG_MLOG)
1481                 CmiSyncSendAndFree(pe, len, (char *)env);
1482         else
1483                 CmiSyncSend(pe, len, (char *)env);
1484 #else
1485                         CmiSyncSendAndFree(pe, len, (char *)env);
1486 #endif
1487
1488                 }
1489   }
1490 }
1491
1492 #if CMK_BIGSIM_CHARM
1493 #   define  _skipCldEnqueue   _CldEnqueue
1494 #endif
1495
1496 // by pass Charm++ priority queue, send as Converse message
1497 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1498 {
1499 #if CMK_CHARMDEBUG
1500   if (!ConverseDeliver(-1)) {
1501     CmiFree(env);
1502     return;
1503   }
1504 #endif
1505   CkPackMessage(&env);
1506   int len=env->getTotalsize();
1507   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1508 }
1509
1510 static void _noCldEnqueue(int pe, envelope *env)
1511 {
1512 /*
1513   if (pe == CkMyPe()) {
1514     CmiHandleMessage(env);
1515   } else
1516 */
1517 #if CMK_CHARMDEBUG
1518   if (!ConverseDeliver(pe)) {
1519     CmiFree(env);
1520     return;
1521   }
1522 #endif
1523
1524   CkPackMessage(&env);
1525   int len=env->getTotalsize();
1526   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1527   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1528   else CmiSyncSendAndFree(pe, len, (char *)env);
1529 }
1530
1531 //static void _noCldNodeEnqueue(int node, envelope *env)
1532 //Made non-static to be used by ckmessagelogging
1533 void _noCldNodeEnqueue(int node, envelope *env)
1534 {
1535 /*
1536   if (node == CkMyNode()) {
1537     CmiHandleMessage(env);
1538   } else {
1539 */
1540 #if CMK_CHARMDEBUG
1541   if (!ConverseDeliver(node)) {
1542     CmiFree(env);
1543     return;
1544   }
1545 #endif
1546
1547   CkPackMessage(&env);
1548   int len=env->getTotalsize();
1549   if (node==CLD_BROADCAST) { 
1550 #if CMK_MESSAGE_LOGGING
1551         if(env->flags & CK_FREE_MSG_MLOG)
1552                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1553         else
1554                 CmiSyncNodeBroadcast(len, (char *)env);
1555 #else
1556         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1557 #endif
1558 }
1559   else if (node==CLD_BROADCAST_ALL) { 
1560 #if CMK_MESSAGE_LOGGING
1561         if(env->flags & CK_FREE_MSG_MLOG)
1562                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1563         else
1564                 CmiSyncNodeBroadcastAll(len, (char *)env);
1565 #else
1566                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1567 #endif
1568
1569 }
1570   else {
1571 #if CMK_MESSAGE_LOGGING
1572         if(env->flags & CK_FREE_MSG_MLOG)
1573                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1574         else
1575                 CmiSyncNodeSend(node, len, (char *)env);
1576 #else
1577         CmiSyncNodeSendAndFree(node, len, (char *)env);
1578 #endif
1579   }
1580 }
1581
1582 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1583 {
1584   envelope *env = UsrToEnv(msg);
1585   _CHECK_USED(env);
1586   _SET_USED(env, 1);
1587 #if CMK_REPLAYSYSTEM
1588   setEventID(env);
1589 #endif
1590   env->setMsgtype(ForChareMsg);
1591   env->setEpIdx(eIdx);
1592   env->setSrcPe(CkMyPe());
1593
1594 #if USE_CRITICAL_PATH_HEADER_ARRAY
1595   CK_CRITICALPATH_SEND(env)
1596   //CK_AUTOMATE_PRIORITY(env)
1597 #endif
1598 #if CMK_CHARMDEBUG
1599   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1600 #endif
1601 #if CMK_OBJECT_QUEUE_AVAILABLE
1602   CmiSetHandler(env, index_objectQHandler);
1603 #else
1604   CmiSetHandler(env, _charmHandlerIdx);
1605 #endif
1606   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1607     int pe = -(pCid->onPE+1);
1608     if(pe==CkMyPe()) {
1609 #ifndef CMK_CHARE_USE_PTR
1610       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1611 #else
1612       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1613 #endif
1614       void *objPtr;
1615       if (NULL!=(objPtr=vblk->getLocalChare()))
1616       { //A ready local chare
1617         env->setObjPtr(objPtr);
1618         return pe;
1619       }
1620       else { //The vidblock is not ready-- forget it
1621         vblk->send(env);
1622         return -1;
1623       }
1624     } else { //Valid vidblock for another PE:
1625       env->setMsgtype(ForVidMsg);
1626       env->setVidPtr(pCid->objPtr);
1627       return pe;
1628     }
1629   }
1630   else {
1631     env->setObjPtr(pCid->objPtr);
1632     return pCid->onPE;
1633   }
1634 }
1635
1636 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1637 {
1638   int destPE = _prepareMsg(eIdx, msg, pCid);
1639   if (destPE != -1) {
1640     envelope *env = UsrToEnv(msg);
1641     //criticalPath_send(env);
1642 #if USE_CRITICAL_PATH_HEADER_ARRAY
1643     CK_CRITICALPATH_SEND(env)
1644     //CK_AUTOMATE_PRIORITY(env)
1645 #endif
1646     CmiBecomeImmediate(env);
1647   }
1648   return destPE;
1649 }
1650
1651 extern "C"
1652 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1653 {
1654   if (opts & CK_MSG_INLINE) {
1655     CkSendMsgInline(entryIdx, msg, pCid, opts);
1656     return;
1657   }
1658   envelope *env = UsrToEnv(msg);
1659 #if CMK_ERROR_CHECKING
1660   //Allow rdma metadata messages marked as immediate to go through
1661   if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
1662     CmiAbort("Immediate message is not allowed in Chare!");
1663   }
1664 #endif
1665   int destPE=_prepareMsg(entryIdx,msg,pCid);
1666   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1667   // VidBlock was not yet filled). The problem is that the creation was never
1668   // traced later when the VidBlock was filled. One solution is to trace the
1669   // creation here, the other to trace it in VidBlock->msgDeliver().
1670 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1671   if (destPE!=-1) {
1672     CpvAccess(_qd)->create();
1673   }
1674         sendChareMsg(env,destPE,_infoIdx,pCid);
1675 #else
1676   _TRACE_CREATION_1(env);
1677   if (destPE!=-1) {
1678     CpvAccess(_qd)->create();
1679     if (opts & CK_MSG_SKIP_OR_IMM)
1680       _noCldEnqueue(destPE, env);
1681     else
1682       _CldEnqueue(destPE, env, _infoIdx);
1683   }
1684   _TRACE_CREATION_DONE(1);
1685 #endif
1686 }
1687
1688 extern "C"
1689 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1690 {
1691   if (pCid->onPE==CkMyPe())
1692   {
1693 #if CMK_FAULT_EVAC
1694     if(!CmiNodeAlive(CkMyPe())){
1695         return;
1696     }
1697 #endif
1698 #if CMK_CHARMDEBUG
1699     //Just in case we need to breakpoint or use the envelope in some way
1700     _prepareMsg(entryIndex,msg,pCid);
1701 #endif
1702                 //Just directly call the chare (skip QD handling & scheduler)
1703     envelope *env = UsrToEnv(msg);
1704     if (env->isPacked()) CkUnpackMessage(&env);
1705     _STATS_RECORD_PROCESS_MSG_1();
1706     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1707   }
1708   else {
1709     //No way to inline a cross-processor message:
1710     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1711   }
1712 }
1713
1714 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1715 {
1716   envelope *env = UsrToEnv(msg);
1717   /*#if CMK_ERROR_CHECKING
1718   CkNodeGroupID nodeRedMgr;
1719 #endif
1720   */
1721   _CHECK_USED(env);
1722   _SET_USED(env, 1);
1723 #if CMK_REPLAYSYSTEM
1724   setEventID(env);
1725 #endif
1726   env->setMsgtype(type);
1727   env->setEpIdx(eIdx);
1728   env->setGroupNum(gID);
1729   env->setSrcPe(CkMyPe());
1730   /*
1731 #if CMK_ERROR_CHECKING
1732   nodeRedMgr.setZero();
1733   env->setRednMgr(nodeRedMgr);
1734 #endif
1735 */
1736   //criticalPath_send(env);
1737 #if USE_CRITICAL_PATH_HEADER_ARRAY
1738   CK_CRITICALPATH_SEND(env)
1739   //CK_AUTOMATE_PRIORITY(env)
1740 #endif
1741 #if CMK_CHARMDEBUG
1742   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1743 #endif
1744   CmiSetHandler(env, _charmHandlerIdx);
1745   return env;
1746 }
1747
1748 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1749 {
1750   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1751 #if USE_CRITICAL_PATH_HEADER_ARRAY
1752   CK_CRITICALPATH_SEND(env)
1753   //CK_AUTOMATE_PRIORITY(env)
1754 #endif
1755   CmiBecomeImmediate(env);
1756   return env;
1757 }
1758
1759 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1760                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1761 {
1762   int numPes;
1763   envelope *env;
1764     if (opts & CK_MSG_IMMEDIATE) {
1765         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1766     }else
1767     {
1768         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1769     }
1770
1771 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1772         sendGroupMsg(env,pe,_infoIdx);
1773 #else
1774   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1775   _TRACE_CREATION_N(env, numPes);
1776   if (opts & CK_MSG_SKIP_OR_IMM)
1777     _noCldEnqueue(pe, env);
1778   else
1779     _skipCldEnqueue(pe, env, _infoIdx);
1780   _TRACE_CREATION_DONE(1);
1781 #endif
1782 }
1783
1784 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1785                            int npes, int *pes)
1786 {
1787   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1788   _TRACE_CREATION_MULTICAST(env, npes, pes);
1789   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1790   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1791 }
1792
1793 extern "C"
1794 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1795 {
1796 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1797   if (destPE==CkMyPe())
1798   {
1799     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1800     return;
1801   }
1802   //Can't inline-- send the usual way
1803   envelope *env = UsrToEnv(msg);
1804   int numPes;
1805   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1806   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1807   _TRACE_CREATION_N(env, numPes);
1808   _noCldEnqueue(destPE, env);
1809   _STATS_RECORD_SEND_BRANCH_1();
1810   CkpvAccess(_coreState)->create();
1811   _TRACE_CREATION_DONE(1);
1812 #else
1813   // no support for immediate message, send inline
1814   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1815 #endif
1816 }
1817
1818 extern "C"
1819 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1820 {
1821   if (destPE==CkMyPe())
1822   {
1823 #if CMK_FAULT_EVAC
1824     if(!CmiNodeAlive(CkMyPe())){
1825         return;
1826     }
1827 #endif
1828     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1829     if (obj!=NULL)
1830     { //Just directly call the group:
1831 #if CMK_ERROR_CHECKING
1832       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1833 #else
1834       envelope *env=UsrToEnv(msg);
1835 #endif
1836       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1837       return;
1838     }
1839   }
1840   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1841   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1842 }
1843
1844 extern "C"
1845 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1846 {
1847   if (opts & CK_MSG_INLINE) {
1848     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1849     return;
1850   }
1851   envelope *env=UsrToEnv(msg);
1852   //Allow rdma metadata messages marked as immediate to go through
1853   if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
1854     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1855     return;
1856   }
1857   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1858   _STATS_RECORD_SEND_BRANCH_1();
1859   CkpvAccess(_coreState)->create();
1860 }
1861
1862 extern "C"
1863 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1864 {
1865 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1866   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1867   _TRACE_CREATION_MULTICAST(env, npes, pes);
1868   _noCldEnqueueMulti(npes, pes, env);
1869   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1870 #else
1871   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1872   CpvAccess(_qd)->create(-npes);
1873 #endif
1874   _STATS_RECORD_SEND_BRANCH_N(npes);
1875   CpvAccess(_qd)->create(npes);
1876 }
1877
1878 extern "C"
1879 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1880 {
1881   if (opts & CK_MSG_IMMEDIATE) {
1882     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1883     return;
1884   }
1885     // normal mesg
1886   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1887   _STATS_RECORD_SEND_BRANCH_N(npes);
1888   CpvAccess(_qd)->create(npes);
1889 }
1890
1891 extern "C"
1892 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1893 {
1894   int npes;
1895   int *pes;
1896   if (opts & CK_MSG_IMMEDIATE) {
1897     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1898     return;
1899   }
1900     // normal mesg
1901   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1902   CmiLookupGroup(grp, &npes, &pes);
1903   _TRACE_CREATION_MULTICAST(env, npes, pes);
1904   _CldEnqueueGroup(grp, env, _infoIdx);
1905   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1906   _STATS_RECORD_SEND_BRANCH_N(npes);
1907   CpvAccess(_qd)->create(npes);
1908 }
1909
1910 extern "C"
1911 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1912 {
1913   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1914   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1915   CpvAccess(_qd)->create(CkNumPes());
1916 }
1917
1918 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1919                 int node=CLD_BROADCAST_ALL, int opts=0)
1920 {
1921     int numPes;
1922     envelope *env;
1923     if (opts & CK_MSG_IMMEDIATE) {
1924         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1925     }else
1926     {
1927         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1928     }
1929 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1930     sendNodeGroupMsg(env,node,_infoIdx);
1931 #else
1932   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1933   _TRACE_CREATION_N(env, numPes);
1934   if (opts & CK_MSG_SKIP_OR_IMM) {
1935     _noCldNodeEnqueue(node, env);
1936   }
1937   else
1938     _CldNodeEnqueue(node, env, _infoIdx);
1939   _TRACE_CREATION_DONE(1);
1940 #endif
1941 }
1942
1943 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1944                            int npes, int *nodes)
1945 {
1946   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1947   _TRACE_CREATION_N(env, npes);
1948   for (int i=0; i<npes; i++) {
1949     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1950   }
1951   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1952 }
1953
1954 extern "C"
1955 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1956 {
1957 #if CMK_IMMEDIATE_MSG
1958   if (node==CkMyNode())
1959   {
1960     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1961     return;
1962   }
1963   //Can't inline-- send the usual way
1964   envelope *env = UsrToEnv(msg);
1965   int numPes;
1966   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1967   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1968   _TRACE_CREATION_N(env, numPes);
1969   _noCldNodeEnqueue(node, env);
1970   _STATS_RECORD_SEND_BRANCH_1();
1971   CkpvAccess(_coreState)->create();
1972   _TRACE_CREATION_DONE(1);
1973 #else
1974   // no support for immediate message, send inline
1975   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1976 #endif
1977 }
1978
1979 extern "C"
1980 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1981 {
1982   if (node==CkMyNode() && ((envelope *)(UsrToEnv(msg)))->isRdma() == false)
1983   {
1984     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1985     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1986     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1987     if (obj!=NULL)
1988     { //Just directly call the group:
1989 #if CMK_ERROR_CHECKING
1990       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1991 #else
1992       envelope *env=UsrToEnv(msg);
1993 #endif
1994       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1995       return;
1996     }
1997   }
1998   //Can't inline-- send the usual way
1999   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
2000 }
2001
2002 extern "C"
2003 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
2004 {
2005   if (opts & CK_MSG_INLINE) {
2006     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
2007     return;
2008   }
2009   if (opts & CK_MSG_IMMEDIATE) {
2010     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
2011     return;
2012   }
2013   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
2014   _STATS_RECORD_SEND_NODE_BRANCH_1();
2015   CkpvAccess(_coreState)->create();
2016 }
2017
2018 extern "C"
2019 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
2020 {
2021 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
2022   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
2023   _noCldEnqueueMulti(npes, nodes, env);
2024 #else
2025   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2026   CpvAccess(_qd)->create(-npes);
2027 #endif
2028   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2029   CpvAccess(_qd)->create(npes);
2030 }
2031
2032 extern "C"
2033 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2034 {
2035   if (opts & CK_MSG_IMMEDIATE) {
2036     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2037     return;
2038   }
2039     // normal mesg
2040   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2041   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2042   CpvAccess(_qd)->create(npes);
2043 }
2044
2045 extern "C"
2046 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2047 {
2048   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2049   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2050   CpvAccess(_qd)->create(CkNumNodes());
2051 }
2052
2053 //Needed by delegation manager:
2054 extern "C"
2055 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2056 { return _prepareMsg(eIdx,msg,pCid); }
2057 extern "C"
2058 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2059 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2060 extern "C"
2061 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2062 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2063
2064 void _ckModuleInit(void) {
2065         CmiAssignOnce(&index_skipCldHandler, CkRegisterHandler(_skipCldHandler));
2066 #if CMK_OBJECT_QUEUE_AVAILABLE
2067         CmiAssignOnce(&index_objectQHandler, CkRegisterHandler(_ObjectQHandler));
2068 #endif
2069         CmiAssignOnce(&index_tokenHandler, CkRegisterHandler(_TokenHandler));
2070         CkpvInitialize(TokenPool*, _tokenPool);
2071         CkpvAccess(_tokenPool) = new TokenPool;
2072 }
2073
2074
2075 /************** Send: Arrays *************/
2076
2077 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2078 {
2079   _CHECK_USED(env);
2080   _SET_USED(env, 1);
2081   env->setMsgtype(type);
2082 #if CMK_CHARMDEBUG
2083   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2084 #endif
2085   CmiSetHandler(env, _charmHandlerIdx);
2086   CpvAccess(_qd)->create();
2087 }
2088
2089 extern "C"
2090 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2091   envelope *env = UsrToEnv(msg);
2092   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2093 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2094         sendArrayMsg(env,pe,_infoIdx);
2095 #else
2096   if (opts & CK_MSG_IMMEDIATE)
2097     CmiBecomeImmediate(env);
2098   if (opts & CK_MSG_SKIP_OR_IMM)
2099     _noCldEnqueue(pe, env);
2100   else
2101     _skipCldEnqueue(pe, env, _infoIdx);
2102 #endif
2103 }
2104
2105 class ElementDestroyer : public CkLocIterator {
2106 private:
2107         CkLocMgr *locMgr;
2108 public:
2109         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2110         void addLocation(CkLocation &loc) {
2111           loc.destroyAll();
2112         }
2113 };
2114
2115 void CkDeleteChares() {
2116   int i;
2117   int numGroups = CkpvAccess(_groupIDTable)->size();
2118
2119   // delete all plain chares
2120 #ifndef CMK_CHARE_USE_PTR
2121   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2122         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2123         delete obj;
2124         CkpvAccess(chare_objs)[i] = NULL;
2125   }
2126   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2127         VidBlock *obj = CkpvAccess(vidblocks)[i];
2128         delete obj;
2129         CkpvAccess(vidblocks)[i] = NULL;
2130   }
2131 #endif
2132
2133   // delete all array elements
2134   for(i=0;i<numGroups;i++) {
2135     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2136     if(obj && obj->isLocMgr())  {
2137       CkLocMgr *mgr = (CkLocMgr*)obj;
2138       ElementDestroyer destroyer(mgr);
2139       mgr->iterate(destroyer);
2140     }
2141   }
2142
2143   // delete all groups
2144   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2145   for(i=0;i<numGroups;i++) {
2146     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2147     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2148     if (obj) delete obj;
2149   }
2150   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2151
2152   // delete all node groups
2153   if (CkMyRank() == 0) {
2154     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2155     for(i=0;i<numNodeGroups;i++) {
2156       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2157       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2158       if (obj) delete obj;
2159     }
2160   }
2161 }
2162
2163 #if CMK_BIGSIM_CHARM
2164 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2165                                    int pb,unsigned int *prio);
2166 #endif
2167
2168 //------------------- External client support (e.g. Charm4py) ----------------
2169
2170 static std::vector< std::vector<char> > ext_args;
2171 static std::vector<char*> ext_argv;
2172
2173 // This is just a wrapper for ConverseInit that copies command-line args into a private
2174 // buffer.
2175 // To be called from external clients like charm4py. This wrapper avoids issues with
2176 // ctypes and cffi.
2177 extern "C" void StartCharmExt(int argc, char **argv) {
2178 #if !defined(_WIN32) && !NODE_0_IS_CONVHOST
2179   // only do this in net layers if using charmrun, so that output of process 0
2180   // doesn't get duplicated
2181   char *ns = getenv("NETSTART");
2182   if (ns != 0) {
2183     int fd;
2184     if (-1 != (fd = open("/dev/null", O_RDWR))) {
2185       dup2(fd, 0);
2186       dup2(fd, 1);
2187       dup2(fd, 2);
2188     }
2189   }
2190 #endif
2191   ext_args.resize(argc);
2192   ext_argv.resize(argc + 1, NULL);
2193   for (int i=0; i < argc; i++) {
2194     ext_args[i].resize(strlen(argv[i]) + 1);
2195     strcpy(ext_args[i].data(), argv[i]);
2196     ext_argv[i] = ext_args[i].data();
2197   }
2198   ConverseInit(argc, ext_argv.data(), _initCharm, 0, 0);
2199 }
2200
2201 void (*CkRegisterMainModuleCallback)() = NULL;
2202 extern "C" void registerCkRegisterMainModuleCallback(void (*cb)()) {
2203   CkRegisterMainModuleCallback = cb;
2204 }
2205
2206 void (*MainchareCtorExtCallback)(int, void*, int, int, char **) = NULL;
2207 extern "C" void registerMainchareCtorExtCallback(void (*cb)(int, void*, int, int, char **)) {
2208   MainchareCtorExtCallback = cb;
2209 }
2210
2211 void (*ReadOnlyRecvExtCallback)(int, char*) = NULL;
2212 extern "C" void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)) {
2213   ReadOnlyRecvExtCallback = cb;
2214 }
2215
2216 void* ReadOnlyExt::ro_data = NULL;
2217 size_t ReadOnlyExt::data_size = 0;
2218
2219 void (*ChareMsgRecvExtCallback)(int, void*, int, int, char *, int) = NULL;
2220 extern "C" void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char *, int)) {
2221   ChareMsgRecvExtCallback = cb;
2222 }
2223
2224 void (*GroupMsgRecvExtCallback)(int, int, int, char *, int) = NULL;
2225 extern "C" void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)) {
2226   GroupMsgRecvExtCallback = cb;
2227 }
2228
2229 void (*ArrayMsgRecvExtCallback)(int, int, int *, int, int, char *, int) = NULL;
2230 extern "C" void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)) {
2231   ArrayMsgRecvExtCallback = cb;
2232 }
2233
2234 int (*ArrayElemLeaveExt)(int, int, int *, char**, int) = NULL;
2235 extern "C" void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)) {
2236   ArrayElemLeaveExt = cb;
2237 }
2238
2239 void (*ArrayElemJoinExt)(int, int, int *, int, char*, int) = NULL;
2240 extern "C" void registerArrayElemJoinExtCallback(void (*cb)(int, int, int *, int, char*, int)) {
2241   ArrayElemJoinExt = cb;
2242 }
2243
2244 void (*ArrayResumeFromSyncExtCallback)(int, int, int *) = NULL;
2245 extern "C" void registerArrayResumeFromSyncExtCallback(void (*cb)(int, int, int *)) {
2246   ArrayResumeFromSyncExtCallback = cb;
2247 }
2248
2249 void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*) = NULL;
2250 extern "C" void registerCreateReductionTargetMsgExtCallback(void (*cb)(void*, int, int, int, char**, int*)) {
2251   CreateReductionTargetMsgExt = cb;
2252 }
2253
2254 int (*PyReductionExt)(char**, int*, int, char**) = NULL;
2255 extern "C" void registerPyReductionExtCallback(int (*cb)(char**, int*, int, char**)) {
2256     PyReductionExt = cb;
2257 }
2258
2259 int (*ArrayMapProcNumExtCallback)(int, int, const int *) = NULL;
2260 extern "C" void registerArrayMapProcNumExtCallback(int (*cb)(int, int, const int *)) {
2261   ArrayMapProcNumExtCallback = cb;
2262 }
2263
2264 extern "C" int CkMyPeHook() { return CkMyPe(); }
2265 extern "C" int CkNumPesHook() { return CkNumPes(); }
2266
2267 void ReadOnlyExt::setData(void *msg, size_t msgSize) {
2268   ro_data = malloc(msgSize);
2269   memcpy(ro_data, msg, msgSize);
2270   data_size = msgSize;
2271 }
2272
2273 void ReadOnlyExt::_roPup(void *pup_er) {
2274   PUP::er &p=*(PUP::er *)pup_er;
2275   if (!p.isUnpacking()) {
2276     //printf("[%d] Sizing/packing data, ro_data=%p, data_size=%d\n", CkMyPe(), ro_data, int(data_size));
2277     p | data_size;
2278     p((char*)ro_data, data_size);
2279   } else {
2280     CkAssert(CkMyPe() != 0);
2281     CkAssert(ro_data == NULL);
2282     PUP::fromMem &p_mem = *(PUP::fromMem *)pup_er;
2283     p_mem | data_size;
2284     //printf("[%d] Unpacking ro, size of data to unpack is %d\n", CkMyPe(), int(data_size));
2285     ReadOnlyRecvExtCallback(int(data_size), p_mem.get_current_pointer());
2286     p_mem.advance(data_size);
2287   }
2288 }
2289
2290 CkpvExtern(int, _currentChareType);
2291
2292 MainchareExt::MainchareExt(CkArgMsg *m) {
2293   int cIdx = CkpvAccess(_currentChareType);
2294   //printf("Constructor of MainchareExt, chareId=(%d,%p), chareIdx=%d\n", thishandle.onPE, thishandle.objPtr, cIdx);
2295   int ctorEpIdx =  _mainTable[_chareTable[cIdx]->mainChareType()]->entryIdx;
2296   MainchareCtorExtCallback(thishandle.onPE, thishandle.objPtr, ctorEpIdx, m->argc, m->argv);
2297   delete m;
2298 }
2299
2300 GroupExt::GroupExt(void *impl_msg) {
2301   //printf("Constructor of GroupExt, gid=%d\n", thisgroup.idx);
2302   //int chareIdx = CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
2303   int chareIdx = ckGetChareType();
2304   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2305   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2306   char *impl_buf = impl_msg_typed->msgBuf;
2307   PUP::fromMem implP(impl_buf);
2308   int msgSize; implP|msgSize;
2309   int dcopy_start; implP|dcopy_start;
2310   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2311                           dcopy_start);
2312 }
2313
2314 ArrayMapExt::ArrayMapExt(void *impl_msg) {
2315   //printf("Constructor of ArrayMapExt, gid=%d\n", thisgroup.idx);
2316   int chareIdx = ckGetChareType();
2317   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2318   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2319   char *impl_buf = impl_msg_typed->msgBuf;
2320   PUP::fromMem implP(impl_buf);
2321   int msgSize; implP|msgSize;
2322   int dcopy_start; implP|dcopy_start;
2323   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2324                           dcopy_start);
2325 }
2326
2327 // TODO options
2328 extern "C"
2329 int CkCreateGroupExt(int cIdx, int eIdx, int num_bufs, char **bufs, int *buf_sizes) {
2330   //static_cast<void>(impl_e_opts);
2331   CkAssert(num_bufs >= 1);
2332   int totalSize = 0;
2333   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2334   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2);
2335   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2336   PUP::toMem implP((void *)impl_msg->msgBuf);
2337   implP|totalSize;
2338   implP|buf_sizes[0];
2339   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2340   UsrToEnv(impl_msg)->setMsgtype(BocInitMsg);
2341   //if (impl_e_opts)
2342   //  UsrToEnv(impl_msg)->setGroupDep(impl_e_opts->getGroupDepID());
2343   CkGroupID gId = CkCreateGroup(cIdx, eIdx, impl_msg);
2344   return gId.idx;
2345 }
2346
2347 // TODO options
2348 extern "C"
2349 int CkCreateArrayExt(int cIdx, int ndims, int *dims, int eIdx, int num_bufs,
2350                      char **bufs, int *buf_sizes, int map_gid, char useAtSync) {
2351   //static_cast<void>(impl_e_opts);
2352   CkAssert(num_bufs >= 1);
2353   int totalSize = 0;
2354   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2355   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2356   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2357   PUP::toMem implP((void *)impl_msg->msgBuf);
2358   implP|useAtSync;
2359   implP|totalSize;
2360   implP|buf_sizes[0];
2361   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2362   CkArrayOptions opts;
2363   if (ndims != -1)
2364     opts = CkArrayOptions(ndims, dims);
2365   if (map_gid >= 0) {
2366     CkGroupID map_gId;
2367     map_gId.idx = map_gid;
2368     opts.setMap(CProxy_Group(map_gId));
2369   }
2370   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2371   //CkArrayID gId = ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2372   CkGroupID gId = CProxyElement_ArrayElement::ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2373   return gId.idx;
2374 }
2375
2376 // TODO options
2377 extern "C"
2378 void CkInsertArrayExt(int aid, int ndims, int *index, int epIdx, int onPE, int num_bufs,
2379                       char **bufs, int *buf_sizes, char useAtSync) {
2380   CkAssert(num_bufs >= 1);
2381   int totalSize = 0;
2382   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2383   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2384   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2385   PUP::toMem implP((void *)impl_msg->msgBuf);
2386   implP|useAtSync;
2387   implP|totalSize;
2388   implP|buf_sizes[0];
2389   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2390
2391   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2392   CkArrayIndex newIdx(ndims, index);
2393   CkGroupID gId;
2394   gId.idx = aid;
2395   CProxy_ArrayBase(gId).ckInsertIdx((CkArrayMessage *)impl_msg, epIdx, onPE, newIdx);
2396 }
2397
2398 extern "C"
2399 void CkMigrateExt(int aid, int ndims, int *index, int toPe) {
2400   //printf("[charm] CkMigrateMeExt called with aid: %d, ndims: %d, index: %d, toPe: %d\n",
2401         //aid, ndims, *index, toPe);
2402   CkGroupID gId;
2403   gId.idx = aid;
2404   CkArrayIndex arrayIndex(ndims, index);
2405   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2406   ArrayElement* arrayElement = arrayProxy.ckLocal();
2407   CkAssert(arrayElement != NULL);
2408   arrayElement->migrateMe(toPe);
2409 }
2410
2411 extern "C"
2412 void CkArrayDoneInsertingExt(int aid) {
2413   CkGroupID gId;
2414   gId.idx = aid;
2415   CProxy_ArrayBase(gId).doneInserting();
2416 }
2417
2418 extern "C"
2419 int CkGroupGetReductionNumber(int g_id) {
2420   CkGroupID gId;
2421   gId.idx = g_id;
2422   return ((Group*)CkLocalBranch(gId))->getRedNo();
2423 }
2424
2425 extern "C"
2426 int CkArrayGetReductionNumber(int aid, int ndims, int *index) {
2427   CkGroupID gId;
2428   gId.idx = aid;
2429   CkArrayIndex arrayIndex(ndims, index);
2430   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2431   ArrayElement* arrayElement = arrayProxy.ckLocal();
2432   CkAssert(arrayElement != NULL);
2433   return arrayElement->getRedNo();
2434 }
2435
2436 extern "C"
2437 void CkChareExtSend(int onPE, void *objPtr, int epIdx, char *msg, int msgSize) {
2438   //ckCheck();    // checks that gid is not zero
2439   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2440   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2441   PUP::toMem implP((void *)impl_msg->msgBuf);
2442   implP|msgSize;
2443   implP|epIdx;
2444   int d=0; implP|d;
2445   implP(msg, msgSize);
2446   CkChareID chareID;
2447   chareID.onPE = onPE;
2448   chareID.objPtr = objPtr;
2449
2450   CkSendMsg(epIdx, impl_msg, &chareID);
2451 }
2452
2453 extern "C"
2454 void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2455   CkAssert(num_bufs >= 1);
2456   //ckCheck();    // checks that gid is not zero
2457   int totalSize = 0;
2458   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2459   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2460   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2461   PUP::toMem implP((void *)impl_msg->msgBuf);
2462   implP | totalSize;
2463   implP | epIdx;
2464   implP | buf_sizes[0];
2465   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2466   CkChareID chareID;
2467   chareID.onPE = onPE;
2468   chareID.objPtr = objPtr;
2469
2470   CkSendMsg(epIdx, impl_msg, &chareID);
2471 }
2472
2473 extern "C"
2474 void CkGroupExtSend(int gid, int pe, int epIdx, char *msg, int msgSize) {
2475   //ckCheck();    // checks that gid is not zero
2476   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2477   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2478   PUP::toMem implP((void *)impl_msg->msgBuf);
2479   implP|msgSize;
2480   implP|epIdx;
2481   int d=0; implP|d;
2482   implP(msg, msgSize);
2483   CkGroupID gId;
2484   gId.idx = gid;
2485
2486   if (pe == -1)
2487     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2488   else
2489     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2490 }
2491
2492 extern "C"
2493 void CkGroupExtSend_multi(int gid, int pe, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2494   CkAssert(num_bufs >= 1);
2495   //ckCheck();    // checks that gid is not zero
2496   int totalSize = 0;
2497   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2498   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2499   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2500   PUP::toMem implP((void *)impl_msg->msgBuf);
2501   implP | totalSize;
2502   implP | epIdx;
2503   implP | buf_sizes[0];
2504   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2505   CkGroupID gId;
2506   gId.idx = gid;
2507
2508   if (pe == -1)
2509     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2510   else
2511     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2512 }
2513
2514 extern "C"
2515 void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize) {
2516   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2517   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2518   PUP::toMem implP((void *)impl_msg->msgBuf);
2519   implP|msgSize;
2520   implP|epIdx;
2521   int d=0; implP|d;
2522   implP(msg, msgSize);
2523   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2524   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2525   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2526   CkGroupID gId;
2527   gId.idx = aid;
2528   if (ndims > 0) {
2529     CkArrayIndex arrIndex(ndims, idx);
2530     // TODO is there a better function for this?
2531     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2532   } else { // broadcast
2533     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2534   }
2535 }
2536
2537 extern "C"
2538 void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2539   CkAssert(num_bufs >= 1);
2540   int totalSize = 0;
2541   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2542   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2543   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2544   PUP::toMem implP((void *)impl_msg->msgBuf);
2545   implP | totalSize;
2546   implP | epIdx;
2547   implP | buf_sizes[0];
2548   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2549   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2550   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2551   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2552   CkGroupID gId;
2553   gId.idx = aid;
2554   if (ndims > 0) {
2555     CkArrayIndex arrIndex(ndims, idx);
2556     // TODO is there a better function for this?
2557     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2558   } else { // broadcast
2559     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2560   }
2561 }
2562
2563 //------------------- Message Watcher (record/replay) ----------------
2564
2565 #include "crc32.h"
2566
2567 CkpvDeclare(int, envelopeEventID);
2568 int _recplay_crc = 0;
2569 int _recplay_checksum = 0;
2570 int _recplay_logsize = 1024*1024;
2571
2572 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2573 #define REPLAYDEBUG(args) /* empty */
2574
2575 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2576
2577 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2578 #include "BaseLB.h" /* For LBMigrateMsg message */
2579
2580 #if CMK_REPLAYSYSTEM
2581 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2582   std::string fName = CkpvAccess(traceRoot);
2583   fName += prefix;
2584   fName += std::to_string(CkMyPe());
2585   fName += suffix;
2586   FILE *f = fopen(fName.c_str(), permissions);
2587   REPLAYDEBUG("openReplayfile " << fName.c_str());
2588   if (f==NULL) {
2589     CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2590              CkMyPe(), fName.c_str(), permissions);
2591     CkAbort("openReplayFile> Could not open replay file");
2592   }
2593   return f;
2594 }
2595
2596 class CkMessageRecorder : public CkMessageWatcher {
2597   unsigned int curpos;
2598   bool firstOpen;
2599   std::vector<char> buffer;
2600 public:
2601   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true), buffer(_recplay_logsize) { f=f_; }
2602   ~CkMessageRecorder() {
2603     flushLog(0);
2604     fprintf(f,"-1 -1 -1 ");
2605     fclose(f);
2606 #if 0
2607     FILE *stsfp = fopen("sts", "w");
2608     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2609     traceWriteSTS(stsfp, 0);
2610     fclose(stsfp);
2611 #endif
2612     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2613   }
2614
2615 private:
2616   void flushLog(int verbose=1) {
2617     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2618     fprintf(f, "%s", buffer.data());
2619     curpos=0;
2620   }
2621   virtual bool process(envelope **envptr,CkCoreState *ck) {
2622     if ((*envptr)->getEvent()) {
2623       bool wasPacked = (*envptr)->isPacked();
2624       if (!wasPacked) CkPackMessage(envptr);
2625       envelope *env = *envptr;
2626       unsigned int crc1=0, crc2=0;
2627       if (_recplay_crc) {
2628         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2629         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2630         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2631       } else if (_recplay_checksum) {
2632         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2633         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2634       }
2635       curpos+=sprintf(&buffer[curpos],"%d %d %d %d %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2636       if (curpos > _recplay_logsize-128) flushLog();
2637       if (!wasPacked) CkUnpackMessage(envptr);
2638     }
2639     return true;
2640   }
2641   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2642     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2643     if (curpos > _recplay_logsize-128) flushLog();
2644     return true;
2645   }
2646   
2647   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2648     FILE *f;
2649     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2650     else f = openReplayFile("ckreplay_",".lb","a");
2651     firstOpen = false;
2652     if (f != NULL) {
2653       PUP::toDisk p(f);
2654       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2655       (*msg)->pup(p);
2656       fclose(f);
2657     }
2658     return true;
2659   }
2660 };
2661
2662 class CkMessageDetailRecorder : public CkMessageWatcher {
2663 public:
2664   CkMessageDetailRecorder(FILE *f_) {
2665     f=f_;
2666     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2667      * The value of 'x' is the pointer size.
2668      */
2669     CmiUInt2 little = sizeof(void*);
2670     fwrite(&little, 2, 1, f);
2671   }
2672   ~CkMessageDetailRecorder() {fclose(f);}
2673 private:
2674   virtual bool process(envelope **envptr, CkCoreState *ck) {
2675     bool wasPacked = (*envptr)->isPacked();
2676     if (!wasPacked) CkPackMessage(envptr);
2677     envelope *env = *envptr;
2678     CmiUInt4 size = env->getTotalsize();
2679     fwrite(&size, 4, 1, f);
2680     fwrite(env, env->getTotalsize(), 1, f);
2681     if (!wasPacked) CkUnpackMessage(envptr);
2682     return true;
2683   }
2684 };
2685
2686 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2687 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2688
2689 class CkMessageReplay : public CkMessageWatcher {
2690   int counter;
2691         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2692         int nextEP;
2693         unsigned int crc1, crc2;
2694         FILE *lbFile;
2695         /// Read the next message we need from the file:
2696         void getNext(void) {
2697           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2698           if (nextSize > 0) {
2699             // We are reading a regular message
2700             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2701               CkAbort("CkMessageReplay> Syntax error reading replay file");
2702             }
2703             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2704           } else if (nextSize == -2) {
2705             // We are reading a special message (right now only thread awaken)
2706             // Nothing to do since we have already read all info
2707             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2708           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2709             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2710             CkAbort("CkMessageReplay> Unrecognized input");
2711           }
2712             /*
2713                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2714                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2715                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2716                 }
2717                 */
2718                 counter++;
2719         }
2720         /// If this is the next message we need, advance and return true.
2721         bool isNext(envelope *env) {
2722                 if (nextPE!=env->getSrcPe()) return false;
2723                 if (nextEvent!=env->getEvent()) return false;
2724                 if (nextSize<0) return false; // not waiting for a regular message
2725 #if 1
2726                 if (nextEP != env->getEpIdx()) {
2727                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2728                         return false;
2729                 }
2730 #endif
2731 #if ! CMK_BIGSIM_CHARM
2732                 if (nextSize!=env->getTotalsize())
2733                 {
2734                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2735                         return false;
2736                 }
2737                 if (_recplay_crc || _recplay_checksum) {
2738                   bool wasPacked = env->isPacked();
2739                   if (!wasPacked) CkPackMessage(&env);
2740                   if (_recplay_crc) {
2741                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2742                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2743                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2744                     if (crcnew1 != crc1) {
2745                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2746                     }
2747                     if (crcnew2 != crc2) {
2748                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2749                     }
2750                   } else if (_recplay_checksum) {
2751             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2752             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2753             if (crcnew1 != crc1) {
2754               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2755             }
2756             if (crcnew2 != crc2) {
2757               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2758             }               
2759                   }
2760                   if (!wasPacked) CkUnpackMessage(&env);
2761                 }
2762 #endif
2763                 return true;
2764         }
2765         bool isNext(CthThreadToken *token) {
2766           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2767           return false;
2768         }
2769
2770         /// This is a (short) list of messages we aren't yet ready for:
2771         CkQ<envelope *> delayedMessages;
2772         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2773         CkQ<CthThreadToken *> delayedTokens;
2774
2775         /// Try to flush out any delayed messages
2776         void flush(void) {
2777           if (nextSize>0) {
2778                 int len=delayedMessages.length();
2779                 for (int i=0;i<len;i++) {
2780                         envelope *env=delayedMessages.deq();
2781                         if (isNext(env)) { /* this is the next message: process it */
2782                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2783                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2784                                 return;
2785                         }
2786                         else /* Not ready yet-- put it back in the
2787                                 queue */
2788                           {
2789                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2790                                 delayedMessages.enq(env);
2791                           }
2792                 }
2793           } else if (nextSize==-2) {
2794             int len=delayedTokens.length();
2795             for (int i=0;i<len;++i) {
2796               CthThreadToken *token=delayedTokens.deq();
2797               if (isNext(token)) {
2798             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2799 #if ! CMK_BIGSIM_CHARM
2800                 CsdEnqueueLifo((void*)token);
2801 #else
2802                 CthEnqueueBigSimThread(token,0,0,NULL);
2803 #endif
2804                 return;
2805               } else {
2806             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2807                 delayedTokens.enq(token);
2808               }
2809             }
2810           }
2811         }
2812
2813 public:
2814         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2815           counter=0;
2816           f=f_;
2817           getNext();
2818           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2819 #if CMI_QD
2820           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2821 #endif
2822         }
2823         ~CkMessageReplay() {fclose(f);}
2824
2825 private:
2826         virtual bool process(envelope **envptr,CkCoreState *ck) {
2827           bool wasPacked = (*envptr)->isPacked();
2828           if (!wasPacked) CkPackMessage(envptr);
2829           envelope *env = *envptr;
2830           //CkAssert(*(int*)env == 0x34567890);
2831           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2832                 if (env->getEvent() == 0) return true;
2833                 if (isNext(env)) { /* This is the message we were expecting */
2834                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2835                         getNext(); /* Advance over this message */
2836                         flush(); /* try to process queued-up stuff */
2837                         if (!wasPacked) CkUnpackMessage(envptr);
2838                         return true;
2839                 }
2840 #if CMK_SMP
2841                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2842                          // try next rank, we can't just buffer the msg and left
2843                          // we need to keep unprocessed msg on the fly
2844                         int nextpe = CkMyPe()+1;
2845                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2846                         nextpe = CkNodeFirst(CkMyNode());
2847                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2848                         return false;
2849                 }
2850 #endif
2851                 else /*!isNext(env) */ {
2852                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2853                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2854                         delayedMessages.enq(env);
2855                         flush();
2856                         return false;
2857                 }
2858         }
2859         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2860       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2861           if (isNext(token)) {
2862         REPLAYDEBUG("Executing token: "<<token->serialNo)
2863             getNext();
2864             flush();
2865             return true;
2866           } else {
2867         REPLAYDEBUG("Queueing token: "<<token->serialNo
2868             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2869             delayedTokens.enq(token);
2870             return false;
2871           }
2872         }
2873
2874         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2875           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2876           if (lbFile != NULL) {
2877             int num_moves = 0;
2878         PUP::fromDisk p(lbFile);
2879             p | num_moves;
2880             if (num_moves != (*msg)->n_moves) {
2881               delete *msg;
2882               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2883             }
2884             (*msg)->pup(p);
2885           }
2886           return true;
2887         }
2888 };
2889
2890 class CkMessageDetailReplay : public CkMessageWatcher {
2891   void *getNext() {
2892     CmiUInt4 size; size_t nread;
2893     if ((nread=fread(&size, 4, 1, f)) < 1) {
2894       if (feof(f)) return NULL;
2895       CkPrintf("Broken record file (metadata) got %d\n",nread);
2896       CkAbort("");
2897     }
2898     void *env = CmiAlloc(size);
2899     long tell = ftell(f);
2900     if ((nread=fread(env, size, 1, f)) < 1) {
2901       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2902       CkAbort("");
2903     }
2904     //*(int*)env = 0x34567890; // set first integer as magic
2905     return env;
2906   }
2907 public:
2908   double starttime;
2909   CkMessageDetailReplay(FILE *f_) {
2910     f=f_;
2911     starttime=CkWallTimer();
2912     /* This must match what CkMessageDetailRecorder did */
2913     CmiUInt2 little;
2914     fread(&little, 2, 1, f);
2915     if (little != sizeof(void*)) {
2916       CkAbort("Replaying on a different architecture from which recording was done!");
2917     }
2918
2919     CsdEnqueue(getNext());
2920
2921     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2922   }
2923   virtual bool process(envelope **env,CkCoreState *ck) {
2924     void *msg = getNext();
2925     if (msg != NULL) CsdEnqueue(msg);
2926     return true;
2927   }
2928 };
2929
2930 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2931 #if ! CMK_BIGSIM_CHARM
2932   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2933 #endif
2934   CkMessageReplay *replay = (CkMessageReplay*)rep;
2935   //CmiStartQD(CkMessageReplayQuiescence, replay);
2936 }
2937
2938 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2939   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2940   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2941   ConverseExit();
2942 }
2943 #endif
2944
2945 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2946   CkCoreState *ck = CkpvAccess(_coreState);
2947   if (ck->watcher!=NULL) {
2948     return ck->watcher->processThread(token,ck);
2949   }
2950   return true;
2951 }
2952
2953 CpvExtern(int, CthResumeNormalThreadIdx);
2954 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2955 {
2956   CthThread t = token->thread;
2957
2958   if(t == NULL){
2959     free(token);
2960     return;
2961   }
2962 #if CMK_TRACE_ENABLED
2963 #if ! CMK_TRACE_IN_CHARM
2964   if(CpvAccess(traceOn))
2965     CthTraceResume(t);
2966 /*    if(CpvAccess(_traceCoreOn)) 
2967             resumeTraceCore();*/
2968 #endif
2969 #endif
2970 #if CMK_OMP
2971   CthSetPrev(t, CthSelf());
2972 #endif
2973   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2974   if (CpdExecuteThreadResume(token)) {
2975     CthResume(t);
2976   }
2977 #if CMK_OMP
2978   CthScheduledDecrement();
2979   CthSetPrev(CthSelf(), 0);
2980 #endif
2981 }
2982
2983 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2984   CkCoreState *ck = CkpvAccess(_coreState);
2985   if (ck->watcher!=NULL) {
2986     ck->watcher->processLBMessage(msg, ck);
2987   }
2988 }
2989
2990 #if CMK_BIGSIM_CHARM
2991 CpvExtern(int      , CthResumeBigSimThreadIdx);
2992 #endif
2993
2994 #include "ckliststring.h"
2995 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2996     CmiArgGroup("Charm++","Record/Replay");
2997     bool forceReplay = false;
2998     char *procs = NULL;
2999     _replaySystem = 0;
3000     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
3001       if(CmiMyRank() == 0) _recplay_crc = 1;
3002     }
3003     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
3004       if(CmiMyRank() == 0) _recplay_checksum = 1;
3005     }
3006     int tmplogsize;
3007     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
3008       {
3009         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
3010       }
3011     REPLAYDEBUG("CkMessageWatcherInit ");
3012     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
3013 #if CMK_REPLAYSYSTEM
3014         CkListString list(procs);
3015         if (list.includes(CkMyPe())) {
3016           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
3017           CpdSetInitializeMemory(1);
3018           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
3019         }
3020 #else
3021         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3022 #endif
3023     }
3024     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
3025 #if CMK_REPLAYSYSTEM
3026       if (CkMyPe() == 0) {
3027         CmiPrintf("Charm++> record mode.\n");
3028         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3029           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
3030           _recplay_crc = _recplay_checksum = 0;
3031         }
3032       }
3033       CpdSetInitializeMemory(1);
3034       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3035       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
3036 #else
3037       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
3038 #endif
3039     }
3040         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
3041 #if CMK_REPLAYSYSTEM
3042             forceReplay = true;
3043             CpdSetInitializeMemory(1);
3044             // Set the parameters of the processor
3045 #if CMK_SHARED_VARS_UNAVAILABLE
3046             _Cmi_mype = atoi(procs);
3047             while (procs[0]!='/') procs++;
3048             procs++;
3049             _Cmi_numpes = atoi(procs);
3050 #else
3051             CkAbort("+replay-detail available only for non-SMP build");
3052 #endif
3053             _replaySystem = 1;
3054             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
3055 #else
3056           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3057 #endif
3058         }
3059         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
3060 #if CMK_REPLAYSYSTEM
3061           if (CkMyPe() == 0)  {
3062             CmiPrintf("Charm++> replay mode.\n");
3063             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3064               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
3065               _recplay_crc = _recplay_checksum = 0;
3066             }
3067           }
3068           CpdSetInitializeMemory(1);
3069 #if ! CMK_BIGSIM_CHARM
3070           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3071 #else
3072           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3073 #endif
3074           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
3075 #else
3076           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
3077 #endif
3078         }
3079         if (_recplay_crc && _recplay_checksum) {
3080           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
3081         }
3082 }
3083
3084 extern "C"
3085 int CkMessageToEpIdx(void *msg) {
3086         envelope *env=UsrToEnv(msg);
3087         int ep=env->getEpIdx();
3088         if (ep==CkIndex_CkArray::recvBroadcast(0))
3089                 return env->getsetArrayBcastEp();
3090         else
3091                 return ep;
3092 }
3093
3094 extern "C"
3095 int getCharmEnvelopeSize() {
3096   return sizeof(envelope);
3097 }
3098
3099 /// Best-effort guess at whether @arg msg points at a charm envelope
3100 extern "C"
3101 int isCharmEnvelope(void *msg) {
3102     envelope *e = (envelope *)msg;
3103     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
3104     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
3105     if (e->getTotalsize() < sizeof(envelope)) return 0;
3106     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
3107 #if CMK_SMP
3108     if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
3109 #else
3110     if (e->getSrcPe()>=CkNumPes()) return 0;
3111 #endif
3112     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
3113     return 1;
3114 }
3115
3116 #include "CkMarshall.def.h"