Zcpy Bcast Send API: Guard ZC API macros for non-RDMA layers
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #include "pathHistory.h"
14
15 #if CMK_LBDB_ON
16 #include "LBDatabase.h"
17 #endif // CMK_LBDB_ON
18
19 #ifndef CMK_CHARE_USE_PTR
20 #include <map>
21 CkpvDeclare(std::vector<void *>, chare_objs);
22 CkpvDeclare(std::vector<int>, chare_types);
23 CkpvDeclare(std::vector<VidBlock *>, vidblocks);
24
25 typedef std::map<int, CkChareID>  Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
28 #endif
29
30 // Map of array IDs to array elements for fast message delivery
31 CkpvDeclare(ArrayObjMap, array_objs);
32
33 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
34
35 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
36
37 int CMessage_CkMessage::__idx=-1;
38 int CMessage_CkArgMsg::__idx=0;
39 int CkIndex_Chare::__idx;
40 int CkIndex_Group::__idx;
41 int CkIndex_ArrayBase::__idx=-1;
42
43 extern int _defaultObjectQ;
44
45 void _initChareTables()
46 {
47 #ifndef CMK_CHARE_USE_PTR
48           /* chare and vidblock table */
49   CkpvInitialize(std::vector<void *>, chare_objs);
50   CkpvInitialize(std::vector<int>, chare_types);
51   CkpvInitialize(std::vector<VidBlock *>, vidblocks);
52   CkpvInitialize(Vidblockmap, vmap);
53   CkpvInitialize(int, currentChareIdx);
54   CkpvAccess(currentChareIdx) = -1;
55 #endif
56
57   CkpvInitialize(ArrayObjMap, array_objs);
58 }
59
60 //Charm++ virtual functions: declaring these here results in a smaller executable
61 Chare::Chare(void) {
62   thishandle.onPE=CkMyPe();
63   thishandle.objPtr=this;
64 #if CMK_ERROR_CHECKING
65   magic = CHARE_MAGIC;
66 #endif
67 #ifndef CMK_CHARE_USE_PTR
68      // for plain chare, objPtr is actually the index to chare obj table
69   if (CkpvAccess(currentChareIdx) >= 0) {
70     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
71   }
72   chareIdx = CkpvAccess(currentChareIdx);
73 #endif
74 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
75   mlogData = new ChareMlogData();
76   mlogData->objID.type = TypeChare;
77   mlogData->objID.data.chare.id = thishandle;
78 #endif
79 #if CMK_OBJECT_QUEUE_AVAILABLE
80   if (_defaultObjectQ)  CkEnableObjQ();
81 #endif
82 }
83
84 Chare::Chare(CkMigrateMessage* m) {
85   thishandle.onPE=CkMyPe();
86   thishandle.objPtr=this;
87 #if CMK_ERROR_CHECKING
88   magic = 0;
89 #endif
90
91 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
92         mlogData = NULL;
93 #endif
94
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96   if (_defaultObjectQ)  CkEnableObjQ();
97 #endif
98 }
99
100 void Chare::CkEnableObjQ()
101 {
102 #if CMK_OBJECT_QUEUE_AVAILABLE
103   objQ.create();
104 #endif
105 }
106
107 Chare::~Chare() {
108 #ifndef CMK_CHARE_USE_PTR
109 /*
110   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
111 */
112   if (chareIdx != -1)
113   {
114     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
115     CkpvAccess(chare_objs)[chareIdx] = NULL;
116     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
117     if (iter != CkpvAccess(vmap).end()) {
118       CkChareID *pCid = (CkChareID *)
119         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
120       int srcPe = iter->second.onPE;
121       *pCid = iter->second;
122       envelope *ret = UsrToEnv(pCid);
123       ret->setVidPtr(iter->second.objPtr);
124       ret->setSrcPe(CkMyPe());
125       CmiSetHandler(ret, _charmHandlerIdx);
126       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
127       CpvAccess(_qd)->create();
128       CkpvAccess(vmap).erase(iter);
129     }
130   }
131 #endif
132 }
133
134 void Chare::pup(PUP::er &p)
135 {
136   p(thishandle.onPE);
137   thishandle.objPtr=(void *)this;
138 #ifndef CMK_CHARE_USE_PTR
139   p(chareIdx);
140   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
141 #endif
142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
143         if(p.isUnpacking()){
144                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
145                 mlogData = new ChareMlogData();
146         }
147         mlogData->pup(p);
148 #endif
149 #if CMK_ERROR_CHECKING
150   p(magic);
151 #endif
152 }
153
154 int Chare::ckGetChareType() const {
155   return -3;
156 }
157 char *Chare::ckDebugChareName(void) {
158   char buf[100];
159   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),(void*)this);
160   return strdup(buf);
161 }
162 int Chare::ckDebugChareID(char *str, int limit) {
163   // pure chares for now do not have a valid ID
164   str[0] = 0;
165   return 1;
166 }
167 void Chare::ckDebugPup(PUP::er &p) {
168   pup(p);
169 }
170
171 /// This method is called before starting a [threaded] entry method.
172 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
173   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
174   traceAddThreadListeners(th, UsrToEnv(msg));
175 }
176
177 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
178   p.comment("Bytes");
179   int ts=UsrToEnv(msg)->getTotalsize();
180   int msgLen=ts-sizeof(envelope);
181   if (msgLen>0)
182     p((char*)msg,msgLen);
183 }
184
185 IrrGroup::IrrGroup(void) {
186   thisgroup = CkpvAccess(_currentGroup);
187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
188         mlogData->objID.type = TypeGroup;
189         mlogData->objID.data.group.id = thisgroup;
190         mlogData->objID.data.group.onPE = CkMyPe();
191 #endif
192 }
193
194 IrrGroup::~IrrGroup() {
195   // remove the object pointer
196   if (CkpvAccess(_destroyingNodeGroup)) {
197     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
198     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
199     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
200     CkpvAccess(_destroyingNodeGroup) = false;
201   } else {
202     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
203     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
204     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
205   }
206 }
207
208 void IrrGroup::pup(PUP::er &p)
209 {
210   Chare::pup(p);
211   p|thisgroup;
212 }
213
214 int IrrGroup::ckGetChareType() const {
215   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
216 }
217
218 int IrrGroup::ckDebugChareID(char *str, int limit) {
219   if (limit<5) return -1;
220   str[0] = 1;
221   *((int*)&str[1]) = thisgroup.idx;
222   return 5;
223 }
224
225 char *IrrGroup::ckDebugChareName() {
226   return strdup(_chareTable[ckGetChareType()]->name);
227 }
228
229 void IrrGroup::ckJustMigrated(void)
230 {
231 }
232
233 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
234   /* FIXME: **CW** not entirely sure what we should do here yet */
235 }
236
237 void Group::CkAddThreadListeners(CthThread th, void *msg) {
238   Chare::CkAddThreadListeners(th, msg);
239   CthSetThreadID(th, thisgroup.idx, 0, 0);
240 }
241
242 void Group::pup(PUP::er &p)
243 {
244   CkReductionMgr::pup(p);
245   p|reductionInfo;
246 }
247
248 /**** Delegation Manager Group */
249 CkDelegateMgr::~CkDelegateMgr() { }
250
251 //Default delegator implementation: do not delegate-- send directly
252 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
253   { CkSendMsg(ep,m,c); }
254 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
255   { CkSendMsgBranch(ep,m,onPE,g); }
256 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
257   { CkBroadcastMsgBranch(ep,m,g); }
258 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
259   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
260 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
261   { CkSendMsgNodeBranch(ep,m,onNode,g); }
262 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
263   { CkBroadcastMsgNodeBranch(ep,m,g); }
264 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
265   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
266 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
267 {
268         CProxyElement_ArrayBase ap(a,idx);
269         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
270 }
271 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
272 {
273         CProxyElement_ArrayBase ap(a,idx);
274         ap.ckSend((CkArrayMessage *)m,ep);
275 }
276 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
277 {
278         CProxy_ArrayBase ap(a);
279         ap.ckBroadcast((CkArrayMessage *)m,ep);
280 }
281
282 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
283 {
284         CmiAbort("ArraySectionSend is not implemented!\n");
285 /*
286         CProxyElement_ArrayBase ap(a,idx);
287         ap.ckSend((CkArrayMessage *)m,ep);
288 */
289 }
290
291 /*** Proxy <-> delegator communication */
292 CkDelegateData::~CkDelegateData() {}
293
294 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
295   return pd; // default implementation ignores pup call
296 }
297
298 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
299    this tricky manual reference counting business... */
300
301 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
302         if (dPtr) dPtr->ref();
303         ckUndelegate();
304         delegatedMgr = dTo;
305         delegatedPtr = dPtr;
306         delegatedGroupId = delegatedMgr->CkGetGroupID();
307         isNodeGroup = delegatedMgr->isNodeGroup();
308 }
309 void CProxy::ckUndelegate(void) {
310         delegatedMgr=NULL;
311         delegatedGroupId.setZero();
312         if (delegatedPtr) delegatedPtr->unref();
313         delegatedPtr=NULL;
314 }
315
316 /// Copy constructor
317 CProxy::CProxy(const CProxy &src)
318   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
319    isNodeGroup(src.isNodeGroup) {
320     delegatedPtr = NULL;
321     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
322         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
323     }
324 }
325
326 /// Assignment operator
327 CProxy& CProxy::operator=(const CProxy &src) {
328         CkDelegateData *oldPtr=delegatedPtr;
329         ckUndelegate();
330         delegatedMgr=src.delegatedMgr;
331         delegatedGroupId = src.delegatedGroupId; 
332         isNodeGroup = src.isNodeGroup;
333
334         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
335             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
336         else
337             delegatedPtr = NULL;
338
339         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
340         if (oldPtr) oldPtr->unref();
341         return *this;
342 }
343
344 void CProxy::pup(PUP::er &p) {
345   if (!p.isUnpacking()) {
346     if (ckDelegatedTo() != NULL) {
347       delegatedGroupId = delegatedMgr->CkGetGroupID();
348       isNodeGroup = delegatedMgr->isNodeGroup();
349     }
350   }
351   p|delegatedGroupId;
352   if (!delegatedGroupId.isZero()) {
353     p|isNodeGroup;
354     if (p.isUnpacking()) {
355       delegatedMgr = ckDelegatedTo(); 
356     }
357
358     int migCtor = 0, cIdx; 
359     if (!p.isUnpacking()) {
360       if (isNodeGroup) {
361         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
362         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
363         migCtor = _chareTable[cIdx]->migCtor; 
364         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
365       }
366       else  {
367         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
368         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
369         migCtor = _chareTable[cIdx]->migCtor; 
370         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
371       }         
372     }
373
374     p|migCtor;
375
376     // if delegated manager has not been created, construct a dummy
377     // object on which to call DelegatePointerPup
378     if (delegatedMgr == NULL) {
379
380       // create a dummy object for calling DelegatePointerPup
381       int objId = _entryTable[migCtor]->chareIdx; 
382       size_t objSize = _chareTable[objId]->size;
383       void *obj = malloc(objSize); 
384       _entryTable[migCtor]->call(NULL, obj); 
385       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
386         ->DelegatePointerPup(p, delegatedPtr);           
387       free(obj);
388
389     }
390     else {
391
392       // delegated manager has been created, so we can use it
393       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
394
395     }
396
397     if (p.isUnpacking() && delegatedPtr) {
398       delegatedPtr->ref();
399     }
400   }
401 }
402
403 /**** Array sections */
404 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
405 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems, int factor): bfactor(factor) { \
406   _elems.assign(elems, elems+nElems);  \
407   _cookie.get_aid() = aid;      \
408   _cookie.get_pe() = CkMyPe();  \
409 } \
410 CkSectionID::CkSectionID(const CkArrayID &aid, const std::vector<CkArrayIndex##index> &elems, int factor): bfactor(factor) { \
411   _elems.resize(elems.size()); \
412   for (int i=0; i<_elems.size(); ++i) { \
413     _elems[i] = static_cast<CkArrayIndex>(elems[i]); \
414   } \
415   _cookie.get_aid() = aid;      \
416   _cookie.get_pe() = CkMyPe();  \
417 } \
418
419 CKSECTIONID_CONSTRUCTOR_DEF(1D)
420 CKSECTIONID_CONSTRUCTOR_DEF(2D)
421 CKSECTIONID_CONSTRUCTOR_DEF(3D)
422 CKSECTIONID_CONSTRUCTOR_DEF(4D)
423 CKSECTIONID_CONSTRUCTOR_DEF(5D)
424 CKSECTIONID_CONSTRUCTOR_DEF(6D)
425 CKSECTIONID_CONSTRUCTOR_DEF(Max)
426
427 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes, int factor): bfactor(factor) {
428   _cookie.get_aid() = gid;
429   pelist.assign(_pelist, _pelist+_npes);
430 }
431
432 CkSectionID::CkSectionID(const CkGroupID &gid, const std::vector<int>& _pelist, int factor): pelist(_pelist), bfactor(factor) {
433   _cookie.get_aid() = gid;
434 }
435
436 CkSectionID::CkSectionID(const CkSectionID &sid) {
437   _cookie = sid._cookie;
438   pelist = sid.pelist;
439   _elems = sid._elems;
440   bfactor = sid.bfactor;
441 }
442
443 void CkSectionID::operator=(const CkSectionID &sid) {
444   _cookie = sid._cookie;
445   pelist = sid.pelist;
446   _elems = sid._elems;
447   bfactor = sid.bfactor;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451   p | _cookie;
452   p | pelist;
453   p | _elems;
454   p | bfactor;
455 }
456
457 /**** Tiny random API routines */
458
459 #if CMK_CUDA
460 void CUDACallbackManager(void *fn) {
461   if (fn != NULL) {
462     CkCallback *cb = (CkCallback*) fn;
463     cb->send();
464   }
465 }
466
467 #endif
468
469 void QdCreate(int n) {
470   CpvAccess(_qd)->create(n);
471 }
472
473 void QdProcess(int n) {
474   CpvAccess(_qd)->process(n);
475 }
476
477 extern "C"
478 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
479 {
480   UsrToEnv(msg)->setRef(ref);
481 }
482
483 extern "C"
484 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
485 {
486   return UsrToEnv(msg)->getRef();
487 }
488
489 extern "C"
490 int CkGetSrcPe(void *msg)
491 {
492   return UsrToEnv(msg)->getSrcPe();
493 }
494
495 extern "C"
496 int CkGetSrcNode(void *msg)
497 {
498   return CmiNodeOf(CkGetSrcPe(msg));
499 }
500
501 extern "C"
502 void *CkLocalBranch(CkGroupID gID) {
503   return _localBranch(gID);
504 }
505
506 static
507 void *_ckLocalNodeBranch(CkGroupID groupID) {
508   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
509   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
510   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
511   return retval;
512 }
513
514 extern "C"
515 void *CkLocalNodeBranch(CkGroupID groupID)
516 {
517   void *retval;
518   // we are called in a constructor
519   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
520     return CkpvAccess(_currentNodeGroupObj);
521   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
522   { // Nodegroup hasn't finished being created yet-- schedule...
523     CsdScheduler(0);
524   }
525   return retval;
526 }
527
528 extern "C"
529 void *CkLocalChare(const CkChareID *pCid)
530 {
531         int pe=pCid->onPE;
532         if (pe<0) { //A virtual chare ID
533                 if (pe!=(-(CkMyPe()+1)))
534                         return NULL;//VID block not on this PE
535 #ifdef CMK_CHARE_USE_PTR
536                 VidBlock *v=(VidBlock *)pCid->objPtr;
537 #else
538                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
539 #endif
540                 return v->getLocalChareObj();
541         }
542         else
543         { //An ordinary chare ID
544                 if (pe!=CkMyPe())
545                         return NULL;//Chare not on this PE
546 #ifdef CMK_CHARE_USE_PTR
547                 return pCid->objPtr;
548 #else
549                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
550 #endif
551         }
552 }
553
554 CkpvDeclare(char**,Ck_argv);
555
556 extern "C" char **CkGetArgv(void) {
557         return CkpvAccess(Ck_argv);
558 }
559 extern "C" int CkGetArgc(void) {
560         return CmiGetArgc(CkpvAccess(Ck_argv));
561 }
562
563 /******************** Basic support *****************/
564 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
565 {
566 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
567         CpvAccess(_currentObj) = (Chare *)obj;
568 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
569 #endif
570   //BIGSIM_OOC DEBUGGING
571   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
572   //fflush(stdout);
573 #if CMK_CHARMDEBUG
574   CpdBeforeEp(epIdx, obj, msg);
575 #endif    
576   _entryTable[epIdx]->call(msg, obj);
577 #if CMK_CHARMDEBUG
578   CpdAfterEp(epIdx);
579 #endif
580   if (_entryTable[epIdx]->noKeep)
581   { /* Method doesn't keep/delete the message, so we have to: */
582     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
583   }
584 }
585 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
586 {
587   //BIGSIM_OOC DEBUGGING
588   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
589   //fflush(stdout);
590
591   void *deliverMsg;
592 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
593         CpvAccess(_currentObj) = (Chare *)obj;
594 #endif
595   if (_entryTable[epIdx]->noKeep)
596   { /* Deliver a read-only copy of the message */
597     deliverMsg=(void *)msg;
598   } else
599   { /* Method needs a copy of the message to keep/delete */
600     void *oldMsg=(void *)msg;
601     deliverMsg=CkCopyMsg(&oldMsg);
602 #if CMK_ERROR_CHECKING
603     if (oldMsg!=msg)
604       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
605 #endif
606   }
607 #if CMK_CHARMDEBUG
608   CpdBeforeEp(epIdx, obj, (void*)msg);
609 #endif
610   _entryTable[epIdx]->call(deliverMsg, obj);
611 #if CMK_CHARMDEBUG
612   CpdAfterEp(epIdx);
613 #endif
614 }
615
616 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
617 {
618   void *msg = EnvToUsr(env);
619   _SET_USED(env, 0);
620   CkDeliverMessageFree(epIdx,msg,obj);
621 }
622
623 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
624 {
625
626 #if CMK_TRACE_ENABLED 
627   if (_entryTable[epIdx]->traceEnabled) {
628     _TRACE_BEGIN_EXECUTE(env, obj);
629     if(_entryTable[epIdx]->appWork)
630         _TRACE_BEGIN_APPWORK();
631     _invokeEntryNoTrace(epIdx,env,obj);
632     if(_entryTable[epIdx]->appWork)
633         _TRACE_END_APPWORK();
634     _TRACE_END_EXECUTE();
635   }
636   else
637 #endif
638     _invokeEntryNoTrace(epIdx,env,obj);
639
640 }
641
642 /********************* Creation ********************/
643
644 extern "C"
645 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
646 {
647   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
648   envelope *env = UsrToEnv(msg);
649   _CHECK_USED(env);
650   if(pCid == 0) {
651     env->setMsgtype(NewChareMsg);
652   } else {
653     pCid->onPE = (-(CkMyPe()+1));
654     //  pCid->magic = _GETIDX(cIdx);
655     pCid->objPtr = (void *) new VidBlock();
656     _MEMCHECK(pCid->objPtr);
657     env->setMsgtype(NewVChareMsg);
658     env->setVidPtr(pCid->objPtr);
659 #ifndef CMK_CHARE_USE_PTR
660     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
661     int idx = CkpvAccess(vidblocks).size()-1;
662     pCid->objPtr = (void *)(CmiIntPtr)idx;
663     env->setVidPtr((void *)(CmiIntPtr)idx);
664 #endif
665   }
666   env->setEpIdx(eIdx);
667   env->setByPe(CkMyPe());
668   env->setSrcPe(CkMyPe());
669   CmiSetHandler(env, _charmHandlerIdx);
670   _TRACE_CREATION_1(env);
671   CpvAccess(_qd)->create();
672   _STATS_RECORD_CREATE_CHARE_1();
673   _SET_USED(env, 1);
674   if(destPE == CK_PE_ANY)
675     env->setForAnyPE(1);
676   else
677     env->setForAnyPE(0);
678   _CldEnqueue(destPE, env, _infoIdx);
679   _TRACE_CREATION_DONE(1);
680 }
681
682 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
683 {
684   int gIdx = _entryTable[epIdx]->chareIdx;
685   void *obj = malloc(_chareTable[gIdx]->size);
686   _MEMCHECK(obj);
687   setMemoryTypeChare(obj);
688   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
689   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
690   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
691   CkpvAccess(_groupIDTable)->push_back(groupID);
692   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
693   if(ptrq) {
694     void *pending;
695     while((pending=ptrq->deq())!=0) {
696 #if CMK_BIGSIM_CHARM
697       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
698       //handler to converse-level BigSim handler.
699       _CldEnqueue(CkMyPe(), pending, _infoIdx);
700 #else
701       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
702 #endif
703     }
704     CkpvAccess(_groupTable)->find(groupID).clearPending();
705   }
706   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
707
708   CkpvAccess(_currentGroup) = groupID;
709   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
710
711 #ifndef CMK_CHARE_USE_PTR
712   int callingChareIdx = CkpvAccess(currentChareIdx);
713   CkpvAccess(currentChareIdx) = -1;
714 #endif
715
716   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
717
718 #ifndef CMK_CHARE_USE_PTR
719   CkpvAccess(currentChareIdx) = callingChareIdx;
720 #endif
721
722   _STATS_RECORD_PROCESS_GROUP_1();
723 }
724
725 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
726 {
727   int gIdx = _entryTable[epIdx]->chareIdx;
728   size_t objSize=_chareTable[gIdx]->size;
729   void *obj = malloc(objSize);
730   _MEMCHECK(obj);
731   setMemoryTypeChare(obj);
732   CkpvAccess(_currentGroup) = groupID;
733
734 // Now that the NodeGroup is created, add it to the table.
735 //  NodeGroups can be accessed by multiple processors, so
736 //  this is in the opposite order from groups - invoking the constructor
737 //  before registering it.
738 // User may call CkLocalNodeBranch() inside the nodegroup constructor
739 //  store nodegroup into _currentNodeGroupObj
740   CkpvAccess(_currentNodeGroupObj) = obj;
741
742 #ifndef CMK_CHARE_USE_PTR
743   int callingChareIdx = CkpvAccess(currentChareIdx);
744   CkpvAccess(currentChareIdx) = -1;
745 #endif
746
747   _invokeEntryNoTrace(epIdx,env,obj);
748
749 #ifndef CMK_CHARE_USE_PTR
750   CkpvAccess(currentChareIdx) = callingChareIdx;
751 #endif
752
753   CkpvAccess(_currentNodeGroupObj) = NULL;
754   _STATS_RECORD_PROCESS_NODE_GROUP_1();
755
756   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
757   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
758   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
759   CksvAccess(_nodeGroupIDTable).push_back(groupID);
760
761   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
762   if(ptrq) {
763     void *pending;
764     while((pending=ptrq->deq())!=0) {
765       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
766     }
767     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
768   }
769   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
770 }
771
772 void _createGroup(CkGroupID groupID, envelope *env)
773 {
774   _CHECK_USED(env);
775   _SET_USED(env, 1);
776   int epIdx = env->getEpIdx();
777   int gIdx = _entryTable[epIdx]->chareIdx;
778   env->setGroupNum(groupID);
779   env->setSrcPe(CkMyPe());
780   env->setGroupEpoch(CkpvAccess(_charmEpoch));
781
782   if(CkNumPes()>1) {
783     CkPackMessage(&env);
784     CmiSetHandler(env, _bocHandlerIdx);
785     _numInitMsgs++;
786     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
787     CpvAccess(_qd)->create(CkNumPes()-1);
788     CkUnpackMessage(&env);
789   }
790   _STATS_RECORD_CREATE_GROUP_1();
791   CkCreateLocalGroup(groupID, epIdx, env);
792 }
793
794 void _createNodeGroup(CkGroupID groupID, envelope *env)
795 {
796   _CHECK_USED(env);
797   _SET_USED(env, 1);
798   int epIdx = env->getEpIdx();
799   env->setGroupNum(groupID);
800   env->setSrcPe(CkMyPe());
801   env->setGroupEpoch(CkpvAccess(_charmEpoch));
802   if(CkNumNodes()>1) {
803     CkPackMessage(&env);
804     CmiSetHandler(env, _bocHandlerIdx);
805     _numInitMsgs++;
806     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
807     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
808     CpvAccess(_qd)->create(CkNumNodes()-1);
809     CkUnpackMessage(&env);
810   }
811   _STATS_RECORD_CREATE_NODE_GROUP_1();
812   CkCreateLocalNodeGroup(groupID, epIdx, env);
813 }
814
815 // new _groupCreate
816
817 static CkGroupID _groupCreate(envelope *env)
818 {
819   CkGroupID groupNum;
820
821   // check CkMyPe(). if it is 0 then idx is _numGroups++
822   // if not, then something else...
823   if(CkMyPe() == 0)
824      groupNum.idx = CkpvAccess(_numGroups)++;
825   else
826      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
827   _createGroup(groupNum, env);
828   return groupNum;
829 }
830
831 // new _nodeGroupCreate
832 static CkGroupID _nodeGroupCreate(envelope *env)
833 {
834   CkGroupID groupNum;
835   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
836   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
837           groupNum.idx = CksvAccess(_numNodeGroups)++;
838    else
839           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
840   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
841   _createNodeGroup(groupNum, env);
842   return groupNum;
843 }
844
845 /**** generate the group idx when group is creator pe is not pe0
846  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
847  **** remaining bits contain the group creator processor number and
848  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
849
850 int _getGroupIdx(int numNodes,int myNode,int numGroups)
851 {
852         int idx;
853         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
854         int n = 32 - (x+1);                                     // number of bits remaining for the index
855         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
856                                                                 // then add the next available index
857         // of course this won't work when int is 8 bytes long on T3E
858         //idx |= 0x80000000;                                      // set the most significant bit to 1
859         idx = - idx;
860                                                                 // if int is not 32 bits, wouldn't this be wrong?
861         return idx;
862 }
863
864 extern "C"
865 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
866 {
867   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
868   envelope *env = UsrToEnv(msg);
869   env->setMsgtype(BocInitMsg);
870   env->setEpIdx(eIdx);
871   env->setSrcPe(CkMyPe());
872   _TRACE_CREATION_N(env, CkNumPes());
873   CkGroupID gid = _groupCreate(env);
874   _TRACE_CREATION_DONE(1);
875   return gid;
876 }
877
878 extern "C"
879 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
880 {
881   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
882   envelope *env = UsrToEnv(msg);
883   env->setMsgtype(NodeBocInitMsg);
884   env->setEpIdx(eIdx);
885   env->setSrcPe(CkMyPe());
886   _TRACE_CREATION_N(env, CkNumNodes());
887   CkGroupID gid = _nodeGroupCreate(env);
888   _TRACE_CREATION_DONE(1);
889   return gid;
890 }
891
892 static inline void *_allocNewChare(envelope *env, int &idx)
893 {
894   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
895   void *tmp=malloc(_chareTable[chareIdx]->size);
896   _MEMCHECK(tmp);
897 #ifndef CMK_CHARE_USE_PTR
898   CkpvAccess(chare_objs).push_back(tmp);
899   CkpvAccess(chare_types).push_back(chareIdx);
900   idx = CkpvAccess(chare_objs).size()-1;
901 #endif
902   setMemoryTypeChare(tmp);
903   return tmp;
904 }
905
906 // Method returns true if one or more group dependencies are unsatisfied
907 inline bool isGroupDepUnsatisfied(const CkCoreState *ck, const envelope *env) {
908   int groupDepNum = env->getGroupDepNum();
909   if(groupDepNum != 0) {
910     CkGroupID *groupDepPtr = (CkGroupID *)(env->getGroupDepPtr());
911     for(int i=0;i<groupDepNum;i++) {
912       CkGroupID depID = groupDepPtr[i];
913       if (!depID.isZero() && !_lookupGroupAndBufferIfNotThere(ck, env, depID)) {
914         return true;
915       }
916     }
917   }
918   return false;
919 }
920
921 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
922 {
923   if(isGroupDepUnsatisfied(ck, env))
924     return;
925   if(ck)
926     ck->process(); // ck->process() updates mProcessed count used in QD
927   int idx;
928   void *obj = _allocNewChare(env, idx);
929 #ifndef CMK_CHARE_USE_PTR
930   CkpvAccess(currentChareIdx) = idx;
931 #endif
932   _invokeEntry(env->getEpIdx(),env,obj);
933   if(ck)
934     _STATS_RECORD_PROCESS_CHARE_1();
935 }
936
937 void CkCreateLocalChare(int epIdx, envelope *env)
938 {
939   env->setEpIdx(epIdx);
940   _processNewChareMsg(NULL, env);
941 }
942
943 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
944 {
945   if(isGroupDepUnsatisfied(ck, env))
946     return;
947   ck->process(); // ck->process() updates mProcessed count used in QD
948   int idx;
949   void *obj = _allocNewChare(env, idx);
950   CkChareID *pCid = (CkChareID *)
951       _allocMsg(FillVidMsg, sizeof(CkChareID));
952   pCid->onPE = CkMyPe();
953 #ifndef CMK_CHARE_USE_PTR
954   pCid->objPtr = (void*)(CmiIntPtr)idx;
955 #else
956   pCid->objPtr = obj;
957 #endif
958   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
959   envelope *ret = UsrToEnv(pCid);
960   ret->setVidPtr(env->getVidPtr());
961   int srcPe = env->getByPe();
962   ret->setSrcPe(CkMyPe());
963   CmiSetHandler(ret, _charmHandlerIdx);
964   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
965 #ifndef CMK_CHARE_USE_PTR
966   // register the remote vidblock for deletion when chare is deleted
967   CkChareID vid;
968   vid.onPE = srcPe;
969   vid.objPtr = env->getVidPtr();
970   CkpvAccess(vmap)[idx] = vid;    
971 #endif
972   CpvAccess(_qd)->create();
973 #ifndef CMK_CHARE_USE_PTR
974   CkpvAccess(currentChareIdx) = idx;
975 #endif
976   _invokeEntry(env->getEpIdx(),env,obj);
977   _STATS_RECORD_PROCESS_CHARE_1();
978 }
979
980 /************** Receive: Chares *************/
981
982 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
983 {
984   if(isGroupDepUnsatisfied(ck, env))
985     return;
986   ck->process(); // ck->process() updates mProcessed count used in QD
987   int epIdx = env->getEpIdx();
988   int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
989   void *obj;
990   if (mainIdx != -1)  {           // mainchare
991     CmiAssert(CkMyPe()==0);
992     obj = _mainTable[mainIdx]->getObj();
993   }
994   else {
995 #ifndef CMK_CHARE_USE_PTR
996     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
997       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
998     else
999       obj = env->getObjPtr();
1000 #else
1001     obj = env->getObjPtr();
1002 #endif
1003   }
1004   _invokeEntry(epIdx,env,obj);
1005   _STATS_RECORD_PROCESS_MSG_1();
1006 }
1007
1008 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1009 {
1010   ck->process(); // ck->process() updates mProcessed count used in QD
1011 #ifndef CMK_CHARE_USE_PTR
1012   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1013 #else
1014   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1015   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1016 #endif
1017   CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1018   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1019   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1020   CmiFree(env);
1021 }
1022
1023 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1024 {
1025   ck->process(); // ck->process() updates mProcessed count used in QD
1026 #ifndef CMK_CHARE_USE_PTR
1027   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1028 #else
1029   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1030   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1031 #endif
1032   _SET_USED(env, 1);
1033   vptr->send(env);
1034 }
1035
1036 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1037 {
1038   ck->process(); // ck->process() updates mProcessed count used in QD
1039 #ifndef CMK_CHARE_USE_PTR
1040   VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1041   delete vptr;
1042   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1043 #endif
1044   CmiFree(env);
1045 }
1046
1047 /************** Receive: Groups ****************/
1048
1049 /**
1050  Return a pointer to the local BOC of "groupID".
1051  The message "env" passed in has some known dependency on this groupID
1052  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1053  Therefore, if the return value is NULL, this function buffers the message so that
1054  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1055  The message passed in must have its handlers correctly set so that it can be
1056  scheduled again.
1057 */
1058 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env, const CkGroupID &groupID)
1059 {
1060
1061         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1062         IrrGroup *obj = ck->localBranch(groupID);
1063         if (obj==NULL) { /* groupmember not yet created: stash message */
1064                 ck->getGroupTable()->find(groupID).enqMsg((envelope *)env);
1065         }
1066         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1067         return obj;
1068 }
1069
1070 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1071 {
1072   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1073 }
1074
1075 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1076 {
1077 #if CMK_LBDB_ON
1078   // if there is a running obj being measured, stop it temporarily
1079   LDObjHandle objHandle;
1080   int objstopped = 0;
1081   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1082   if (the_lbdb->RunningObject(&objHandle)) {
1083     objstopped = 1;
1084     the_lbdb->ObjectStop(objHandle);
1085   }
1086 #endif
1087   _invokeEntry(epIdx,env,obj);
1088 #if CMK_LBDB_ON
1089   if (objstopped) the_lbdb->ObjectStart(objHandle);
1090 #endif
1091   _STATS_RECORD_PROCESS_BRANCH_1();
1092 }
1093
1094 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1095 {
1096   if(isGroupDepUnsatisfied(ck, env))
1097     return;
1098   CkGroupID groupID =  env->getGroupNum();
1099   IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1100   if(obj) {
1101     ck->process(); // ck->process() updates mProcessed count used in QD
1102     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1103   }
1104 }
1105
1106 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1107 {
1108   env->setEpIdx(epIdx);
1109   _invokeEntry(epIdx,env,obj);
1110   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1111 }
1112
1113 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1114 {
1115   if(isGroupDepUnsatisfied(ck, env))
1116     return;
1117   CkGroupID groupID = env->getGroupNum();
1118   void *obj;
1119
1120   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1121   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1122   if(!obj) { // groupmember not yet created
1123 #if CMK_IMMEDIATE_MSG
1124     if (CmiIsImmediate(env)) {
1125       //CmiDelayImmediate();        /* buffer immediate message */
1126       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1127     }
1128 #endif
1129     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1130     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1131     return;
1132   }
1133   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1134   ck->process(); // ck->process() updates mProcessed count used in QD
1135   _invokeEntry(env->getEpIdx(),env,obj);
1136   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1137 }
1138
1139 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1140 {
1141   if(isGroupDepUnsatisfied(ck, env))
1142     return;
1143   CkGroupID groupID = env->getGroupNum();
1144   int epIdx = env->getEpIdx();
1145   ck->process(); // ck->process() updates mProcessed count used in QD
1146   CkCreateLocalGroup(groupID, epIdx, env);
1147 }
1148
1149 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1150 {
1151   if(isGroupDepUnsatisfied(ck, env))
1152     return;
1153   ck->process(); // ck->process() updates mProcessed count used in QD
1154   CkGroupID groupID = env->getGroupNum();
1155   int epIdx = env->getEpIdx();
1156   CkCreateLocalNodeGroup(groupID, epIdx, env);
1157 }
1158
1159 /************** Receive: Arrays *************/
1160 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1161   ArrayObjMap& object_map = CkpvAccess(array_objs);
1162   auto iter = object_map.find(env->getRecipientID());
1163   if (iter != object_map.end()) {
1164     // First see if we already have a direct pointer to the object
1165     _SET_USED(env, 0);
1166     ck->process(); // ck->process() updates mProcessed count used in QD
1167     int opts = 0;
1168     CkArrayMessage* msg = (CkArrayMessage*)EnvToUsr(env);
1169     if (msg->array_hops()>1) {
1170       CProxy_ArrayBase(env->getArrayMgr()).ckLocMgr()->multiHop(msg);
1171     }
1172     iter->second->ckInvokeEntry(env->getEpIdx(), msg, !(opts & CK_MSG_KEEP));
1173   } else {
1174     // Otherwise fallback to delivery through the array manager
1175     CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1176     if (mgr) {
1177       _SET_USED(env, 0);
1178       ck->process(); // ck->process() updates mProcessed count used in QD
1179       mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
1180     }
1181   }
1182 }
1183
1184 //BIGSIM_OOC DEBUGGING
1185 #define TELLMSGTYPE(x) //x
1186
1187 /**
1188  * This is the main converse-level handler used by all of Charm++.
1189  *
1190  * \addtogroup CriticalPathFramework
1191  */
1192 void _processHandler(void *converseMsg,CkCoreState *ck)
1193 {
1194   envelope *env = (envelope *) converseMsg;
1195
1196   MESSAGE_PHASE_CHECK(env);
1197
1198 #if CMK_ONESIDED_IMPL
1199   if(CMI_ZC_MSGTYPE(env) == CMK_ZC_P2P_SEND_MSG || CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG){
1200     envelope *prevEnv = env;
1201
1202     // Determine mode depending on the message
1203     ncpyEmApiMode mode = (CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG) ? ncpyEmApiMode::BCAST : ncpyEmApiMode::P2P;
1204
1205     env = CkRdmaIssueRgets(env, mode, prevEnv);
1206
1207     if(env) {
1208       // memcpyGet or cmaGet completed, env contains the payload and will be enqueued
1209
1210       // Free prevEnv
1211       CkFreeMsg(EnvToUsr(prevEnv));
1212     } else {
1213       // async rdma call in place, asynchronous return and ack handling
1214       return;
1215     }
1216   }
1217 #endif
1218
1219 //#if CMK_RECORD_REPLAY
1220   if (ck->watcher!=NULL) {
1221     if (!ck->watcher->processMessage(&env,ck)) return;
1222   }
1223 //#endif
1224 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1225         Chare *obj=NULL;
1226         CkObjID sender;
1227         MCount SN;
1228         MlogEntry *entry=NULL;
1229         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1230            || env->getMsgtype() == ForArrayEltMsg
1231            || env->getMsgtype() == ForChareMsg) {
1232                 sender = env->sender;
1233                 SN = env->SN;
1234                 int result = preProcessReceivedMessage(env,&obj,&entry);
1235                 if(result == 0){
1236                         return;
1237                 }
1238         }
1239 #endif
1240 #if USE_CRITICAL_PATH_HEADER_ARRAY
1241   CK_CRITICALPATH_START(env)
1242 #endif
1243
1244   switch(env->getMsgtype()) {
1245 // Group support
1246     case BocInitMsg : // Group creation message
1247       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1248       // QD processing moved inside _processBocInitMsg because it is conditional
1249       //ck->process(); 
1250       if(env->isPacked()) CkUnpackMessage(&env);
1251       _processBocInitMsg(ck,env);
1252       break;
1253     case NodeBocInitMsg : // Nodegroup creation message
1254       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1255       if(env->isPacked()) CkUnpackMessage(&env);
1256       _processNodeBocInitMsg(ck,env);
1257       break;
1258     case ForBocMsg : // Group entry method message (non creation)
1259       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1260       // QD processing moved inside _processForBocMsg because it is conditional
1261       if(env->isPacked()) CkUnpackMessage(&env);
1262       _processForBocMsg(ck,env);
1263       // stats record moved inside _processForBocMsg because it is conditional
1264       break;
1265     case ForNodeBocMsg : // Nodegroup entry method message (non creation)
1266       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1267       // QD processing moved to _processForNodeBocMsg because it is conditional
1268       if(env->isPacked()) CkUnpackMessage(&env);
1269       _processForNodeBocMsg(ck,env);
1270       // stats record moved to _processForNodeBocMsg because it is conditional
1271       break;
1272
1273 // Array support
1274     case ForArrayEltMsg: // Array element entry method message
1275       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1276       if(env->isPacked()) CkUnpackMessage(&env);
1277       _processArrayEltMsg(ck,env);
1278       break;
1279
1280 // Chare support
1281     case NewChareMsg : // Singleton chare creation message
1282       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1283       if(env->isPacked()) CkUnpackMessage(&env);
1284       _processNewChareMsg(ck,env);
1285       break;
1286     case NewVChareMsg : // Singleton virtual chare creation message
1287       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1288       if(env->isPacked()) CkUnpackMessage(&env);
1289       _processNewVChareMsg(ck,env);
1290       break;
1291     case ForChareMsg : // Singeton chare entry method message (non creation)
1292       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1293       if(env->isPacked()) CkUnpackMessage(&env);
1294       _processForPlainChareMsg(ck,env);
1295       break;
1296     case ForVidMsg   : // Singleton virtual chare entry method message (non creation)
1297       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1298       _processForVidMsg(ck,env);
1299       break;
1300     case FillVidMsg  : // Message sent back from the real chare PE to the virtual chare PE to
1301                        // fill the VidBlock (called when the real chare is constructed)
1302       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1303       _processFillVidMsg(ck,env);
1304       break;
1305     case DeleteVidMsg  : // Message sent back from the real chare PE to the virtual chare PE to
1306                          // delete the Vidblock (called when the real chare is deleted by the destructor)
1307       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1308       _processDeleteVidMsg(ck,env);
1309       break;
1310
1311     default:
1312       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1313   }
1314 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1315         if(obj != NULL){
1316                 postProcessReceivedMessage(obj,sender,SN,entry);
1317         }
1318 #endif
1319
1320
1321 #if USE_CRITICAL_PATH_HEADER_ARRAY
1322   CK_CRITICALPATH_END()
1323 #endif
1324
1325 }
1326
1327
1328 /******************** Message Send **********************/
1329
1330 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1331              int *queueing, int *priobits, unsigned int **prioptr)
1332 {
1333   envelope *env = (envelope *)converseMsg;
1334   *pfn = (CldPackFn)CkPackMessage;
1335   *len = env->getTotalsize();
1336   *queueing = env->getQueueing();
1337   *priobits = env->getPriobits();
1338   *prioptr = (unsigned int *) env->getPrioPtr();
1339 }
1340
1341 void CkPackMessage(envelope **pEnv)
1342 {
1343   envelope *env = *pEnv;
1344   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1345     void *msg = EnvToUsr(env);
1346     _TRACE_BEGIN_PACK();
1347     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1348     _TRACE_END_PACK();
1349     env=UsrToEnv(msg);
1350     env->setPacked(1);
1351     *pEnv = env;
1352   }
1353 }
1354
1355 void CkUnpackMessage(envelope **pEnv)
1356 {
1357   envelope *env = *pEnv;
1358   int msgIdx = env->getMsgIdx();
1359   if(env->isPacked()) {
1360     void *msg = EnvToUsr(env);
1361     _TRACE_BEGIN_UNPACK();
1362     msg = _msgTable[msgIdx]->unpack(msg);
1363     _TRACE_END_UNPACK();
1364     env=UsrToEnv(msg);
1365     env->setPacked(0);
1366     *pEnv = env;
1367   }
1368 }
1369
1370 //There's no reason for most messages to go through the Cld--
1371 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1372 // Thus these accellerated versions of the Cld calls.
1373 #if CMK_OBJECT_QUEUE_AVAILABLE
1374 static int index_objectQHandler;
1375 #endif
1376 int index_tokenHandler;
1377 int index_skipCldHandler;
1378
1379 void _skipCldHandler(void *converseMsg)
1380 {
1381   envelope *env = (envelope *)(converseMsg);
1382   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1383 #if CMK_GRID_QUEUE_AVAILABLE
1384   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1385     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1386                        env, env->getQueueing (), env->getPriobits (),
1387                        (unsigned int *) env->getPrioPtr ());
1388   } else {
1389     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1390                        env, env->getQueueing (), env->getPriobits (),
1391                        (unsigned int *) env->getPrioPtr ());
1392   }
1393 #else
1394   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1395         env, env->getQueueing(),env->getPriobits(),
1396         (unsigned int *)env->getPrioPtr());
1397 #endif
1398 }
1399
1400
1401 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1402 // Made non-static to be used by ckmessagelogging
1403 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1404 {
1405 #if CMK_CHARMDEBUG
1406   if (!ConverseDeliver(pe)) {
1407     CmiFree(env);
1408     return;
1409   }
1410 #endif
1411
1412 #if CMK_ONESIDED_IMPL
1413   // Store source information to handle acknowledgements on completion
1414   if(CMI_IS_ZC_BCAST(env))
1415     CkRdmaPrepareBcastMsg(env);
1416 #endif
1417
1418 #if CMK_FAULT_EVAC
1419   if(pe == CkMyPe() ){
1420     if(!CmiNodeAlive(CkMyPe())){
1421         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1422 //      return;
1423     }
1424   }
1425 #endif
1426   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1427 #if CMK_OBJECT_QUEUE_AVAILABLE
1428     Chare *obj = CkFindObjectPtr(env);
1429     if (obj && obj->CkGetObjQueue().queue()) {
1430       _enqObjQueue(obj, env);
1431     }
1432     else
1433 #endif
1434     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1435         env, env->getQueueing(),env->getPriobits(),
1436         (unsigned int *)env->getPrioPtr());
1437 #if CMK_PERSISTENT_COMM
1438     CmiPersistentOneSend();
1439 #endif
1440   } else {
1441     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1442       CkPackMessage(&env);
1443     int len=env->getTotalsize();
1444     CmiSetXHandler(env,CmiGetHandler(env));
1445 #if CMK_OBJECT_QUEUE_AVAILABLE
1446     CmiSetHandler(env,index_objectQHandler);
1447 #else
1448     CmiSetHandler(env,index_skipCldHandler);
1449 #endif
1450     CmiSetInfo(env,infoFn);
1451     if (pe==CLD_BROADCAST) {
1452 #if CMK_MESSAGE_LOGGING
1453         if(env->flags & CK_FREE_MSG_MLOG)
1454                 CmiSyncBroadcastAndFree(len, (char *)env); 
1455         else
1456                 CmiSyncBroadcast(len, (char *)env);
1457 #else
1458                         CmiSyncBroadcastAndFree(len, (char *)env); 
1459 #endif
1460
1461 }
1462     else if (pe==CLD_BROADCAST_ALL) { 
1463 #if CMK_MESSAGE_LOGGING
1464         if(env->flags & CK_FREE_MSG_MLOG)
1465                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1466         else
1467                 CmiSyncBroadcastAll(len, (char *)env);
1468 #else
1469                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1470 #endif
1471
1472 }
1473     else{
1474 #if CMK_MESSAGE_LOGGING
1475         if(env->flags & CK_FREE_MSG_MLOG)
1476                 CmiSyncSendAndFree(pe, len, (char *)env);
1477         else
1478                 CmiSyncSend(pe, len, (char *)env);
1479 #else
1480                         CmiSyncSendAndFree(pe, len, (char *)env);
1481 #endif
1482
1483                 }
1484   }
1485 }
1486
1487 #if CMK_BIGSIM_CHARM
1488 #   define  _skipCldEnqueue   _CldEnqueue
1489 #endif
1490
1491 // by pass Charm++ priority queue, send as Converse message
1492 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1493 {
1494 #if CMK_CHARMDEBUG
1495   if (!ConverseDeliver(-1)) {
1496     CmiFree(env);
1497     return;
1498   }
1499 #endif
1500   CkPackMessage(&env);
1501   int len=env->getTotalsize();
1502   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1503 }
1504
1505 static void _noCldEnqueue(int pe, envelope *env)
1506 {
1507 /*
1508   if (pe == CkMyPe()) {
1509     CmiHandleMessage(env);
1510   } else
1511 */
1512 #if CMK_CHARMDEBUG
1513   if (!ConverseDeliver(pe)) {
1514     CmiFree(env);
1515     return;
1516   }
1517 #endif
1518
1519 #if CMK_ONESIDED_IMPL
1520   // Store source information to handle acknowledgements on completion
1521   if(CMI_IS_ZC_BCAST(env))
1522     CkRdmaPrepareBcastMsg(env);
1523 #endif
1524
1525   CkPackMessage(&env);
1526   int len=env->getTotalsize();
1527   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1528   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1529   else CmiSyncSendAndFree(pe, len, (char *)env);
1530 }
1531
1532 //static void _noCldNodeEnqueue(int node, envelope *env)
1533 //Made non-static to be used by ckmessagelogging
1534 void _noCldNodeEnqueue(int node, envelope *env)
1535 {
1536 /*
1537   if (node == CkMyNode()) {
1538     CmiHandleMessage(env);
1539   } else {
1540 */
1541 #if CMK_CHARMDEBUG
1542   if (!ConverseDeliver(node)) {
1543     CmiFree(env);
1544     return;
1545   }
1546 #endif
1547
1548 #if CMK_ONESIDED_IMPL
1549   // Store source information to handle acknowledgements on completion
1550   if(CMI_IS_ZC_BCAST(env))
1551     CkRdmaPrepareBcastMsg(env);
1552 #endif
1553
1554   CkPackMessage(&env);
1555   int len=env->getTotalsize();
1556   if (node==CLD_BROADCAST) { 
1557 #if CMK_MESSAGE_LOGGING
1558         if(env->flags & CK_FREE_MSG_MLOG)
1559                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1560         else
1561                 CmiSyncNodeBroadcast(len, (char *)env);
1562 #else
1563         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1564 #endif
1565 }
1566   else if (node==CLD_BROADCAST_ALL) { 
1567 #if CMK_MESSAGE_LOGGING
1568         if(env->flags & CK_FREE_MSG_MLOG)
1569                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1570         else
1571                 CmiSyncNodeBroadcastAll(len, (char *)env);
1572 #else
1573                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1574 #endif
1575
1576 }
1577   else {
1578 #if CMK_MESSAGE_LOGGING
1579         if(env->flags & CK_FREE_MSG_MLOG)
1580                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1581         else
1582                 CmiSyncNodeSend(node, len, (char *)env);
1583 #else
1584         CmiSyncNodeSendAndFree(node, len, (char *)env);
1585 #endif
1586   }
1587 }
1588
1589 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1590 {
1591   envelope *env = UsrToEnv(msg);
1592   _CHECK_USED(env);
1593   _SET_USED(env, 1);
1594 #if CMK_REPLAYSYSTEM
1595   setEventID(env);
1596 #endif
1597   env->setMsgtype(ForChareMsg);
1598   env->setEpIdx(eIdx);
1599   env->setSrcPe(CkMyPe());
1600
1601 #if USE_CRITICAL_PATH_HEADER_ARRAY
1602   CK_CRITICALPATH_SEND(env)
1603   //CK_AUTOMATE_PRIORITY(env)
1604 #endif
1605 #if CMK_CHARMDEBUG
1606   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1607 #endif
1608 #if CMK_OBJECT_QUEUE_AVAILABLE
1609   CmiSetHandler(env, index_objectQHandler);
1610 #else
1611   CmiSetHandler(env, _charmHandlerIdx);
1612 #endif
1613   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1614     int pe = -(pCid->onPE+1);
1615     if(pe==CkMyPe()) {
1616 #ifndef CMK_CHARE_USE_PTR
1617       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1618 #else
1619       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1620 #endif
1621       void *objPtr;
1622       if (NULL!=(objPtr=vblk->getLocalChare()))
1623       { //A ready local chare
1624         env->setObjPtr(objPtr);
1625         return pe;
1626       }
1627       else { //The vidblock is not ready-- forget it
1628         vblk->send(env);
1629         return -1;
1630       }
1631     } else { //Valid vidblock for another PE:
1632       env->setMsgtype(ForVidMsg);
1633       env->setVidPtr(pCid->objPtr);
1634       return pe;
1635     }
1636   }
1637   else {
1638     env->setObjPtr(pCid->objPtr);
1639     return pCid->onPE;
1640   }
1641 }
1642
1643 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1644 {
1645   int destPE = _prepareMsg(eIdx, msg, pCid);
1646   if (destPE != -1) {
1647     envelope *env = UsrToEnv(msg);
1648     //criticalPath_send(env);
1649 #if USE_CRITICAL_PATH_HEADER_ARRAY
1650     CK_CRITICALPATH_SEND(env)
1651     //CK_AUTOMATE_PRIORITY(env)
1652 #endif
1653     CmiBecomeImmediate(env);
1654   }
1655   return destPE;
1656 }
1657
1658 extern "C"
1659 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1660 {
1661   if (opts & CK_MSG_INLINE) {
1662     CkSendMsgInline(entryIdx, msg, pCid, opts);
1663     return;
1664   }
1665   envelope *env = UsrToEnv(msg);
1666 #if CMK_ERROR_CHECKING
1667   //Allow rdma metadata messages marked as immediate to go through
1668   if (opts & CK_MSG_IMMEDIATE)
1669 #if CMK_ONESIDED_IMPL
1670     if (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)
1671 #endif
1672       CmiAbort("Immediate message is not allowed in Chare!");
1673 #endif
1674   int destPE=_prepareMsg(entryIdx,msg,pCid);
1675   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1676   // VidBlock was not yet filled). The problem is that the creation was never
1677   // traced later when the VidBlock was filled. One solution is to trace the
1678   // creation here, the other to trace it in VidBlock->msgDeliver().
1679 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1680   if (destPE!=-1) {
1681     CpvAccess(_qd)->create();
1682   }
1683         sendChareMsg(env,destPE,_infoIdx,pCid);
1684 #else
1685   _TRACE_CREATION_1(env);
1686   if (destPE!=-1) {
1687     CpvAccess(_qd)->create();
1688     if (opts & CK_MSG_SKIP_OR_IMM)
1689       _noCldEnqueue(destPE, env);
1690     else
1691       _CldEnqueue(destPE, env, _infoIdx);
1692   }
1693   _TRACE_CREATION_DONE(1);
1694 #endif
1695 }
1696
1697 extern "C"
1698 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1699 {
1700   if (pCid->onPE==CkMyPe())
1701   {
1702 #if CMK_FAULT_EVAC
1703     if(!CmiNodeAlive(CkMyPe())){
1704         return;
1705     }
1706 #endif
1707 #if CMK_CHARMDEBUG
1708     //Just in case we need to breakpoint or use the envelope in some way
1709     _prepareMsg(entryIndex,msg,pCid);
1710 #endif
1711                 //Just directly call the chare (skip QD handling & scheduler)
1712     envelope *env = UsrToEnv(msg);
1713     if (env->isPacked()) CkUnpackMessage(&env);
1714     _STATS_RECORD_PROCESS_MSG_1();
1715     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1716   }
1717   else {
1718     //No way to inline a cross-processor message:
1719     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1720   }
1721 }
1722
1723 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1724 {
1725   envelope *env = UsrToEnv(msg);
1726   /*#if CMK_ERROR_CHECKING
1727   CkNodeGroupID nodeRedMgr;
1728 #endif
1729   */
1730   _CHECK_USED(env);
1731   _SET_USED(env, 1);
1732 #if CMK_REPLAYSYSTEM
1733   setEventID(env);
1734 #endif
1735   env->setMsgtype(type);
1736   env->setEpIdx(eIdx);
1737   env->setGroupNum(gID);
1738   env->setSrcPe(CkMyPe());
1739   /*
1740 #if CMK_ERROR_CHECKING
1741   nodeRedMgr.setZero();
1742   env->setRednMgr(nodeRedMgr);
1743 #endif
1744 */
1745   //criticalPath_send(env);
1746 #if USE_CRITICAL_PATH_HEADER_ARRAY
1747   CK_CRITICALPATH_SEND(env)
1748   //CK_AUTOMATE_PRIORITY(env)
1749 #endif
1750 #if CMK_CHARMDEBUG
1751   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1752 #endif
1753   CmiSetHandler(env, _charmHandlerIdx);
1754   return env;
1755 }
1756
1757 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1758 {
1759   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1760 #if USE_CRITICAL_PATH_HEADER_ARRAY
1761   CK_CRITICALPATH_SEND(env)
1762   //CK_AUTOMATE_PRIORITY(env)
1763 #endif
1764   CmiBecomeImmediate(env);
1765   return env;
1766 }
1767
1768 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1769                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1770 {
1771   int numPes;
1772   envelope *env;
1773     if (opts & CK_MSG_IMMEDIATE) {
1774         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1775     }else
1776     {
1777         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1778     }
1779
1780 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1781         sendGroupMsg(env,pe,_infoIdx);
1782 #else
1783   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1784   _TRACE_CREATION_N(env, numPes);
1785   if (opts & CK_MSG_SKIP_OR_IMM)
1786     _noCldEnqueue(pe, env);
1787   else
1788     _skipCldEnqueue(pe, env, _infoIdx);
1789   _TRACE_CREATION_DONE(1);
1790 #endif
1791 }
1792
1793 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1794                            int npes, int *pes)
1795 {
1796   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1797   _TRACE_CREATION_MULTICAST(env, npes, pes);
1798   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1799   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1800 }
1801
1802 extern "C"
1803 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1804 {
1805 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1806   if (destPE==CkMyPe())
1807   {
1808     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1809     return;
1810   }
1811   //Can't inline-- send the usual way
1812   envelope *env = UsrToEnv(msg);
1813   int numPes;
1814   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1815   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1816   _TRACE_CREATION_N(env, numPes);
1817   _noCldEnqueue(destPE, env);
1818   _STATS_RECORD_SEND_BRANCH_1();
1819   CkpvAccess(_coreState)->create();
1820   _TRACE_CREATION_DONE(1);
1821 #else
1822   // no support for immediate message, send inline
1823   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1824 #endif
1825 }
1826
1827 extern "C"
1828 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1829 {
1830   if (destPE==CkMyPe())
1831   {
1832 #if CMK_FAULT_EVAC
1833     if(!CmiNodeAlive(CkMyPe())){
1834         return;
1835     }
1836 #endif
1837     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1838     if (obj!=NULL)
1839     { //Just directly call the group:
1840 #if CMK_ERROR_CHECKING
1841       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1842 #else
1843       envelope *env=UsrToEnv(msg);
1844 #endif
1845       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1846       return;
1847     }
1848   }
1849   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1850   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1851 }
1852
1853 extern "C"
1854 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1855 {
1856   if (opts & CK_MSG_INLINE) {
1857     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1858     return;
1859   }
1860   envelope *env=UsrToEnv(msg);
1861   //Allow rdma metadata messages marked as immediate to go through
1862   if (opts & CK_MSG_IMMEDIATE) {
1863 #if CMK_ONESIDED_IMPL
1864     if (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)
1865 #endif
1866     {
1867       CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1868       return;
1869     }
1870   }
1871   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1872   _STATS_RECORD_SEND_BRANCH_1();
1873   CkpvAccess(_coreState)->create();
1874 }
1875
1876 extern "C"
1877 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1878 {
1879 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1880   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1881   _TRACE_CREATION_MULTICAST(env, npes, pes);
1882   _noCldEnqueueMulti(npes, pes, env);
1883   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1884 #else
1885   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1886   CpvAccess(_qd)->create(-npes);
1887 #endif
1888   _STATS_RECORD_SEND_BRANCH_N(npes);
1889   CpvAccess(_qd)->create(npes);
1890 }
1891
1892 extern "C"
1893 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1894 {
1895   if (opts & CK_MSG_IMMEDIATE) {
1896     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1897     return;
1898   }
1899     // normal mesg
1900   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1901   _STATS_RECORD_SEND_BRANCH_N(npes);
1902   CpvAccess(_qd)->create(npes);
1903 }
1904
1905 extern "C"
1906 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1907 {
1908   int npes;
1909   int *pes;
1910   if (opts & CK_MSG_IMMEDIATE) {
1911     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1912     return;
1913   }
1914     // normal mesg
1915   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1916   CmiLookupGroup(grp, &npes, &pes);
1917   _TRACE_CREATION_MULTICAST(env, npes, pes);
1918   _CldEnqueueGroup(grp, env, _infoIdx);
1919   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1920   _STATS_RECORD_SEND_BRANCH_N(npes);
1921   CpvAccess(_qd)->create(npes);
1922 }
1923
1924 extern "C"
1925 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1926 {
1927   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1928   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1929   CpvAccess(_qd)->create(CkNumPes());
1930 }
1931
1932 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1933                 int node=CLD_BROADCAST_ALL, int opts=0)
1934 {
1935     int numPes;
1936     envelope *env;
1937     if (opts & CK_MSG_IMMEDIATE) {
1938         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1939     }else
1940     {
1941         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1942     }
1943 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1944     sendNodeGroupMsg(env,node,_infoIdx);
1945 #else
1946   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1947   _TRACE_CREATION_N(env, numPes);
1948   if (opts & CK_MSG_SKIP_OR_IMM) {
1949     _noCldNodeEnqueue(node, env);
1950   }
1951   else
1952     _CldNodeEnqueue(node, env, _infoIdx);
1953   _TRACE_CREATION_DONE(1);
1954 #endif
1955 }
1956
1957 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1958                            int npes, int *nodes)
1959 {
1960   envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1961   _TRACE_CREATION_N(env, npes);
1962   for (int i=0; i<npes; i++) {
1963     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1964   }
1965   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1966 }
1967
1968 extern "C"
1969 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1970 {
1971 #if CMK_IMMEDIATE_MSG
1972   if (node==CkMyNode())
1973   {
1974     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1975     return;
1976   }
1977   //Can't inline-- send the usual way
1978   envelope *env = UsrToEnv(msg);
1979   int numPes;
1980   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1981   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1982   _TRACE_CREATION_N(env, numPes);
1983   _noCldNodeEnqueue(node, env);
1984   _STATS_RECORD_SEND_BRANCH_1();
1985   CkpvAccess(_coreState)->create();
1986   _TRACE_CREATION_DONE(1);
1987 #else
1988   // no support for immediate message, send inline
1989   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1990 #endif
1991 }
1992
1993 extern "C"
1994 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1995 {
1996   if (node==CkMyNode()) {
1997 #if CMK_ONESIDED_IMPL
1998     if (CMI_ZC_MSGTYPE(msg) == CMK_REG_NO_ZC_MSG)
1999 #endif
2000     {
2001       CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
2002       void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2003       CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
2004       if (obj!=NULL)
2005       { //Just directly call the group:
2006 #if CMK_ERROR_CHECKING
2007         envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
2008 #else
2009         envelope *env=UsrToEnv(msg);
2010 #endif
2011         _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
2012         return;
2013       }
2014     }
2015   }
2016   //Can't inline-- send the usual way
2017   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
2018 }
2019
2020 extern "C"
2021 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
2022 {
2023   if (opts & CK_MSG_INLINE) {
2024     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
2025     return;
2026   }
2027   if (opts & CK_MSG_IMMEDIATE) {
2028     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
2029     return;
2030   }
2031   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
2032   _STATS_RECORD_SEND_NODE_BRANCH_1();
2033   CkpvAccess(_coreState)->create();
2034 }
2035
2036 extern "C"
2037 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
2038 {
2039 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
2040   envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
2041   _noCldEnqueueMulti(npes, nodes, env);
2042 #else
2043   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2044   CpvAccess(_qd)->create(-npes);
2045 #endif
2046   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2047   CpvAccess(_qd)->create(npes);
2048 }
2049
2050 extern "C"
2051 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2052 {
2053   if (opts & CK_MSG_IMMEDIATE) {
2054     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2055     return;
2056   }
2057     // normal mesg
2058   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2059   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2060   CpvAccess(_qd)->create(npes);
2061 }
2062
2063 extern "C"
2064 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2065 {
2066   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2067   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2068   CpvAccess(_qd)->create(CkNumNodes());
2069 }
2070
2071 //Needed by delegation manager:
2072 extern "C"
2073 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2074 { return _prepareMsg(eIdx,msg,pCid); }
2075 extern "C"
2076 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2077 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2078 extern "C"
2079 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2080 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2081
2082 void _ckModuleInit(void) {
2083         CmiAssignOnce(&index_skipCldHandler, CkRegisterHandler(_skipCldHandler));
2084 #if CMK_OBJECT_QUEUE_AVAILABLE
2085         CmiAssignOnce(&index_objectQHandler, CkRegisterHandler(_ObjectQHandler));
2086 #endif
2087         CmiAssignOnce(&index_tokenHandler, CkRegisterHandler(_TokenHandler));
2088         CkpvInitialize(TokenPool*, _tokenPool);
2089         CkpvAccess(_tokenPool) = new TokenPool;
2090 }
2091
2092
2093 /************** Send: Arrays *************/
2094
2095 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2096 {
2097   _CHECK_USED(env);
2098   _SET_USED(env, 1);
2099   env->setMsgtype(type);
2100 #if CMK_CHARMDEBUG
2101   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2102 #endif
2103   CmiSetHandler(env, _charmHandlerIdx);
2104   CpvAccess(_qd)->create();
2105 }
2106
2107 extern "C"
2108 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2109   envelope *env = UsrToEnv(msg);
2110   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2111 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2112         sendArrayMsg(env,pe,_infoIdx);
2113 #else
2114   if (opts & CK_MSG_IMMEDIATE)
2115     CmiBecomeImmediate(env);
2116   if (opts & CK_MSG_SKIP_OR_IMM)
2117     _noCldEnqueue(pe, env);
2118   else
2119     _skipCldEnqueue(pe, env, _infoIdx);
2120 #endif
2121 }
2122
2123 class ElementDestroyer : public CkLocIterator {
2124 private:
2125         CkLocMgr *locMgr;
2126 public:
2127         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2128         void addLocation(CkLocation &loc) {
2129           loc.destroyAll();
2130         }
2131 };
2132
2133 void CkDeleteChares() {
2134   int i;
2135   int numGroups = CkpvAccess(_groupIDTable)->size();
2136
2137   // delete all plain chares
2138 #ifndef CMK_CHARE_USE_PTR
2139   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2140         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2141         delete obj;
2142         CkpvAccess(chare_objs)[i] = NULL;
2143   }
2144   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2145         VidBlock *obj = CkpvAccess(vidblocks)[i];
2146         delete obj;
2147         CkpvAccess(vidblocks)[i] = NULL;
2148   }
2149 #endif
2150
2151   // delete all array elements
2152   for(i=0;i<numGroups;i++) {
2153     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2154     if(obj && obj->isLocMgr())  {
2155       CkLocMgr *mgr = (CkLocMgr*)obj;
2156       ElementDestroyer destroyer(mgr);
2157       mgr->iterate(destroyer);
2158     }
2159   }
2160
2161   // delete all groups
2162   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2163   for(i=0;i<numGroups;i++) {
2164     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2165     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2166     if (obj) delete obj;
2167   }
2168   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2169
2170   // delete all node groups
2171   if (CkMyRank() == 0) {
2172     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2173     for(i=0;i<numNodeGroups;i++) {
2174       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2175       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2176       if (obj) delete obj;
2177     }
2178   }
2179 }
2180
2181 #if CMK_BIGSIM_CHARM
2182 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2183                                    int pb,unsigned int *prio);
2184 #endif
2185
2186 //------------------- External client support (e.g. Charm4py) ----------------
2187
2188 static std::vector< std::vector<char> > ext_args;
2189 static std::vector<char*> ext_argv;
2190
2191 // This is just a wrapper for ConverseInit that copies command-line args into a private
2192 // buffer.
2193 // To be called from external clients like charm4py. This wrapper avoids issues with
2194 // ctypes and cffi.
2195 extern "C" void StartCharmExt(int argc, char **argv) {
2196 #if !defined(_WIN32) && !NODE_0_IS_CONVHOST
2197   // only do this in net layers if using charmrun, so that output of process 0
2198   // doesn't get duplicated
2199   char *ns = getenv("NETSTART");
2200   if (ns != 0) {
2201     int fd;
2202     if (-1 != (fd = open("/dev/null", O_RDWR))) {
2203       dup2(fd, 0);
2204       dup2(fd, 1);
2205       dup2(fd, 2);
2206     }
2207   }
2208 #endif
2209   ext_args.resize(argc);
2210   ext_argv.resize(argc + 1, NULL);
2211   for (int i=0; i < argc; i++) {
2212     ext_args[i].resize(strlen(argv[i]) + 1);
2213     strcpy(ext_args[i].data(), argv[i]);
2214     ext_argv[i] = ext_args[i].data();
2215   }
2216   ConverseInit(argc, ext_argv.data(), _initCharm, 0, 0);
2217 }
2218
2219 void (*CkRegisterMainModuleCallback)() = NULL;
2220 extern "C" void registerCkRegisterMainModuleCallback(void (*cb)()) {
2221   CkRegisterMainModuleCallback = cb;
2222 }
2223
2224 void (*MainchareCtorExtCallback)(int, void*, int, int, char **) = NULL;
2225 extern "C" void registerMainchareCtorExtCallback(void (*cb)(int, void*, int, int, char **)) {
2226   MainchareCtorExtCallback = cb;
2227 }
2228
2229 void (*ReadOnlyRecvExtCallback)(int, char*) = NULL;
2230 extern "C" void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)) {
2231   ReadOnlyRecvExtCallback = cb;
2232 }
2233
2234 void* ReadOnlyExt::ro_data = NULL;
2235 size_t ReadOnlyExt::data_size = 0;
2236
2237 void (*ChareMsgRecvExtCallback)(int, void*, int, int, char *, int) = NULL;
2238 extern "C" void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char *, int)) {
2239   ChareMsgRecvExtCallback = cb;
2240 }
2241
2242 void (*GroupMsgRecvExtCallback)(int, int, int, char *, int) = NULL;
2243 extern "C" void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)) {
2244   GroupMsgRecvExtCallback = cb;
2245 }
2246
2247 void (*ArrayMsgRecvExtCallback)(int, int, int *, int, int, char *, int) = NULL;
2248 extern "C" void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)) {
2249   ArrayMsgRecvExtCallback = cb;
2250 }
2251
2252 int (*ArrayElemLeaveExt)(int, int, int *, char**, int) = NULL;
2253 extern "C" void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)) {
2254   ArrayElemLeaveExt = cb;
2255 }
2256
2257 void (*ArrayElemJoinExt)(int, int, int *, int, char*, int) = NULL;
2258 extern "C" void registerArrayElemJoinExtCallback(void (*cb)(int, int, int *, int, char*, int)) {
2259   ArrayElemJoinExt = cb;
2260 }
2261
2262 void (*ArrayResumeFromSyncExtCallback)(int, int, int *) = NULL;
2263 extern "C" void registerArrayResumeFromSyncExtCallback(void (*cb)(int, int, int *)) {
2264   ArrayResumeFromSyncExtCallback = cb;
2265 }
2266
2267 void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*) = NULL;
2268 extern "C" void registerCreateReductionTargetMsgExtCallback(void (*cb)(void*, int, int, int, char**, int*)) {
2269   CreateReductionTargetMsgExt = cb;
2270 }
2271
2272 int (*PyReductionExt)(char**, int*, int, char**) = NULL;
2273 extern "C" void registerPyReductionExtCallback(int (*cb)(char**, int*, int, char**)) {
2274     PyReductionExt = cb;
2275 }
2276
2277 int (*ArrayMapProcNumExtCallback)(int, int, const int *) = NULL;
2278 extern "C" void registerArrayMapProcNumExtCallback(int (*cb)(int, int, const int *)) {
2279   ArrayMapProcNumExtCallback = cb;
2280 }
2281
2282 extern "C" int CkMyPeHook() { return CkMyPe(); }
2283 extern "C" int CkNumPesHook() { return CkNumPes(); }
2284
2285 void ReadOnlyExt::setData(void *msg, size_t msgSize) {
2286   ro_data = malloc(msgSize);
2287   memcpy(ro_data, msg, msgSize);
2288   data_size = msgSize;
2289 }
2290
2291 void ReadOnlyExt::_roPup(void *pup_er) {
2292   PUP::er &p=*(PUP::er *)pup_er;
2293   if (!p.isUnpacking()) {
2294     //printf("[%d] Sizing/packing data, ro_data=%p, data_size=%d\n", CkMyPe(), ro_data, int(data_size));
2295     p | data_size;
2296     p((char*)ro_data, data_size);
2297   } else {
2298     CkAssert(CkMyPe() != 0);
2299     CkAssert(ro_data == NULL);
2300     PUP::fromMem &p_mem = *(PUP::fromMem *)pup_er;
2301     p_mem | data_size;
2302     //printf("[%d] Unpacking ro, size of data to unpack is %d\n", CkMyPe(), int(data_size));
2303     ReadOnlyRecvExtCallback(int(data_size), p_mem.get_current_pointer());
2304     p_mem.advance(data_size);
2305   }
2306 }
2307
2308 CkpvExtern(int, _currentChareType);
2309
2310 MainchareExt::MainchareExt(CkArgMsg *m) {
2311   int cIdx = CkpvAccess(_currentChareType);
2312   //printf("Constructor of MainchareExt, chareId=(%d,%p), chareIdx=%d\n", thishandle.onPE, thishandle.objPtr, cIdx);
2313   int ctorEpIdx =  _mainTable[_chareTable[cIdx]->mainChareType()]->entryIdx;
2314   MainchareCtorExtCallback(thishandle.onPE, thishandle.objPtr, ctorEpIdx, m->argc, m->argv);
2315   delete m;
2316 }
2317
2318 GroupExt::GroupExt(void *impl_msg) {
2319   //printf("Constructor of GroupExt, gid=%d\n", thisgroup.idx);
2320   //int chareIdx = CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
2321   int chareIdx = ckGetChareType();
2322   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2323   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2324   char *impl_buf = impl_msg_typed->msgBuf;
2325   PUP::fromMem implP(impl_buf);
2326   int msgSize; implP|msgSize;
2327   int dcopy_start; implP|dcopy_start;
2328   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2329                           dcopy_start);
2330 }
2331
2332 ArrayMapExt::ArrayMapExt(void *impl_msg) {
2333   //printf("Constructor of ArrayMapExt, gid=%d\n", thisgroup.idx);
2334   int chareIdx = ckGetChareType();
2335   int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2336   CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2337   char *impl_buf = impl_msg_typed->msgBuf;
2338   PUP::fromMem implP(impl_buf);
2339   int msgSize; implP|msgSize;
2340   int dcopy_start; implP|dcopy_start;
2341   GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2342                           dcopy_start);
2343 }
2344
2345 // TODO options
2346 extern "C"
2347 int CkCreateGroupExt(int cIdx, int eIdx, int num_bufs, char **bufs, int *buf_sizes) {
2348   //static_cast<void>(impl_e_opts);
2349   CkAssert(num_bufs >= 1);
2350   int totalSize = 0;
2351   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2352   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2);
2353   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2354   PUP::toMem implP((void *)impl_msg->msgBuf);
2355   implP|totalSize;
2356   implP|buf_sizes[0];
2357   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2358   UsrToEnv(impl_msg)->setMsgtype(BocInitMsg);
2359   //if (impl_e_opts)
2360   //  UsrToEnv(impl_msg)->setGroupDep(impl_e_opts->getGroupDepID());
2361   CkGroupID gId = CkCreateGroup(cIdx, eIdx, impl_msg);
2362   return gId.idx;
2363 }
2364
2365 // TODO options
2366 extern "C"
2367 int CkCreateArrayExt(int cIdx, int ndims, int *dims, int eIdx, int num_bufs,
2368                      char **bufs, int *buf_sizes, int map_gid, char useAtSync) {
2369   //static_cast<void>(impl_e_opts);
2370   CkAssert(num_bufs >= 1);
2371   int totalSize = 0;
2372   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2373   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2374   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2375   PUP::toMem implP((void *)impl_msg->msgBuf);
2376   implP|useAtSync;
2377   implP|totalSize;
2378   implP|buf_sizes[0];
2379   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2380   CkArrayOptions opts;
2381   if (ndims != -1)
2382     opts = CkArrayOptions(ndims, dims);
2383   if (map_gid >= 0) {
2384     CkGroupID map_gId;
2385     map_gId.idx = map_gid;
2386     opts.setMap(CProxy_Group(map_gId));
2387   }
2388   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2389   //CkArrayID gId = ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2390   CkGroupID gId = CProxyElement_ArrayElement::ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2391   return gId.idx;
2392 }
2393
2394 // TODO options
2395 extern "C"
2396 void CkInsertArrayExt(int aid, int ndims, int *index, int epIdx, int onPE, int num_bufs,
2397                       char **bufs, int *buf_sizes, char useAtSync) {
2398   CkAssert(num_bufs >= 1);
2399   int totalSize = 0;
2400   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2401   int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2402   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2403   PUP::toMem implP((void *)impl_msg->msgBuf);
2404   implP|useAtSync;
2405   implP|totalSize;
2406   implP|buf_sizes[0];
2407   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2408
2409   UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2410   CkArrayIndex newIdx(ndims, index);
2411   CkGroupID gId;
2412   gId.idx = aid;
2413   CProxy_ArrayBase(gId).ckInsertIdx((CkArrayMessage *)impl_msg, epIdx, onPE, newIdx);
2414 }
2415
2416 extern "C"
2417 void CkMigrateExt(int aid, int ndims, int *index, int toPe) {
2418   //printf("[charm] CkMigrateMeExt called with aid: %d, ndims: %d, index: %d, toPe: %d\n",
2419         //aid, ndims, *index, toPe);
2420   CkGroupID gId;
2421   gId.idx = aid;
2422   CkArrayIndex arrayIndex(ndims, index);
2423   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2424   ArrayElement* arrayElement = arrayProxy.ckLocal();
2425   CkAssert(arrayElement != NULL);
2426   arrayElement->migrateMe(toPe);
2427 }
2428
2429 extern "C"
2430 void CkArrayDoneInsertingExt(int aid) {
2431   CkGroupID gId;
2432   gId.idx = aid;
2433   CProxy_ArrayBase(gId).doneInserting();
2434 }
2435
2436 extern "C"
2437 int CkGroupGetReductionNumber(int g_id) {
2438   CkGroupID gId;
2439   gId.idx = g_id;
2440   return ((Group*)CkLocalBranch(gId))->getRedNo();
2441 }
2442
2443 extern "C"
2444 int CkArrayGetReductionNumber(int aid, int ndims, int *index) {
2445   CkGroupID gId;
2446   gId.idx = aid;
2447   CkArrayIndex arrayIndex(ndims, index);
2448   CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2449   ArrayElement* arrayElement = arrayProxy.ckLocal();
2450   CkAssert(arrayElement != NULL);
2451   return arrayElement->getRedNo();
2452 }
2453
2454 extern "C"
2455 void CkChareExtSend(int onPE, void *objPtr, int epIdx, char *msg, int msgSize) {
2456   //ckCheck();    // checks that gid is not zero
2457   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2458   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2459   PUP::toMem implP((void *)impl_msg->msgBuf);
2460   implP|msgSize;
2461   implP|epIdx;
2462   int d=0; implP|d;
2463   implP(msg, msgSize);
2464   CkChareID chareID;
2465   chareID.onPE = onPE;
2466   chareID.objPtr = objPtr;
2467
2468   CkSendMsg(epIdx, impl_msg, &chareID);
2469 }
2470
2471 extern "C"
2472 void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2473   CkAssert(num_bufs >= 1);
2474   //ckCheck();    // checks that gid is not zero
2475   int totalSize = 0;
2476   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2477   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2478   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2479   PUP::toMem implP((void *)impl_msg->msgBuf);
2480   implP | totalSize;
2481   implP | epIdx;
2482   implP | buf_sizes[0];
2483   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2484   CkChareID chareID;
2485   chareID.onPE = onPE;
2486   chareID.objPtr = objPtr;
2487
2488   CkSendMsg(epIdx, impl_msg, &chareID);
2489 }
2490
2491 extern "C"
2492 void CkGroupExtSend(int gid, int pe, int epIdx, char *msg, int msgSize) {
2493   //ckCheck();    // checks that gid is not zero
2494   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2495   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2496   PUP::toMem implP((void *)impl_msg->msgBuf);
2497   implP|msgSize;
2498   implP|epIdx;
2499   int d=0; implP|d;
2500   implP(msg, msgSize);
2501   CkGroupID gId;
2502   gId.idx = gid;
2503
2504   if (pe == -1)
2505     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2506   else
2507     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2508 }
2509
2510 extern "C"
2511 void CkGroupExtSend_multi(int gid, int pe, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2512   CkAssert(num_bufs >= 1);
2513   //ckCheck();    // checks that gid is not zero
2514   int totalSize = 0;
2515   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2516   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2517   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2518   PUP::toMem implP((void *)impl_msg->msgBuf);
2519   implP | totalSize;
2520   implP | epIdx;
2521   implP | buf_sizes[0];
2522   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2523   CkGroupID gId;
2524   gId.idx = gid;
2525
2526   if (pe == -1)
2527     CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2528   else
2529     CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2530 }
2531
2532 extern "C"
2533 void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize) {
2534   int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2535   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2536   PUP::toMem implP((void *)impl_msg->msgBuf);
2537   implP|msgSize;
2538   implP|epIdx;
2539   int d=0; implP|d;
2540   implP(msg, msgSize);
2541   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2542   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2543   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2544   CkGroupID gId;
2545   gId.idx = aid;
2546   if (ndims > 0) {
2547     CkArrayIndex arrIndex(ndims, idx);
2548     // TODO is there a better function for this?
2549     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2550   } else { // broadcast
2551     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2552   }
2553 }
2554
2555 extern "C"
2556 void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2557   CkAssert(num_bufs >= 1);
2558   int totalSize = 0;
2559   for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2560   int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2561   CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2562   PUP::toMem implP((void *)impl_msg->msgBuf);
2563   implP | totalSize;
2564   implP | epIdx;
2565   implP | buf_sizes[0];
2566   for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2567   UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2568   CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2569   impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2570   CkGroupID gId;
2571   gId.idx = aid;
2572   if (ndims > 0) {
2573     CkArrayIndex arrIndex(ndims, idx);
2574     // TODO is there a better function for this?
2575     CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2576   } else { // broadcast
2577     CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2578   }
2579 }
2580
2581 //------------------- Message Watcher (record/replay) ----------------
2582
2583 #include "crc32.h"
2584
2585 CkpvDeclare(int, envelopeEventID);
2586 int _recplay_crc = 0;
2587 int _recplay_checksum = 0;
2588 int _recplay_logsize = 1024*1024;
2589
2590 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2591 #define REPLAYDEBUG(args) /* empty */
2592
2593 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2594
2595 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2596 #include "BaseLB.h" /* For LBMigrateMsg message */
2597
2598 #if CMK_REPLAYSYSTEM
2599 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2600   std::string fName = CkpvAccess(traceRoot);
2601   fName += prefix;
2602   fName += std::to_string(CkMyPe());
2603   fName += suffix;
2604   FILE *f = fopen(fName.c_str(), permissions);
2605   REPLAYDEBUG("openReplayfile " << fName.c_str());
2606   if (f==NULL) {
2607     CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2608              CkMyPe(), fName.c_str(), permissions);
2609     CkAbort("openReplayFile> Could not open replay file");
2610   }
2611   return f;
2612 }
2613
2614 class CkMessageRecorder : public CkMessageWatcher {
2615   unsigned int curpos;
2616   bool firstOpen;
2617   std::vector<char> buffer;
2618 public:
2619   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true), buffer(_recplay_logsize) { f=f_; }
2620   ~CkMessageRecorder() {
2621     flushLog(0);
2622     fprintf(f,"-1 -1 -1 ");
2623     fclose(f);
2624 #if 0
2625     FILE *stsfp = fopen("sts", "w");
2626     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2627     traceWriteSTS(stsfp, 0);
2628     fclose(stsfp);
2629 #endif
2630     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2631   }
2632
2633 private:
2634   void flushLog(int verbose=1) {
2635     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2636     fprintf(f, "%s", buffer.data());
2637     curpos=0;
2638   }
2639   virtual bool process(envelope **envptr,CkCoreState *ck) {
2640     if ((*envptr)->getEvent()) {
2641       bool wasPacked = (*envptr)->isPacked();
2642       if (!wasPacked) CkPackMessage(envptr);
2643       envelope *env = *envptr;
2644       unsigned int crc1=0, crc2=0;
2645       if (_recplay_crc) {
2646         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2647         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2648         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2649       } else if (_recplay_checksum) {
2650         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2651         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2652       }
2653       curpos+=sprintf(&buffer[curpos],"%d %d %d %d %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2654       if (curpos > _recplay_logsize-128) flushLog();
2655       if (!wasPacked) CkUnpackMessage(envptr);
2656     }
2657     return true;
2658   }
2659   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2660     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2661     if (curpos > _recplay_logsize-128) flushLog();
2662     return true;
2663   }
2664   
2665   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2666     FILE *f;
2667     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2668     else f = openReplayFile("ckreplay_",".lb","a");
2669     firstOpen = false;
2670     if (f != NULL) {
2671       PUP::toDisk p(f);
2672       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2673       (*msg)->pup(p);
2674       fclose(f);
2675     }
2676     return true;
2677   }
2678 };
2679
2680 class CkMessageDetailRecorder : public CkMessageWatcher {
2681 public:
2682   CkMessageDetailRecorder(FILE *f_) {
2683     f=f_;
2684     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2685      * The value of 'x' is the pointer size.
2686      */
2687     CmiUInt2 little = sizeof(void*);
2688     fwrite(&little, 2, 1, f);
2689   }
2690   ~CkMessageDetailRecorder() {fclose(f);}
2691 private:
2692   virtual bool process(envelope **envptr, CkCoreState *ck) {
2693     bool wasPacked = (*envptr)->isPacked();
2694     if (!wasPacked) CkPackMessage(envptr);
2695     envelope *env = *envptr;
2696     CmiUInt4 size = env->getTotalsize();
2697     fwrite(&size, 4, 1, f);
2698     fwrite(env, env->getTotalsize(), 1, f);
2699     if (!wasPacked) CkUnpackMessage(envptr);
2700     return true;
2701   }
2702 };
2703
2704 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2705 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2706
2707 class CkMessageReplay : public CkMessageWatcher {
2708   int counter;
2709         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2710         int nextEP;
2711         unsigned int crc1, crc2;
2712         FILE *lbFile;
2713         /// Read the next message we need from the file:
2714         void getNext(void) {
2715           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2716           if (nextSize > 0) {
2717             // We are reading a regular message
2718             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2719               CkAbort("CkMessageReplay> Syntax error reading replay file");
2720             }
2721             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2722           } else if (nextSize == -2) {
2723             // We are reading a special message (right now only thread awaken)
2724             // Nothing to do since we have already read all info
2725             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2726           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2727             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2728             CkAbort("CkMessageReplay> Unrecognized input");
2729           }
2730             /*
2731                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2732                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2733                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2734                 }
2735                 */
2736                 counter++;
2737         }
2738         /// If this is the next message we need, advance and return true.
2739         bool isNext(envelope *env) {
2740                 if (nextPE!=env->getSrcPe()) return false;
2741                 if (nextEvent!=env->getEvent()) return false;
2742                 if (nextSize<0) return false; // not waiting for a regular message
2743 #if 1
2744                 if (nextEP != env->getEpIdx()) {
2745                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2746                         return false;
2747                 }
2748 #endif
2749 #if ! CMK_BIGSIM_CHARM
2750                 if (nextSize!=env->getTotalsize())
2751                 {
2752                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2753                         return false;
2754                 }
2755                 if (_recplay_crc || _recplay_checksum) {
2756                   bool wasPacked = env->isPacked();
2757                   if (!wasPacked) CkPackMessage(&env);
2758                   if (_recplay_crc) {
2759                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2760                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2761                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2762                     if (crcnew1 != crc1) {
2763                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2764                     }
2765                     if (crcnew2 != crc2) {
2766                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2767                     }
2768                   } else if (_recplay_checksum) {
2769             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2770             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2771             if (crcnew1 != crc1) {
2772               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2773             }
2774             if (crcnew2 != crc2) {
2775               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2776             }               
2777                   }
2778                   if (!wasPacked) CkUnpackMessage(&env);
2779                 }
2780 #endif
2781                 return true;
2782         }
2783         bool isNext(CthThreadToken *token) {
2784           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2785           return false;
2786         }
2787
2788         /// This is a (short) list of messages we aren't yet ready for:
2789         CkQ<envelope *> delayedMessages;
2790         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2791         CkQ<CthThreadToken *> delayedTokens;
2792
2793         /// Try to flush out any delayed messages
2794         void flush(void) {
2795           if (nextSize>0) {
2796                 int len=delayedMessages.length();
2797                 for (int i=0;i<len;i++) {
2798                         envelope *env=delayedMessages.deq();
2799                         if (isNext(env)) { /* this is the next message: process it */
2800                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2801                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2802                                 return;
2803                         }
2804                         else /* Not ready yet-- put it back in the
2805                                 queue */
2806                           {
2807                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2808                                 delayedMessages.enq(env);
2809                           }
2810                 }
2811           } else if (nextSize==-2) {
2812             int len=delayedTokens.length();
2813             for (int i=0;i<len;++i) {
2814               CthThreadToken *token=delayedTokens.deq();
2815               if (isNext(token)) {
2816             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2817 #if ! CMK_BIGSIM_CHARM
2818                 CsdEnqueueLifo((void*)token);
2819 #else
2820                 CthEnqueueBigSimThread(token,0,0,NULL);
2821 #endif
2822                 return;
2823               } else {
2824             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2825                 delayedTokens.enq(token);
2826               }
2827             }
2828           }
2829         }
2830
2831 public:
2832         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2833           counter=0;
2834           f=f_;
2835           getNext();
2836           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2837 #if CMI_QD
2838           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2839 #endif
2840         }
2841         ~CkMessageReplay() {fclose(f);}
2842
2843 private:
2844         virtual bool process(envelope **envptr,CkCoreState *ck) {
2845           bool wasPacked = (*envptr)->isPacked();
2846           if (!wasPacked) CkPackMessage(envptr);
2847           envelope *env = *envptr;
2848           //CkAssert(*(int*)env == 0x34567890);
2849           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2850                 if (env->getEvent() == 0) return true;
2851                 if (isNext(env)) { /* This is the message we were expecting */
2852                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2853                         getNext(); /* Advance over this message */
2854                         flush(); /* try to process queued-up stuff */
2855                         if (!wasPacked) CkUnpackMessage(envptr);
2856                         return true;
2857                 }
2858 #if CMK_SMP
2859                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2860                          // try next rank, we can't just buffer the msg and left
2861                          // we need to keep unprocessed msg on the fly
2862                         int nextpe = CkMyPe()+1;
2863                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2864                         nextpe = CkNodeFirst(CkMyNode());
2865                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2866                         return false;
2867                 }
2868 #endif
2869                 else /*!isNext(env) */ {
2870                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2871                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2872                         delayedMessages.enq(env);
2873                         flush();
2874                         return false;
2875                 }
2876         }
2877         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2878       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2879           if (isNext(token)) {
2880         REPLAYDEBUG("Executing token: "<<token->serialNo)
2881             getNext();
2882             flush();
2883             return true;
2884           } else {
2885         REPLAYDEBUG("Queueing token: "<<token->serialNo
2886             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2887             delayedTokens.enq(token);
2888             return false;
2889           }
2890         }
2891
2892         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2893           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2894           if (lbFile != NULL) {
2895             int num_moves = 0;
2896         PUP::fromDisk p(lbFile);
2897             p | num_moves;
2898             if (num_moves != (*msg)->n_moves) {
2899               delete *msg;
2900               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2901             }
2902             (*msg)->pup(p);
2903           }
2904           return true;
2905         }
2906 };
2907
2908 class CkMessageDetailReplay : public CkMessageWatcher {
2909   void *getNext() {
2910     CmiUInt4 size; size_t nread;
2911     if ((nread=fread(&size, 4, 1, f)) < 1) {
2912       if (feof(f)) return NULL;
2913       CkPrintf("Broken record file (metadata) got %d\n",nread);
2914       CkAbort("");
2915     }
2916     void *env = CmiAlloc(size);
2917     long tell = ftell(f);
2918     if ((nread=fread(env, size, 1, f)) < 1) {
2919       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2920       CkAbort("");
2921     }
2922     //*(int*)env = 0x34567890; // set first integer as magic
2923     return env;
2924   }
2925 public:
2926   double starttime;
2927   CkMessageDetailReplay(FILE *f_) {
2928     f=f_;
2929     starttime=CkWallTimer();
2930     /* This must match what CkMessageDetailRecorder did */
2931     CmiUInt2 little;
2932     fread(&little, 2, 1, f);
2933     if (little != sizeof(void*)) {
2934       CkAbort("Replaying on a different architecture from which recording was done!");
2935     }
2936
2937     CsdEnqueue(getNext());
2938
2939     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2940   }
2941   virtual bool process(envelope **env,CkCoreState *ck) {
2942     void *msg = getNext();
2943     if (msg != NULL) CsdEnqueue(msg);
2944     return true;
2945   }
2946 };
2947
2948 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2949 #if ! CMK_BIGSIM_CHARM
2950   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2951 #endif
2952   CkMessageReplay *replay = (CkMessageReplay*)rep;
2953   //CmiStartQD(CkMessageReplayQuiescence, replay);
2954 }
2955
2956 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2957   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2958   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2959   ConverseExit();
2960 }
2961 #endif
2962
2963 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2964   CkCoreState *ck = CkpvAccess(_coreState);
2965   if (ck->watcher!=NULL) {
2966     return ck->watcher->processThread(token,ck);
2967   }
2968   return true;
2969 }
2970
2971 CpvExtern(int, CthResumeNormalThreadIdx);
2972 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2973 {
2974   CthThread t = token->thread;
2975
2976   if(t == NULL){
2977     free(token);
2978     return;
2979   }
2980 #if CMK_TRACE_ENABLED
2981 #if ! CMK_TRACE_IN_CHARM
2982   if(CpvAccess(traceOn))
2983     CthTraceResume(t);
2984 /*    if(CpvAccess(_traceCoreOn)) 
2985             resumeTraceCore();*/
2986 #endif
2987 #endif
2988 #if CMK_OMP
2989   CthSetPrev(t, CthSelf());
2990 #endif
2991   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2992   if (CpdExecuteThreadResume(token)) {
2993     CthResume(t);
2994   }
2995 #if CMK_OMP
2996   CthScheduledDecrement();
2997   CthSetPrev(CthSelf(), 0);
2998 #endif
2999 }
3000
3001 void CpdHandleLBMessage(LBMigrateMsg **msg) {
3002   CkCoreState *ck = CkpvAccess(_coreState);
3003   if (ck->watcher!=NULL) {
3004     ck->watcher->processLBMessage(msg, ck);
3005   }
3006 }
3007
3008 #if CMK_BIGSIM_CHARM
3009 CpvExtern(int      , CthResumeBigSimThreadIdx);
3010 #endif
3011
3012 #include "ckliststring.h"
3013 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
3014     CmiArgGroup("Charm++","Record/Replay");
3015     bool forceReplay = false;
3016     char *procs = NULL;
3017     _replaySystem = 0;
3018     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
3019       if(CmiMyRank() == 0) _recplay_crc = 1;
3020     }
3021     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
3022       if(CmiMyRank() == 0) _recplay_checksum = 1;
3023     }
3024     int tmplogsize;
3025     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
3026       {
3027         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
3028       }
3029     REPLAYDEBUG("CkMessageWatcherInit ");
3030     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
3031 #if CMK_REPLAYSYSTEM
3032         CkListString list(procs);
3033         if (list.includes(CkMyPe())) {
3034           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
3035           CpdSetInitializeMemory(1);
3036           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
3037         }
3038 #else
3039         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3040 #endif
3041     }
3042     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
3043 #if CMK_REPLAYSYSTEM
3044       if (CkMyPe() == 0) {
3045         CmiPrintf("Charm++> record mode.\n");
3046         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3047           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
3048           _recplay_crc = _recplay_checksum = 0;
3049         }
3050       }
3051       CpdSetInitializeMemory(1);
3052       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3053       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
3054 #else
3055       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
3056 #endif
3057     }
3058         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
3059 #if CMK_REPLAYSYSTEM
3060             forceReplay = true;
3061             CpdSetInitializeMemory(1);
3062             // Set the parameters of the processor
3063 #if CMK_SHARED_VARS_UNAVAILABLE
3064             _Cmi_mype = atoi(procs);
3065             while (procs[0]!='/') procs++;
3066             procs++;
3067             _Cmi_numpes = atoi(procs);
3068 #else
3069             CkAbort("+replay-detail available only for non-SMP build");
3070 #endif
3071             _replaySystem = 1;
3072             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
3073 #else
3074           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3075 #endif
3076         }
3077         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
3078 #if CMK_REPLAYSYSTEM
3079           if (CkMyPe() == 0)  {
3080             CmiPrintf("Charm++> replay mode.\n");
3081             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3082               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
3083               _recplay_crc = _recplay_checksum = 0;
3084             }
3085           }
3086           CpdSetInitializeMemory(1);
3087 #if ! CMK_BIGSIM_CHARM
3088           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3089 #else
3090           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3091 #endif
3092           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
3093 #else
3094           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
3095 #endif
3096         }
3097         if (_recplay_crc && _recplay_checksum) {
3098           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
3099         }
3100 }
3101
3102 extern "C"
3103 int CkMessageToEpIdx(void *msg) {
3104         envelope *env=UsrToEnv(msg);
3105         int ep=env->getEpIdx();
3106         if (ep==CkIndex_CkArray::recvBroadcast(0))
3107                 return env->getsetArrayBcastEp();
3108         else
3109                 return ep;
3110 }
3111
3112 extern "C"
3113 int getCharmEnvelopeSize() {
3114   return sizeof(envelope);
3115 }
3116
3117 /// Best-effort guess at whether @arg msg points at a charm envelope
3118 extern "C"
3119 int isCharmEnvelope(void *msg) {
3120     envelope *e = (envelope *)msg;
3121     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
3122     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
3123     if (e->getTotalsize() < sizeof(envelope)) return 0;
3124     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
3125 #if CMK_SMP
3126     if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
3127 #else
3128     if (e->getSrcPe()>=CkNumPes()) return 0;
3129 #endif
3130     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
3131     return 1;
3132 }
3133
3134 #include "CkMarshall.def.h"