e4e4a959c2a74e8565d80670ef27196cc0ab63ac
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #include "pathHistory.h"
14
15 #if CMK_LBDB_ON
16 #include "LBDatabase.h"
17 #endif // CMK_LBDB_ON
18
19 #ifndef CMK_CHARE_USE_PTR
20 #include <map>
21 CkpvDeclare(std::vector<void *>, chare_objs);
22 CkpvDeclare(std::vector<int>, chare_types);
23 CkpvDeclare(std::vector<VidBlock *>, vidblocks);
24
25 typedef std::map<int, CkChareID>  Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
28 #endif
29
30 // Map of array IDs to array elements for fast message delivery
31 CkpvDeclare(ArrayObjMap, array_objs);
32
33 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
34
35 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
36
37 int CMessage_CkMessage::__idx=-1;
38 int CMessage_CkArgMsg::__idx=0;
39 int CkIndex_Chare::__idx;
40 int CkIndex_Group::__idx;
41 int CkIndex_ArrayBase::__idx=-1;
42
43 extern int _defaultObjectQ;
44
45 void _initChareTables()
46 {
47 #ifndef CMK_CHARE_USE_PTR
48           /* chare and vidblock table */
49   CkpvInitialize(std::vector<void *>, chare_objs);
50   CkpvInitialize(std::vector<int>, chare_types);
51   CkpvInitialize(std::vector<VidBlock *>, vidblocks);
52   CkpvInitialize(Vidblockmap, vmap);
53   CkpvInitialize(int, currentChareIdx);
54   CkpvAccess(currentChareIdx) = -1;
55 #endif
56
57   CkpvInitialize(ArrayObjMap, array_objs);
58 }
59
60 //Charm++ virtual functions: declaring these here results in a smaller executable
61 Chare::Chare(void) {
62   thishandle.onPE=CkMyPe();
63   thishandle.objPtr=this;
64 #if CMK_ERROR_CHECKING
65   magic = CHARE_MAGIC;
66 #endif
67 #ifndef CMK_CHARE_USE_PTR
68      // for plain chare, objPtr is actually the index to chare obj table
69   if (CkpvAccess(currentChareIdx) >= 0) {
70     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
71   }
72   chareIdx = CkpvAccess(currentChareIdx);
73 #endif
74 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
75   mlogData = new ChareMlogData();
76   mlogData->objID.type = TypeChare;
77   mlogData->objID.data.chare.id = thishandle;
78 #endif
79 #if CMK_OBJECT_QUEUE_AVAILABLE
80   if (_defaultObjectQ)  CkEnableObjQ();
81 #endif
82 }
83
84 Chare::Chare(CkMigrateMessage* m) {
85   thishandle.onPE=CkMyPe();
86   thishandle.objPtr=this;
87 #if CMK_ERROR_CHECKING
88   magic = 0;
89 #endif
90
91 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
92         mlogData = NULL;
93 #endif
94
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96   if (_defaultObjectQ)  CkEnableObjQ();
97 #endif
98 }
99
100 void Chare::CkEnableObjQ()
101 {
102 #if CMK_OBJECT_QUEUE_AVAILABLE
103   objQ.create();
104 #endif
105 }
106
107 Chare::~Chare() {
108 #ifndef CMK_CHARE_USE_PTR
109 /*
110   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
111 */
112   if (chareIdx != -1)
113   {
114     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
115     CkpvAccess(chare_objs)[chareIdx] = NULL;
116     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
117     if (iter != CkpvAccess(vmap).end()) {
118       CkChareID *pCid = (CkChareID *)
119         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
120       int srcPe = iter->second.onPE;
121       *pCid = iter->second;
122       envelope *ret = UsrToEnv(pCid);
123       ret->setVidPtr(iter->second.objPtr);
124       ret->setSrcPe(CkMyPe());
125       CmiSetHandler(ret, _charmHandlerIdx);
126       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
127       CpvAccess(_qd)->create();
128       CkpvAccess(vmap).erase(iter);
129     }
130   }
131 #endif
132 }
133
134 void Chare::pup(PUP::er &p)
135 {
136   p(thishandle.onPE);
137   thishandle.objPtr=(void *)this;
138 #ifndef CMK_CHARE_USE_PTR
139   p(chareIdx);
140   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
141 #endif
142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
143         if(p.isUnpacking()){
144                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
145                 mlogData = new ChareMlogData();
146         }
147         mlogData->pup(p);
148 #endif
149 #if CMK_ERROR_CHECKING
150   p(magic);
151 #endif
152 }
153
154 int Chare::ckGetChareType() const {
155   return -3;
156 }
157 char *Chare::ckDebugChareName(void) {
158   char buf[100];
159   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),(void*)this);
160   return strdup(buf);
161 }
162 int Chare::ckDebugChareID(char *str, int limit) {
163   // pure chares for now do not have a valid ID
164   str[0] = 0;
165   return 1;
166 }
167 void Chare::ckDebugPup(PUP::er &p) {
168   pup(p);
169 }
170
171 /// This method is called before starting a [threaded] entry method.
172 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
173   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
174   traceAddThreadListeners(th, UsrToEnv(msg));
175 }
176
177 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
178   p.comment("Bytes");
179   int ts=UsrToEnv(msg)->getTotalsize();
180   int msgLen=ts-sizeof(envelope);
181   if (msgLen>0)
182     p((char*)msg,msgLen);
183 }
184
185 IrrGroup::IrrGroup(void) {
186   thisgroup = CkpvAccess(_currentGroup);
187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
188         mlogData->objID.type = TypeGroup;
189         mlogData->objID.data.group.id = thisgroup;
190         mlogData->objID.data.group.onPE = CkMyPe();
191 #endif
192 }
193
194 IrrGroup::~IrrGroup() {
195   // remove the object pointer
196   if (CkpvAccess(_destroyingNodeGroup)) {
197     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
198     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
199     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
200     CkpvAccess(_destroyingNodeGroup) = false;
201   } else {
202     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
203     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
204     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
205   }
206 }
207
208 void IrrGroup::pup(PUP::er &p)
209 {
210   Chare::pup(p);
211   p|thisgroup;
212 }
213
214 int IrrGroup::ckGetChareType() const {
215   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
216 }
217
218 int IrrGroup::ckDebugChareID(char *str, int limit) {
219   if (limit<5) return -1;
220   str[0] = 1;
221   *((int*)&str[1]) = thisgroup.idx;
222   return 5;
223 }
224
225 char *IrrGroup::ckDebugChareName() {
226   return strdup(_chareTable[ckGetChareType()]->name);
227 }
228
229 void IrrGroup::ckJustMigrated(void)
230 {
231 }
232
233 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
234   /* FIXME: **CW** not entirely sure what we should do here yet */
235 }
236
237 void Group::CkAddThreadListeners(CthThread th, void *msg) {
238   Chare::CkAddThreadListeners(th, msg);
239   CthSetThreadID(th, thisgroup.idx, 0, 0);
240 }
241
242 void Group::pup(PUP::er &p)
243 {
244   CkReductionMgr::pup(p);
245   p|reductionInfo;
246 }
247
248 /**** Delegation Manager Group */
249 CkDelegateMgr::~CkDelegateMgr() { }
250
251 //Default delegator implementation: do not delegate-- send directly
252 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
253   { CkSendMsg(ep,m,c); }
254 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
255   { CkSendMsgBranch(ep,m,onPE,g); }
256 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
257   { CkBroadcastMsgBranch(ep,m,g); }
258 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
259   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
260 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
261   { CkSendMsgNodeBranch(ep,m,onNode,g); }
262 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
263   { CkBroadcastMsgNodeBranch(ep,m,g); }
264 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
265   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
266 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
267 {
268         CProxyElement_ArrayBase ap(a,idx);
269         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
270 }
271 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
272 {
273         CProxyElement_ArrayBase ap(a,idx);
274         ap.ckSend((CkArrayMessage *)m,ep);
275 }
276 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
277 {
278         CProxy_ArrayBase ap(a);
279         ap.ckBroadcast((CkArrayMessage *)m,ep);
280 }
281
282 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
283 {
284         CmiAbort("ArraySectionSend is not implemented!\n");
285 /*
286         CProxyElement_ArrayBase ap(a,idx);
287         ap.ckSend((CkArrayMessage *)m,ep);
288 */
289 }
290
291 /*** Proxy <-> delegator communication */
292 CkDelegateData::~CkDelegateData() {}
293
294 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
295   return pd; // default implementation ignores pup call
296 }
297
298 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
299    this tricky manual reference counting business... */
300
301 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
302         if (dPtr) dPtr->ref();
303         ckUndelegate();
304         delegatedMgr = dTo;
305         delegatedPtr = dPtr;
306         delegatedGroupId = delegatedMgr->CkGetGroupID();
307         isNodeGroup = delegatedMgr->isNodeGroup();
308 }
309 void CProxy::ckUndelegate(void) {
310         delegatedMgr=NULL;
311         delegatedGroupId.setZero();
312         if (delegatedPtr) delegatedPtr->unref();
313         delegatedPtr=NULL;
314 }
315
316 /// Copy constructor
317 CProxy::CProxy(const CProxy &src)
318   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
319    isNodeGroup(src.isNodeGroup) {
320     delegatedPtr = NULL;
321     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
322         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
323     }
324 }
325
326 /// Assignment operator
327 CProxy& CProxy::operator=(const CProxy &src) {
328         CkDelegateData *oldPtr=delegatedPtr;
329         ckUndelegate();
330         delegatedMgr=src.delegatedMgr;
331         delegatedGroupId = src.delegatedGroupId; 
332         isNodeGroup = src.isNodeGroup;
333
334         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
335             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
336         else
337             delegatedPtr = NULL;
338
339         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
340         if (oldPtr) oldPtr->unref();
341         return *this;
342 }
343
344 void CProxy::pup(PUP::er &p) {
345   if (!p.isUnpacking()) {
346     if (ckDelegatedTo() != NULL) {
347       delegatedGroupId = delegatedMgr->CkGetGroupID();
348       isNodeGroup = delegatedMgr->isNodeGroup();
349     }
350   }
351   p|delegatedGroupId;
352   if (!delegatedGroupId.isZero()) {
353     p|isNodeGroup;
354     if (p.isUnpacking()) {
355       delegatedMgr = ckDelegatedTo(); 
356     }
357
358     int migCtor = 0, cIdx; 
359     if (!p.isUnpacking()) {
360       if (isNodeGroup) {
361         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
362         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
363         migCtor = _chareTable[cIdx]->migCtor; 
364         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
365       }
366       else  {
367         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
368         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
369         migCtor = _chareTable[cIdx]->migCtor; 
370         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
371       }         
372     }
373
374     p|migCtor;
375
376     // if delegated manager has not been created, construct a dummy
377     // object on which to call DelegatePointerPup
378     if (delegatedMgr == NULL) {
379
380       // create a dummy object for calling DelegatePointerPup
381       int objId = _entryTable[migCtor]->chareIdx; 
382       size_t objSize = _chareTable[objId]->size;
383       void *obj = malloc(objSize); 
384       _entryTable[migCtor]->call(NULL, obj); 
385       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
386         ->DelegatePointerPup(p, delegatedPtr);           
387       free(obj);
388
389     }
390     else {
391
392       // delegated manager has been created, so we can use it
393       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
394
395     }
396
397     if (p.isUnpacking() && delegatedPtr) {
398       delegatedPtr->ref();
399     }
400   }
401 }
402
403 /**** Array sections */
404 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
405 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems, int factor): bfactor(factor) { \
406   _elems.assign(elems, elems+nElems);  \
407   _cookie.get_aid() = aid;      \
408   _cookie.get_pe() = CkMyPe();  \
409 } \
410 CkSectionID::CkSectionID(const CkArrayID &aid, const std::vector<CkArrayIndex##index> &elems, int factor): bfactor(factor) { \
411   _elems.resize(elems.size()); \
412   for (int i=0; i<_elems.size(); ++i) { \
413     _elems[i] = static_cast<CkArrayIndex>(elems[i]); \
414   } \
415   _cookie.get_aid() = aid;      \
416   _cookie.get_pe() = CkMyPe();  \
417 } \
418
419 CKSECTIONID_CONSTRUCTOR_DEF(1D)
420 CKSECTIONID_CONSTRUCTOR_DEF(2D)
421 CKSECTIONID_CONSTRUCTOR_DEF(3D)
422 CKSECTIONID_CONSTRUCTOR_DEF(4D)
423 CKSECTIONID_CONSTRUCTOR_DEF(5D)
424 CKSECTIONID_CONSTRUCTOR_DEF(6D)
425 CKSECTIONID_CONSTRUCTOR_DEF(Max)
426
427 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes, int factor): bfactor(factor) {
428   _cookie.get_aid() = gid;
429   pelist.assign(_pelist, _pelist+_npes);
430 }
431
432 CkSectionID::CkSectionID(const CkGroupID &gid, const std::vector<int>& _pelist, int factor): pelist(_pelist), bfactor(factor) {
433   _cookie.get_aid() = gid;
434 }
435
436 CkSectionID::CkSectionID(const CkSectionID &sid) {
437   _cookie = sid._cookie;
438   pelist = sid.pelist;
439   _elems = sid._elems;
440   bfactor = sid.bfactor;
441 }
442
443 void CkSectionID::operator=(const CkSectionID &sid) {
444   _cookie = sid._cookie;
445   pelist = sid.pelist;
446   _elems = sid._elems;
447   bfactor = sid.bfactor;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451   p | _cookie;
452   p | pelist;
453   p | _elems;
454   p | bfactor;
455 }
456
457 /**** Tiny random API routines */
458
459 #if CMK_CUDA
460 void CUDACallbackManager(void *fn) {
461   if (fn != NULL) {
462     CkCallback *cb = (CkCallback*) fn;
463     cb->send();
464   }
465 }
466
467 #endif
468
469 void QdCreate(int n) {
470   CpvAccess(_qd)->create(n);
471 }
472
473 void QdProcess(int n) {
474   CpvAccess(_qd)->process(n);
475 }
476
477 extern "C"
478 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
479 {
480   UsrToEnv(msg)->setRef(ref);
481 }
482
483 extern "C"
484 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
485 {
486   return UsrToEnv(msg)->getRef();
487 }
488
489 extern "C"
490 int CkGetSrcPe(void *msg)
491 {
492   return UsrToEnv(msg)->getSrcPe();
493 }
494
495 extern "C"
496 int CkGetSrcNode(void *msg)
497 {
498   return CmiNodeOf(CkGetSrcPe(msg));
499 }
500
501 extern "C"
502 void *CkLocalBranch(CkGroupID gID) {
503   return _localBranch(gID);
504 }
505
506 static
507 void *_ckLocalNodeBranch(CkGroupID groupID) {
508   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
509   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
510   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
511   return retval;
512 }
513
514 extern "C"
515 void *CkLocalNodeBranch(CkGroupID groupID)
516 {
517   void *retval;
518   // we are called in a constructor
519   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
520     return CkpvAccess(_currentNodeGroupObj);
521   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
522   { // Nodegroup hasn't finished being created yet-- schedule...
523     CsdScheduler(0);
524   }
525   return retval;
526 }
527
528 extern "C"
529 void *CkLocalChare(const CkChareID *pCid)
530 {
531         int pe=pCid->onPE;
532         if (pe<0) { //A virtual chare ID
533                 if (pe!=(-(CkMyPe()+1)))
534                         return NULL;//VID block not on this PE
535 #ifdef CMK_CHARE_USE_PTR
536                 VidBlock *v=(VidBlock *)pCid->objPtr;
537 #else
538                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
539 #endif
540                 return v->getLocalChareObj();
541         }
542         else
543         { //An ordinary chare ID
544                 if (pe!=CkMyPe())
545                         return NULL;//Chare not on this PE
546 #ifdef CMK_CHARE_USE_PTR
547                 return pCid->objPtr;
548 #else
549                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
550 #endif
551         }
552 }
553
554 CkpvDeclare(char**,Ck_argv);
555
556 extern "C" char **CkGetArgv(void) {
557         return CkpvAccess(Ck_argv);
558 }
559 extern "C" int CkGetArgc(void) {
560         return CmiGetArgc(CkpvAccess(Ck_argv));
561 }
562
563 /******************** Basic support *****************/
564 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
565 {
566 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
567         CpvAccess(_currentObj) = (Chare *)obj;
568 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
569 #endif
570   //BIGSIM_OOC DEBUGGING
571   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
572   //fflush(stdout);
573 #if CMK_CHARMDEBUG
574   CpdBeforeEp(epIdx, obj, msg);
575 #endif    
576   _entryTable[epIdx]->call(msg, obj);
577 #if CMK_CHARMDEBUG
578   CpdAfterEp(epIdx);
579 #endif
580   if (_entryTable[epIdx]->noKeep)
581   { /* Method doesn't keep/delete the message, so we have to: */
582     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
583   }
584 }
585 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
586 {
587   //BIGSIM_OOC DEBUGGING
588   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
589   //fflush(stdout);
590
591   void *deliverMsg;
592 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
593         CpvAccess(_currentObj) = (Chare *)obj;
594 #endif
595   if (_entryTable[epIdx]->noKeep)
596   { /* Deliver a read-only copy of the message */
597     deliverMsg=(void *)msg;
598   } else
599   { /* Method needs a copy of the message to keep/delete */
600     void *oldMsg=(void *)msg;
601     deliverMsg=CkCopyMsg(&oldMsg);
602 #if CMK_ERROR_CHECKING
603     if (oldMsg!=msg)
604       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
605 #endif
606   }
607 #if CMK_CHARMDEBUG
608   CpdBeforeEp(epIdx, obj, (void*)msg);
609 #endif
610   _entryTable[epIdx]->call(deliverMsg, obj);
611 #if CMK_CHARMDEBUG
612   CpdAfterEp(epIdx);
613 #endif
614 }
615
616 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
617 {
618   void *msg = EnvToUsr(env);
619   _SET_USED(env, 0);
620   CkDeliverMessageFree(epIdx,msg,obj);
621 }
622
623 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
624 {
625
626 #if CMK_TRACE_ENABLED 
627   if (_entryTable[epIdx]->traceEnabled) {
628     _TRACE_BEGIN_EXECUTE(env, obj);
629     if(_entryTable[epIdx]->appWork)
630         _TRACE_BEGIN_APPWORK();
631     _invokeEntryNoTrace(epIdx,env,obj);
632     if(_entryTable[epIdx]->appWork)
633         _TRACE_END_APPWORK();
634     _TRACE_END_EXECUTE();
635   }
636   else
637 #endif
638     _invokeEntryNoTrace(epIdx,env,obj);
639
640 }
641
642 /********************* Creation ********************/
643
644 extern "C"
645 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
646 {
647   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
648   envelope *env = UsrToEnv(msg);
649   _CHECK_USED(env);
650   if(pCid == 0) {
651     env->setMsgtype(NewChareMsg);
652   } else {
653     pCid->onPE = (-(CkMyPe()+1));
654     //  pCid->magic = _GETIDX(cIdx);
655     pCid->objPtr = (void *) new VidBlock();
656     _MEMCHECK(pCid->objPtr);
657     env->setMsgtype(NewVChareMsg);
658     env->setVidPtr(pCid->objPtr);
659 #ifndef CMK_CHARE_USE_PTR
660     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
661     int idx = CkpvAccess(vidblocks).size()-1;
662     pCid->objPtr = (void *)(CmiIntPtr)idx;
663     env->setVidPtr((void *)(CmiIntPtr)idx);
664 #endif
665   }
666   env->setEpIdx(eIdx);
667   env->setByPe(CkMyPe());
668   env->setSrcPe(CkMyPe());
669   CmiSetHandler(env, _charmHandlerIdx);
670   _TRACE_CREATION_1(env);
671   CpvAccess(_qd)->create();
672   _STATS_RECORD_CREATE_CHARE_1();
673   _SET_USED(env, 1);
674   if(destPE == CK_PE_ANY)
675     env->setForAnyPE(1);
676   else
677     env->setForAnyPE(0);
678   _CldEnqueue(destPE, env, _infoIdx);
679   _TRACE_CREATION_DONE(1);
680 }
681
682 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
683 {
684   int gIdx = _entryTable[epIdx]->chareIdx;
685   void *obj = malloc(_chareTable[gIdx]->size);
686   _MEMCHECK(obj);
687   setMemoryTypeChare(obj);
688   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
689   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
690   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
691   CkpvAccess(_groupIDTable)->push_back(groupID);
692   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
693   if(ptrq) {
694     void *pending;
695     while((pending=ptrq->deq())!=0) {
696 #if CMK_BIGSIM_CHARM
697       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
698       //handler to converse-level BigSim handler.
699       _CldEnqueue(CkMyPe(), pending, _infoIdx);
700 #else
701       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
702 #endif
703     }
704     CkpvAccess(_groupTable)->find(groupID).clearPending();
705   }
706   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
707
708   CkpvAccess(_currentGroup) = groupID;
709   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
710
711 #ifndef CMK_CHARE_USE_PTR
712   int callingChareIdx = CkpvAccess(currentChareIdx);
713   CkpvAccess(currentChareIdx) = -1;
714 #endif
715
716   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
717
718 #ifndef CMK_CHARE_USE_PTR
719   CkpvAccess(currentChareIdx) = callingChareIdx;
720 #endif
721
722   _STATS_RECORD_PROCESS_GROUP_1();
723 }
724
725 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
726 {
727   int gIdx = _entryTable[epIdx]->chareIdx;
728   size_t objSize=_chareTable[gIdx]->size;
729   void *obj = malloc(objSize);
730   _MEMCHECK(obj);
731   setMemoryTypeChare(obj);
732   CkpvAccess(_currentGroup) = groupID;
733
734 // Now that the NodeGroup is created, add it to the table.
735 //  NodeGroups can be accessed by multiple processors, so
736 //  this is in the opposite order from groups - invoking the constructor
737 //  before registering it.
738 // User may call CkLocalNodeBranch() inside the nodegroup constructor
739 //  store nodegroup into _currentNodeGroupObj
740   CkpvAccess(_currentNodeGroupObj) = obj;
741
742 #ifndef CMK_CHARE_USE_PTR
743   int callingChareIdx = CkpvAccess(currentChareIdx);
744   CkpvAccess(currentChareIdx) = -1;
745 #endif
746
747   _invokeEntryNoTrace(epIdx,env,obj);
748
749 #ifndef CMK_CHARE_USE_PTR
750   CkpvAccess(currentChareIdx) = callingChareIdx;
751 #endif
752
753   CkpvAccess(_currentNodeGroupObj) = NULL;
754   _STATS_RECORD_PROCESS_NODE_GROUP_1();
755
756   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
757   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
758   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
759   CksvAccess(_nodeGroupIDTable).push_back(groupID);
760
761   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
762   if(ptrq) {
763     void *pending;
764     while((pending=ptrq->deq())!=0) {
765       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
766     }
767     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
768   }
769   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
770 }
771
772 void _createGroup(CkGroupID groupID, envelope *env)
773 {
774   _CHECK_USED(env);
775   _SET_USED(env, 1);
776   int epIdx = env->getEpIdx();
777   int gIdx = _entryTable[epIdx]->chareIdx;
778   env->setGroupNum(groupID);
779   env->setSrcPe(CkMyPe());
780   env->setGroupEpoch(CkpvAccess(_charmEpoch));
781
782   if(CkNumPes()>1) {
783     CkPackMessage(&env);
784     CmiSetHandler(env, _bocHandlerIdx);
785     _numInitMsgs++;
786     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
787     CpvAccess(_qd)->create(CkNumPes()-1);
788     CkUnpackMessage(&env);
789   }
790   _STATS_RECORD_CREATE_GROUP_1();
791   CkCreateLocalGroup(groupID, epIdx, env);
792 }
793
794 void _createNodeGroup(CkGroupID groupID, envelope *env)
795 {
796   _CHECK_USED(env);
797   _SET_USED(env, 1);
798   int epIdx = env->getEpIdx();
799   env->setGroupNum(groupID);
800   env->setSrcPe(CkMyPe());
801   env->setGroupEpoch(CkpvAccess(_charmEpoch));
802   if(CkNumNodes()>1) {
803     CkPackMessage(&env);
804     CmiSetHandler(env, _bocHandlerIdx);
805     _numInitMsgs++;
806     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
807     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
808     CpvAccess(_qd)->create(CkNumNodes()-1);
809     CkUnpackMessage(&env);
810   }
811   _STATS_RECORD_CREATE_NODE_GROUP_1();
812   CkCreateLocalNodeGroup(groupID, epIdx, env);
813 }
814
815 // new _groupCreate
816
817 static CkGroupID _groupCreate(envelope *env)
818 {
819   CkGroupID groupNum;
820
821   // check CkMyPe(). if it is 0 then idx is _numGroups++
822   // if not, then something else...
823   if(CkMyPe() == 0)
824      groupNum.idx = CkpvAccess(_numGroups)++;
825   else
826      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
827   _createGroup(groupNum, env);
828   return groupNum;
829 }
830
831 // new _nodeGroupCreate
832 static CkGroupID _nodeGroupCreate(envelope *env)
833 {
834   CkGroupID groupNum;
835   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
836   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
837           groupNum.idx = CksvAccess(_numNodeGroups)++;
838    else
839           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
840   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
841   _createNodeGroup(groupNum, env);
842   return groupNum;
843 }
844
845 /**** generate the group idx when group is creator pe is not pe0
846  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
847  **** remaining bits contain the group creator processor number and
848  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
849
850 int _getGroupIdx(int numNodes,int myNode,int numGroups)
851 {
852         int idx;
853         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
854         int n = 32 - (x+1);                                     // number of bits remaining for the index
855         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
856                                                                 // then add the next available index
857         // of course this won't work when int is 8 bytes long on T3E
858         //idx |= 0x80000000;                                      // set the most significant bit to 1
859         idx = - idx;
860                                                                 // if int is not 32 bits, wouldn't this be wrong?
861         return idx;
862 }
863
864 extern "C"
865 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
866 {
867   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
868   envelope *env = UsrToEnv(msg);
869   env->setMsgtype(BocInitMsg);
870   env->setEpIdx(eIdx);
871   env->setSrcPe(CkMyPe());
872   _TRACE_CREATION_N(env, CkNumPes());
873   CkGroupID gid = _groupCreate(env);
874   _TRACE_CREATION_DONE(1);
875   return gid;
876 }
877
878 extern "C"
879 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
880 {
881   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
882   envelope *env = UsrToEnv(msg);
883   env->setMsgtype(NodeBocInitMsg);
884   env->setEpIdx(eIdx);
885   env->setSrcPe(CkMyPe());
886   _TRACE_CREATION_N(env, CkNumNodes());
887   CkGroupID gid = _nodeGroupCreate(env);
888   _TRACE_CREATION_DONE(1);
889   return gid;
890 }
891
892 static inline void *_allocNewChare(envelope *env, int &idx)
893 {
894   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
895   void *tmp=malloc(_chareTable[chareIdx]->size);
896   _MEMCHECK(tmp);
897 #ifndef CMK_CHARE_USE_PTR
898   CkpvAccess(chare_objs).push_back(tmp);
899   CkpvAccess(chare_types).push_back(chareIdx);
900   idx = CkpvAccess(chare_objs).size()-1;
901 #endif
902   setMemoryTypeChare(tmp);
903   return tmp;
904 }
905
906 // Method returns true if one or more group dependencies are unsatisfied
907 inline bool isGroupDepUnsatisfied(const CkCoreState *ck, const envelope *env) {
908   int groupDepNum = env->getGroupDepNum();
909   if(groupDepNum != 0) {
910     CkGroupID *groupDepPtr = (CkGroupID *)(env->getGroupDepPtr());
911     for(int i=0;i<groupDepNum;i++) {
912       CkGroupID depID = groupDepPtr[i];
913       if (!depID.isZero() && !_lookupGroupAndBufferIfNotThere(ck, env, depID)) {
914         return true;
915       }
916     }
917   }
918   return false;
919 }
920
921 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
922 {
923   if(isGroupDepUnsatisfied(ck, env))
924     return;
925   if(ck)
926     ck->process(); // ck->process() updates mProcessed count used in QD
927   int idx;
928   void *obj = _allocNewChare(env, idx);
929 #ifndef CMK_CHARE_USE_PTR
930   CkpvAccess(currentChareIdx) = idx;
931 #endif
932   _invokeEntry(env->getEpIdx(),env,obj);
933   if(ck)
934     _STATS_RECORD_PROCESS_CHARE_1();
935 }
936
937 void CkCreateLocalChare(int epIdx, envelope *env)
938 {
939   env->setEpIdx(epIdx);
940   _processNewChareMsg(NULL, env);
941 }
942
943 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
944 {
945   if(isGroupDepUnsatisfied(ck, env))
946     return;
947   ck->process(); // ck->process() updates mProcessed count used in QD
948   int idx;
949   void *obj = _allocNewChare(env, idx);
950   CkChareID *pCid = (CkChareID *)
951       _allocMsg(FillVidMsg, sizeof(CkChareID));
952   pCid->onPE = CkMyPe();
953 #ifndef CMK_CHARE_USE_PTR
954   pCid->objPtr = (void*)(CmiIntPtr)idx;
955 #else
956   pCid->objPtr = obj;
957 #endif
958   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
959   envelope *ret = UsrToEnv(pCid);
960   ret->setVidPtr(env->getVidPtr());
961   int srcPe = env->getByPe();
962   ret->setSrcPe(CkMyPe());
963   CmiSetHandler(ret, _charmHandlerIdx);
964   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
965 #ifndef CMK_CHARE_USE_PTR
966   // register the remote vidblock for deletion when chare is deleted
967   CkChareID vid;
968   vid.onPE = srcPe;
969   vid.objPtr = env->getVidPtr();
970   CkpvAccess(vmap)[idx] = vid;    
971 #endif
972   CpvAccess(_qd)->create();
973 #ifndef CMK_CHARE_USE_PTR
974   CkpvAccess(currentChareIdx) = idx;
975 #endif
976   _invokeEntry(env->getEpIdx(),env,obj);
977   _STATS_RECORD_PROCESS_CHARE_1();
978 }
979
980 /************** Receive: Chares *************/
981
982 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
983 {
984   if(isGroupDepUnsatisfied(ck, env))
985     return;
986   ck->process(); // ck->process() updates mProcessed count used in QD
987   int epIdx = env->getEpIdx();
988   int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
989   void *obj;
990   if (mainIdx != -1)  {           // mainchare
991     CmiAssert(CkMyPe()==0);
992     obj = _mainTable[mainIdx]->getObj();
993   }
994   else {
995 #ifndef CMK_CHARE_USE_PTR
996     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
997       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
998     else
999       obj = env->getObjPtr();
1000 #else
1001     obj = env->getObjPtr();
1002 #endif
1003   }
1004   _invokeEntry(epIdx,env,obj);
1005   _STATS_RECORD_PROCESS_MSG_1();
1006 }
1007
1008 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
1009 {
1010   int epIdx = env->getEpIdx();
1011   void *obj = env->getObjPtr();
1012   _invokeEntry(epIdx,env,obj);
1013 }
1014
1015 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1016 {
1017   ck->process(); // ck->process() updates mProcessed count used in QD
1018 #ifndef CMK_CHARE_USE_PTR
1019   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1020 #else
1021   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1022   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1023 #endif
1024   CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1025   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1026   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1027   CmiFree(env);
1028 }
1029
1030 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1031 {
1032   ck->process(); // ck->process() updates mProcessed count used in QD
1033 #ifndef CMK_CHARE_USE_PTR
1034   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1035 #else
1036   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1037   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1038 #endif
1039   _SET_USED(env, 1);
1040   vptr->send(env);
1041 }
1042
1043 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1044 {
1045   ck->process(); // ck->process() updates mProcessed count used in QD
1046 #ifndef CMK_CHARE_USE_PTR
1047   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1048   delete vptr;
1049   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1050 #endif
1051   CmiFree(env);
1052 }
1053
1054 /************** Receive: Groups ****************/
1055
1056 /**
1057  Return a pointer to the local BOC of "groupID".
1058  The message "env" passed in has some known dependency on this groupID
1059  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1060  Therefore, if the return value is NULL, this function buffers the message so that
1061  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1062  The message passed in must have its handlers correctly set so that it can be
1063  scheduled again.
1064 */
1065 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env, const CkGroupID &groupID)
1066 {
1067
1068         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1069         IrrGroup *obj = ck->localBranch(groupID);
1070         if (obj==NULL) { /* groupmember not yet created: stash message */
1071                 ck->getGroupTable()->find(groupID).enqMsg((envelope *)env);
1072         }
1073         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1074         return obj;
1075 }
1076
1077 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1078 {
1079   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1080 }
1081
1082 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1083 {
1084 #if CMK_LBDB_ON
1085   // if there is a running obj being measured, stop it temporarily
1086   LDObjHandle objHandle;
1087   int objstopped = 0;
1088   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1089   if (the_lbdb->RunningObject(&objHandle)) {
1090     objstopped = 1;
1091     the_lbdb->ObjectStop(objHandle);
1092   }
1093 #endif
1094   _invokeEntry(epIdx,env,obj);
1095 #if CMK_LBDB_ON
1096   if (objstopped) the_lbdb->ObjectStart(objHandle);
1097 #endif
1098   _STATS_RECORD_PROCESS_BRANCH_1();
1099 }
1100
1101 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1102 {
1103   if(isGroupDepUnsatisfied(ck, env))
1104     return;
1105   CkGroupID groupID =  env->getGroupNum();
1106   IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1107   if(obj) {
1108     ck->process(); // ck->process() updates mProcessed count used in QD
1109     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1110   }
1111 }
1112
1113 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1114 {
1115   env->setMsgtype(ForChareMsg);
1116   env->setObjPtr(obj);
1117   _processForChareMsg(ck,env);
1118   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1119 }
1120
1121 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1122 {
1123   env->setEpIdx(epIdx);
1124   _deliverForNodeBocMsg(ck,env, obj);
1125 }
1126
1127 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1128 {
1129   if(isGroupDepUnsatisfied(ck, env))
1130     return;
1131   CkGroupID groupID = env->getGroupNum();
1132   void *obj;
1133
1134   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1135   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1136   if(!obj) { // groupmember not yet created
1137 #if CMK_IMMEDIATE_MSG
1138     if (CmiIsImmediate(env)) {
1139       //CmiDelayImmediate();        /* buffer immediate message */
1140       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1141     }
1142 #endif
1143     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1144     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1145     return;
1146   }
1147   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1148   ck->process(); // ck->process() updates mProcessed count used in QD
1149   env->setMsgtype(ForChareMsg);
1150   env->setObjPtr(obj);
1151   _processForChareMsg(ck,env);
1152   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1153 }
1154
1155 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1156 {
1157   if(isGroupDepUnsatisfied(ck, env))
1158     return;
1159   CkGroupID groupID = env->getGroupNum();
1160   int epIdx = env->getEpIdx();
1161   ck->process(); // ck->process() updates mProcessed count used in QD
1162   CkCreateLocalGroup(groupID, epIdx, env);
1163 }
1164
1165 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1166 {
1167   if(isGroupDepUnsatisfied(ck, env))
1168     return;
1169   ck->process(); // ck->process() updates mProcessed count used in QD
1170   CkGroupID groupID = env->getGroupNum();
1171   int epIdx = env->getEpIdx();
1172   CkCreateLocalNodeGroup(groupID, epIdx, env);
1173 }
1174
1175 /************** Receive: Arrays *************/
1176 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1177   ArrayObjMap& object_map = CkpvAccess(array_objs);
1178   auto iter = object_map.find(env->getRecipientID());
1179   if (iter != object_map.end()) {
1180     // First see if we already have a direct pointer to the object
1181     _SET_USED(env, 0);
1182     ck->process(); // ck->process() updates mProcessed count used in QD
1183     int opts = 0;
1184     CkArrayMessage* msg = (CkArrayMessage*)EnvToUsr(env);
1185     if (msg->array_hops()>1) {
1186       CProxy_ArrayBase(env->getArrayMgr()).ckLocMgr()->multiHop(msg);
1187     }
1188     iter->second->ckInvokeEntry(env->getEpIdx(), msg, !(opts & CK_MSG_KEEP));
1189   } else {
1190     // Otherwise fallback to delivery through the array manager
1191     CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1192     if (mgr) {
1193       _SET_USED(env, 0);
1194       ck->process(); // ck->process() updates mProcessed count used in QD
1195       mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
1196     }
1197   }
1198 }
1199
1200 //BIGSIM_OOC DEBUGGING
1201 #define TELLMSGTYPE(x) //x
1202
1203 /**
1204  * This is the main converse-level handler used by all of Charm++.
1205  *
1206  * \addtogroup CriticalPathFramework
1207  */
1208 void _processHandler(void *converseMsg,CkCoreState *ck)
1209 {
1210   envelope *env = (envelope *) converseMsg;
1211
1212   MESSAGE_PHASE_CHECK(env);
1213
1214 #if CMK_ONESIDED_IMPL
1215   if(env->isRdma()){
1216     envelope *prevEnv = env;
1217     env = CkRdmaIssueRgets(prevEnv);
1218     if(env) {
1219       // Within pe or logical node, env points to new message with data
1220
1221       // Free prevEnv
1222       CkFreeMsg(EnvToUsr(prevEnv));
1223     } else{
1224       // async rdma call in place, asynchronous return and ack handling
1225       return;
1226     }
1227   }
1228 #endif
1229
1230 //#if CMK_RECORD_REPLAY
1231   if (ck->watcher!=NULL) {
1232     if (!ck->watcher->processMessage(&env,ck)) return;
1233   }
1234 //#endif
1235 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1236         Chare *obj=NULL;
1237         CkObjID sender;
1238         MCount SN;
1239         MlogEntry *entry=NULL;
1240         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1241            || env->getMsgtype() == ForArrayEltMsg
1242            || env->getMsgtype() == ForChareMsg) {
1243                 sender = env->sender;
1244                 SN = env->SN;
1245                 int result = preProcessReceivedMessage(env,&obj,&entry);
1246                 if(result == 0){
1247                         return;
1248                 }
1249         }
1250 #endif
1251 #if USE_CRITICAL_PATH_HEADER_ARRAY
1252   CK_CRITICALPATH_START(env)
1253 #endif
1254
1255   switch(env->getMsgtype()) {
1256 // Group support
1257     case BocInitMsg :
1258       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1259       // QD processing moved inside _processBocInitMsg because it is conditional
1260       //ck->process(); 
1261       if(env->isPacked()) CkUnpackMessage(&env);
1262       _processBocInitMsg(ck,env);
1263       break;
1264     case NodeBocInitMsg :
1265       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1266       if(env->isPacked()) CkUnpackMessage(&env);
1267       _processNodeBocInitMsg(ck,env);
1268       break;
1269     case ForBocMsg :
1270       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1271       // QD processing moved inside _processForBocMsg because it is conditional
1272       if(env->isPacked()) CkUnpackMessage(&env);
1273       _processForBocMsg(ck,env);
1274       // stats record moved inside _processForBocMsg because it is conditional
1275       break;
1276     case ForNodeBocMsg :
1277       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1278       // QD processing moved to _processForNodeBocMsg because it is conditional
1279       if(env->isPacked()) CkUnpackMessage(&env);
1280       _processForNodeBocMsg(ck,env);
1281       // stats record moved to _processForNodeBocMsg because it is conditional
1282       break;
1283
1284 // Array support
1285     case ForArrayEltMsg:
1286       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1287       if(env->isPacked()) CkUnpackMessage(&env);
1288       _processArrayEltMsg(ck,env);
1289       break;
1290
1291 // Chare support
1292     case NewChareMsg :
1293       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1294       if(env->isPacked()) CkUnpackMessage(&env);
1295       _processNewChareMsg(ck,env);
1296       break;
1297     case NewVChareMsg :
1298       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1299       if(env->isPacked()) CkUnpackMessage(&env);
1300       _processNewVChareMsg(ck,env);
1301       break;
1302     case ForChareMsg :
1303       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1304       if(env->isPacked()) CkUnpackMessage(&env);
1305       _processForPlainChareMsg(ck,env);
1306       break;
1307     case ForVidMsg   :
1308       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1309       _processForVidMsg(ck,env);
1310       break;
1311     case FillVidMsg  :
1312       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1313       _processFillVidMsg(ck,env);
1314       break;
1315     case DeleteVidMsg  :
1316       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1317       _processDeleteVidMsg(ck,env);
1318       break;
1319
1320     default:
1321       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1322   }
1323 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1324         if(obj != NULL){
1325                 postProcessReceivedMessage(obj,sender,SN,entry);
1326         }
1327 #endif
1328
1329
1330 #if USE_CRITICAL_PATH_HEADER_ARRAY
1331   CK_CRITICALPATH_END()
1332 #endif
1333
1334 }
1335
1336
1337 /******************** Message Send **********************/
1338
1339 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1340              int *queueing, int *priobits, unsigned int **prioptr)
1341 {
1342   envelope *env = (envelope *)converseMsg;
1343   *pfn = (CldPackFn)CkPackMessage;
1344   *len = env->getTotalsize();
1345   *queueing = env->getQueueing();
1346   *priobits = env->getPriobits();
1347   *prioptr = (unsigned int *) env->getPrioPtr();
1348 }
1349
1350 void CkPackMessage(envelope **pEnv)
1351 {
1352   envelope *env = *pEnv;
1353   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1354     void *msg = EnvToUsr(env);
1355     _TRACE_BEGIN_PACK();
1356     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1357     _TRACE_END_PACK();
1358     env=UsrToEnv(msg);
1359     env->setPacked(1);
1360     *pEnv = env;
1361   }
1362 }
1363
1364 void CkUnpackMessage(envelope **pEnv)
1365 {
1366   envelope *env = *pEnv;
1367   int msgIdx = env->getMsgIdx();
1368   if(env->isPacked()) {
1369     void *msg = EnvToUsr(env);
1370     _TRACE_BEGIN_UNPACK();
1371     msg = _msgTable[msgIdx]->unpack(msg);
1372     _TRACE_END_UNPACK();
1373     env=UsrToEnv(msg);
1374     env->setPacked(0);
1375     *pEnv = env;
1376   }
1377 }
1378
1379 //There's no reason for most messages to go through the Cld--
1380 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1381 // Thus these accellerated versions of the Cld calls.
1382 #if CMK_OBJECT_QUEUE_AVAILABLE
1383 static int index_objectQHandler;
1384 #endif
1385 int index_tokenHandler;
1386 int index_skipCldHandler;
1387
1388 void _skipCldHandler(void *converseMsg)
1389 {
1390   envelope *env = (envelope *)(converseMsg);
1391   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1392 #if CMK_GRID_QUEUE_AVAILABLE
1393   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1394     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1395                        env, env->getQueueing (), env->getPriobits (),
1396                        (unsigned int *) env->getPrioPtr ());
1397   } else {
1398     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1399                        env, env->getQueueing (), env->getPriobits (),
1400                        (unsigned int *) env->getPrioPtr ());
1401   }
1402 #else
1403   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1404         env, env->getQueueing(),env->getPriobits(),
1405         (unsigned int *)env->getPrioPtr());
1406 #endif
1407 }
1408
1409
1410 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1411 // Made non-static to be used by ckmessagelogging
1412 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1413 {
1414 #if CMK_CHARMDEBUG
1415   if (!ConverseDeliver(pe)) {
1416     CmiFree(env);
1417     return;
1418   }
1419 #endif
1420
1421 #if CMK_FAULT_EVAC
1422   if(pe == CkMyPe() ){
1423     if(!CmiNodeAlive(CkMyPe())){
1424         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1425 //      return;
1426     }
1427   }
1428 #endif
1429   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1430 #if CMK_OBJECT_QUEUE_AVAILABLE
1431     Chare *obj = CkFindObjectPtr(env);
1432     if (obj && obj->CkGetObjQueue().queue()) {
1433       _enqObjQueue(obj, env);
1434     }
1435     else
1436 #endif
1437     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1438         env, env->getQueueing(),env->getPriobits(),
1439         (unsigned int *)env->getPrioPtr());
1440 #if CMK_PERSISTENT_COMM
1441     CmiPersistentOneSend();
1442 #endif
1443   } else {
1444     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1445       CkPackMessage(&env);
1446     int len=env->getTotalsize();
1447     CmiSetXHandler(env,CmiGetHandler(env));
1448 #if CMK_OBJECT_QUEUE_AVAILABLE
1449     CmiSetHandler(env,index_objectQHandler);
1450 #else
1451     CmiSetHandler(env,index_skipCldHandler);
1452 #endif
1453     CmiSetInfo(env,infoFn);
1454     if (pe==CLD_BROADCAST) {
1455 #if CMK_MESSAGE_LOGGING
1456         if(env->flags & CK_FREE_MSG_MLOG)
1457                 CmiSyncBroadcastAndFree(len, (char *)env); 
1458         else
1459                 CmiSyncBroadcast(len, (char *)env);
1460 #else
1461                         CmiSyncBroadcastAndFree(len, (char *)env); 
1462 #endif
1463
1464 }
1465     else if (pe==CLD_BROADCAST_ALL) { 
1466 #if CMK_MESSAGE_LOGGING
1467         if(env->flags & CK_FREE_MSG_MLOG)
1468                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1469         else
1470                 CmiSyncBroadcastAll(len, (char *)env);
1471 #else
1472                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1473 #endif
1474
1475 }
1476     else{
1477 #if CMK_MESSAGE_LOGGING
1478         if(env->flags & CK_FREE_MSG_MLOG)
1479                 CmiSyncSendAndFree(pe, len, (char *)env);
1480         else
1481                 CmiSyncSend(pe, len, (char *)env);
1482 #else
1483                         CmiSyncSendAndFree(pe, len, (char *)env);
1484 #endif
1485
1486                 }
1487   }
1488 }
1489
1490 #if CMK_BIGSIM_CHARM
1491 #   define  _skipCldEnqueue   _CldEnqueue
1492 #endif
1493
1494 // by pass Charm++ priority queue, send as Converse message
1495 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1496 {
1497 #if CMK_CHARMDEBUG
1498   if (!ConverseDeliver(-1)) {
1499     CmiFree(env);
1500     return;
1501   }
1502 #endif
1503   CkPackMessage(&env);
1504   int len=env->getTotalsize();
1505   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1506 }
1507
1508 static void _noCldEnqueue(int pe, envelope *env)
1509 {
1510 /*
1511   if (pe == CkMyPe()) {
1512     CmiHandleMessage(env);
1513   } else
1514 */
1515 #if CMK_CHARMDEBUG
1516   if (!ConverseDeliver(pe)) {
1517     CmiFree(env);
1518     return;
1519   }
1520 #endif
1521
1522   CkPackMessage(&env);
1523   int len=env->getTotalsize();
1524   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1525   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1526   else CmiSyncSendAndFree(pe, len, (char *)env);
1527 }
1528
1529 //static void _noCldNodeEnqueue(int node, envelope *env)
1530 //Made non-static to be used by ckmessagelogging
1531 void _noCldNodeEnqueue(int node, envelope *env)
1532 {
1533 /*
1534   if (node == CkMyNode()) {
1535     CmiHandleMessage(env);
1536   } else {
1537 */
1538 #if CMK_CHARMDEBUG
1539   if (!ConverseDeliver(node)) {
1540     CmiFree(env);
1541     return;
1542   }
1543 #endif
1544
1545   CkPackMessage(&env);
1546   int len=env->getTotalsize();
1547   if (node==CLD_BROADCAST) { 
1548 #if CMK_MESSAGE_LOGGING
1549         if(env->flags & CK_FREE_MSG_MLOG)
1550                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1551         else
1552                 CmiSyncNodeBroadcast(len, (char *)env);
1553 #else
1554         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1555 #endif
1556 }
1557   else if (node==CLD_BROADCAST_ALL) { 
1558 #if CMK_MESSAGE_LOGGING
1559         if(env->flags & CK_FREE_MSG_MLOG)
1560                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1561         else
1562                 CmiSyncNodeBroadcastAll(len, (char *)env);
1563 #else
1564                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1565 #endif
1566
1567 }
1568   else {
1569 #if CMK_MESSAGE_LOGGING
1570         if(env->flags & CK_FREE_MSG_MLOG)
1571                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1572         else
1573                 CmiSyncNodeSend(node, len, (char *)env);
1574 #else
1575         CmiSyncNodeSendAndFree(node, len, (char *)env);
1576 #endif
1577   }
1578 }
1579
1580 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1581 {
1582   envelope *env = UsrToEnv(msg);
1583   _CHECK_USED(env);
1584   _SET_USED(env, 1);
1585 #if CMK_REPLAYSYSTEM
1586   setEventID(env);
1587 #endif
1588   env->setMsgtype(ForChareMsg);
1589   env->setEpIdx(eIdx);
1590   env->setSrcPe(CkMyPe());
1591
1592 #if USE_CRITICAL_PATH_HEADER_ARRAY
1593   CK_CRITICALPATH_SEND(env)
1594   //CK_AUTOMATE_PRIORITY(env)
1595 #endif
1596 #if CMK_CHARMDEBUG
1597   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1598 #endif
1599 #if CMK_OBJECT_QUEUE_AVAILABLE
1600   CmiSetHandler(env, index_objectQHandler);
1601 #else
1602   CmiSetHandler(env, _charmHandlerIdx);
1603 #endif
1604   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1605     int pe = -(pCid->onPE+1);
1606     if(pe==CkMyPe()) {
1607 #ifndef CMK_CHARE_USE_PTR
1608       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1609 #else
1610       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1611 #endif
1612       void *objPtr;
1613       if (NULL!=(objPtr=vblk->getLocalChare()))
1614       { //A ready local chare
1615         env->setObjPtr(objPtr);
1616         return pe;
1617       }
1618       else { //The vidblock is not ready-- forget it
1619         vblk->send(env);
1620         return -1;
1621       }
1622     } else { //Valid vidblock for another PE:
1623       env->setMsgtype(ForVidMsg);
1624       env->setVidPtr(pCid->objPtr);
1625       return pe;
1626     }
1627   }
1628   else {
1629     env->setObjPtr(pCid->objPtr);
1630     return pCid->onPE;
1631   }
1632 }
1633
1634 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1635 {
1636   int destPE = _prepareMsg(eIdx, msg, pCid);
1637   if (destPE != -1) {
1638     envelope *env = UsrToEnv(msg);
1639     //criticalPath_send(env);
1640 #if USE_CRITICAL_PATH_HEADER_ARRAY
1641     CK_CRITICALPATH_SEND(env)
1642     //CK_AUTOMATE_PRIORITY(env)
1643 #endif
1644     CmiBecomeImmediate(env);
1645   }
1646   return destPE;
1647 }
1648
1649 extern "C"
1650 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1651 {
1652   if (opts & CK_MSG_INLINE) {
1653     CkSendMsgInline(entryIdx, msg, pCid, opts);
1654     return;
1655   }
1656   envelope *env = UsrToEnv(msg);
1657 #if CMK_ERROR_CHECKING
1658   //Allow rdma metadata messages marked as immediate to go through
1659   if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
1660     CmiAbort("Immediate message is not allowed in Chare!");
1661   }
1662 #endif
1663   int destPE=_prepareMsg(entryIdx,msg,pCid);
1664   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1665   // VidBlock was not yet filled). The problem is that the creation was never
1666   // traced later when the VidBlock was filled. One solution is to trace the
1667   // creation here, the other to trace it in VidBlock->msgDeliver().
1668 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1669   if (destPE!=-1) {
1670     CpvAccess(_qd)->create();
1671   }
1672         sendChareMsg(env,destPE,_infoIdx,pCid);
1673 #else
1674   _TRACE_CREATION_1(env);
1675   if (destPE!=-1) {
1676     CpvAccess(_qd)->create();
1677     if (opts & CK_MSG_SKIP_OR_IMM)
1678       _noCldEnqueue(destPE, env);
1679     else
1680       _CldEnqueue(destPE, env, _infoIdx);
1681   }
1682   _TRACE_CREATION_DONE(1);
1683 #endif
1684 }
1685
1686 extern "C"
1687 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1688 {
1689   if (pCid->onPE==CkMyPe())
1690   {
1691 #if CMK_FAULT_EVAC
1692     if(!CmiNodeAlive(CkMyPe())){
1693         return;
1694     }
1695 #endif
1696 #if CMK_CHARMDEBUG
1697     //Just in case we need to breakpoint or use the envelope in some way
1698     _prepareMsg(entryIndex,msg,pCid);
1699 #endif
1700                 //Just directly call the chare (skip QD handling & scheduler)
1701     envelope *env = UsrToEnv(msg);
1702     if (env->isPacked()) CkUnpackMessage(&env);
1703     _STATS_RECORD_PROCESS_MSG_1();
1704     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1705   }
1706   else {
1707     //No way to inline a cross-processor message:
1708     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1709   }
1710 }
1711
1712 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1713 {
1714   envelope *env = UsrToEnv(msg);
1715   /*#if CMK_ERROR_CHECKING
1716   CkNodeGroupID nodeRedMgr;
1717 #endif
1718   */
1719   _CHECK_USED(env);
1720   _SET_USED(env, 1);
1721 #if CMK_REPLAYSYSTEM
1722   setEventID(env);
1723 #endif
1724   env->setMsgtype(type);
1725   env->setEpIdx(eIdx);
1726   env->setGroupNum(gID);
1727   env->setSrcPe(CkMyPe());
1728   /*
1729 #if CMK_ERROR_CHECKING
1730   nodeRedMgr.setZero();
1731   env->setRednMgr(nodeRedMgr);
1732 #endif
1733 */
1734   //criticalPath_send(env);
1735 #if USE_CRITICAL_PATH_HEADER_ARRAY
1736   CK_CRITICALPATH_SEND(env)
1737   //CK_AUTOMATE_PRIORITY(env)
1738 #endif
1739 #if CMK_CHARMDEBUG
1740   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1741 #endif
1742   CmiSetHandler(env, _charmHandlerIdx);
1743   return env;
1744 }
1745
1746 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1747 {
1748   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1749 #if USE_CRITICAL_PATH_HEADER_ARRAY
1750   CK_CRITICALPATH_SEND(env)
1751   //CK_AUTOMATE_PRIORITY(env)
1752 #endif
1753   CmiBecomeImmediate(env);
1754   return env;
1755 }
1756
1757 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1758                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1759 {
1760   int numPes;
1761   envelope *env;
1762     if (opts & CK_MSG_IMMEDIATE) {
1763         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1764     }else
1765     {
1766         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1767     }
1768
1769 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1770         sendGroupMsg(env,pe,_infoIdx);
1771 #else
1772   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1773   _TRACE_CREATION_N(env, numPes);
1774   if (opts & CK_MSG_SKIP_OR_IMM)
1775     _noCldEnqueue(pe, env);
1776   else
1777     _skipCldEnqueue(pe, env, _infoIdx);
1778   _TRACE_CREATION_DONE(1);
1779 #endif
1780 }
1781
1782 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1783                            int npes, int *pes)
1784 {
1785   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1786   _TRACE_CREATION_MULTICAST(env, npes, pes);
1787   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1788   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1789 }
1790
1791 extern "C"
1792 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1793 {
1794 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1795   if (destPE==CkMyPe())
1796   {
1797     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1798     return;
1799   }
1800   //Can't inline-- send the usual way
1801   envelope *env = UsrToEnv(msg);
1802   int numPes;
1803   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1804   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1805   _TRACE_CREATION_N(env, numPes);
1806   _noCldEnqueue(destPE, env);
1807   _STATS_RECORD_SEND_BRANCH_1();
1808   CkpvAccess(_coreState)->create();
1809   _TRACE_CREATION_DONE(1);
1810 #else
1811   // no support for immediate message, send inline
1812   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1813 #endif
1814 }
1815
1816 extern "C"
1817 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1818 {
1819   if (destPE==CkMyPe())
1820   {
1821 #if CMK_FAULT_EVAC
1822     if(!CmiNodeAlive(CkMyPe())){
1823         return;
1824     }
1825 #endif
1826     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1827     if (obj!=NULL)
1828     { //Just directly call the group:
1829 #if CMK_ERROR_CHECKING
1830       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1831 #else
1832       envelope *env=UsrToEnv(msg);
1833 #endif
1834       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1835       return;
1836     }
1837   }
1838   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1839   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1840 }
1841
1842 extern "C"
1843 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1844 {
1845   if (opts & CK_MSG_INLINE) {
1846     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1847     return;
1848   }
1849   envelope *env=UsrToEnv(msg);
1850   //Allow rdma metadata messages marked as immediate to go through
1851   if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
1852     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1853     return;
1854   }
1855   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1856   _STATS_RECORD_SEND_BRANCH_1();
1857   CkpvAccess(_coreState)->create();
1858 }
1859
1860 extern "C"
1861 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1862 {
1863 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1864   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1865   _TRACE_CREATION_MULTICAST(env, npes, pes);
1866   _noCldEnqueueMulti(npes, pes, env);
1867   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1868 #else
1869   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1870   CpvAccess(_qd)->create(-npes);
1871 #endif
1872   _STATS_RECORD_SEND_BRANCH_N(npes);
1873   CpvAccess(_qd)->create(npes);
1874 }
1875
1876 extern "C"
1877 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1878 {
1879   if (opts & CK_MSG_IMMEDIATE) {
1880     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1881     return;
1882   }
1883     // normal mesg
1884   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1885   _STATS_RECORD_SEND_BRANCH_N(npes);
1886   CpvAccess(_qd)->create(npes);
1887 }
1888
1889 extern "C"
1890 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1891 {
1892   int npes;
1893   int *pes;
1894   if (opts & CK_MSG_IMMEDIATE) {
1895     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1896     return;
1897   }
1898     // normal mesg
1899   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1900   CmiLookupGroup(grp, &npes, &pes);
1901   _TRACE_CREATION_MULTICAST(env, npes, pes);
1902   _CldEnqueueGroup(grp, env, _infoIdx);
1903   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1904   _STATS_RECORD_SEND_BRANCH_N(npes);
1905   CpvAccess(_qd)->create(npes);
1906 }
1907
1908 extern "C"
1909 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1910 {
1911   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1912   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1913   CpvAccess(_qd)->create(CkNumPes());
1914 }
1915
1916 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1917                 int node=CLD_BROADCAST_ALL, int opts=0)
1918 {
1919     int numPes;
1920     envelope *env;
1921     if (opts & CK_MSG_IMMEDIATE) {
1922         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1923     }else
1924     {
1925         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1926     }
1927 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1928     sendNodeGroupMsg(env,node,_infoIdx);
1929 #else
1930   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1931   _TRACE_CREATION_N(env, numPes);
1932   if (opts & CK_MSG_SKIP_OR_IMM) {
1933     _noCldNodeEnqueue(node, env);
1934   }
1935   else
1936     _CldNodeEnqueue(node, env, _infoIdx);
1937   _TRACE_CREATION_DONE(1);
1938 #endif
1939 }
1940
1941 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1942                            int npes, int *nodes)
1943 {
1944   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1945   _TRACE_CREATION_N(env, npes);
1946   for (int i=0; i<npes; i++) {
1947     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1948   }
1949   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1950 }
1951
1952 extern "C"
1953 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1954 {
1955 #if CMK_IMMEDIATE_MSG
1956   if (node==CkMyNode())
1957   {
1958     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1959     return;
1960   }
1961   //Can't inline-- send the usual way
1962   envelope *env = UsrToEnv(msg);
1963   int numPes;
1964   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1965   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1966   _TRACE_CREATION_N(env, numPes);
1967   _noCldNodeEnqueue(node, env);
1968   _STATS_RECORD_SEND_BRANCH_1();
1969   CkpvAccess(_coreState)->create();
1970   _TRACE_CREATION_DONE(1);
1971 #else
1972   // no support for immediate message, send inline
1973   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1974 #endif
1975 }
1976
1977 extern "C"
1978 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1979 {
1980   if (node==CkMyNode() && ((envelope *)(UsrToEnv(msg)))->isRdma() == false)
1981   {
1982     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1983     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1984     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1985     if (obj!=NULL)
1986     { //Just directly call the group:
1987 #if CMK_ERROR_CHECKING
1988       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1989 #else
1990       envelope *env=UsrToEnv(msg);
1991 #endif
1992       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1993       return;
1994     }
1995   }
1996   //Can't inline-- send the usual way
1997   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1998 }
1999
2000 extern "C"
2001 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
2002 {
2003   if (opts & CK_MSG_INLINE) {
2004     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
2005     return;
2006   }
2007   if (opts & CK_MSG_IMMEDIATE) {
2008     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
2009     return;
2010   }
2011   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
2012   _STATS_RECORD_SEND_NODE_BRANCH_1();
2013   CkpvAccess(_coreState)->create();
2014 }
2015
2016 extern "C"
2017 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
2018 {
2019 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
2020   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
2021   _noCldEnqueueMulti(npes, nodes, env);
2022 #else
2023   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2024   CpvAccess(_qd)->create(-npes);
2025 #endif
2026   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2027   CpvAccess(_qd)->create(npes);
2028 }
2029
2030 extern "C"
2031 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2032 {
2033   if (opts & CK_MSG_IMMEDIATE) {
2034     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2035     return;
2036   }
2037     // normal mesg
2038   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2039   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2040   CpvAccess(_qd)->create(npes);
2041 }
2042
2043 extern "C"
2044 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2045 {
2046   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2047   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2048   CpvAccess(_qd)->create(CkNumNodes());
2049 }
2050
2051 //Needed by delegation manager:
2052 extern "C"
2053 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2054 { return _prepareMsg(eIdx,msg,pCid); }
2055 extern "C"
2056 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2057 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2058 extern "C"
2059 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2060 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2061
2062 void _ckModuleInit(void) {
2063         CmiAssignOnce(&index_skipCldHandler, CkRegisterHandler(_skipCldHandler));
2064 #if CMK_OBJECT_QUEUE_AVAILABLE
2065         CmiAssignOnce(&index_objectQHandler, CkRegisterHandler(_ObjectQHandler));
2066 #endif
2067         CmiAssignOnce(&index_tokenHandler, CkRegisterHandler(_TokenHandler));
2068         CkpvInitialize(TokenPool*, _tokenPool);
2069         CkpvAccess(_tokenPool) = new TokenPool;
2070 }
2071
2072
2073 /************** Send: Arrays *************/
2074
2075 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2076 {
2077   _CHECK_USED(env);
2078   _SET_USED(env, 1);
2079   env->setMsgtype(type);
2080 #if CMK_CHARMDEBUG
2081   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2082 #endif
2083   CmiSetHandler(env, _charmHandlerIdx);
2084   CpvAccess(_qd)->create();
2085 }
2086
2087 extern "C"
2088 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2089   envelope *env = UsrToEnv(msg);
2090   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2091 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2092         sendArrayMsg(env,pe,_infoIdx);
2093 #else
2094   if (opts & CK_MSG_IMMEDIATE)
2095     CmiBecomeImmediate(env);
2096   if (opts & CK_MSG_SKIP_OR_IMM)
2097     _noCldEnqueue(pe, env);
2098   else
2099     _skipCldEnqueue(pe, env, _infoIdx);
2100 #endif
2101 }
2102
2103 class ElementDestroyer : public CkLocIterator {
2104 private:
2105         CkLocMgr *locMgr;
2106 public:
2107         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2108         void addLocation(CkLocation &loc) {
2109           loc.destroyAll();
2110         }
2111 };
2112
2113 void CkDeleteChares() {
2114   int i;
2115   int numGroups = CkpvAccess(_groupIDTable)->size();
2116
2117   // delete all plain chares
2118 #ifndef CMK_CHARE_USE_PTR
2119   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2120         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2121         delete obj;
2122         CkpvAccess(chare_objs)[i] = NULL;
2123   }
2124   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2125         VidBlock *obj = CkpvAccess(vidblocks)[i];
2126         delete obj;
2127         CkpvAccess(vidblocks)[i] = NULL;
2128   }
2129 #endif
2130
2131   // delete all array elements
2132   for(i=0;i<numGroups;i++) {
2133     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2134     if(obj && obj->isLocMgr())  {
2135       CkLocMgr *mgr = (CkLocMgr*)obj;
2136       ElementDestroyer destroyer(mgr);
2137       mgr->iterate(destroyer);
2138     }
2139   }
2140
2141   // delete all groups
2142   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2143   for(i=0;i<numGroups;i++) {
2144     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2145     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2146     if (obj) delete obj;
2147   }
2148   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2149
2150   // delete all node groups
2151   if (CkMyRank() == 0) {
2152     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2153     for(i=0;i<numNodeGroups;i++) {
2154       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2155       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2156       if (obj) delete obj;
2157     }
2158   }
2159 }
2160
2161 #if CMK_BIGSIM_CHARM
2162 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2163                                    int pb,unsigned int *prio);
2164 #endif
2165
2166 //------------------- External client support (e.g. Charm4py) ----------------
2167
2168 static std::vector< std::vector<char> > ext_args;
2169 static std::vector<char*> ext_argv;
2170
2171 // This is just a wrapper for ConverseInit that copies command-line args into a private
2172 // buffer.
2173 // To be called from external clients like charm4py. This wrapper avoids issues with
2174 // ctypes and cffi.
2175 extern "C" void StartCharmExt(int argc, char **argv) {
2176 #if !defined(_WIN32) && !NODE_0_IS_CONVHOST
2177   // only do this in net layers if using charmrun, so that output of process 0
2178   // doesn't get duplicated
2179   char *ns = getenv("NETSTART");
2180   if (ns != 0) {
2181     int fd;
2182     if (-1 != (fd = open("/dev/null", O_RDWR))) {
2183       dup2(fd, 0);
2184       dup2(fd, 1);
2185       dup2(fd, 2);
2186     }
2187   }
2188 #endif
2189   ext_args.resize(argc);
2190   ext_argv.resize(argc + 1, NULL);
2191   for (int i=0; i < argc; i++) {
2192     ext_args[i].resize(strlen(argv[i]) + 1);
2193     strcpy(ext_args[i].data(), argv[i]);
2194     ext_argv[i] = ext_args[i].data();
2195   }
2196   ConverseInit(argc, ext_argv.data(), _initCharm, 0, 0);
2197 }
2198
2199 void (*CkRegisterMainModuleCallback)() = NULL;
2200 extern "C" void registerCkRegisterMainModuleCallback(void (*cb)()) {
2201   CkRegisterMainModuleCallback = cb;
2202 }
2203
2204 void (*MainchareCtorExtCallback)(int, void*, int, int, char **) = NULL;
2205 extern "C" void registerMainchareCtorExtCallback(void (*cb)(int, void*, int, int, char **)) {
2206   MainchareCtorExtCallback = cb;
2207 }
2208
2209 void (*ReadOnlyRecvExtCallback)(int, char*) = NULL;
2210 extern "C" void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)) {
2211   ReadOnlyRecvExtCallback = cb;
2212 }
2213
2214 void* ReadOnlyExt::ro_data = NULL;
2215 size_t ReadOnlyExt::data_size = 0;
2216
2217 void (*ChareMsgRecvExtCallback)(int, void*, int, int, char *, int) = NULL;
2218 extern "C" void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char *, int)) {
2219   ChareMsgRecvExtCallback = cb;
2220 }
2221
2222 void (*GroupMsgRecvExtCallback)(int, int, int, char *, int) = NULL;
2223 extern "C" void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)) {
2224   GroupMsgRecvExtCallback = cb;
2225 }
2226
2227 void (*ArrayMsgRecvExtCallback)(int, int, int *, int, int, char *, int) = NULL;
2228 extern "C" void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)) {
2229   ArrayMsgRecvExtCallback = cb;
2230 }
2231
2232 int (*ArrayElemLeaveExt)(int, int, int *, char**, int) = NULL;
2233 extern "C" void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)) {
2234   ArrayElemLeaveExt = cb;
2235 }
2236
2237 void (*ArrayElemJoinExt)(int, int, int *, int, char*, int) = NULL;
2238 extern "C" void registerArrayElemJoinExtCallback(void (*cb)(int, int, int *, int, char*, int)) {
2239   ArrayElemJoinExt = cb;
2240 }
2241
2242 void (*ArrayResumeFromSyncExtCallback)(int, int, int *) = NULL;
2243 extern "C" void registerArrayResumeFromSyncExtCallback(void (*cb)(int, int, int *)) {
2244   ArrayResumeFromSyncExtCallback = cb;
2245 }
2246
2247 void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*) = NULL;
2248 extern "C" void registerCreateReductionTargetMsgExtCallback(void (*cb)(void*, int, int, int, char**, int*)) {
2249   CreateReductionTargetMsgExt = cb;
2250 }
2251
2252 int (*PyReductionExt)(char**, int*, int, char**) = NULL;
2253 extern "C" void registerPyReductionExtCallback(int (*cb)(char**, int*, int, char**)) {
2254     PyReductionExt = cb;
2255 }
2256
2257 int (*ArrayMapProcNumExtCallback)(int, int, const int *) = NULL;
2258 extern "C" void registerArrayMapProcNumExtCallback(int (*cb)(int, int, const int *)) {
2259   ArrayMapProcNumExtCallback = cb;
2260 }
2261
2262 extern "C" int CkMyPeHook() { return CkMyPe(); }
2263 extern "C" int CkNumPesHook() { return CkNumPes(); }
2264
2265 void ReadOnlyExt::setData(void *msg, size_t msgSize) {
2266   ro_data = malloc(msgSize);
2267   memcpy(ro_data, msg, msgSize);
2268   data_size = msgSize;
2269 }
2270
2271 void ReadOnlyExt::_roPup(void *pup_er) {
2272   PUP::er &p=*(PUP::er *)pup_er;
2273   if (!p.isUnpacking()) {
2274     //printf("[%d] Sizing/packing data, ro_data=%p, data_size=%d\n", CkMyPe(), ro_data, int(data_size));
2275     p | data_size;
2276     p((char*)ro_data, data_size);
2277   } else {
2278     CkAssert(CkMyPe() != 0);
2279     CkAssert(ro_data == NULL);
2280     PUP::fromMem &p_mem = *(PUP::fromMem *)pup_er;
2281     p_mem | data_size;
2282     //printf("[%d] Unpacking ro, size of data to unpack is %d\n", CkMyPe(), int(data_size));
2283     ReadOnlyRecvExtCallback(int(data_size), p_mem.get_current_pointer());
2284     p_mem.advance(data_size);
2285   }
2286 }
2287
2288 CkpvExtern(int, _currentChareType);
2289
2290 MainchareExt::MainchareExt(CkArgMsg *m) {
2291   int cIdx = CkpvAccess(_currentChareType);
2292   //printf("Constructor of MainchareExt, chareId=(%d,%p), chareIdx=%d\n", thishandle.onPE, thishandle.objPtr, cIdx);
2293   int ctorEpIdx =  _mainTable[_chareTable[cIdx]->mainChareType()]->entryIdx;
2294   MainchareCtorExtCallback(thishandle.onPE, thishandle.objPtr, ctorEpIdx, m->argc, m->argv);
2295   delete m;
2296 }
2297
2298 GroupExt::GroupExt(void *impl_msg) {
2299   //printf("Constructor of GroupExt, gid=%d\n", thisgroup.idx);
2300   //int chareIdx = CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
2301   int chareIdx = ckGetChareType();
2302   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2303   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2304   char *impl_buf = impl_msg_typed->msgBuf;
2305   PUP::fromMem implP(impl_buf);
2306   int msgSize; implP|msgSize;
2307   int dcopy_start; implP|dcopy_start;
2308   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2309                           dcopy_start);
2310 }
2311
2312 ArrayMapExt::ArrayMapExt(void *impl_msg) {
2313   //printf("Constructor of ArrayMapExt, gid=%d\n", thisgroup.idx);
2314   int chareIdx = ckGetChareType();
2315   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2316   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2317   char *impl_buf = impl_msg_typed->msgBuf;
2318   PUP::fromMem implP(impl_buf);
2319   int msgSize; implP|msgSize;
2320   int dcopy_start; implP|dcopy_start;
2321   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2322                           dcopy_start);
2323 }
2324
2325 // TODO options
2326 extern "C"
2327 int CkCreateGroupExt(int cIdx, int eIdx, int num_bufs, char **bufs, int *buf_sizes) {
2328   //static_cast<void>(impl_e_opts);
2329   CkAssert(num_bufs >= 1);
2330   int totalSize = 0;
2331   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2332   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2);
2333   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2334   PUP::toMem implP((void *)impl_msg->msgBuf);
2335   implP|totalSize;
2336   implP|buf_sizes[0];
2337   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2338   UsrToEnv(impl_msg)->setMsgtype(BocInitMsg);
2339   //if (impl_e_opts)
2340   //  UsrToEnv(impl_msg)->setGroupDep(impl_e_opts->getGroupDepID());
2341   CkGroupID gId = CkCreateGroup(cIdx, eIdx, impl_msg);
2342   return gId.idx;
2343 }
2344
2345 // TODO options
2346 extern "C"
2347 int CkCreateArrayExt(int cIdx, int ndims, int *dims, int eIdx, int num_bufs,
2348                      char **bufs, int *buf_sizes, int map_gid, char useAtSync) {
2349   //static_cast<void>(impl_e_opts);
2350   CkAssert(num_bufs >= 1);
2351   int totalSize = 0;
2352   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2353   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2354   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2355   PUP::toMem implP((void *)impl_msg->msgBuf);
2356   implP|useAtSync;
2357   implP|totalSize;
2358   implP|buf_sizes[0];
2359   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2360   CkArrayOptions opts;
2361   if (ndims != -1)
2362     opts = CkArrayOptions(ndims, dims);
2363   if (map_gid >= 0) {
2364     CkGroupID map_gId;
2365     map_gId.idx = map_gid;
2366     opts.setMap(CProxy_Group(map_gId));
2367   }
2368   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2369   //CkArrayID gId = ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2370   CkGroupID gId = CProxyElement_ArrayElement::ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2371   return gId.idx;
2372 }
2373
2374 // TODO options
2375 extern "C"
2376 void CkInsertArrayExt(int aid, int ndims, int *index, int epIdx, int onPE, int num_bufs,
2377                       char **bufs, int *buf_sizes, char useAtSync) {
2378   CkAssert(num_bufs >= 1);
2379   int totalSize = 0;
2380   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2381   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2382   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2383   PUP::toMem implP((void *)impl_msg->msgBuf);
2384   implP|useAtSync;
2385   implP|totalSize;
2386   implP|buf_sizes[0];
2387   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2388
2389   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2390   CkArrayIndex newIdx(ndims, index);
2391   CkGroupID gId;
2392   gId.idx = aid;
2393   CProxy_ArrayBase(gId).ckInsertIdx((CkArrayMessage *)impl_msg, epIdx, onPE, newIdx);
2394 }
2395
2396 extern "C"
2397 void CkMigrateExt(int aid, int ndims, int *index, int toPe) {
2398   //printf("[charm] CkMigrateMeExt called with aid: %d, ndims: %d, index: %d, toPe: %d\n",
2399         //aid, ndims, *index, toPe);
2400   CkGroupID gId;
2401   gId.idx = aid;
2402   CkArrayIndex arrayIndex(ndims, index);
2403   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2404   ArrayElement* arrayElement = arrayProxy.ckLocal();
2405   CkAssert(arrayElement != NULL);
2406   arrayElement->migrateMe(toPe);
2407 }
2408
2409 extern "C"
2410 void CkArrayDoneInsertingExt(int aid) {
2411   CkGroupID gId;
2412   gId.idx = aid;
2413   CProxy_ArrayBase(gId).doneInserting();
2414 }
2415
2416 extern "C"
2417 int CkGroupGetReductionNumber(int g_id) {
2418   CkGroupID gId;
2419   gId.idx = g_id;
2420   return ((Group*)CkLocalBranch(gId))->getRedNo();
2421 }
2422
2423 extern "C"
2424 int CkArrayGetReductionNumber(int aid, int ndims, int *index) {
2425   CkGroupID gId;
2426   gId.idx = aid;
2427   CkArrayIndex arrayIndex(ndims, index);
2428   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2429   ArrayElement* arrayElement = arrayProxy.ckLocal();
2430   CkAssert(arrayElement != NULL);
2431   return arrayElement->getRedNo();
2432 }
2433
2434 extern "C"
2435 void CkChareExtSend(int onPE, void *objPtr, int epIdx, char *msg, int msgSize) {
2436   //ckCheck();    // checks that gid is not zero
2437   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2438   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2439   PUP::toMem implP((void *)impl_msg->msgBuf);
2440   implP|msgSize;
2441   implP|epIdx;
2442   int d=0; implP|d;
2443   implP(msg, msgSize);
2444   CkChareID chareID;
2445   chareID.onPE = onPE;
2446   chareID.objPtr = objPtr;
2447
2448   CkSendMsg(epIdx, impl_msg, &chareID);
2449 }
2450
2451 extern "C"
2452 void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2453   CkAssert(num_bufs >= 1);
2454   //ckCheck();    // checks that gid is not zero
2455   int totalSize = 0;
2456   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2457   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2458   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2459   PUP::toMem implP((void *)impl_msg->msgBuf);
2460   implP | totalSize;
2461   implP | epIdx;
2462   implP | buf_sizes[0];
2463   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2464   CkChareID chareID;
2465   chareID.onPE = onPE;
2466   chareID.objPtr = objPtr;
2467
2468   CkSendMsg(epIdx, impl_msg, &chareID);
2469 }
2470
2471 extern "C"
2472 void CkGroupExtSend(int gid, int pe, int epIdx, char *msg, int msgSize) {
2473   //ckCheck();    // checks that gid is not zero
2474   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2475   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2476   PUP::toMem implP((void *)impl_msg->msgBuf);
2477   implP|msgSize;
2478   implP|epIdx;
2479   int d=0; implP|d;
2480   implP(msg, msgSize);
2481   CkGroupID gId;
2482   gId.idx = gid;
2483
2484   if (pe == -1)
2485     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2486   else
2487     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2488 }
2489
2490 extern "C"
2491 void CkGroupExtSend_multi(int gid, int pe, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2492   CkAssert(num_bufs >= 1);
2493   //ckCheck();    // checks that gid is not zero
2494   int totalSize = 0;
2495   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2496   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2497   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2498   PUP::toMem implP((void *)impl_msg->msgBuf);
2499   implP | totalSize;
2500   implP | epIdx;
2501   implP | buf_sizes[0];
2502   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2503   CkGroupID gId;
2504   gId.idx = gid;
2505
2506   if (pe == -1)
2507     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2508   else
2509     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2510 }
2511
2512 extern "C"
2513 void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize) {
2514   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2515   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2516   PUP::toMem implP((void *)impl_msg->msgBuf);
2517   implP|msgSize;
2518   implP|epIdx;
2519   int d=0; implP|d;
2520   implP(msg, msgSize);
2521   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2522   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2523   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2524   CkGroupID gId;
2525   gId.idx = aid;
2526   if (ndims > 0) {
2527     CkArrayIndex arrIndex(ndims, idx);
2528     // TODO is there a better function for this?
2529     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2530   } else { // broadcast
2531     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2532   }
2533 }
2534
2535 extern "C"
2536 void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2537   CkAssert(num_bufs >= 1);
2538   int totalSize = 0;
2539   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2540   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2541   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2542   PUP::toMem implP((void *)impl_msg->msgBuf);
2543   implP | totalSize;
2544   implP | epIdx;
2545   implP | buf_sizes[0];
2546   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2547   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2548   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2549   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2550   CkGroupID gId;
2551   gId.idx = aid;
2552   if (ndims > 0) {
2553     CkArrayIndex arrIndex(ndims, idx);
2554     // TODO is there a better function for this?
2555     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2556   } else { // broadcast
2557     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2558   }
2559 }
2560
2561 //------------------- Message Watcher (record/replay) ----------------
2562
2563 #include "crc32.h"
2564
2565 CkpvDeclare(int, envelopeEventID);
2566 int _recplay_crc = 0;
2567 int _recplay_checksum = 0;
2568 int _recplay_logsize = 1024*1024;
2569
2570 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2571 #define REPLAYDEBUG(args) /* empty */
2572
2573 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2574
2575 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2576 #include "BaseLB.h" /* For LBMigrateMsg message */
2577
2578 #if CMK_REPLAYSYSTEM
2579 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2580   std::string fName = CkpvAccess(traceRoot);
2581   fName += prefix;
2582   fName += std::to_string(CkMyPe());
2583   fName += suffix;
2584   FILE *f = fopen(fName.c_str(), permissions);
2585   REPLAYDEBUG("openReplayfile " << fName.c_str());
2586   if (f==NULL) {
2587     CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2588              CkMyPe(), fName.c_str(), permissions);
2589     CkAbort("openReplayFile> Could not open replay file");
2590   }
2591   return f;
2592 }
2593
2594 class CkMessageRecorder : public CkMessageWatcher {
2595   unsigned int curpos;
2596   bool firstOpen;
2597   std::vector<char> buffer;
2598 public:
2599   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true), buffer(_recplay_logsize) { f=f_; }
2600   ~CkMessageRecorder() {
2601     flushLog(0);
2602     fprintf(f,"-1 -1 -1 ");
2603     fclose(f);
2604 #if 0
2605     FILE *stsfp = fopen("sts", "w");
2606     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2607     traceWriteSTS(stsfp, 0);
2608     fclose(stsfp);
2609 #endif
2610     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2611   }
2612
2613 private:
2614   void flushLog(int verbose=1) {
2615     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2616     fprintf(f, "%s", buffer.data());
2617     curpos=0;
2618   }
2619   virtual bool process(envelope **envptr,CkCoreState *ck) {
2620     if ((*envptr)->getEvent()) {
2621       bool wasPacked = (*envptr)->isPacked();
2622       if (!wasPacked) CkPackMessage(envptr);
2623       envelope *env = *envptr;
2624       unsigned int crc1=0, crc2=0;
2625       if (_recplay_crc) {
2626         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2627         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2628         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2629       } else if (_recplay_checksum) {
2630         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2631         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2632       }
2633       curpos+=sprintf(&buffer[curpos],"%d %d %d %d %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2634       if (curpos > _recplay_logsize-128) flushLog();
2635       if (!wasPacked) CkUnpackMessage(envptr);
2636     }
2637     return true;
2638   }
2639   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2640     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2641     if (curpos > _recplay_logsize-128) flushLog();
2642     return true;
2643   }
2644   
2645   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2646     FILE *f;
2647     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2648     else f = openReplayFile("ckreplay_",".lb","a");
2649     firstOpen = false;
2650     if (f != NULL) {
2651       PUP::toDisk p(f);
2652       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2653       (*msg)->pup(p);
2654       fclose(f);
2655     }
2656     return true;
2657   }
2658 };
2659
2660 class CkMessageDetailRecorder : public CkMessageWatcher {
2661 public:
2662   CkMessageDetailRecorder(FILE *f_) {
2663     f=f_;
2664     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2665      * The value of 'x' is the pointer size.
2666      */
2667     CmiUInt2 little = sizeof(void*);
2668     fwrite(&little, 2, 1, f);
2669   }
2670   ~CkMessageDetailRecorder() {fclose(f);}
2671 private:
2672   virtual bool process(envelope **envptr, CkCoreState *ck) {
2673     bool wasPacked = (*envptr)->isPacked();
2674     if (!wasPacked) CkPackMessage(envptr);
2675     envelope *env = *envptr;
2676     CmiUInt4 size = env->getTotalsize();
2677     fwrite(&size, 4, 1, f);
2678     fwrite(env, env->getTotalsize(), 1, f);
2679     if (!wasPacked) CkUnpackMessage(envptr);
2680     return true;
2681   }
2682 };
2683
2684 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2685 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2686
2687 class CkMessageReplay : public CkMessageWatcher {
2688   int counter;
2689         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2690         int nextEP;
2691         unsigned int crc1, crc2;
2692         FILE *lbFile;
2693         /// Read the next message we need from the file:
2694         void getNext(void) {
2695           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2696           if (nextSize > 0) {
2697             // We are reading a regular message
2698             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2699               CkAbort("CkMessageReplay> Syntax error reading replay file");
2700             }
2701             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2702           } else if (nextSize == -2) {
2703             // We are reading a special message (right now only thread awaken)
2704             // Nothing to do since we have already read all info
2705             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2706           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2707             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2708             CkAbort("CkMessageReplay> Unrecognized input");
2709           }
2710             /*
2711                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2712                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2713                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2714                 }
2715                 */
2716                 counter++;
2717         }
2718         /// If this is the next message we need, advance and return true.
2719         bool isNext(envelope *env) {
2720                 if (nextPE!=env->getSrcPe()) return false;
2721                 if (nextEvent!=env->getEvent()) return false;
2722                 if (nextSize<0) return false; // not waiting for a regular message
2723 #if 1
2724                 if (nextEP != env->getEpIdx()) {
2725                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2726                         return false;
2727                 }
2728 #endif
2729 #if ! CMK_BIGSIM_CHARM
2730                 if (nextSize!=env->getTotalsize())
2731                 {
2732                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2733                         return false;
2734                 }
2735                 if (_recplay_crc || _recplay_checksum) {
2736                   bool wasPacked = env->isPacked();
2737                   if (!wasPacked) CkPackMessage(&env);
2738                   if (_recplay_crc) {
2739                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2740                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2741                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2742                     if (crcnew1 != crc1) {
2743                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2744                     }
2745                     if (crcnew2 != crc2) {
2746                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2747                     }
2748                   } else if (_recplay_checksum) {
2749             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2750             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2751             if (crcnew1 != crc1) {
2752               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2753             }
2754             if (crcnew2 != crc2) {
2755               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2756             }               
2757                   }
2758                   if (!wasPacked) CkUnpackMessage(&env);
2759                 }
2760 #endif
2761                 return true;
2762         }
2763         bool isNext(CthThreadToken *token) {
2764           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2765           return false;
2766         }
2767
2768         /// This is a (short) list of messages we aren't yet ready for:
2769         CkQ<envelope *> delayedMessages;
2770         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2771         CkQ<CthThreadToken *> delayedTokens;
2772
2773         /// Try to flush out any delayed messages
2774         void flush(void) {
2775           if (nextSize>0) {
2776                 int len=delayedMessages.length();
2777                 for (int i=0;i<len;i++) {
2778                         envelope *env=delayedMessages.deq();
2779                         if (isNext(env)) { /* this is the next message: process it */
2780                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2781                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2782                                 return;
2783                         }
2784                         else /* Not ready yet-- put it back in the
2785                                 queue */
2786                           {
2787                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2788                                 delayedMessages.enq(env);
2789                           }
2790                 }
2791           } else if (nextSize==-2) {
2792             int len=delayedTokens.length();
2793             for (int i=0;i<len;++i) {
2794               CthThreadToken *token=delayedTokens.deq();
2795               if (isNext(token)) {
2796             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2797 #if ! CMK_BIGSIM_CHARM
2798                 CsdEnqueueLifo((void*)token);
2799 #else
2800                 CthEnqueueBigSimThread(token,0,0,NULL);
2801 #endif
2802                 return;
2803               } else {
2804             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2805                 delayedTokens.enq(token);
2806               }
2807             }
2808           }
2809         }
2810
2811 public:
2812         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2813           counter=0;
2814           f=f_;
2815           getNext();
2816           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2817 #if CMI_QD
2818           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2819 #endif
2820         }
2821         ~CkMessageReplay() {fclose(f);}
2822
2823 private:
2824         virtual bool process(envelope **envptr,CkCoreState *ck) {
2825           bool wasPacked = (*envptr)->isPacked();
2826           if (!wasPacked) CkPackMessage(envptr);
2827           envelope *env = *envptr;
2828           //CkAssert(*(int*)env == 0x34567890);
2829           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2830                 if (env->getEvent() == 0) return true;
2831                 if (isNext(env)) { /* This is the message we were expecting */
2832                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2833                         getNext(); /* Advance over this message */
2834                         flush(); /* try to process queued-up stuff */
2835                         if (!wasPacked) CkUnpackMessage(envptr);
2836                         return true;
2837                 }
2838 #if CMK_SMP
2839                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2840                          // try next rank, we can't just buffer the msg and left
2841                          // we need to keep unprocessed msg on the fly
2842                         int nextpe = CkMyPe()+1;
2843                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2844                         nextpe = CkNodeFirst(CkMyNode());
2845                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2846                         return false;
2847                 }
2848 #endif
2849                 else /*!isNext(env) */ {
2850                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2851                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2852                         delayedMessages.enq(env);
2853                         flush();
2854                         return false;
2855                 }
2856         }
2857         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2858       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2859           if (isNext(token)) {
2860         REPLAYDEBUG("Executing token: "<<token->serialNo)
2861             getNext();
2862             flush();
2863             return true;
2864           } else {
2865         REPLAYDEBUG("Queueing token: "<<token->serialNo
2866             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2867             delayedTokens.enq(token);
2868             return false;
2869           }
2870         }
2871
2872         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2873           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2874           if (lbFile != NULL) {
2875             int num_moves = 0;
2876         PUP::fromDisk p(lbFile);
2877             p | num_moves;
2878             if (num_moves != (*msg)->n_moves) {
2879               delete *msg;
2880               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2881             }
2882             (*msg)->pup(p);
2883           }
2884           return true;
2885         }
2886 };
2887
2888 class CkMessageDetailReplay : public CkMessageWatcher {
2889   void *getNext() {
2890     CmiUInt4 size; size_t nread;
2891     if ((nread=fread(&size, 4, 1, f)) < 1) {
2892       if (feof(f)) return NULL;
2893       CkPrintf("Broken record file (metadata) got %d\n",nread);
2894       CkAbort("");
2895     }
2896     void *env = CmiAlloc(size);
2897     long tell = ftell(f);
2898     if ((nread=fread(env, size, 1, f)) < 1) {
2899       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2900       CkAbort("");
2901     }
2902     //*(int*)env = 0x34567890; // set first integer as magic
2903     return env;
2904   }
2905 public:
2906   double starttime;
2907   CkMessageDetailReplay(FILE *f_) {
2908     f=f_;
2909     starttime=CkWallTimer();
2910     /* This must match what CkMessageDetailRecorder did */
2911     CmiUInt2 little;
2912     fread(&little, 2, 1, f);
2913     if (little != sizeof(void*)) {
2914       CkAbort("Replaying on a different architecture from which recording was done!");
2915     }
2916
2917     CsdEnqueue(getNext());
2918
2919     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2920   }
2921   virtual bool process(envelope **env,CkCoreState *ck) {
2922     void *msg = getNext();
2923     if (msg != NULL) CsdEnqueue(msg);
2924     return true;
2925   }
2926 };
2927
2928 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2929 #if ! CMK_BIGSIM_CHARM
2930   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2931 #endif
2932   CkMessageReplay *replay = (CkMessageReplay*)rep;
2933   //CmiStartQD(CkMessageReplayQuiescence, replay);
2934 }
2935
2936 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2937   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2938   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2939   ConverseExit();
2940 }
2941 #endif
2942
2943 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2944   CkCoreState *ck = CkpvAccess(_coreState);
2945   if (ck->watcher!=NULL) {
2946     return ck->watcher->processThread(token,ck);
2947   }
2948   return true;
2949 }
2950
2951 CpvExtern(int, CthResumeNormalThreadIdx);
2952 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2953 {
2954   CthThread t = token->thread;
2955
2956   if(t == NULL){
2957     free(token);
2958     return;
2959   }
2960 #if CMK_TRACE_ENABLED
2961 #if ! CMK_TRACE_IN_CHARM
2962   if(CpvAccess(traceOn))
2963     CthTraceResume(t);
2964 /*    if(CpvAccess(_traceCoreOn)) 
2965             resumeTraceCore();*/
2966 #endif
2967 #endif
2968 #if CMK_OMP
2969   CthSetPrev(t, CthSelf());
2970 #endif
2971   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2972   if (CpdExecuteThreadResume(token)) {
2973     CthResume(t);
2974   }
2975 #if CMK_OMP
2976   CthScheduledDecrement();
2977   CthSetPrev(CthSelf(), 0);
2978 #endif
2979 }
2980
2981 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2982   CkCoreState *ck = CkpvAccess(_coreState);
2983   if (ck->watcher!=NULL) {
2984     ck->watcher->processLBMessage(msg, ck);
2985   }
2986 }
2987
2988 #if CMK_BIGSIM_CHARM
2989 CpvExtern(int      , CthResumeBigSimThreadIdx);
2990 #endif
2991
2992 #include "ckliststring.h"
2993 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2994     CmiArgGroup("Charm++","Record/Replay");
2995     bool forceReplay = false;
2996     char *procs = NULL;
2997     _replaySystem = 0;
2998     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2999       if(CmiMyRank() == 0) _recplay_crc = 1;
3000     }
3001     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
3002       if(CmiMyRank() == 0) _recplay_checksum = 1;
3003     }
3004     int tmplogsize;
3005     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
3006       {
3007         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
3008       }
3009     REPLAYDEBUG("CkMessageWatcherInit ");
3010     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
3011 #if CMK_REPLAYSYSTEM
3012         CkListString list(procs);
3013         if (list.includes(CkMyPe())) {
3014           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
3015           CpdSetInitializeMemory(1);
3016           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
3017         }
3018 #else
3019         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3020 #endif
3021     }
3022     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
3023 #if CMK_REPLAYSYSTEM
3024       if (CkMyPe() == 0) {
3025         CmiPrintf("Charm++> record mode.\n");
3026         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3027           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
3028           _recplay_crc = _recplay_checksum = 0;
3029         }
3030       }
3031       CpdSetInitializeMemory(1);
3032       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3033       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
3034 #else
3035       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
3036 #endif
3037     }
3038         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
3039 #if CMK_REPLAYSYSTEM
3040             forceReplay = true;
3041             CpdSetInitializeMemory(1);
3042             // Set the parameters of the processor
3043 #if CMK_SHARED_VARS_UNAVAILABLE
3044             _Cmi_mype = atoi(procs);
3045             while (procs[0]!='/') procs++;
3046             procs++;
3047             _Cmi_numpes = atoi(procs);
3048 #else
3049             CkAbort("+replay-detail available only for non-SMP build");
3050 #endif
3051             _replaySystem = 1;
3052             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
3053 #else
3054           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3055 #endif
3056         }
3057         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
3058 #if CMK_REPLAYSYSTEM
3059           if (CkMyPe() == 0)  {
3060             CmiPrintf("Charm++> replay mode.\n");
3061             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3062               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
3063               _recplay_crc = _recplay_checksum = 0;
3064             }
3065           }
3066           CpdSetInitializeMemory(1);
3067 #if ! CMK_BIGSIM_CHARM
3068           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3069 #else
3070           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3071 #endif
3072           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
3073 #else
3074           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
3075 #endif
3076         }
3077         if (_recplay_crc && _recplay_checksum) {
3078           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
3079         }
3080 }
3081
3082 extern "C"
3083 int CkMessageToEpIdx(void *msg) {
3084         envelope *env=UsrToEnv(msg);
3085         int ep=env->getEpIdx();
3086         if (ep==CkIndex_CkArray::recvBroadcast(0))
3087                 return env->getsetArrayBcastEp();
3088         else
3089                 return ep;
3090 }
3091
3092 extern "C"
3093 int getCharmEnvelopeSize() {
3094   return sizeof(envelope);
3095 }
3096
3097 /// Best-effort guess at whether @arg msg points at a charm envelope
3098 extern "C"
3099 int isCharmEnvelope(void *msg) {
3100     envelope *e = (envelope *)msg;
3101     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
3102     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
3103     if (e->getTotalsize() < sizeof(envelope)) return 0;
3104     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
3105 #if CMK_SMP
3106     if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
3107 #else
3108     if (e->getSrcPe()>=CkNumPes()) return 0;
3109 #endif
3110     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
3111     return 1;
3112 }
3113
3114 #include "CkMarshall.def.h"