6a1a6888a900f2394677d720894e96250e49732f
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #include "pathHistory.h"
14
15 #if CMK_LBDB_ON
16 #include "LBDatabase.h"
17 #endif // CMK_LBDB_ON
18
19 #ifndef CMK_CHARE_USE_PTR
20 #include <map>
21 CkpvDeclare(std::vector<void *>, chare_objs);
22 CkpvDeclare(std::vector<int>, chare_types);
23 CkpvDeclare(std::vector<VidBlock *>, vidblocks);
24
25 typedef std::map<int, CkChareID>  Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
28 #endif
29
30 // Map of array IDs to array elements for fast message delivery
31 CkpvDeclare(ArrayObjMap, array_objs);
32
33 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
34
35 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
36
37 int CMessage_CkMessage::__idx=-1;
38 int CMessage_CkArgMsg::__idx=0;
39 int CkIndex_Chare::__idx;
40 int CkIndex_Group::__idx;
41 int CkIndex_ArrayBase::__idx=-1;
42
43 extern int _defaultObjectQ;
44
45 void _initChareTables()
46 {
47 #ifndef CMK_CHARE_USE_PTR
48           /* chare and vidblock table */
49   CkpvInitialize(std::vector<void *>, chare_objs);
50   CkpvInitialize(std::vector<int>, chare_types);
51   CkpvInitialize(std::vector<VidBlock *>, vidblocks);
52   CkpvInitialize(Vidblockmap, vmap);
53   CkpvInitialize(int, currentChareIdx);
54   CkpvAccess(currentChareIdx) = -1;
55 #endif
56
57   CkpvInitialize(ArrayObjMap, array_objs);
58 }
59
60 //Charm++ virtual functions: declaring these here results in a smaller executable
61 Chare::Chare(void) {
62   thishandle.onPE=CkMyPe();
63   thishandle.objPtr=this;
64 #if CMK_ERROR_CHECKING
65   magic = CHARE_MAGIC;
66 #endif
67 #ifndef CMK_CHARE_USE_PTR
68      // for plain chare, objPtr is actually the index to chare obj table
69   if (CkpvAccess(currentChareIdx) >= 0) {
70     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
71   }
72   chareIdx = CkpvAccess(currentChareIdx);
73 #endif
74 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
75   mlogData = new ChareMlogData();
76   mlogData->objID.type = TypeChare;
77   mlogData->objID.data.chare.id = thishandle;
78 #endif
79 #if CMK_OBJECT_QUEUE_AVAILABLE
80   if (_defaultObjectQ)  CkEnableObjQ();
81 #endif
82 }
83
84 Chare::Chare(CkMigrateMessage* m) {
85   thishandle.onPE=CkMyPe();
86   thishandle.objPtr=this;
87 #if CMK_ERROR_CHECKING
88   magic = 0;
89 #endif
90
91 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
92         mlogData = NULL;
93 #endif
94
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96   if (_defaultObjectQ)  CkEnableObjQ();
97 #endif
98 }
99
100 void Chare::CkEnableObjQ()
101 {
102 #if CMK_OBJECT_QUEUE_AVAILABLE
103   objQ.create();
104 #endif
105 }
106
107 Chare::~Chare() {
108 #ifndef CMK_CHARE_USE_PTR
109 /*
110   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
111 */
112   if (chareIdx != -1)
113   {
114     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
115     CkpvAccess(chare_objs)[chareIdx] = NULL;
116     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
117     if (iter != CkpvAccess(vmap).end()) {
118       CkChareID *pCid = (CkChareID *)
119         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
120       int srcPe = iter->second.onPE;
121       *pCid = iter->second;
122       envelope *ret = UsrToEnv(pCid);
123       ret->setVidPtr(iter->second.objPtr);
124       ret->setSrcPe(CkMyPe());
125       CmiSetHandler(ret, _charmHandlerIdx);
126       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
127       CpvAccess(_qd)->create();
128       CkpvAccess(vmap).erase(iter);
129     }
130   }
131 #endif
132 }
133
134 void Chare::pup(PUP::er &p)
135 {
136   p(thishandle.onPE);
137   thishandle.objPtr=(void *)this;
138 #ifndef CMK_CHARE_USE_PTR
139   p(chareIdx);
140   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
141 #endif
142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
143         if(p.isUnpacking()){
144                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
145                 mlogData = new ChareMlogData();
146         }
147         mlogData->pup(p);
148 #endif
149 #if CMK_ERROR_CHECKING
150   p(magic);
151 #endif
152 }
153
154 int Chare::ckGetChareType() const {
155   return -3;
156 }
157 char *Chare::ckDebugChareName(void) {
158   char buf[100];
159   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),(void*)this);
160   return strdup(buf);
161 }
162 int Chare::ckDebugChareID(char *str, int limit) {
163   // pure chares for now do not have a valid ID
164   str[0] = 0;
165   return 1;
166 }
167 void Chare::ckDebugPup(PUP::er &p) {
168   pup(p);
169 }
170
171 /// This method is called before starting a [threaded] entry method.
172 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
173   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
174   traceAddThreadListeners(th, UsrToEnv(msg));
175 }
176
177 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
178   p.comment("Bytes");
179   int ts=UsrToEnv(msg)->getTotalsize();
180   int msgLen=ts-sizeof(envelope);
181   if (msgLen>0)
182     p((char*)msg,msgLen);
183 }
184
185 IrrGroup::IrrGroup(void) {
186   thisgroup = CkpvAccess(_currentGroup);
187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
188         mlogData->objID.type = TypeGroup;
189         mlogData->objID.data.group.id = thisgroup;
190         mlogData->objID.data.group.onPE = CkMyPe();
191 #endif
192 }
193
194 IrrGroup::~IrrGroup() {
195   // remove the object pointer
196   if (CkpvAccess(_destroyingNodeGroup)) {
197     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
198     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
199     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
200     CkpvAccess(_destroyingNodeGroup) = false;
201   } else {
202     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
203     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
204     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
205   }
206 }
207
208 void IrrGroup::pup(PUP::er &p)
209 {
210   Chare::pup(p);
211   p|thisgroup;
212 }
213
214 int IrrGroup::ckGetChareType() const {
215   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
216 }
217
218 int IrrGroup::ckDebugChareID(char *str, int limit) {
219   if (limit<5) return -1;
220   str[0] = 1;
221   *((int*)&str[1]) = thisgroup.idx;
222   return 5;
223 }
224
225 char *IrrGroup::ckDebugChareName() {
226   return strdup(_chareTable[ckGetChareType()]->name);
227 }
228
229 void IrrGroup::ckJustMigrated(void)
230 {
231 }
232
233 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
234   /* FIXME: **CW** not entirely sure what we should do here yet */
235 }
236
237 void Group::CkAddThreadListeners(CthThread th, void *msg) {
238   Chare::CkAddThreadListeners(th, msg);
239   CthSetThreadID(th, thisgroup.idx, 0, 0);
240 }
241
242 void Group::pup(PUP::er &p)
243 {
244   CkReductionMgr::pup(p);
245   p|reductionInfo;
246 }
247
248 /**** Delegation Manager Group */
249 CkDelegateMgr::~CkDelegateMgr() { }
250
251 //Default delegator implementation: do not delegate-- send directly
252 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
253   { CkSendMsg(ep,m,c); }
254 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
255   { CkSendMsgBranch(ep,m,onPE,g); }
256 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
257   { CkBroadcastMsgBranch(ep,m,g); }
258 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
259   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
260 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
261   { CkSendMsgNodeBranch(ep,m,onNode,g); }
262 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
263   { CkBroadcastMsgNodeBranch(ep,m,g); }
264 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
265   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
266 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
267 {
268         CProxyElement_ArrayBase ap(a,idx);
269         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
270 }
271 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
272 {
273         CProxyElement_ArrayBase ap(a,idx);
274         ap.ckSend((CkArrayMessage *)m,ep);
275 }
276 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
277 {
278         CProxy_ArrayBase ap(a);
279         ap.ckBroadcast((CkArrayMessage *)m,ep);
280 }
281
282 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
283 {
284         CmiAbort("ArraySectionSend is not implemented!\n");
285 /*
286         CProxyElement_ArrayBase ap(a,idx);
287         ap.ckSend((CkArrayMessage *)m,ep);
288 */
289 }
290
291 /*** Proxy <-> delegator communication */
292 CkDelegateData::~CkDelegateData() {}
293
294 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
295   return pd; // default implementation ignores pup call
296 }
297
298 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
299    this tricky manual reference counting business... */
300
301 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
302         if (dPtr) dPtr->ref();
303         ckUndelegate();
304         delegatedMgr = dTo;
305         delegatedPtr = dPtr;
306         delegatedGroupId = delegatedMgr->CkGetGroupID();
307         isNodeGroup = delegatedMgr->isNodeGroup();
308 }
309 void CProxy::ckUndelegate(void) {
310         delegatedMgr=NULL;
311         delegatedGroupId.setZero();
312         if (delegatedPtr) delegatedPtr->unref();
313         delegatedPtr=NULL;
314 }
315
316 /// Copy constructor
317 CProxy::CProxy(const CProxy &src)
318   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
319    isNodeGroup(src.isNodeGroup) {
320     delegatedPtr = NULL;
321     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
322         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
323     }
324 }
325
326 /// Assignment operator
327 CProxy& CProxy::operator=(const CProxy &src) {
328         CkDelegateData *oldPtr=delegatedPtr;
329         ckUndelegate();
330         delegatedMgr=src.delegatedMgr;
331         delegatedGroupId = src.delegatedGroupId; 
332         isNodeGroup = src.isNodeGroup;
333
334         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
335             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
336         else
337             delegatedPtr = NULL;
338
339         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
340         if (oldPtr) oldPtr->unref();
341         return *this;
342 }
343
344 void CProxy::pup(PUP::er &p) {
345   if (!p.isUnpacking()) {
346     if (ckDelegatedTo() != NULL) {
347       delegatedGroupId = delegatedMgr->CkGetGroupID();
348       isNodeGroup = delegatedMgr->isNodeGroup();
349     }
350   }
351   p|delegatedGroupId;
352   if (!delegatedGroupId.isZero()) {
353     p|isNodeGroup;
354     if (p.isUnpacking()) {
355       delegatedMgr = ckDelegatedTo(); 
356     }
357
358     int migCtor = 0, cIdx; 
359     if (!p.isUnpacking()) {
360       if (isNodeGroup) {
361         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
362         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
363         migCtor = _chareTable[cIdx]->migCtor; 
364         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
365       }
366       else  {
367         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
368         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
369         migCtor = _chareTable[cIdx]->migCtor; 
370         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
371       }         
372     }
373
374     p|migCtor;
375
376     // if delegated manager has not been created, construct a dummy
377     // object on which to call DelegatePointerPup
378     if (delegatedMgr == NULL) {
379
380       // create a dummy object for calling DelegatePointerPup
381       int objId = _entryTable[migCtor]->chareIdx; 
382       size_t objSize = _chareTable[objId]->size;
383       void *obj = malloc(objSize); 
384       _entryTable[migCtor]->call(NULL, obj); 
385       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
386         ->DelegatePointerPup(p, delegatedPtr);           
387       free(obj);
388
389     }
390     else {
391
392       // delegated manager has been created, so we can use it
393       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
394
395     }
396
397     if (p.isUnpacking() && delegatedPtr) {
398       delegatedPtr->ref();
399     }
400   }
401 }
402
403 /**** Array sections */
404 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
405 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems, int factor): bfactor(factor) { \
406   _elems.assign(elems, elems+nElems);  \
407   _cookie.get_aid() = aid;      \
408   _cookie.get_pe() = CkMyPe();  \
409 } \
410 CkSectionID::CkSectionID(const CkArrayID &aid, const std::vector<CkArrayIndex##index> &elems, int factor): bfactor(factor) { \
411   _elems.resize(elems.size()); \
412   for (int i=0; i<_elems.size(); ++i) { \
413     _elems[i] = static_cast<CkArrayIndex>(elems[i]); \
414   } \
415   _cookie.get_aid() = aid;      \
416   _cookie.get_pe() = CkMyPe();  \
417 } \
418
419 CKSECTIONID_CONSTRUCTOR_DEF(1D)
420 CKSECTIONID_CONSTRUCTOR_DEF(2D)
421 CKSECTIONID_CONSTRUCTOR_DEF(3D)
422 CKSECTIONID_CONSTRUCTOR_DEF(4D)
423 CKSECTIONID_CONSTRUCTOR_DEF(5D)
424 CKSECTIONID_CONSTRUCTOR_DEF(6D)
425 CKSECTIONID_CONSTRUCTOR_DEF(Max)
426
427 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes, int factor): bfactor(factor) {
428   _cookie.get_aid() = gid;
429   pelist.assign(_pelist, _pelist+_npes);
430 }
431
432 CkSectionID::CkSectionID(const CkGroupID &gid, const std::vector<int>& _pelist, int factor): pelist(_pelist), bfactor(factor) {
433   _cookie.get_aid() = gid;
434 }
435
436 CkSectionID::CkSectionID(const CkSectionID &sid) {
437   _cookie = sid._cookie;
438   pelist = sid.pelist;
439   _elems = sid._elems;
440   bfactor = sid.bfactor;
441 }
442
443 void CkSectionID::operator=(const CkSectionID &sid) {
444   _cookie = sid._cookie;
445   pelist = sid.pelist;
446   _elems = sid._elems;
447   bfactor = sid.bfactor;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451   p | _cookie;
452   p | pelist;
453   p | _elems;
454   p | bfactor;
455 }
456
457 /**** Tiny random API routines */
458
459 #if CMK_CUDA
460 void CUDACallbackManager(void *fn) {
461   if (fn != NULL) {
462     CkCallback *cb = (CkCallback*) fn;
463     cb->send();
464   }
465 }
466
467 #endif
468
469 void QdCreate(int n) {
470   CpvAccess(_qd)->create(n);
471 }
472
473 void QdProcess(int n) {
474   CpvAccess(_qd)->process(n);
475 }
476
477 extern "C"
478 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
479 {
480   UsrToEnv(msg)->setRef(ref);
481 }
482
483 extern "C"
484 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
485 {
486   return UsrToEnv(msg)->getRef();
487 }
488
489 extern "C"
490 int CkGetSrcPe(void *msg)
491 {
492   return UsrToEnv(msg)->getSrcPe();
493 }
494
495 extern "C"
496 int CkGetSrcNode(void *msg)
497 {
498   return CmiNodeOf(CkGetSrcPe(msg));
499 }
500
501 extern "C"
502 void *CkLocalBranch(CkGroupID gID) {
503   return _localBranch(gID);
504 }
505
506 static
507 void *_ckLocalNodeBranch(CkGroupID groupID) {
508   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
509   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
510   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
511   return retval;
512 }
513
514 extern "C"
515 void *CkLocalNodeBranch(CkGroupID groupID)
516 {
517   void *retval;
518   // we are called in a constructor
519   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
520     return CkpvAccess(_currentNodeGroupObj);
521   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
522   { // Nodegroup hasn't finished being created yet-- schedule...
523     CsdScheduler(0);
524   }
525   return retval;
526 }
527
528 extern "C"
529 void *CkLocalChare(const CkChareID *pCid)
530 {
531         int pe=pCid->onPE;
532         if (pe<0) { //A virtual chare ID
533                 if (pe!=(-(CkMyPe()+1)))
534                         return NULL;//VID block not on this PE
535 #ifdef CMK_CHARE_USE_PTR
536                 VidBlock *v=(VidBlock *)pCid->objPtr;
537 #else
538                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
539 #endif
540                 return v->getLocalChareObj();
541         }
542         else
543         { //An ordinary chare ID
544                 if (pe!=CkMyPe())
545                         return NULL;//Chare not on this PE
546 #ifdef CMK_CHARE_USE_PTR
547                 return pCid->objPtr;
548 #else
549                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
550 #endif
551         }
552 }
553
554 CkpvDeclare(char**,Ck_argv);
555
556 extern "C" char **CkGetArgv(void) {
557         return CkpvAccess(Ck_argv);
558 }
559 extern "C" int CkGetArgc(void) {
560         return CmiGetArgc(CkpvAccess(Ck_argv));
561 }
562
563 /******************** Basic support *****************/
564 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
565 {
566 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
567         CpvAccess(_currentObj) = (Chare *)obj;
568 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
569 #endif
570   //BIGSIM_OOC DEBUGGING
571   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
572   //fflush(stdout);
573 #if CMK_CHARMDEBUG
574   CpdBeforeEp(epIdx, obj, msg);
575 #endif    
576   _entryTable[epIdx]->call(msg, obj);
577 #if CMK_CHARMDEBUG
578   CpdAfterEp(epIdx);
579 #endif
580   if (_entryTable[epIdx]->noKeep)
581   { /* Method doesn't keep/delete the message, so we have to: */
582     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
583   }
584 }
585 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
586 {
587   //BIGSIM_OOC DEBUGGING
588   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
589   //fflush(stdout);
590
591   void *deliverMsg;
592 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
593         CpvAccess(_currentObj) = (Chare *)obj;
594 #endif
595   if (_entryTable[epIdx]->noKeep)
596   { /* Deliver a read-only copy of the message */
597     deliverMsg=(void *)msg;
598   } else
599   { /* Method needs a copy of the message to keep/delete */
600     void *oldMsg=(void *)msg;
601     deliverMsg=CkCopyMsg(&oldMsg);
602 #if CMK_ERROR_CHECKING
603     if (oldMsg!=msg)
604       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
605 #endif
606   }
607 #if CMK_CHARMDEBUG
608   CpdBeforeEp(epIdx, obj, (void*)msg);
609 #endif
610   _entryTable[epIdx]->call(deliverMsg, obj);
611 #if CMK_CHARMDEBUG
612   CpdAfterEp(epIdx);
613 #endif
614 }
615
616 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
617 {
618   void *msg = EnvToUsr(env);
619   _SET_USED(env, 0);
620   CkDeliverMessageFree(epIdx,msg,obj);
621 }
622
623 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
624 {
625
626 #if CMK_TRACE_ENABLED 
627   if (_entryTable[epIdx]->traceEnabled) {
628     _TRACE_BEGIN_EXECUTE(env, obj);
629     if(_entryTable[epIdx]->appWork)
630         _TRACE_BEGIN_APPWORK();
631     _invokeEntryNoTrace(epIdx,env,obj);
632     if(_entryTable[epIdx]->appWork)
633         _TRACE_END_APPWORK();
634     _TRACE_END_EXECUTE();
635   }
636   else
637 #endif
638     _invokeEntryNoTrace(epIdx,env,obj);
639
640 }
641
642 /********************* Creation ********************/
643
644 extern "C"
645 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
646 {
647   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
648   envelope *env = UsrToEnv(msg);
649   _CHECK_USED(env);
650   if(pCid == 0) {
651     env->setMsgtype(NewChareMsg);
652   } else {
653     pCid->onPE = (-(CkMyPe()+1));
654     //  pCid->magic = _GETIDX(cIdx);
655     pCid->objPtr = (void *) new VidBlock();
656     _MEMCHECK(pCid->objPtr);
657     env->setMsgtype(NewVChareMsg);
658     env->setVidPtr(pCid->objPtr);
659 #ifndef CMK_CHARE_USE_PTR
660     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
661     int idx = CkpvAccess(vidblocks).size()-1;
662     pCid->objPtr = (void *)(CmiIntPtr)idx;
663     env->setVidPtr((void *)(CmiIntPtr)idx);
664 #endif
665   }
666   env->setEpIdx(eIdx);
667   env->setByPe(CkMyPe());
668   env->setSrcPe(CkMyPe());
669   CmiSetHandler(env, _charmHandlerIdx);
670   _TRACE_CREATION_1(env);
671   CpvAccess(_qd)->create();
672   _STATS_RECORD_CREATE_CHARE_1();
673   _SET_USED(env, 1);
674   if(destPE == CK_PE_ANY)
675     env->setForAnyPE(1);
676   else
677     env->setForAnyPE(0);
678   _CldEnqueue(destPE, env, _infoIdx);
679   _TRACE_CREATION_DONE(1);
680 }
681
682 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
683 {
684   int gIdx = _entryTable[epIdx]->chareIdx;
685   void *obj = malloc(_chareTable[gIdx]->size);
686   _MEMCHECK(obj);
687   setMemoryTypeChare(obj);
688   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
689   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
690   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
691   CkpvAccess(_groupIDTable)->push_back(groupID);
692   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
693   if(ptrq) {
694     void *pending;
695     while((pending=ptrq->deq())!=0) {
696 #if CMK_BIGSIM_CHARM
697       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
698       //handler to converse-level BigSim handler.
699       _CldEnqueue(CkMyPe(), pending, _infoIdx);
700 #else
701       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
702 #endif
703     }
704     CkpvAccess(_groupTable)->find(groupID).clearPending();
705   }
706   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
707
708   CkpvAccess(_currentGroup) = groupID;
709   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
710
711 #ifndef CMK_CHARE_USE_PTR
712   int callingChareIdx = CkpvAccess(currentChareIdx);
713   CkpvAccess(currentChareIdx) = -1;
714 #endif
715
716   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
717
718 #ifndef CMK_CHARE_USE_PTR
719   CkpvAccess(currentChareIdx) = callingChareIdx;
720 #endif
721
722   _STATS_RECORD_PROCESS_GROUP_1();
723 }
724
725 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
726 {
727   int gIdx = _entryTable[epIdx]->chareIdx;
728   size_t objSize=_chareTable[gIdx]->size;
729   void *obj = malloc(objSize);
730   _MEMCHECK(obj);
731   setMemoryTypeChare(obj);
732   CkpvAccess(_currentGroup) = groupID;
733
734 // Now that the NodeGroup is created, add it to the table.
735 //  NodeGroups can be accessed by multiple processors, so
736 //  this is in the opposite order from groups - invoking the constructor
737 //  before registering it.
738 // User may call CkLocalNodeBranch() inside the nodegroup constructor
739 //  store nodegroup into _currentNodeGroupObj
740   CkpvAccess(_currentNodeGroupObj) = obj;
741
742 #ifndef CMK_CHARE_USE_PTR
743   int callingChareIdx = CkpvAccess(currentChareIdx);
744   CkpvAccess(currentChareIdx) = -1;
745 #endif
746
747   _invokeEntryNoTrace(epIdx,env,obj);
748
749 #ifndef CMK_CHARE_USE_PTR
750   CkpvAccess(currentChareIdx) = callingChareIdx;
751 #endif
752
753   CkpvAccess(_currentNodeGroupObj) = NULL;
754   _STATS_RECORD_PROCESS_NODE_GROUP_1();
755
756   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
757   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
758   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
759   CksvAccess(_nodeGroupIDTable).push_back(groupID);
760
761   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
762   if(ptrq) {
763     void *pending;
764     while((pending=ptrq->deq())!=0) {
765       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
766     }
767     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
768   }
769   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
770 }
771
772 void _createGroup(CkGroupID groupID, envelope *env)
773 {
774   _CHECK_USED(env);
775   _SET_USED(env, 1);
776   int epIdx = env->getEpIdx();
777   int gIdx = _entryTable[epIdx]->chareIdx;
778   env->setGroupNum(groupID);
779   env->setSrcPe(CkMyPe());
780   env->setGroupEpoch(CkpvAccess(_charmEpoch));
781
782   if(CkNumPes()>1) {
783     CkPackMessage(&env);
784     CmiSetHandler(env, _bocHandlerIdx);
785     _numInitMsgs++;
786     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
787     CpvAccess(_qd)->create(CkNumPes()-1);
788     CkUnpackMessage(&env);
789   }
790   _STATS_RECORD_CREATE_GROUP_1();
791   CkCreateLocalGroup(groupID, epIdx, env);
792 }
793
794 void _createNodeGroup(CkGroupID groupID, envelope *env)
795 {
796   _CHECK_USED(env);
797   _SET_USED(env, 1);
798   int epIdx = env->getEpIdx();
799   env->setGroupNum(groupID);
800   env->setSrcPe(CkMyPe());
801   env->setGroupEpoch(CkpvAccess(_charmEpoch));
802   if(CkNumNodes()>1) {
803     CkPackMessage(&env);
804     CmiSetHandler(env, _bocHandlerIdx);
805     _numInitMsgs++;
806     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
807     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
808     CpvAccess(_qd)->create(CkNumNodes()-1);
809     CkUnpackMessage(&env);
810   }
811   _STATS_RECORD_CREATE_NODE_GROUP_1();
812   CkCreateLocalNodeGroup(groupID, epIdx, env);
813 }
814
815 // new _groupCreate
816
817 static CkGroupID _groupCreate(envelope *env)
818 {
819   CkGroupID groupNum;
820
821   // check CkMyPe(). if it is 0 then idx is _numGroups++
822   // if not, then something else...
823   if(CkMyPe() == 0)
824      groupNum.idx = CkpvAccess(_numGroups)++;
825   else
826      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
827   _createGroup(groupNum, env);
828   return groupNum;
829 }
830
831 // new _nodeGroupCreate
832 static CkGroupID _nodeGroupCreate(envelope *env)
833 {
834   CkGroupID groupNum;
835   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
836   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
837           groupNum.idx = CksvAccess(_numNodeGroups)++;
838    else
839           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
840   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
841   _createNodeGroup(groupNum, env);
842   return groupNum;
843 }
844
845 /**** generate the group idx when group is creator pe is not pe0
846  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
847  **** remaining bits contain the group creator processor number and
848  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
849
850 int _getGroupIdx(int numNodes,int myNode,int numGroups)
851 {
852         int idx;
853         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
854         int n = 32 - (x+1);                                     // number of bits remaining for the index
855         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
856                                                                 // then add the next available index
857         // of course this won't work when int is 8 bytes long on T3E
858         //idx |= 0x80000000;                                      // set the most significant bit to 1
859         idx = - idx;
860                                                                 // if int is not 32 bits, wouldn't this be wrong?
861         return idx;
862 }
863
864 extern "C"
865 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
866 {
867   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
868   envelope *env = UsrToEnv(msg);
869   env->setMsgtype(BocInitMsg);
870   env->setEpIdx(eIdx);
871   env->setSrcPe(CkMyPe());
872   _TRACE_CREATION_N(env, CkNumPes());
873   CkGroupID gid = _groupCreate(env);
874   _TRACE_CREATION_DONE(1);
875   return gid;
876 }
877
878 extern "C"
879 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
880 {
881   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
882   envelope *env = UsrToEnv(msg);
883   env->setMsgtype(NodeBocInitMsg);
884   env->setEpIdx(eIdx);
885   env->setSrcPe(CkMyPe());
886   _TRACE_CREATION_N(env, CkNumNodes());
887   CkGroupID gid = _nodeGroupCreate(env);
888   _TRACE_CREATION_DONE(1);
889   return gid;
890 }
891
892 static inline void *_allocNewChare(envelope *env, int &idx)
893 {
894   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
895   void *tmp=malloc(_chareTable[chareIdx]->size);
896   _MEMCHECK(tmp);
897 #ifndef CMK_CHARE_USE_PTR
898   CkpvAccess(chare_objs).push_back(tmp);
899   CkpvAccess(chare_types).push_back(chareIdx);
900   idx = CkpvAccess(chare_objs).size()-1;
901 #endif
902   setMemoryTypeChare(tmp);
903   return tmp;
904 }
905
906 // Method returns true if one or more group dependencies are unsatisfied
907 inline bool isGroupDepUnsatisfied(const CkCoreState *ck, const envelope *env) {
908   int groupDepNum = env->getGroupDepNum();
909   if(groupDepNum != 0) {
910     CkGroupID *groupDepPtr = (CkGroupID *)(env->getGroupDepPtr());
911     for(int i=0;i<groupDepNum;i++) {
912       CkGroupID depID = groupDepPtr[i];
913       if (!depID.isZero() && !_lookupGroupAndBufferIfNotThere(ck, env, depID)) {
914         return true;
915       }
916     }
917   }
918   return false;
919 }
920
921 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
922 {
923   if(isGroupDepUnsatisfied(ck, env))
924     return;
925   if(ck)
926     ck->process(); // ck->process() updates mProcessed count used in QD
927   int idx;
928   void *obj = _allocNewChare(env, idx);
929 #ifndef CMK_CHARE_USE_PTR
930   CkpvAccess(currentChareIdx) = idx;
931 #endif
932   _invokeEntry(env->getEpIdx(),env,obj);
933   if(ck)
934     _STATS_RECORD_PROCESS_CHARE_1();
935 }
936
937 void CkCreateLocalChare(int epIdx, envelope *env)
938 {
939   env->setEpIdx(epIdx);
940   _processNewChareMsg(NULL, env);
941 }
942
943 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
944 {
945   if(isGroupDepUnsatisfied(ck, env))
946     return;
947   ck->process(); // ck->process() updates mProcessed count used in QD
948   int idx;
949   void *obj = _allocNewChare(env, idx);
950   CkChareID *pCid = (CkChareID *)
951       _allocMsg(FillVidMsg, sizeof(CkChareID));
952   pCid->onPE = CkMyPe();
953 #ifndef CMK_CHARE_USE_PTR
954   pCid->objPtr = (void*)(CmiIntPtr)idx;
955 #else
956   pCid->objPtr = obj;
957 #endif
958   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
959   envelope *ret = UsrToEnv(pCid);
960   ret->setVidPtr(env->getVidPtr());
961   int srcPe = env->getByPe();
962   ret->setSrcPe(CkMyPe());
963   CmiSetHandler(ret, _charmHandlerIdx);
964   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
965 #ifndef CMK_CHARE_USE_PTR
966   // register the remote vidblock for deletion when chare is deleted
967   CkChareID vid;
968   vid.onPE = srcPe;
969   vid.objPtr = env->getVidPtr();
970   CkpvAccess(vmap)[idx] = vid;    
971 #endif
972   CpvAccess(_qd)->create();
973 #ifndef CMK_CHARE_USE_PTR
974   CkpvAccess(currentChareIdx) = idx;
975 #endif
976   _invokeEntry(env->getEpIdx(),env,obj);
977   _STATS_RECORD_PROCESS_CHARE_1();
978 }
979
980 /************** Receive: Chares *************/
981
982 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
983 {
984   if(isGroupDepUnsatisfied(ck, env))
985     return;
986   ck->process(); // ck->process() updates mProcessed count used in QD
987   int epIdx = env->getEpIdx();
988   int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
989   void *obj;
990   if (mainIdx != -1)  {           // mainchare
991     CmiAssert(CkMyPe()==0);
992     obj = _mainTable[mainIdx]->getObj();
993   }
994   else {
995 #ifndef CMK_CHARE_USE_PTR
996     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
997       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
998     else
999       obj = env->getObjPtr();
1000 #else
1001     obj = env->getObjPtr();
1002 #endif
1003   }
1004   _invokeEntry(epIdx,env,obj);
1005   _STATS_RECORD_PROCESS_MSG_1();
1006 }
1007
1008 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1009 {
1010   ck->process(); // ck->process() updates mProcessed count used in QD
1011 #ifndef CMK_CHARE_USE_PTR
1012   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1013 #else
1014   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1015   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1016 #endif
1017   CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1018   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1019   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1020   CmiFree(env);
1021 }
1022
1023 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1024 {
1025   ck->process(); // ck->process() updates mProcessed count used in QD
1026 #ifndef CMK_CHARE_USE_PTR
1027   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1028 #else
1029   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1030   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1031 #endif
1032   _SET_USED(env, 1);
1033   vptr->send(env);
1034 }
1035
1036 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1037 {
1038   ck->process(); // ck->process() updates mProcessed count used in QD
1039 #ifndef CMK_CHARE_USE_PTR
1040   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1041   delete vptr;
1042   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1043 #endif
1044   CmiFree(env);
1045 }
1046
1047 /************** Receive: Groups ****************/
1048
1049 /**
1050  Return a pointer to the local BOC of "groupID".
1051  The message "env" passed in has some known dependency on this groupID
1052  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1053  Therefore, if the return value is NULL, this function buffers the message so that
1054  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1055  The message passed in must have its handlers correctly set so that it can be
1056  scheduled again.
1057 */
1058 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env, const CkGroupID &groupID)
1059 {
1060
1061         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1062         IrrGroup *obj = ck->localBranch(groupID);
1063         if (obj==NULL) { /* groupmember not yet created: stash message */
1064                 ck->getGroupTable()->find(groupID).enqMsg((envelope *)env);
1065         }
1066         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1067         return obj;
1068 }
1069
1070 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1071 {
1072   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1073 }
1074
1075 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1076 {
1077 #if CMK_LBDB_ON
1078   // if there is a running obj being measured, stop it temporarily
1079   LDObjHandle objHandle;
1080   int objstopped = 0;
1081   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1082   if (the_lbdb->RunningObject(&objHandle)) {
1083     objstopped = 1;
1084     the_lbdb->ObjectStop(objHandle);
1085   }
1086 #endif
1087   _invokeEntry(epIdx,env,obj);
1088 #if CMK_LBDB_ON
1089   if (objstopped) the_lbdb->ObjectStart(objHandle);
1090 #endif
1091   _STATS_RECORD_PROCESS_BRANCH_1();
1092 }
1093
1094 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1095 {
1096   if(isGroupDepUnsatisfied(ck, env))
1097     return;
1098   CkGroupID groupID =  env->getGroupNum();
1099   IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1100   if(obj) {
1101     ck->process(); // ck->process() updates mProcessed count used in QD
1102     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1103   }
1104 }
1105
1106 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1107 {
1108   env->setEpIdx(epIdx);
1109   _invokeEntry(epIdx,env,obj);
1110   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1111 }
1112
1113 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1114 {
1115   if(isGroupDepUnsatisfied(ck, env))
1116     return;
1117   CkGroupID groupID = env->getGroupNum();
1118   void *obj;
1119
1120   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1121   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1122   if(!obj) { // groupmember not yet created
1123 #if CMK_IMMEDIATE_MSG
1124     if (CmiIsImmediate(env)) {
1125       //CmiDelayImmediate();        /* buffer immediate message */
1126       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1127     }
1128 #endif
1129     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1130     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1131     return;
1132   }
1133   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1134   ck->process(); // ck->process() updates mProcessed count used in QD
1135   _invokeEntry(env->getEpIdx(),env,obj);
1136   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1137 }
1138
1139 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1140 {
1141   if(isGroupDepUnsatisfied(ck, env))
1142     return;
1143   CkGroupID groupID = env->getGroupNum();
1144   int epIdx = env->getEpIdx();
1145   ck->process(); // ck->process() updates mProcessed count used in QD
1146   CkCreateLocalGroup(groupID, epIdx, env);
1147 }
1148
1149 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1150 {
1151   if(isGroupDepUnsatisfied(ck, env))
1152     return;
1153   ck->process(); // ck->process() updates mProcessed count used in QD
1154   CkGroupID groupID = env->getGroupNum();
1155   int epIdx = env->getEpIdx();
1156   CkCreateLocalNodeGroup(groupID, epIdx, env);
1157 }
1158
1159 /************** Receive: Arrays *************/
1160 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1161   ArrayObjMap& object_map = CkpvAccess(array_objs);
1162   auto iter = object_map.find(env->getRecipientID());
1163   if (iter != object_map.end()) {
1164     // First see if we already have a direct pointer to the object
1165     _SET_USED(env, 0);
1166     ck->process(); // ck->process() updates mProcessed count used in QD
1167     int opts = 0;
1168     CkArrayMessage* msg = (CkArrayMessage*)EnvToUsr(env);
1169     if (msg->array_hops()>1) {
1170       CProxy_ArrayBase(env->getArrayMgr()).ckLocMgr()->multiHop(msg);
1171     }
1172     iter->second->ckInvokeEntry(env->getEpIdx(), msg, !(opts & CK_MSG_KEEP));
1173   } else {
1174     // Otherwise fallback to delivery through the array manager
1175     CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1176     if (mgr) {
1177       _SET_USED(env, 0);
1178       ck->process(); // ck->process() updates mProcessed count used in QD
1179       mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
1180     }
1181   }
1182 }
1183
1184 //BIGSIM_OOC DEBUGGING
1185 #define TELLMSGTYPE(x) //x
1186
1187 /**
1188  * This is the main converse-level handler used by all of Charm++.
1189  *
1190  * \addtogroup CriticalPathFramework
1191  */
1192 void _processHandler(void *converseMsg,CkCoreState *ck)
1193 {
1194   envelope *env = (envelope *) converseMsg;
1195
1196   MESSAGE_PHASE_CHECK(env);
1197
1198 #if CMK_ONESIDED_IMPL
1199   if(CMI_ZC_MSGTYPE(env) == CMK_ZC_P2P_SEND_MSG || CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG){
1200     envelope *prevEnv = env;
1201
1202     // Determine mode depending on the message
1203     ncpyEmApiMode mode = (CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG) ? ncpyEmApiMode::BCAST : ncpyEmApiMode::P2P;
1204
1205     env = CkRdmaIssueRgets(env, mode, prevEnv);
1206
1207     if(env) {
1208       // memcpyGet or cmaGet completed, env contains the payload and will be enqueued
1209
1210       // Free prevEnv
1211       CkFreeMsg(EnvToUsr(prevEnv));
1212     } else {
1213       // async rdma call in place, asynchronous return and ack handling
1214       return;
1215     }
1216   }
1217 #endif
1218
1219 //#if CMK_RECORD_REPLAY
1220   if (ck->watcher!=NULL) {
1221     if (!ck->watcher->processMessage(&env,ck)) return;
1222   }
1223 //#endif
1224 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1225         Chare *obj=NULL;
1226         CkObjID sender;
1227         MCount SN;
1228         MlogEntry *entry=NULL;
1229         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1230            || env->getMsgtype() == ForArrayEltMsg
1231            || env->getMsgtype() == ForChareMsg) {
1232                 sender = env->sender;
1233                 SN = env->SN;
1234                 int result = preProcessReceivedMessage(env,&obj,&entry);
1235                 if(result == 0){
1236                         return;
1237                 }
1238         }
1239 #endif
1240 #if USE_CRITICAL_PATH_HEADER_ARRAY
1241   CK_CRITICALPATH_START(env)
1242 #endif
1243
1244   switch(env->getMsgtype()) {
1245 // Group support
1246     case BocInitMsg : // Group creation message
1247       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1248       // QD processing moved inside _processBocInitMsg because it is conditional
1249       //ck->process(); 
1250       if(env->isPacked()) CkUnpackMessage(&env);
1251       _processBocInitMsg(ck,env);
1252       break;
1253     case NodeBocInitMsg : // Nodegroup creation message
1254       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1255       if(env->isPacked()) CkUnpackMessage(&env);
1256       _processNodeBocInitMsg(ck,env);
1257       break;
1258     case ForBocMsg : // Group entry method message (non creation)
1259       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1260       // QD processing moved inside _processForBocMsg because it is conditional
1261       if(env->isPacked()) CkUnpackMessage(&env);
1262       _processForBocMsg(ck,env);
1263       // stats record moved inside _processForBocMsg because it is conditional
1264       break;
1265     case ForNodeBocMsg : // Nodegroup entry method message (non creation)
1266       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1267       // QD processing moved to _processForNodeBocMsg because it is conditional
1268       if(env->isPacked()) CkUnpackMessage(&env);
1269       _processForNodeBocMsg(ck,env);
1270       // stats record moved to _processForNodeBocMsg because it is conditional
1271       break;
1272
1273 // Array support
1274     case ForArrayEltMsg: // Array element entry method message
1275       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1276       if(env->isPacked()) CkUnpackMessage(&env);
1277       _processArrayEltMsg(ck,env);
1278       break;
1279
1280 // Chare support
1281     case NewChareMsg : // Singleton chare creation message
1282       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1283       if(env->isPacked()) CkUnpackMessage(&env);
1284       _processNewChareMsg(ck,env);
1285       break;
1286     case NewVChareMsg : // Singleton virtual chare creation message
1287       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1288       if(env->isPacked()) CkUnpackMessage(&env);
1289       _processNewVChareMsg(ck,env);
1290       break;
1291     case ForChareMsg : // Singeton chare entry method message (non creation)
1292       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1293       if(env->isPacked()) CkUnpackMessage(&env);
1294       _processForPlainChareMsg(ck,env);
1295       break;
1296     case ForVidMsg   : // Singleton virtual chare entry method message (non creation)
1297       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1298       _processForVidMsg(ck,env);
1299       break;
1300     case FillVidMsg  : // Message sent back from the real chare PE to the virtual chare PE to
1301                        // fill the VidBlock (called when the real chare is constructed)
1302       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1303       _processFillVidMsg(ck,env);
1304       break;
1305     case DeleteVidMsg  : // Message sent back from the real chare PE to the virtual chare PE to
1306                          // delete the Vidblock (called when the real chare is deleted by the destructor)
1307       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1308       _processDeleteVidMsg(ck,env);
1309       break;
1310
1311     default:
1312       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1313   }
1314 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1315         if(obj != NULL){
1316                 postProcessReceivedMessage(obj,sender,SN,entry);
1317         }
1318 #endif
1319
1320
1321 #if USE_CRITICAL_PATH_HEADER_ARRAY
1322   CK_CRITICALPATH_END()
1323 #endif
1324
1325 }
1326
1327
1328 /******************** Message Send **********************/
1329
1330 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1331              int *queueing, int *priobits, unsigned int **prioptr)
1332 {
1333   envelope *env = (envelope *)converseMsg;
1334   *pfn = (CldPackFn)CkPackMessage;
1335   *len = env->getTotalsize();
1336   *queueing = env->getQueueing();
1337   *priobits = env->getPriobits();
1338   *prioptr = (unsigned int *) env->getPrioPtr();
1339 }
1340
1341 void CkPackMessage(envelope **pEnv)
1342 {
1343   envelope *env = *pEnv;
1344   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1345     void *msg = EnvToUsr(env);
1346     _TRACE_BEGIN_PACK();
1347     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1348     _TRACE_END_PACK();
1349     env=UsrToEnv(msg);
1350     env->setPacked(1);
1351     *pEnv = env;
1352   }
1353 }
1354
1355 void CkUnpackMessage(envelope **pEnv)
1356 {
1357   envelope *env = *pEnv;
1358   int msgIdx = env->getMsgIdx();
1359   if(env->isPacked()) {
1360     void *msg = EnvToUsr(env);
1361     _TRACE_BEGIN_UNPACK();
1362     msg = _msgTable[msgIdx]->unpack(msg);
1363     _TRACE_END_UNPACK();
1364     env=UsrToEnv(msg);
1365     env->setPacked(0);
1366     *pEnv = env;
1367   }
1368 }
1369
1370 //There's no reason for most messages to go through the Cld--
1371 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1372 // Thus these accellerated versions of the Cld calls.
1373 #if CMK_OBJECT_QUEUE_AVAILABLE
1374 static int index_objectQHandler;
1375 #endif
1376 int index_tokenHandler;
1377 int index_skipCldHandler;
1378
1379 void _skipCldHandler(void *converseMsg)
1380 {
1381   envelope *env = (envelope *)(converseMsg);
1382   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1383 #if CMK_GRID_QUEUE_AVAILABLE
1384   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1385     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1386                        env, env->getQueueing (), env->getPriobits (),
1387                        (unsigned int *) env->getPrioPtr ());
1388   } else {
1389     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1390                        env, env->getQueueing (), env->getPriobits (),
1391                        (unsigned int *) env->getPrioPtr ());
1392   }
1393 #else
1394   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1395         env, env->getQueueing(),env->getPriobits(),
1396         (unsigned int *)env->getPrioPtr());
1397 #endif
1398 }
1399
1400
1401 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1402 // Made non-static to be used by ckmessagelogging
1403 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1404 {
1405 #if CMK_CHARMDEBUG
1406   if (!ConverseDeliver(pe)) {
1407     CmiFree(env);
1408     return;
1409   }
1410 #endif
1411
1412 #if CMK_ONESIDED_IMPL
1413   // Store source information to handle acknowledgements on completion
1414   if(CMI_IS_ZC_BCAST(env))
1415     CkRdmaPrepareBcastMsg(env);
1416 #endif
1417
1418 #if CMK_FAULT_EVAC
1419   if(pe == CkMyPe() ){
1420     if(!CmiNodeAlive(CkMyPe())){
1421         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1422 //      return;
1423     }
1424   }
1425 #endif
1426   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1427 #if CMK_OBJECT_QUEUE_AVAILABLE
1428     Chare *obj = CkFindObjectPtr(env);
1429     if (obj && obj->CkGetObjQueue().queue()) {
1430       _enqObjQueue(obj, env);
1431     }
1432     else
1433 #endif
1434     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1435         env, env->getQueueing(),env->getPriobits(),
1436         (unsigned int *)env->getPrioPtr());
1437 #if CMK_PERSISTENT_COMM
1438     CmiPersistentOneSend();
1439 #endif
1440   } else {
1441     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1442       CkPackMessage(&env);
1443     int len=env->getTotalsize();
1444     CmiSetXHandler(env,CmiGetHandler(env));
1445 #if CMK_OBJECT_QUEUE_AVAILABLE
1446     CmiSetHandler(env,index_objectQHandler);
1447 #else
1448     CmiSetHandler(env,index_skipCldHandler);
1449 #endif
1450     CmiSetInfo(env,infoFn);
1451     if (pe==CLD_BROADCAST) {
1452 #if CMK_MESSAGE_LOGGING
1453         if(env->flags & CK_FREE_MSG_MLOG)
1454                 CmiSyncBroadcastAndFree(len, (char *)env); 
1455         else
1456                 CmiSyncBroadcast(len, (char *)env);
1457 #else
1458                         CmiSyncBroadcastAndFree(len, (char *)env); 
1459 #endif
1460
1461 }
1462     else if (pe==CLD_BROADCAST_ALL) { 
1463 #if CMK_MESSAGE_LOGGING
1464         if(env->flags & CK_FREE_MSG_MLOG)
1465                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1466         else
1467                 CmiSyncBroadcastAll(len, (char *)env);
1468 #else
1469                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1470 #endif
1471
1472 }
1473     else{
1474 #if CMK_MESSAGE_LOGGING
1475         if(env->flags & CK_FREE_MSG_MLOG)
1476                 CmiSyncSendAndFree(pe, len, (char *)env);
1477         else
1478                 CmiSyncSend(pe, len, (char *)env);
1479 #else
1480                         CmiSyncSendAndFree(pe, len, (char *)env);
1481 #endif
1482
1483                 }
1484   }
1485 }
1486
1487 #if CMK_BIGSIM_CHARM
1488 #   define  _skipCldEnqueue   _CldEnqueue
1489 #endif
1490
1491 // by pass Charm++ priority queue, send as Converse message
1492 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1493 {
1494 #if CMK_CHARMDEBUG
1495   if (!ConverseDeliver(-1)) {
1496     CmiFree(env);
1497     return;
1498   }
1499 #endif
1500   CkPackMessage(&env);
1501   int len=env->getTotalsize();
1502   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1503 }
1504
1505 static void _noCldEnqueue(int pe, envelope *env)
1506 {
1507 /*
1508   if (pe == CkMyPe()) {
1509     CmiHandleMessage(env);
1510   } else
1511 */
1512 #if CMK_CHARMDEBUG
1513   if (!ConverseDeliver(pe)) {
1514     CmiFree(env);
1515     return;
1516   }
1517 #endif
1518
1519 #if CMK_ONESIDED_IMPL
1520   // Store source information to handle acknowledgements on completion
1521   if(CMI_IS_ZC_BCAST(env))
1522     CkRdmaPrepareBcastMsg(env);
1523 #endif
1524
1525   CkPackMessage(&env);
1526   int len=env->getTotalsize();
1527   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1528   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1529   else CmiSyncSendAndFree(pe, len, (char *)env);
1530 }
1531
1532 //static void _noCldNodeEnqueue(int node, envelope *env)
1533 //Made non-static to be used by ckmessagelogging
1534 void _noCldNodeEnqueue(int node, envelope *env)
1535 {
1536 /*
1537   if (node == CkMyNode()) {
1538     CmiHandleMessage(env);
1539   } else {
1540 */
1541 #if CMK_CHARMDEBUG
1542   if (!ConverseDeliver(node)) {
1543     CmiFree(env);
1544     return;
1545   }
1546 #endif
1547
1548 #if CMK_ONESIDED_IMPL
1549   // Store source information to handle acknowledgements on completion
1550   if(CMI_IS_ZC_BCAST(env))
1551     CkRdmaPrepareBcastMsg(env);
1552 #endif
1553
1554   CkPackMessage(&env);
1555   int len=env->getTotalsize();
1556   if (node==CLD_BROADCAST) { 
1557 #if CMK_MESSAGE_LOGGING
1558         if(env->flags & CK_FREE_MSG_MLOG)
1559                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1560         else
1561                 CmiSyncNodeBroadcast(len, (char *)env);
1562 #else
1563         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1564 #endif
1565 }
1566   else if (node==CLD_BROADCAST_ALL) { 
1567 #if CMK_MESSAGE_LOGGING
1568         if(env->flags & CK_FREE_MSG_MLOG)
1569                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1570         else
1571                 CmiSyncNodeBroadcastAll(len, (char *)env);
1572 #else
1573                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1574 #endif
1575
1576 }
1577   else {
1578 #if CMK_MESSAGE_LOGGING
1579         if(env->flags & CK_FREE_MSG_MLOG)
1580                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1581         else
1582                 CmiSyncNodeSend(node, len, (char *)env);
1583 #else
1584         CmiSyncNodeSendAndFree(node, len, (char *)env);
1585 #endif
1586   }
1587 }
1588
1589 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1590 {
1591   envelope *env = UsrToEnv(msg);
1592   _CHECK_USED(env);
1593   _SET_USED(env, 1);
1594 #if CMK_REPLAYSYSTEM
1595   setEventID(env);
1596 #endif
1597   env->setMsgtype(ForChareMsg);
1598   env->setEpIdx(eIdx);
1599   env->setSrcPe(CkMyPe());
1600
1601 #if USE_CRITICAL_PATH_HEADER_ARRAY
1602   CK_CRITICALPATH_SEND(env)
1603   //CK_AUTOMATE_PRIORITY(env)
1604 #endif
1605 #if CMK_CHARMDEBUG
1606   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1607 #endif
1608 #if CMK_OBJECT_QUEUE_AVAILABLE
1609   CmiSetHandler(env, index_objectQHandler);
1610 #else
1611   CmiSetHandler(env, _charmHandlerIdx);
1612 #endif
1613   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1614     int pe = -(pCid->onPE+1);
1615     if(pe==CkMyPe()) {
1616 #ifndef CMK_CHARE_USE_PTR
1617       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1618 #else
1619       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1620 #endif
1621       void *objPtr;
1622       if (NULL!=(objPtr=vblk->getLocalChare()))
1623       { //A ready local chare
1624         env->setObjPtr(objPtr);
1625         return pe;
1626       }
1627       else { //The vidblock is not ready-- forget it
1628         vblk->send(env);
1629         return -1;
1630       }
1631     } else { //Valid vidblock for another PE:
1632       env->setMsgtype(ForVidMsg);
1633       env->setVidPtr(pCid->objPtr);
1634       return pe;
1635     }
1636   }
1637   else {
1638     env->setObjPtr(pCid->objPtr);
1639     return pCid->onPE;
1640   }
1641 }
1642
1643 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1644 {
1645   int destPE = _prepareMsg(eIdx, msg, pCid);
1646   if (destPE != -1) {
1647     envelope *env = UsrToEnv(msg);
1648     //criticalPath_send(env);
1649 #if USE_CRITICAL_PATH_HEADER_ARRAY
1650     CK_CRITICALPATH_SEND(env)
1651     //CK_AUTOMATE_PRIORITY(env)
1652 #endif
1653     CmiBecomeImmediate(env);
1654   }
1655   return destPE;
1656 }
1657
1658 extern "C"
1659 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1660 {
1661   if (opts & CK_MSG_INLINE) {
1662     CkSendMsgInline(entryIdx, msg, pCid, opts);
1663     return;
1664   }
1665   envelope *env = UsrToEnv(msg);
1666 #if CMK_ERROR_CHECKING
1667   //Allow rdma metadata messages marked as immediate to go through
1668   if (opts & CK_MSG_IMMEDIATE && (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)) {
1669     CmiAbort("Immediate message is not allowed in Chare!");
1670   }
1671 #endif
1672   int destPE=_prepareMsg(entryIdx,msg,pCid);
1673   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1674   // VidBlock was not yet filled). The problem is that the creation was never
1675   // traced later when the VidBlock was filled. One solution is to trace the
1676   // creation here, the other to trace it in VidBlock->msgDeliver().
1677 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1678   if (destPE!=-1) {
1679     CpvAccess(_qd)->create();
1680   }
1681         sendChareMsg(env,destPE,_infoIdx,pCid);
1682 #else
1683   _TRACE_CREATION_1(env);
1684   if (destPE!=-1) {
1685     CpvAccess(_qd)->create();
1686     if (opts & CK_MSG_SKIP_OR_IMM)
1687       _noCldEnqueue(destPE, env);
1688     else
1689       _CldEnqueue(destPE, env, _infoIdx);
1690   }
1691   _TRACE_CREATION_DONE(1);
1692 #endif
1693 }
1694
1695 extern "C"
1696 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1697 {
1698   if (pCid->onPE==CkMyPe())
1699   {
1700 #if CMK_FAULT_EVAC
1701     if(!CmiNodeAlive(CkMyPe())){
1702         return;
1703     }
1704 #endif
1705 #if CMK_CHARMDEBUG
1706     //Just in case we need to breakpoint or use the envelope in some way
1707     _prepareMsg(entryIndex,msg,pCid);
1708 #endif
1709                 //Just directly call the chare (skip QD handling & scheduler)
1710     envelope *env = UsrToEnv(msg);
1711     if (env->isPacked()) CkUnpackMessage(&env);
1712     _STATS_RECORD_PROCESS_MSG_1();
1713     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1714   }
1715   else {
1716     //No way to inline a cross-processor message:
1717     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1718   }
1719 }
1720
1721 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1722 {
1723   envelope *env = UsrToEnv(msg);
1724   /*#if CMK_ERROR_CHECKING
1725   CkNodeGroupID nodeRedMgr;
1726 #endif
1727   */
1728   _CHECK_USED(env);
1729   _SET_USED(env, 1);
1730 #if CMK_REPLAYSYSTEM
1731   setEventID(env);
1732 #endif
1733   env->setMsgtype(type);
1734   env->setEpIdx(eIdx);
1735   env->setGroupNum(gID);
1736   env->setSrcPe(CkMyPe());
1737   /*
1738 #if CMK_ERROR_CHECKING
1739   nodeRedMgr.setZero();
1740   env->setRednMgr(nodeRedMgr);
1741 #endif
1742 */
1743   //criticalPath_send(env);
1744 #if USE_CRITICAL_PATH_HEADER_ARRAY
1745   CK_CRITICALPATH_SEND(env)
1746   //CK_AUTOMATE_PRIORITY(env)
1747 #endif
1748 #if CMK_CHARMDEBUG
1749   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1750 #endif
1751   CmiSetHandler(env, _charmHandlerIdx);
1752   return env;
1753 }
1754
1755 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1756 {
1757   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1758 #if USE_CRITICAL_PATH_HEADER_ARRAY
1759   CK_CRITICALPATH_SEND(env)
1760   //CK_AUTOMATE_PRIORITY(env)
1761 #endif
1762   CmiBecomeImmediate(env);
1763   return env;
1764 }
1765
1766 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1767                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1768 {
1769   int numPes;
1770   envelope *env;
1771     if (opts & CK_MSG_IMMEDIATE) {
1772         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1773     }else
1774     {
1775         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1776     }
1777
1778 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1779         sendGroupMsg(env,pe,_infoIdx);
1780 #else
1781   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1782   _TRACE_CREATION_N(env, numPes);
1783   if (opts & CK_MSG_SKIP_OR_IMM)
1784     _noCldEnqueue(pe, env);
1785   else
1786     _skipCldEnqueue(pe, env, _infoIdx);
1787   _TRACE_CREATION_DONE(1);
1788 #endif
1789 }
1790
1791 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1792                            int npes, int *pes)
1793 {
1794   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1795   _TRACE_CREATION_MULTICAST(env, npes, pes);
1796   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1797   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1798 }
1799
1800 extern "C"
1801 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1802 {
1803 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1804   if (destPE==CkMyPe())
1805   {
1806     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1807     return;
1808   }
1809   //Can't inline-- send the usual way
1810   envelope *env = UsrToEnv(msg);
1811   int numPes;
1812   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1813   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1814   _TRACE_CREATION_N(env, numPes);
1815   _noCldEnqueue(destPE, env);
1816   _STATS_RECORD_SEND_BRANCH_1();
1817   CkpvAccess(_coreState)->create();
1818   _TRACE_CREATION_DONE(1);
1819 #else
1820   // no support for immediate message, send inline
1821   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1822 #endif
1823 }
1824
1825 extern "C"
1826 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1827 {
1828   if (destPE==CkMyPe())
1829   {
1830 #if CMK_FAULT_EVAC
1831     if(!CmiNodeAlive(CkMyPe())){
1832         return;
1833     }
1834 #endif
1835     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1836     if (obj!=NULL)
1837     { //Just directly call the group:
1838 #if CMK_ERROR_CHECKING
1839       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1840 #else
1841       envelope *env=UsrToEnv(msg);
1842 #endif
1843       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1844       return;
1845     }
1846   }
1847   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1848   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1849 }
1850
1851 extern "C"
1852 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1853 {
1854   if (opts & CK_MSG_INLINE) {
1855     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1856     return;
1857   }
1858   envelope *env=UsrToEnv(msg);
1859   //Allow rdma metadata messages marked as immediate to go through
1860   if (opts & CK_MSG_IMMEDIATE && (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)) {
1861     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1862     return;
1863   }
1864   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1865   _STATS_RECORD_SEND_BRANCH_1();
1866   CkpvAccess(_coreState)->create();
1867 }
1868
1869 extern "C"
1870 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1871 {
1872 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1873   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1874   _TRACE_CREATION_MULTICAST(env, npes, pes);
1875   _noCldEnqueueMulti(npes, pes, env);
1876   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1877 #else
1878   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1879   CpvAccess(_qd)->create(-npes);
1880 #endif
1881   _STATS_RECORD_SEND_BRANCH_N(npes);
1882   CpvAccess(_qd)->create(npes);
1883 }
1884
1885 extern "C"
1886 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1887 {
1888   if (opts & CK_MSG_IMMEDIATE) {
1889     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1890     return;
1891   }
1892     // normal mesg
1893   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1894   _STATS_RECORD_SEND_BRANCH_N(npes);
1895   CpvAccess(_qd)->create(npes);
1896 }
1897
1898 extern "C"
1899 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1900 {
1901   int npes;
1902   int *pes;
1903   if (opts & CK_MSG_IMMEDIATE) {
1904     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1905     return;
1906   }
1907     // normal mesg
1908   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1909   CmiLookupGroup(grp, &npes, &pes);
1910   _TRACE_CREATION_MULTICAST(env, npes, pes);
1911   _CldEnqueueGroup(grp, env, _infoIdx);
1912   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1913   _STATS_RECORD_SEND_BRANCH_N(npes);
1914   CpvAccess(_qd)->create(npes);
1915 }
1916
1917 extern "C"
1918 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1919 {
1920   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1921   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1922   CpvAccess(_qd)->create(CkNumPes());
1923 }
1924
1925 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1926                 int node=CLD_BROADCAST_ALL, int opts=0)
1927 {
1928     int numPes;
1929     envelope *env;
1930     if (opts & CK_MSG_IMMEDIATE) {
1931         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1932     }else
1933     {
1934         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1935     }
1936 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1937     sendNodeGroupMsg(env,node,_infoIdx);
1938 #else
1939   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1940   _TRACE_CREATION_N(env, numPes);
1941   if (opts & CK_MSG_SKIP_OR_IMM) {
1942     _noCldNodeEnqueue(node, env);
1943   }
1944   else
1945     _CldNodeEnqueue(node, env, _infoIdx);
1946   _TRACE_CREATION_DONE(1);
1947 #endif
1948 }
1949
1950 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1951                            int npes, int *nodes)
1952 {
1953   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1954   _TRACE_CREATION_N(env, npes);
1955   for (int i=0; i<npes; i++) {
1956     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1957   }
1958   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1959 }
1960
1961 extern "C"
1962 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1963 {
1964 #if CMK_IMMEDIATE_MSG
1965   if (node==CkMyNode())
1966   {
1967     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1968     return;
1969   }
1970   //Can't inline-- send the usual way
1971   envelope *env = UsrToEnv(msg);
1972   int numPes;
1973   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1974   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1975   _TRACE_CREATION_N(env, numPes);
1976   _noCldNodeEnqueue(node, env);
1977   _STATS_RECORD_SEND_BRANCH_1();
1978   CkpvAccess(_coreState)->create();
1979   _TRACE_CREATION_DONE(1);
1980 #else
1981   // no support for immediate message, send inline
1982   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1983 #endif
1984 }
1985
1986 extern "C"
1987 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1988 {
1989   if (node==CkMyNode() && CMI_ZC_MSGTYPE((envelope *)(UsrToEnv(msg))) == CMK_REG_NO_ZC_MSG)
1990   {
1991     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1992     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1993     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1994     if (obj!=NULL)
1995     { //Just directly call the group:
1996 #if CMK_ERROR_CHECKING
1997       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1998 #else
1999       envelope *env=UsrToEnv(msg);
2000 #endif
2001       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
2002       return;
2003     }
2004   }
2005   //Can't inline-- send the usual way
2006   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
2007 }
2008
2009 extern "C"
2010 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
2011 {
2012   if (opts & CK_MSG_INLINE) {
2013     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
2014     return;
2015   }
2016   if (opts & CK_MSG_IMMEDIATE) {
2017     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
2018     return;
2019   }
2020   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
2021   _STATS_RECORD_SEND_NODE_BRANCH_1();
2022   CkpvAccess(_coreState)->create();
2023 }
2024
2025 extern "C"
2026 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
2027 {
2028 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
2029   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
2030   _noCldEnqueueMulti(npes, nodes, env);
2031 #else
2032   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2033   CpvAccess(_qd)->create(-npes);
2034 #endif
2035   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2036   CpvAccess(_qd)->create(npes);
2037 }
2038
2039 extern "C"
2040 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2041 {
2042   if (opts & CK_MSG_IMMEDIATE) {
2043     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2044     return;
2045   }
2046     // normal mesg
2047   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2048   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2049   CpvAccess(_qd)->create(npes);
2050 }
2051
2052 extern "C"
2053 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2054 {
2055   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2056   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2057   CpvAccess(_qd)->create(CkNumNodes());
2058 }
2059
2060 //Needed by delegation manager:
2061 extern "C"
2062 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2063 { return _prepareMsg(eIdx,msg,pCid); }
2064 extern "C"
2065 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2066 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2067 extern "C"
2068 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2069 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2070
2071 void _ckModuleInit(void) {
2072         CmiAssignOnce(&index_skipCldHandler, CkRegisterHandler(_skipCldHandler));
2073 #if CMK_OBJECT_QUEUE_AVAILABLE
2074         CmiAssignOnce(&index_objectQHandler, CkRegisterHandler(_ObjectQHandler));
2075 #endif
2076         CmiAssignOnce(&index_tokenHandler, CkRegisterHandler(_TokenHandler));
2077         CkpvInitialize(TokenPool*, _tokenPool);
2078         CkpvAccess(_tokenPool) = new TokenPool;
2079 }
2080
2081
2082 /************** Send: Arrays *************/
2083
2084 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2085 {
2086   _CHECK_USED(env);
2087   _SET_USED(env, 1);
2088   env->setMsgtype(type);
2089 #if CMK_CHARMDEBUG
2090   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2091 #endif
2092   CmiSetHandler(env, _charmHandlerIdx);
2093   CpvAccess(_qd)->create();
2094 }
2095
2096 extern "C"
2097 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2098   envelope *env = UsrToEnv(msg);
2099   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2100 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2101         sendArrayMsg(env,pe,_infoIdx);
2102 #else
2103   if (opts & CK_MSG_IMMEDIATE)
2104     CmiBecomeImmediate(env);
2105   if (opts & CK_MSG_SKIP_OR_IMM)
2106     _noCldEnqueue(pe, env);
2107   else
2108     _skipCldEnqueue(pe, env, _infoIdx);
2109 #endif
2110 }
2111
2112 class ElementDestroyer : public CkLocIterator {
2113 private:
2114         CkLocMgr *locMgr;
2115 public:
2116         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2117         void addLocation(CkLocation &loc) {
2118           loc.destroyAll();
2119         }
2120 };
2121
2122 void CkDeleteChares() {
2123   int i;
2124   int numGroups = CkpvAccess(_groupIDTable)->size();
2125
2126   // delete all plain chares
2127 #ifndef CMK_CHARE_USE_PTR
2128   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2129         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2130         delete obj;
2131         CkpvAccess(chare_objs)[i] = NULL;
2132   }
2133   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2134         VidBlock *obj = CkpvAccess(vidblocks)[i];
2135         delete obj;
2136         CkpvAccess(vidblocks)[i] = NULL;
2137   }
2138 #endif
2139
2140   // delete all array elements
2141   for(i=0;i<numGroups;i++) {
2142     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2143     if(obj && obj->isLocMgr())  {
2144       CkLocMgr *mgr = (CkLocMgr*)obj;
2145       ElementDestroyer destroyer(mgr);
2146       mgr->iterate(destroyer);
2147     }
2148   }
2149
2150   // delete all groups
2151   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2152   for(i=0;i<numGroups;i++) {
2153     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2154     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2155     if (obj) delete obj;
2156   }
2157   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2158
2159   // delete all node groups
2160   if (CkMyRank() == 0) {
2161     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2162     for(i=0;i<numNodeGroups;i++) {
2163       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2164       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2165       if (obj) delete obj;
2166     }
2167   }
2168 }
2169
2170 #if CMK_BIGSIM_CHARM
2171 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2172                                    int pb,unsigned int *prio);
2173 #endif
2174
2175 //------------------- External client support (e.g. Charm4py) ----------------
2176
2177 static std::vector< std::vector<char> > ext_args;
2178 static std::vector<char*> ext_argv;
2179
2180 // This is just a wrapper for ConverseInit that copies command-line args into a private
2181 // buffer.
2182 // To be called from external clients like charm4py. This wrapper avoids issues with
2183 // ctypes and cffi.
2184 extern "C" void StartCharmExt(int argc, char **argv) {
2185 #if !defined(_WIN32) && !NODE_0_IS_CONVHOST
2186   // only do this in net layers if using charmrun, so that output of process 0
2187   // doesn't get duplicated
2188   char *ns = getenv("NETSTART");
2189   if (ns != 0) {
2190     int fd;
2191     if (-1 != (fd = open("/dev/null", O_RDWR))) {
2192       dup2(fd, 0);
2193       dup2(fd, 1);
2194       dup2(fd, 2);
2195     }
2196   }
2197 #endif
2198   ext_args.resize(argc);
2199   ext_argv.resize(argc + 1, NULL);
2200   for (int i=0; i < argc; i++) {
2201     ext_args[i].resize(strlen(argv[i]) + 1);
2202     strcpy(ext_args[i].data(), argv[i]);
2203     ext_argv[i] = ext_args[i].data();
2204   }
2205   ConverseInit(argc, ext_argv.data(), _initCharm, 0, 0);
2206 }
2207
2208 void (*CkRegisterMainModuleCallback)() = NULL;
2209 extern "C" void registerCkRegisterMainModuleCallback(void (*cb)()) {
2210   CkRegisterMainModuleCallback = cb;
2211 }
2212
2213 void (*MainchareCtorExtCallback)(int, void*, int, int, char **) = NULL;
2214 extern "C" void registerMainchareCtorExtCallback(void (*cb)(int, void*, int, int, char **)) {
2215   MainchareCtorExtCallback = cb;
2216 }
2217
2218 void (*ReadOnlyRecvExtCallback)(int, char*) = NULL;
2219 extern "C" void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)) {
2220   ReadOnlyRecvExtCallback = cb;
2221 }
2222
2223 void* ReadOnlyExt::ro_data = NULL;
2224 size_t ReadOnlyExt::data_size = 0;
2225
2226 void (*ChareMsgRecvExtCallback)(int, void*, int, int, char *, int) = NULL;
2227 extern "C" void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char *, int)) {
2228   ChareMsgRecvExtCallback = cb;
2229 }
2230
2231 void (*GroupMsgRecvExtCallback)(int, int, int, char *, int) = NULL;
2232 extern "C" void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)) {
2233   GroupMsgRecvExtCallback = cb;
2234 }
2235
2236 void (*ArrayMsgRecvExtCallback)(int, int, int *, int, int, char *, int) = NULL;
2237 extern "C" void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)) {
2238   ArrayMsgRecvExtCallback = cb;
2239 }
2240
2241 int (*ArrayElemLeaveExt)(int, int, int *, char**, int) = NULL;
2242 extern "C" void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)) {
2243   ArrayElemLeaveExt = cb;
2244 }
2245
2246 void (*ArrayElemJoinExt)(int, int, int *, int, char*, int) = NULL;
2247 extern "C" void registerArrayElemJoinExtCallback(void (*cb)(int, int, int *, int, char*, int)) {
2248   ArrayElemJoinExt = cb;
2249 }
2250
2251 void (*ArrayResumeFromSyncExtCallback)(int, int, int *) = NULL;
2252 extern "C" void registerArrayResumeFromSyncExtCallback(void (*cb)(int, int, int *)) {
2253   ArrayResumeFromSyncExtCallback = cb;
2254 }
2255
2256 void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*) = NULL;
2257 extern "C" void registerCreateReductionTargetMsgExtCallback(void (*cb)(void*, int, int, int, char**, int*)) {
2258   CreateReductionTargetMsgExt = cb;
2259 }
2260
2261 int (*PyReductionExt)(char**, int*, int, char**) = NULL;
2262 extern "C" void registerPyReductionExtCallback(int (*cb)(char**, int*, int, char**)) {
2263     PyReductionExt = cb;
2264 }
2265
2266 int (*ArrayMapProcNumExtCallback)(int, int, const int *) = NULL;
2267 extern "C" void registerArrayMapProcNumExtCallback(int (*cb)(int, int, const int *)) {
2268   ArrayMapProcNumExtCallback = cb;
2269 }
2270
2271 extern "C" int CkMyPeHook() { return CkMyPe(); }
2272 extern "C" int CkNumPesHook() { return CkNumPes(); }
2273
2274 void ReadOnlyExt::setData(void *msg, size_t msgSize) {
2275   ro_data = malloc(msgSize);
2276   memcpy(ro_data, msg, msgSize);
2277   data_size = msgSize;
2278 }
2279
2280 void ReadOnlyExt::_roPup(void *pup_er) {
2281   PUP::er &p=*(PUP::er *)pup_er;
2282   if (!p.isUnpacking()) {
2283     //printf("[%d] Sizing/packing data, ro_data=%p, data_size=%d\n", CkMyPe(), ro_data, int(data_size));
2284     p | data_size;
2285     p((char*)ro_data, data_size);
2286   } else {
2287     CkAssert(CkMyPe() != 0);
2288     CkAssert(ro_data == NULL);
2289     PUP::fromMem &p_mem = *(PUP::fromMem *)pup_er;
2290     p_mem | data_size;
2291     //printf("[%d] Unpacking ro, size of data to unpack is %d\n", CkMyPe(), int(data_size));
2292     ReadOnlyRecvExtCallback(int(data_size), p_mem.get_current_pointer());
2293     p_mem.advance(data_size);
2294   }
2295 }
2296
2297 CkpvExtern(int, _currentChareType);
2298
2299 MainchareExt::MainchareExt(CkArgMsg *m) {
2300   int cIdx = CkpvAccess(_currentChareType);
2301   //printf("Constructor of MainchareExt, chareId=(%d,%p), chareIdx=%d\n", thishandle.onPE, thishandle.objPtr, cIdx);
2302   int ctorEpIdx =  _mainTable[_chareTable[cIdx]->mainChareType()]->entryIdx;
2303   MainchareCtorExtCallback(thishandle.onPE, thishandle.objPtr, ctorEpIdx, m->argc, m->argv);
2304   delete m;
2305 }
2306
2307 GroupExt::GroupExt(void *impl_msg) {
2308   //printf("Constructor of GroupExt, gid=%d\n", thisgroup.idx);
2309   //int chareIdx = CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
2310   int chareIdx = ckGetChareType();
2311   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2312   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2313   char *impl_buf = impl_msg_typed->msgBuf;
2314   PUP::fromMem implP(impl_buf);
2315   int msgSize; implP|msgSize;
2316   int dcopy_start; implP|dcopy_start;
2317   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2318                           dcopy_start);
2319 }
2320
2321 ArrayMapExt::ArrayMapExt(void *impl_msg) {
2322   //printf("Constructor of ArrayMapExt, gid=%d\n", thisgroup.idx);
2323   int chareIdx = ckGetChareType();
2324   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2325   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2326   char *impl_buf = impl_msg_typed->msgBuf;
2327   PUP::fromMem implP(impl_buf);
2328   int msgSize; implP|msgSize;
2329   int dcopy_start; implP|dcopy_start;
2330   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2331                           dcopy_start);
2332 }
2333
2334 // TODO options
2335 extern "C"
2336 int CkCreateGroupExt(int cIdx, int eIdx, int num_bufs, char **bufs, int *buf_sizes) {
2337   //static_cast<void>(impl_e_opts);
2338   CkAssert(num_bufs >= 1);
2339   int totalSize = 0;
2340   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2341   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2);
2342   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2343   PUP::toMem implP((void *)impl_msg->msgBuf);
2344   implP|totalSize;
2345   implP|buf_sizes[0];
2346   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2347   UsrToEnv(impl_msg)->setMsgtype(BocInitMsg);
2348   //if (impl_e_opts)
2349   //  UsrToEnv(impl_msg)->setGroupDep(impl_e_opts->getGroupDepID());
2350   CkGroupID gId = CkCreateGroup(cIdx, eIdx, impl_msg);
2351   return gId.idx;
2352 }
2353
2354 // TODO options
2355 extern "C"
2356 int CkCreateArrayExt(int cIdx, int ndims, int *dims, int eIdx, int num_bufs,
2357                      char **bufs, int *buf_sizes, int map_gid, char useAtSync) {
2358   //static_cast<void>(impl_e_opts);
2359   CkAssert(num_bufs >= 1);
2360   int totalSize = 0;
2361   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2362   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2363   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2364   PUP::toMem implP((void *)impl_msg->msgBuf);
2365   implP|useAtSync;
2366   implP|totalSize;
2367   implP|buf_sizes[0];
2368   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2369   CkArrayOptions opts;
2370   if (ndims != -1)
2371     opts = CkArrayOptions(ndims, dims);
2372   if (map_gid >= 0) {
2373     CkGroupID map_gId;
2374     map_gId.idx = map_gid;
2375     opts.setMap(CProxy_Group(map_gId));
2376   }
2377   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2378   //CkArrayID gId = ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2379   CkGroupID gId = CProxyElement_ArrayElement::ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2380   return gId.idx;
2381 }
2382
2383 // TODO options
2384 extern "C"
2385 void CkInsertArrayExt(int aid, int ndims, int *index, int epIdx, int onPE, int num_bufs,
2386                       char **bufs, int *buf_sizes, char useAtSync) {
2387   CkAssert(num_bufs >= 1);
2388   int totalSize = 0;
2389   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2390   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2391   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2392   PUP::toMem implP((void *)impl_msg->msgBuf);
2393   implP|useAtSync;
2394   implP|totalSize;
2395   implP|buf_sizes[0];
2396   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2397
2398   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2399   CkArrayIndex newIdx(ndims, index);
2400   CkGroupID gId;
2401   gId.idx = aid;
2402   CProxy_ArrayBase(gId).ckInsertIdx((CkArrayMessage *)impl_msg, epIdx, onPE, newIdx);
2403 }
2404
2405 extern "C"
2406 void CkMigrateExt(int aid, int ndims, int *index, int toPe) {
2407   //printf("[charm] CkMigrateMeExt called with aid: %d, ndims: %d, index: %d, toPe: %d\n",
2408         //aid, ndims, *index, toPe);
2409   CkGroupID gId;
2410   gId.idx = aid;
2411   CkArrayIndex arrayIndex(ndims, index);
2412   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2413   ArrayElement* arrayElement = arrayProxy.ckLocal();
2414   CkAssert(arrayElement != NULL);
2415   arrayElement->migrateMe(toPe);
2416 }
2417
2418 extern "C"
2419 void CkArrayDoneInsertingExt(int aid) {
2420   CkGroupID gId;
2421   gId.idx = aid;
2422   CProxy_ArrayBase(gId).doneInserting();
2423 }
2424
2425 extern "C"
2426 int CkGroupGetReductionNumber(int g_id) {
2427   CkGroupID gId;
2428   gId.idx = g_id;
2429   return ((Group*)CkLocalBranch(gId))->getRedNo();
2430 }
2431
2432 extern "C"
2433 int CkArrayGetReductionNumber(int aid, int ndims, int *index) {
2434   CkGroupID gId;
2435   gId.idx = aid;
2436   CkArrayIndex arrayIndex(ndims, index);
2437   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2438   ArrayElement* arrayElement = arrayProxy.ckLocal();
2439   CkAssert(arrayElement != NULL);
2440   return arrayElement->getRedNo();
2441 }
2442
2443 extern "C"
2444 void CkChareExtSend(int onPE, void *objPtr, int epIdx, char *msg, int msgSize) {
2445   //ckCheck();    // checks that gid is not zero
2446   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2447   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2448   PUP::toMem implP((void *)impl_msg->msgBuf);
2449   implP|msgSize;
2450   implP|epIdx;
2451   int d=0; implP|d;
2452   implP(msg, msgSize);
2453   CkChareID chareID;
2454   chareID.onPE = onPE;
2455   chareID.objPtr = objPtr;
2456
2457   CkSendMsg(epIdx, impl_msg, &chareID);
2458 }
2459
2460 extern "C"
2461 void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2462   CkAssert(num_bufs >= 1);
2463   //ckCheck();    // checks that gid is not zero
2464   int totalSize = 0;
2465   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2466   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2467   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2468   PUP::toMem implP((void *)impl_msg->msgBuf);
2469   implP | totalSize;
2470   implP | epIdx;
2471   implP | buf_sizes[0];
2472   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2473   CkChareID chareID;
2474   chareID.onPE = onPE;
2475   chareID.objPtr = objPtr;
2476
2477   CkSendMsg(epIdx, impl_msg, &chareID);
2478 }
2479
2480 extern "C"
2481 void CkGroupExtSend(int gid, int pe, int epIdx, char *msg, int msgSize) {
2482   //ckCheck();    // checks that gid is not zero
2483   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2484   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2485   PUP::toMem implP((void *)impl_msg->msgBuf);
2486   implP|msgSize;
2487   implP|epIdx;
2488   int d=0; implP|d;
2489   implP(msg, msgSize);
2490   CkGroupID gId;
2491   gId.idx = gid;
2492
2493   if (pe == -1)
2494     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2495   else
2496     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2497 }
2498
2499 extern "C"
2500 void CkGroupExtSend_multi(int gid, int pe, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2501   CkAssert(num_bufs >= 1);
2502   //ckCheck();    // checks that gid is not zero
2503   int totalSize = 0;
2504   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2505   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2506   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2507   PUP::toMem implP((void *)impl_msg->msgBuf);
2508   implP | totalSize;
2509   implP | epIdx;
2510   implP | buf_sizes[0];
2511   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2512   CkGroupID gId;
2513   gId.idx = gid;
2514
2515   if (pe == -1)
2516     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2517   else
2518     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2519 }
2520
2521 extern "C"
2522 void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize) {
2523   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2524   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2525   PUP::toMem implP((void *)impl_msg->msgBuf);
2526   implP|msgSize;
2527   implP|epIdx;
2528   int d=0; implP|d;
2529   implP(msg, msgSize);
2530   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2531   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2532   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2533   CkGroupID gId;
2534   gId.idx = aid;
2535   if (ndims > 0) {
2536     CkArrayIndex arrIndex(ndims, idx);
2537     // TODO is there a better function for this?
2538     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2539   } else { // broadcast
2540     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2541   }
2542 }
2543
2544 extern "C"
2545 void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2546   CkAssert(num_bufs >= 1);
2547   int totalSize = 0;
2548   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2549   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2550   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2551   PUP::toMem implP((void *)impl_msg->msgBuf);
2552   implP | totalSize;
2553   implP | epIdx;
2554   implP | buf_sizes[0];
2555   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2556   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2557   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2558   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2559   CkGroupID gId;
2560   gId.idx = aid;
2561   if (ndims > 0) {
2562     CkArrayIndex arrIndex(ndims, idx);
2563     // TODO is there a better function for this?
2564     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2565   } else { // broadcast
2566     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2567   }
2568 }
2569
2570 //------------------- Message Watcher (record/replay) ----------------
2571
2572 #include "crc32.h"
2573
2574 CkpvDeclare(int, envelopeEventID);
2575 int _recplay_crc = 0;
2576 int _recplay_checksum = 0;
2577 int _recplay_logsize = 1024*1024;
2578
2579 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2580 #define REPLAYDEBUG(args) /* empty */
2581
2582 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2583
2584 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2585 #include "BaseLB.h" /* For LBMigrateMsg message */
2586
2587 #if CMK_REPLAYSYSTEM
2588 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2589   std::string fName = CkpvAccess(traceRoot);
2590   fName += prefix;
2591   fName += std::to_string(CkMyPe());
2592   fName += suffix;
2593   FILE *f = fopen(fName.c_str(), permissions);
2594   REPLAYDEBUG("openReplayfile " << fName.c_str());
2595   if (f==NULL) {
2596     CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2597              CkMyPe(), fName.c_str(), permissions);
2598     CkAbort("openReplayFile> Could not open replay file");
2599   }
2600   return f;
2601 }
2602
2603 class CkMessageRecorder : public CkMessageWatcher {
2604   unsigned int curpos;
2605   bool firstOpen;
2606   std::vector<char> buffer;
2607 public:
2608   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true), buffer(_recplay_logsize) { f=f_; }
2609   ~CkMessageRecorder() {
2610     flushLog(0);
2611     fprintf(f,"-1 -1 -1 ");
2612     fclose(f);
2613 #if 0
2614     FILE *stsfp = fopen("sts", "w");
2615     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2616     traceWriteSTS(stsfp, 0);
2617     fclose(stsfp);
2618 #endif
2619     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2620   }
2621
2622 private:
2623   void flushLog(int verbose=1) {
2624     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2625     fprintf(f, "%s", buffer.data());
2626     curpos=0;
2627   }
2628   virtual bool process(envelope **envptr,CkCoreState *ck) {
2629     if ((*envptr)->getEvent()) {
2630       bool wasPacked = (*envptr)->isPacked();
2631       if (!wasPacked) CkPackMessage(envptr);
2632       envelope *env = *envptr;
2633       unsigned int crc1=0, crc2=0;
2634       if (_recplay_crc) {
2635         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2636         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2637         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2638       } else if (_recplay_checksum) {
2639         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2640         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2641       }
2642       curpos+=sprintf(&buffer[curpos],"%d %d %d %d %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2643       if (curpos > _recplay_logsize-128) flushLog();
2644       if (!wasPacked) CkUnpackMessage(envptr);
2645     }
2646     return true;
2647   }
2648   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2649     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2650     if (curpos > _recplay_logsize-128) flushLog();
2651     return true;
2652   }
2653   
2654   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2655     FILE *f;
2656     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2657     else f = openReplayFile("ckreplay_",".lb","a");
2658     firstOpen = false;
2659     if (f != NULL) {
2660       PUP::toDisk p(f);
2661       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2662       (*msg)->pup(p);
2663       fclose(f);
2664     }
2665     return true;
2666   }
2667 };
2668
2669 class CkMessageDetailRecorder : public CkMessageWatcher {
2670 public:
2671   CkMessageDetailRecorder(FILE *f_) {
2672     f=f_;
2673     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2674      * The value of 'x' is the pointer size.
2675      */
2676     CmiUInt2 little = sizeof(void*);
2677     fwrite(&little, 2, 1, f);
2678   }
2679   ~CkMessageDetailRecorder() {fclose(f);}
2680 private:
2681   virtual bool process(envelope **envptr, CkCoreState *ck) {
2682     bool wasPacked = (*envptr)->isPacked();
2683     if (!wasPacked) CkPackMessage(envptr);
2684     envelope *env = *envptr;
2685     CmiUInt4 size = env->getTotalsize();
2686     fwrite(&size, 4, 1, f);
2687     fwrite(env, env->getTotalsize(), 1, f);
2688     if (!wasPacked) CkUnpackMessage(envptr);
2689     return true;
2690   }
2691 };
2692
2693 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2694 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2695
2696 class CkMessageReplay : public CkMessageWatcher {
2697   int counter;
2698         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2699         int nextEP;
2700         unsigned int crc1, crc2;
2701         FILE *lbFile;
2702         /// Read the next message we need from the file:
2703         void getNext(void) {
2704           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2705           if (nextSize > 0) {
2706             // We are reading a regular message
2707             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2708               CkAbort("CkMessageReplay> Syntax error reading replay file");
2709             }
2710             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2711           } else if (nextSize == -2) {
2712             // We are reading a special message (right now only thread awaken)
2713             // Nothing to do since we have already read all info
2714             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2715           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2716             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2717             CkAbort("CkMessageReplay> Unrecognized input");
2718           }
2719             /*
2720                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2721                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2722                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2723                 }
2724                 */
2725                 counter++;
2726         }
2727         /// If this is the next message we need, advance and return true.
2728         bool isNext(envelope *env) {
2729                 if (nextPE!=env->getSrcPe()) return false;
2730                 if (nextEvent!=env->getEvent()) return false;
2731                 if (nextSize<0) return false; // not waiting for a regular message
2732 #if 1
2733                 if (nextEP != env->getEpIdx()) {
2734                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2735                         return false;
2736                 }
2737 #endif
2738 #if ! CMK_BIGSIM_CHARM
2739                 if (nextSize!=env->getTotalsize())
2740                 {
2741                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2742                         return false;
2743                 }
2744                 if (_recplay_crc || _recplay_checksum) {
2745                   bool wasPacked = env->isPacked();
2746                   if (!wasPacked) CkPackMessage(&env);
2747                   if (_recplay_crc) {
2748                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2749                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2750                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2751                     if (crcnew1 != crc1) {
2752                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2753                     }
2754                     if (crcnew2 != crc2) {
2755                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2756                     }
2757                   } else if (_recplay_checksum) {
2758             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2759             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2760             if (crcnew1 != crc1) {
2761               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2762             }
2763             if (crcnew2 != crc2) {
2764               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2765             }               
2766                   }
2767                   if (!wasPacked) CkUnpackMessage(&env);
2768                 }
2769 #endif
2770                 return true;
2771         }
2772         bool isNext(CthThreadToken *token) {
2773           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2774           return false;
2775         }
2776
2777         /// This is a (short) list of messages we aren't yet ready for:
2778         CkQ<envelope *> delayedMessages;
2779         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2780         CkQ<CthThreadToken *> delayedTokens;
2781
2782         /// Try to flush out any delayed messages
2783         void flush(void) {
2784           if (nextSize>0) {
2785                 int len=delayedMessages.length();
2786                 for (int i=0;i<len;i++) {
2787                         envelope *env=delayedMessages.deq();
2788                         if (isNext(env)) { /* this is the next message: process it */
2789                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2790                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2791                                 return;
2792                         }
2793                         else /* Not ready yet-- put it back in the
2794                                 queue */
2795                           {
2796                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2797                                 delayedMessages.enq(env);
2798                           }
2799                 }
2800           } else if (nextSize==-2) {
2801             int len=delayedTokens.length();
2802             for (int i=0;i<len;++i) {
2803               CthThreadToken *token=delayedTokens.deq();
2804               if (isNext(token)) {
2805             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2806 #if ! CMK_BIGSIM_CHARM
2807                 CsdEnqueueLifo((void*)token);
2808 #else
2809                 CthEnqueueBigSimThread(token,0,0,NULL);
2810 #endif
2811                 return;
2812               } else {
2813             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2814                 delayedTokens.enq(token);
2815               }
2816             }
2817           }
2818         }
2819
2820 public:
2821         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2822           counter=0;
2823           f=f_;
2824           getNext();
2825           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2826 #if CMI_QD
2827           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2828 #endif
2829         }
2830         ~CkMessageReplay() {fclose(f);}
2831
2832 private:
2833         virtual bool process(envelope **envptr,CkCoreState *ck) {
2834           bool wasPacked = (*envptr)->isPacked();
2835           if (!wasPacked) CkPackMessage(envptr);
2836           envelope *env = *envptr;
2837           //CkAssert(*(int*)env == 0x34567890);
2838           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2839                 if (env->getEvent() == 0) return true;
2840                 if (isNext(env)) { /* This is the message we were expecting */
2841                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2842                         getNext(); /* Advance over this message */
2843                         flush(); /* try to process queued-up stuff */
2844                         if (!wasPacked) CkUnpackMessage(envptr);
2845                         return true;
2846                 }
2847 #if CMK_SMP
2848                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2849                          // try next rank, we can't just buffer the msg and left
2850                          // we need to keep unprocessed msg on the fly
2851                         int nextpe = CkMyPe()+1;
2852                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2853                         nextpe = CkNodeFirst(CkMyNode());
2854                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2855                         return false;
2856                 }
2857 #endif
2858                 else /*!isNext(env) */ {
2859                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2860                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2861                         delayedMessages.enq(env);
2862                         flush();
2863                         return false;
2864                 }
2865         }
2866         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2867       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2868           if (isNext(token)) {
2869         REPLAYDEBUG("Executing token: "<<token->serialNo)
2870             getNext();
2871             flush();
2872             return true;
2873           } else {
2874         REPLAYDEBUG("Queueing token: "<<token->serialNo
2875             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2876             delayedTokens.enq(token);
2877             return false;
2878           }
2879         }
2880
2881         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2882           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2883           if (lbFile != NULL) {
2884             int num_moves = 0;
2885         PUP::fromDisk p(lbFile);
2886             p | num_moves;
2887             if (num_moves != (*msg)->n_moves) {
2888               delete *msg;
2889               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2890             }
2891             (*msg)->pup(p);
2892           }
2893           return true;
2894         }
2895 };
2896
2897 class CkMessageDetailReplay : public CkMessageWatcher {
2898   void *getNext() {
2899     CmiUInt4 size; size_t nread;
2900     if ((nread=fread(&size, 4, 1, f)) < 1) {
2901       if (feof(f)) return NULL;
2902       CkPrintf("Broken record file (metadata) got %d\n",nread);
2903       CkAbort("");
2904     }
2905     void *env = CmiAlloc(size);
2906     long tell = ftell(f);
2907     if ((nread=fread(env, size, 1, f)) < 1) {
2908       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2909       CkAbort("");
2910     }
2911     //*(int*)env = 0x34567890; // set first integer as magic
2912     return env;
2913   }
2914 public:
2915   double starttime;
2916   CkMessageDetailReplay(FILE *f_) {
2917     f=f_;
2918     starttime=CkWallTimer();
2919     /* This must match what CkMessageDetailRecorder did */
2920     CmiUInt2 little;
2921     fread(&little, 2, 1, f);
2922     if (little != sizeof(void*)) {
2923       CkAbort("Replaying on a different architecture from which recording was done!");
2924     }
2925
2926     CsdEnqueue(getNext());
2927
2928     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2929   }
2930   virtual bool process(envelope **env,CkCoreState *ck) {
2931     void *msg = getNext();
2932     if (msg != NULL) CsdEnqueue(msg);
2933     return true;
2934   }
2935 };
2936
2937 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2938 #if ! CMK_BIGSIM_CHARM
2939   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2940 #endif
2941   CkMessageReplay *replay = (CkMessageReplay*)rep;
2942   //CmiStartQD(CkMessageReplayQuiescence, replay);
2943 }
2944
2945 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2946   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2947   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2948   ConverseExit();
2949 }
2950 #endif
2951
2952 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2953   CkCoreState *ck = CkpvAccess(_coreState);
2954   if (ck->watcher!=NULL) {
2955     return ck->watcher->processThread(token,ck);
2956   }
2957   return true;
2958 }
2959
2960 CpvExtern(int, CthResumeNormalThreadIdx);
2961 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2962 {
2963   CthThread t = token->thread;
2964
2965   if(t == NULL){
2966     free(token);
2967     return;
2968   }
2969 #if CMK_TRACE_ENABLED
2970 #if ! CMK_TRACE_IN_CHARM
2971   if(CpvAccess(traceOn))
2972     CthTraceResume(t);
2973 /*    if(CpvAccess(_traceCoreOn)) 
2974             resumeTraceCore();*/
2975 #endif
2976 #endif
2977 #if CMK_OMP
2978   CthSetPrev(t, CthSelf());
2979 #endif
2980   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2981   if (CpdExecuteThreadResume(token)) {
2982     CthResume(t);
2983   }
2984 #if CMK_OMP
2985   CthScheduledDecrement();
2986   CthSetPrev(CthSelf(), 0);
2987 #endif
2988 }
2989
2990 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2991   CkCoreState *ck = CkpvAccess(_coreState);
2992   if (ck->watcher!=NULL) {
2993     ck->watcher->processLBMessage(msg, ck);
2994   }
2995 }
2996
2997 #if CMK_BIGSIM_CHARM
2998 CpvExtern(int      , CthResumeBigSimThreadIdx);
2999 #endif
3000
3001 #include "ckliststring.h"
3002 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
3003     CmiArgGroup("Charm++","Record/Replay");
3004     bool forceReplay = false;
3005     char *procs = NULL;
3006     _replaySystem = 0;
3007     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
3008       if(CmiMyRank() == 0) _recplay_crc = 1;
3009     }
3010     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
3011       if(CmiMyRank() == 0) _recplay_checksum = 1;
3012     }
3013     int tmplogsize;
3014     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
3015       {
3016         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
3017       }
3018     REPLAYDEBUG("CkMessageWatcherInit ");
3019     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
3020 #if CMK_REPLAYSYSTEM
3021         CkListString list(procs);
3022         if (list.includes(CkMyPe())) {
3023           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
3024           CpdSetInitializeMemory(1);
3025           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
3026         }
3027 #else
3028         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3029 #endif
3030     }
3031     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
3032 #if CMK_REPLAYSYSTEM
3033       if (CkMyPe() == 0) {
3034         CmiPrintf("Charm++> record mode.\n");
3035         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3036           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
3037           _recplay_crc = _recplay_checksum = 0;
3038         }
3039       }
3040       CpdSetInitializeMemory(1);
3041       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3042       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
3043 #else
3044       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
3045 #endif
3046     }
3047         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
3048 #if CMK_REPLAYSYSTEM
3049             forceReplay = true;
3050             CpdSetInitializeMemory(1);
3051             // Set the parameters of the processor
3052 #if CMK_SHARED_VARS_UNAVAILABLE
3053             _Cmi_mype = atoi(procs);
3054             while (procs[0]!='/') procs++;
3055             procs++;
3056             _Cmi_numpes = atoi(procs);
3057 #else
3058             CkAbort("+replay-detail available only for non-SMP build");
3059 #endif
3060             _replaySystem = 1;
3061             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
3062 #else
3063           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3064 #endif
3065         }
3066         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
3067 #if CMK_REPLAYSYSTEM
3068           if (CkMyPe() == 0)  {
3069             CmiPrintf("Charm++> replay mode.\n");
3070             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3071               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
3072               _recplay_crc = _recplay_checksum = 0;
3073             }
3074           }
3075           CpdSetInitializeMemory(1);
3076 #if ! CMK_BIGSIM_CHARM
3077           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3078 #else
3079           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3080 #endif
3081           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
3082 #else
3083           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
3084 #endif
3085         }
3086         if (_recplay_crc && _recplay_checksum) {
3087           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
3088         }
3089 }
3090
3091 extern "C"
3092 int CkMessageToEpIdx(void *msg) {
3093         envelope *env=UsrToEnv(msg);
3094         int ep=env->getEpIdx();
3095         if (ep==CkIndex_CkArray::recvBroadcast(0))
3096                 return env->getsetArrayBcastEp();
3097         else
3098                 return ep;
3099 }
3100
3101 extern "C"
3102 int getCharmEnvelopeSize() {
3103   return sizeof(envelope);
3104 }
3105
3106 /// Best-effort guess at whether @arg msg points at a charm envelope
3107 extern "C"
3108 int isCharmEnvelope(void *msg) {
3109     envelope *e = (envelope *)msg;
3110     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
3111     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
3112     if (e->getTotalsize() < sizeof(envelope)) return 0;
3113     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
3114 #if CMK_SMP
3115     if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
3116 #else
3117     if (e->getSrcPe()>=CkNumPes()) return 0;
3118 #endif
3119     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
3120     return 1;
3121 }
3122
3123 #include "CkMarshall.def.h"