Critical path header changes for the pics merge
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #include "pathHistory.h"
14
15 #if CMK_LBDB_ON
16 #include "LBDatabase.h"
17 #endif // CMK_LBDB_ON
18
19 #ifndef CMK_CHARE_USE_PTR
20 #include <map>
21 CkpvDeclare(CkVec<void *>, chare_objs);
22 CkpvDeclare(CkVec<int>, chare_types);
23 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
24
25 typedef std::map<int, CkChareID>  Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
28 #endif
29
30
31 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
32
33 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
34
35 int CMessage_CkMessage::__idx=-1;
36 int CMessage_CkArgMsg::__idx=0;
37 int CkIndex_Chare::__idx;
38 int CkIndex_Group::__idx;
39 int CkIndex_ArrayBase::__idx=-1;
40
41 extern int _defaultObjectQ;
42
43 void _initChareTables()
44 {
45 #ifndef CMK_CHARE_USE_PTR
46           /* chare and vidblock table */
47   CkpvInitialize(CkVec<void *>, chare_objs);
48   CkpvInitialize(CkVec<int>, chare_types);
49   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
50   CkpvInitialize(Vidblockmap, vmap);
51   CkpvInitialize(int, currentChareIdx);
52   CkpvAccess(currentChareIdx) = -1;
53 #endif
54 }
55
56 //Charm++ virtual functions: declaring these here results in a smaller executable
57 Chare::Chare(void) {
58   thishandle.onPE=CkMyPe();
59   thishandle.objPtr=this;
60 #if CMK_ERROR_CHECKING
61   magic = CHARE_MAGIC;
62 #endif
63 #ifndef CMK_CHARE_USE_PTR
64      // for plain chare, objPtr is actually the index to chare obj table
65   if (CkpvAccess(currentChareIdx) >= 0) {
66     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
67   }
68   chareIdx = CkpvAccess(currentChareIdx);
69 #endif
70 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
71   mlogData = new ChareMlogData();
72   mlogData->objID.type = TypeChare;
73   mlogData->objID.data.chare.id = thishandle;
74 #endif
75 #if CMK_OBJECT_QUEUE_AVAILABLE
76   if (_defaultObjectQ)  CkEnableObjQ();
77 #endif
78 }
79
80 Chare::Chare(CkMigrateMessage* m) {
81   thishandle.onPE=CkMyPe();
82   thishandle.objPtr=this;
83 #if CMK_ERROR_CHECKING
84   magic = 0;
85 #endif
86
87 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
88         mlogData = NULL;
89 #endif
90
91 #if CMK_OBJECT_QUEUE_AVAILABLE
92   if (_defaultObjectQ)  CkEnableObjQ();
93 #endif
94 }
95
96 void Chare::CkEnableObjQ()
97 {
98 #if CMK_OBJECT_QUEUE_AVAILABLE
99   objQ.create();
100 #endif
101 }
102
103 Chare::~Chare() {
104 #ifndef CMK_CHARE_USE_PTR
105 /*
106   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
107 */
108   if (chareIdx != -1)
109   {
110     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
111     CkpvAccess(chare_objs)[chareIdx] = NULL;
112     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
113     if (iter != CkpvAccess(vmap).end()) {
114       register CkChareID *pCid = (CkChareID *)
115         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
116       int srcPe = iter->second.onPE;
117       *pCid = iter->second;
118       register envelope *ret = UsrToEnv(pCid);
119       ret->setVidPtr(iter->second.objPtr);
120       ret->setSrcPe(CkMyPe());
121       CmiSetHandler(ret, _charmHandlerIdx);
122       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
123       CpvAccess(_qd)->create();
124       CkpvAccess(vmap).erase(iter);
125     }
126   }
127 #endif
128 }
129
130 void Chare::pup(PUP::er &p)
131 {
132   p(thishandle.onPE);
133   thishandle.objPtr=(void *)this;
134 #ifndef CMK_CHARE_USE_PTR
135   p(chareIdx);
136   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
137 #endif
138 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
139         if(p.isUnpacking()){
140                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
141                 mlogData = new ChareMlogData();
142         }
143         mlogData->pup(p);
144 #endif
145 #if CMK_ERROR_CHECKING
146   p(magic);
147 #endif
148 }
149
150 int Chare::ckGetChareType() const {
151   return -3;
152 }
153 char *Chare::ckDebugChareName(void) {
154   char buf[100];
155   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
156   return strdup(buf);
157 }
158 int Chare::ckDebugChareID(char *str, int limit) {
159   // pure chares for now do not have a valid ID
160   str[0] = 0;
161   return 1;
162 }
163 void Chare::ckDebugPup(PUP::er &p) {
164   pup(p);
165 }
166
167 /// This method is called before starting a [threaded] entry method.
168 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
169   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
170   traceAddThreadListeners(th, UsrToEnv(msg));
171 }
172
173 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
174   p.comment("Bytes");
175   int ts=UsrToEnv(msg)->getTotalsize();
176   int msgLen=ts-sizeof(envelope);
177   if (msgLen>0)
178     p((char*)msg,msgLen);
179 }
180
181 IrrGroup::IrrGroup(void) {
182   thisgroup = CkpvAccess(_currentGroup);
183 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
184         mlogData->objID.type = TypeGroup;
185         mlogData->objID.data.group.id = thisgroup;
186         mlogData->objID.data.group.onPE = CkMyPe();
187 #endif
188 }
189
190 IrrGroup::~IrrGroup() {
191   // remove the object pointer
192   if (CkpvAccess(_destroyingNodeGroup)) {
193     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
194     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
195     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
196     CkpvAccess(_destroyingNodeGroup) = false;
197   } else {
198     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
199     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
200     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
201   }
202 }
203
204 void IrrGroup::pup(PUP::er &p)
205 {
206   Chare::pup(p);
207   p|thisgroup;
208 }
209
210 int IrrGroup::ckGetChareType() const {
211   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
212 }
213
214 int IrrGroup::ckDebugChareID(char *str, int limit) {
215   if (limit<5) return -1;
216   str[0] = 1;
217   *((int*)&str[1]) = thisgroup.idx;
218   return 5;
219 }
220
221 char *IrrGroup::ckDebugChareName() {
222   return strdup(_chareTable[ckGetChareType()]->name);
223 }
224
225 void IrrGroup::ckJustMigrated(void)
226 {
227 }
228
229 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
230   /* FIXME: **CW** not entirely sure what we should do here yet */
231 }
232
233 void Group::CkAddThreadListeners(CthThread th, void *msg) {
234   Chare::CkAddThreadListeners(th, msg);
235   CthSetThreadID(th, thisgroup.idx, 0, 0);
236 }
237
238 void Group::pup(PUP::er &p)
239 {
240   CkReductionMgr::pup(p);
241   reductionInfo.pup(p);
242 }
243
244 /**** Delegation Manager Group */
245 CkDelegateMgr::~CkDelegateMgr() { }
246
247 //Default delegator implementation: do not delegate-- send directly
248 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
249   { CkSendMsg(ep,m,c); }
250 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
251   { CkSendMsgBranch(ep,m,onPE,g); }
252 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
253   { CkBroadcastMsgBranch(ep,m,g); }
254 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
255   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
256 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
257   { CkSendMsgNodeBranch(ep,m,onNode,g); }
258 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
259   { CkBroadcastMsgNodeBranch(ep,m,g); }
260 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
261   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
262 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
263 {
264         CProxyElement_ArrayBase ap(a,idx);
265         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
266 }
267 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
268 {
269         CProxyElement_ArrayBase ap(a,idx);
270         ap.ckSend((CkArrayMessage *)m,ep);
271 }
272 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
273 {
274         CProxy_ArrayBase ap(a);
275         ap.ckBroadcast((CkArrayMessage *)m,ep);
276 }
277
278 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
279 {
280         CmiAbort("ArraySectionSend is not implemented!\n");
281 /*
282         CProxyElement_ArrayBase ap(a,idx);
283         ap.ckSend((CkArrayMessage *)m,ep);
284 */
285 }
286
287 /*** Proxy <-> delegator communication */
288 CkDelegateData::~CkDelegateData() {}
289
290 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
291   return pd; // default implementation ignores pup call
292 }
293
294 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
295    this tricky manual reference counting business... */
296
297 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
298         if (dPtr) dPtr->ref();
299         ckUndelegate();
300         delegatedMgr = dTo;
301         delegatedPtr = dPtr;
302         delegatedGroupId = delegatedMgr->CkGetGroupID();
303         isNodeGroup = delegatedMgr->isNodeGroup();
304 }
305 void CProxy::ckUndelegate(void) {
306         delegatedMgr=NULL;
307         delegatedGroupId.setZero();
308         if (delegatedPtr) delegatedPtr->unref();
309         delegatedPtr=NULL;
310 }
311
312 /// Copy constructor
313 CProxy::CProxy(const CProxy &src)
314   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
315    isNodeGroup(src.isNodeGroup) {
316     delegatedPtr = NULL;
317     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
318         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
319     }
320 }
321
322 /// Assignment operator
323 CProxy& CProxy::operator=(const CProxy &src) {
324         CkDelegateData *oldPtr=delegatedPtr;
325         ckUndelegate();
326         delegatedMgr=src.delegatedMgr;
327         delegatedGroupId = src.delegatedGroupId; 
328         isNodeGroup = src.isNodeGroup;
329
330         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
331             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
332         else
333             delegatedPtr = NULL;
334
335         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
336         if (oldPtr) oldPtr->unref();
337         return *this;
338 }
339
340 void CProxy::pup(PUP::er &p) {
341   if (!p.isUnpacking()) {
342     if (ckDelegatedTo() != NULL) {
343       delegatedGroupId = delegatedMgr->CkGetGroupID();
344       isNodeGroup = delegatedMgr->isNodeGroup();
345     }
346   }
347   p|delegatedGroupId;
348   if (!delegatedGroupId.isZero()) {
349     p|isNodeGroup;
350     if (p.isUnpacking()) {
351       delegatedMgr = ckDelegatedTo(); 
352     }
353
354     int migCtor = 0, cIdx; 
355     if (!p.isUnpacking()) {
356       if (isNodeGroup) {
357         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
358         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
359         migCtor = _chareTable[cIdx]->migCtor; 
360         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
361       }
362       else  {
363         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
364         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
365         migCtor = _chareTable[cIdx]->migCtor; 
366         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
367       }         
368     }
369
370     p|migCtor;
371
372     // if delegated manager has not been created, construct a dummy
373     // object on which to call DelegatePointerPup
374     if (delegatedMgr == NULL) {
375
376       // create a dummy object for calling DelegatePointerPup
377       int objId = _entryTable[migCtor]->chareIdx; 
378       int objSize = _chareTable[objId]->size; 
379       void *obj = malloc(objSize); 
380       _entryTable[migCtor]->call(NULL, obj); 
381       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
382         ->DelegatePointerPup(p, delegatedPtr);           
383       free(obj);
384
385     }
386     else {
387
388       // delegated manager has been created, so we can use it
389       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
390
391     }
392
393     if (p.isUnpacking() && delegatedPtr) {
394       delegatedPtr->ref();
395     }
396   }
397 }
398
399 /**** Array sections */
400 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
401 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
402   _cookie.get_aid() = aid;      \
403   _cookie.get_pe() = CkMyPe();  \
404   _elems = new CkArrayIndex[nElems];    \
405   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
406   pelist = NULL;        \
407   npes  = 0;    \
408 }
409
410 CKSECTIONID_CONSTRUCTOR_DEF(1D)
411 CKSECTIONID_CONSTRUCTOR_DEF(2D)
412 CKSECTIONID_CONSTRUCTOR_DEF(3D)
413 CKSECTIONID_CONSTRUCTOR_DEF(4D)
414 CKSECTIONID_CONSTRUCTOR_DEF(5D)
415 CKSECTIONID_CONSTRUCTOR_DEF(6D)
416 CKSECTIONID_CONSTRUCTOR_DEF(Max)
417
418 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
419   pelist = new int[npes];
420   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
421   _cookie.get_aid() = gid;
422 }
423
424 CkSectionID::CkSectionID(const CkSectionID &sid) {
425   int i;
426   _cookie = sid._cookie;
427   _nElems = sid._nElems;
428   if (_nElems > 0) {
429     _elems = new CkArrayIndex[_nElems];
430     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
431   } else _elems = NULL;
432   npes = sid.npes;
433   if (npes > 0) {
434     pelist = new int[npes];
435     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
436   } else pelist = NULL;
437 }
438
439 void CkSectionID::operator=(const CkSectionID &sid) {
440   int i;
441   _cookie = sid._cookie;
442   _nElems = sid._nElems;
443   if (_nElems > 0) {
444     _elems = new CkArrayIndex[_nElems];
445     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
446   } else _elems = NULL;
447   npes = sid.npes;
448   if (npes > 0) {
449     pelist = new int[npes];
450     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
451   } else pelist = NULL;
452 }
453
454 void CkSectionID::pup(PUP::er &p) {
455     p | _cookie;
456     p(_nElems);
457     if (_nElems > 0) {
458       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
459       for (int i=0; i< _nElems; i++) p | _elems[i];
460       npes = 0;
461       pelist = NULL;
462     } else {
463       // If _nElems is zero, than this section describes processors instead of array elements
464       _elems = NULL;
465       p(npes);
466       if (p.isUnpacking()) pelist = new int[npes];
467       p(pelist, npes);
468     }
469 }
470
471 /**** Tiny random API routines */
472
473 #if CMK_CUDA
474 void CUDACallbackManager(void *fn) {
475   if (fn != NULL) {
476     CkCallback *cb = (CkCallback*) fn;
477     cb->send();
478   }
479 }
480
481 #endif
482
483 void QdCreate(int n) {
484   CpvAccess(_qd)->create(n);
485 }
486
487 void QdProcess(int n) {
488   CpvAccess(_qd)->process(n);
489 }
490
491 extern "C"
492 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
493 {
494   UsrToEnv(msg)->setRef(ref);
495 }
496
497 extern "C"
498 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
499 {
500   return UsrToEnv(msg)->getRef();
501 }
502
503 extern "C"
504 int CkGetSrcPe(void *msg)
505 {
506   return UsrToEnv(msg)->getSrcPe();
507 }
508
509 extern "C"
510 int CkGetSrcNode(void *msg)
511 {
512   return CmiNodeOf(CkGetSrcPe(msg));
513 }
514
515 extern "C"
516 void *CkLocalBranch(CkGroupID gID) {
517   return _localBranch(gID);
518 }
519
520 static
521 void *_ckLocalNodeBranch(CkGroupID groupID) {
522   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
523   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
524   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
525   return retval;
526 }
527
528 extern "C"
529 void *CkLocalNodeBranch(CkGroupID groupID)
530 {
531   void *retval;
532   // we are called in a constructor
533   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
534     return CkpvAccess(_currentNodeGroupObj);
535   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
536   { // Nodegroup hasn't finished being created yet-- schedule...
537     CsdScheduler(0);
538   }
539   return retval;
540 }
541
542 extern "C"
543 void *CkLocalChare(const CkChareID *pCid)
544 {
545         int pe=pCid->onPE;
546         if (pe<0) { //A virtual chare ID
547                 if (pe!=(-(CkMyPe()+1)))
548                         return NULL;//VID block not on this PE
549 #ifdef CMK_CHARE_USE_PTR
550                 VidBlock *v=(VidBlock *)pCid->objPtr;
551 #else
552                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
553 #endif
554                 return v->getLocalChareObj();
555         }
556         else
557         { //An ordinary chare ID
558                 if (pe!=CkMyPe())
559                         return NULL;//Chare not on this PE
560 #ifdef CMK_CHARE_USE_PTR
561                 return pCid->objPtr;
562 #else
563                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
564 #endif
565         }
566 }
567
568 CkpvDeclare(char**,Ck_argv);
569
570 extern "C" char **CkGetArgv(void) {
571         return CkpvAccess(Ck_argv);
572 }
573 extern "C" int CkGetArgc(void) {
574         return CmiGetArgc(CkpvAccess(Ck_argv));
575 }
576
577 /******************** Basic support *****************/
578 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
579 {
580 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
581         CpvAccess(_currentObj) = (Chare *)obj;
582 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
583 #endif
584   //BIGSIM_OOC DEBUGGING
585   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
586   //fflush(stdout);
587 #if CMK_CHARMDEBUG
588   CpdBeforeEp(epIdx, obj, msg);
589 #endif    
590   _entryTable[epIdx]->call(msg, obj);
591 #if CMK_CHARMDEBUG
592   CpdAfterEp(epIdx);
593 #endif
594   if (_entryTable[epIdx]->noKeep)
595   { /* Method doesn't keep/delete the message, so we have to: */
596     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
597   }
598 }
599 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
600 {
601   //BIGSIM_OOC DEBUGGING
602   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
603   //fflush(stdout);
604
605   void *deliverMsg;
606 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
607         CpvAccess(_currentObj) = (Chare *)obj;
608 #endif
609   if (_entryTable[epIdx]->noKeep)
610   { /* Deliver a read-only copy of the message */
611     deliverMsg=(void *)msg;
612   } else
613   { /* Method needs a copy of the message to keep/delete */
614     void *oldMsg=(void *)msg;
615     deliverMsg=CkCopyMsg(&oldMsg);
616 #if CMK_ERROR_CHECKING
617     if (oldMsg!=msg)
618       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
619 #endif
620   }
621 #if CMK_CHARMDEBUG
622   CpdBeforeEp(epIdx, obj, (void*)msg);
623 #endif
624   _entryTable[epIdx]->call(deliverMsg, obj);
625 #if CMK_CHARMDEBUG
626   CpdAfterEp(epIdx);
627 #endif
628 }
629
630 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
631 {
632   register void *msg = EnvToUsr(env);
633   _SET_USED(env, 0);
634   CkDeliverMessageFree(epIdx,msg,obj);
635 }
636
637 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
638 {
639
640 #if CMK_TRACE_ENABLED 
641   if (_entryTable[epIdx]->traceEnabled) {
642     _TRACE_BEGIN_EXECUTE(env, obj);
643     if(_entryTable[epIdx]->appWork)
644         _TRACE_BEGIN_APPWORK();
645     _invokeEntryNoTrace(epIdx,env,obj);
646     if(_entryTable[epIdx]->appWork)
647         _TRACE_END_APPWORK();
648     _TRACE_END_EXECUTE();
649   }
650   else
651 #endif
652     _invokeEntryNoTrace(epIdx,env,obj);
653
654 }
655
656 /********************* Creation ********************/
657
658 extern "C"
659 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
660 {
661   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
662   envelope *env = UsrToEnv(msg);
663   _CHECK_USED(env);
664   if(pCid == 0) {
665     env->setMsgtype(NewChareMsg);
666   } else {
667     pCid->onPE = (-(CkMyPe()+1));
668     //  pCid->magic = _GETIDX(cIdx);
669     pCid->objPtr = (void *) new VidBlock();
670     _MEMCHECK(pCid->objPtr);
671     env->setMsgtype(NewVChareMsg);
672     env->setVidPtr(pCid->objPtr);
673 #ifndef CMK_CHARE_USE_PTR
674     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
675     int idx = CkpvAccess(vidblocks).size()-1;
676     pCid->objPtr = (void *)(CmiIntPtr)idx;
677     env->setVidPtr((void *)(CmiIntPtr)idx);
678 #endif
679   }
680   env->setEpIdx(eIdx);
681   env->setByPe(CkMyPe());
682   env->setSrcPe(CkMyPe());
683   CmiSetHandler(env, _charmHandlerIdx);
684   _TRACE_CREATION_1(env);
685   CpvAccess(_qd)->create();
686   _STATS_RECORD_CREATE_CHARE_1();
687   _SET_USED(env, 1);
688   if(destPE == CK_PE_ANY)
689     env->setForAnyPE(1);
690   else
691     env->setForAnyPE(0);
692   _CldEnqueue(destPE, env, _infoIdx);
693   _TRACE_CREATION_DONE(1);
694 }
695
696 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
697 {
698   register int gIdx = _entryTable[epIdx]->chareIdx;
699   register void *obj = malloc(_chareTable[gIdx]->size);
700   _MEMCHECK(obj);
701   setMemoryTypeChare(obj);
702   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
703   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
704   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
705   CkpvAccess(_groupIDTable)->push_back(groupID);
706   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
707   if(ptrq) {
708     void *pending;
709     while((pending=ptrq->deq())!=0) {
710 #if CMK_BIGSIM_CHARM
711       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
712       //handler to converse-level BigSim handler.
713       _CldEnqueue(CkMyPe(), pending, _infoIdx);
714 #else
715       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
716 #endif
717     }
718     CkpvAccess(_groupTable)->find(groupID).clearPending();
719   }
720   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
721
722   CkpvAccess(_currentGroup) = groupID;
723   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
724
725 #ifndef CMK_CHARE_USE_PTR
726   int callingChareIdx = CkpvAccess(currentChareIdx);
727   CkpvAccess(currentChareIdx) = -1;
728 #endif
729
730   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
731
732 #ifndef CMK_CHARE_USE_PTR
733   CkpvAccess(currentChareIdx) = callingChareIdx;
734 #endif
735
736   _STATS_RECORD_PROCESS_GROUP_1();
737 }
738
739 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
740 {
741   register int gIdx = _entryTable[epIdx]->chareIdx;
742   int objSize=_chareTable[gIdx]->size;
743   register void *obj = malloc(objSize);
744   _MEMCHECK(obj);
745   setMemoryTypeChare(obj);
746   CkpvAccess(_currentGroup) = groupID;
747
748 // Now that the NodeGroup is created, add it to the table.
749 //  NodeGroups can be accessed by multiple processors, so
750 //  this is in the opposite order from groups - invoking the constructor
751 //  before registering it.
752 // User may call CkLocalNodeBranch() inside the nodegroup constructor
753 //  store nodegroup into _currentNodeGroupObj
754   CkpvAccess(_currentNodeGroupObj) = obj;
755
756 #ifndef CMK_CHARE_USE_PTR
757   int callingChareIdx = CkpvAccess(currentChareIdx);
758   CkpvAccess(currentChareIdx) = -1;
759 #endif
760
761   _invokeEntryNoTrace(epIdx,env,obj);
762
763 #ifndef CMK_CHARE_USE_PTR
764   CkpvAccess(currentChareIdx) = callingChareIdx;
765 #endif
766
767   CkpvAccess(_currentNodeGroupObj) = NULL;
768   _STATS_RECORD_PROCESS_NODE_GROUP_1();
769
770   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
771   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
772   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
773   CksvAccess(_nodeGroupIDTable).push_back(groupID);
774
775   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
776   if(ptrq) {
777     void *pending;
778     while((pending=ptrq->deq())!=0) {
779       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
780     }
781     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
782   }
783   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
784 }
785
786 void _createGroup(CkGroupID groupID, envelope *env)
787 {
788   _CHECK_USED(env);
789   _SET_USED(env, 1);
790   register int epIdx = env->getEpIdx();
791   int gIdx = _entryTable[epIdx]->chareIdx;
792   CkNodeGroupID rednMgr;
793 #if !GROUP_LEVEL_REDUCTION
794   if(_chareTable[gIdx]->isIrr == 0){
795                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
796                 rednMgr = rednMgrProxy;
797 //              rednMgrProxy.setAttachedGroup(groupID);
798   }
799   else
800 #endif
801   {
802         rednMgr.setZero();
803   }
804   env->setGroupNum(groupID);
805   env->setSrcPe(CkMyPe());
806   env->setRednMgr(rednMgr);
807   env->setGroupEpoch(CkpvAccess(_charmEpoch));
808
809   if(CkNumPes()>1) {
810     CkPackMessage(&env);
811     CmiSetHandler(env, _bocHandlerIdx);
812     _numInitMsgs++;
813     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
814     CpvAccess(_qd)->create(CkNumPes()-1);
815     CkUnpackMessage(&env);
816   }
817   _STATS_RECORD_CREATE_GROUP_1();
818   CkCreateLocalGroup(groupID, epIdx, env);
819 }
820
821 void _createNodeGroup(CkGroupID groupID, envelope *env)
822 {
823   _CHECK_USED(env);
824   _SET_USED(env, 1);
825   register int epIdx = env->getEpIdx();
826   env->setGroupNum(groupID);
827   env->setSrcPe(CkMyPe());
828   env->setGroupEpoch(CkpvAccess(_charmEpoch));
829   if(CkNumNodes()>1) {
830     CkPackMessage(&env);
831     CmiSetHandler(env, _bocHandlerIdx);
832     _numInitMsgs++;
833     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
834     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
835     CpvAccess(_qd)->create(CkNumNodes()-1);
836     CkUnpackMessage(&env);
837   }
838   _STATS_RECORD_CREATE_NODE_GROUP_1();
839   CkCreateLocalNodeGroup(groupID, epIdx, env);
840 }
841
842 // new _groupCreate
843
844 static CkGroupID _groupCreate(envelope *env)
845 {
846   register CkGroupID groupNum;
847
848   // check CkMyPe(). if it is 0 then idx is _numGroups++
849   // if not, then something else...
850   if(CkMyPe() == 0)
851      groupNum.idx = CkpvAccess(_numGroups)++;
852   else
853      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
854   _createGroup(groupNum, env);
855   return groupNum;
856 }
857
858 // new _nodeGroupCreate
859 static CkGroupID _nodeGroupCreate(envelope *env)
860 {
861   register CkGroupID groupNum;
862   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
863   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
864           groupNum.idx = CksvAccess(_numNodeGroups)++;
865    else
866           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
867   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
868   _createNodeGroup(groupNum, env);
869   return groupNum;
870 }
871
872 /**** generate the group idx when group is creator pe is not pe0
873  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
874  **** remaining bits contain the group creator processor number and
875  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
876
877 int _getGroupIdx(int numNodes,int myNode,int numGroups)
878 {
879         int idx;
880         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
881         int n = 32 - (x+1);                                     // number of bits remaining for the index
882         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
883                                                                 // then add the next available index
884         // of course this won't work when int is 8 bytes long on T3E
885         //idx |= 0x80000000;                                      // set the most significant bit to 1
886         idx = - idx;
887                                                                 // if int is not 32 bits, wouldn't this be wrong?
888         return idx;
889 }
890
891 extern "C"
892 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
893 {
894   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
895   register envelope *env = UsrToEnv(msg);
896   env->setMsgtype(BocInitMsg);
897   env->setEpIdx(eIdx);
898   env->setSrcPe(CkMyPe());
899   _TRACE_CREATION_N(env, CkNumPes());
900   CkGroupID gid = _groupCreate(env);
901   _TRACE_CREATION_DONE(1);
902   return gid;
903 }
904
905 extern "C"
906 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
907 {
908   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
909   register envelope *env = UsrToEnv(msg);
910   env->setMsgtype(NodeBocInitMsg);
911   env->setEpIdx(eIdx);
912   env->setSrcPe(CkMyPe());
913   _TRACE_CREATION_N(env, CkNumNodes());
914   CkGroupID gid = _nodeGroupCreate(env);
915   _TRACE_CREATION_DONE(1);
916   return gid;
917 }
918
919 static inline void *_allocNewChare(envelope *env, int &idx)
920 {
921   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
922   void *tmp=malloc(_chareTable[chareIdx]->size);
923   _MEMCHECK(tmp);
924 #ifndef CMK_CHARE_USE_PTR
925   CkpvAccess(chare_objs).push_back(tmp);
926   CkpvAccess(chare_types).push_back(chareIdx);
927   idx = CkpvAccess(chare_objs).size()-1;
928 #endif
929   setMemoryTypeChare(tmp);
930   return tmp;
931 }
932
933 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
934 {
935   int idx;
936   register void *obj = _allocNewChare(env, idx);
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
942 }
943
944 void CkCreateLocalChare(int epIdx, envelope *env)
945 {
946   env->setEpIdx(epIdx);
947   _processNewChareMsg(NULL, env);
948 }
949
950 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
951 {
952   int idx;
953   register void *obj = _allocNewChare(env, idx);
954   register CkChareID *pCid = (CkChareID *)
955       _allocMsg(FillVidMsg, sizeof(CkChareID));
956   pCid->onPE = CkMyPe();
957 #ifndef CMK_CHARE_USE_PTR
958   pCid->objPtr = (void*)(CmiIntPtr)idx;
959 #else
960   pCid->objPtr = obj;
961 #endif
962   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
963   register envelope *ret = UsrToEnv(pCid);
964   ret->setVidPtr(env->getVidPtr());
965   register int srcPe = env->getByPe();
966   ret->setSrcPe(CkMyPe());
967   CmiSetHandler(ret, _charmHandlerIdx);
968   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
969 #ifndef CMK_CHARE_USE_PTR
970   // register the remote vidblock for deletion when chare is deleted
971   CkChareID vid;
972   vid.onPE = srcPe;
973   vid.objPtr = env->getVidPtr();
974   CkpvAccess(vmap)[idx] = vid;    
975 #endif
976   CpvAccess(_qd)->create();
977 #ifndef CMK_CHARE_USE_PTR
978   //((Chare *)obj)->chareIdx = idx;
979   CkpvAccess(currentChareIdx) = idx;
980 #endif
981   _invokeEntry(env->getEpIdx(),env,obj);
982 }
983
984 /************** Receive: Chares *************/
985
986 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
987 {
988   register int epIdx = env->getEpIdx();
989   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
990   register void *obj;
991   if (mainIdx != -1)  {           // mainchare
992     CmiAssert(CkMyPe()==0);
993     obj = _mainTable[mainIdx]->getObj();
994   }
995   else {
996 #ifndef CMK_CHARE_USE_PTR
997     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
998       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
999     else
1000       obj = env->getObjPtr();
1001 #else
1002     obj = env->getObjPtr();
1003 #endif
1004   }
1005   _invokeEntry(epIdx,env,obj);
1006 }
1007
1008 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
1009 {
1010   register int epIdx = env->getEpIdx();
1011   register void *obj = env->getObjPtr();
1012   _invokeEntry(epIdx,env,obj);
1013 }
1014
1015 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1016 {
1017 #ifndef CMK_CHARE_USE_PTR
1018   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1019 #else
1020   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
1021   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1022 #endif
1023   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1024   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1025   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1026   CmiFree(env);
1027 }
1028
1029 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1030 {
1031 #ifndef CMK_CHARE_USE_PTR
1032   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1033 #else
1034   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1035   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1036 #endif
1037   _SET_USED(env, 1);
1038   vptr->send(env);
1039 }
1040
1041 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1042 {
1043 #ifndef CMK_CHARE_USE_PTR
1044   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1045   delete vptr;
1046   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1047 #endif
1048   CmiFree(env);
1049 }
1050
1051 /************** Receive: Groups ****************/
1052
1053 /**
1054  Return a pointer to the local BOC of "groupID".
1055  The message "env" passed in has some known dependency on this groupID
1056  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1057  Therefore, if the return value is NULL, this function buffers the message so that
1058  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1059  The message passed in must have its handlers correctly set so that it can be
1060  scheduled again.
1061 */
1062 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1063 {
1064
1065         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1066         IrrGroup *obj = ck->localBranch(groupID);
1067         if (obj==NULL) { /* groupmember not yet created: stash message */
1068                 ck->getGroupTable()->find(groupID).enqMsg(env);
1069         }
1070         else { /* will be able to process message */
1071                 ck->process();
1072         }
1073         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1074         return obj;
1075 }
1076
1077 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1078 {
1079   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1080 }
1081
1082 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1083 {
1084 #if CMK_LBDB_ON
1085   // if there is a running obj being measured, stop it temporarily
1086   LDObjHandle objHandle;
1087   int objstopped = 0;
1088   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1089   if (the_lbdb->RunningObject(&objHandle)) {
1090     objstopped = 1;
1091     the_lbdb->ObjectStop(objHandle);
1092   }
1093 #endif
1094   _invokeEntry(epIdx,env,obj);
1095 #if CMK_LBDB_ON
1096   if (objstopped) the_lbdb->ObjectStart(objHandle);
1097 #endif
1098   _STATS_RECORD_PROCESS_BRANCH_1();
1099 }
1100
1101 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1102 {
1103   register CkGroupID groupID =  env->getGroupNum();
1104   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1105   if(obj) {
1106     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1107   }
1108 }
1109
1110 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1111 {
1112   env->setMsgtype(ForChareMsg);
1113   env->setObjPtr(obj);
1114   _processForChareMsg(ck,env);
1115   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1116 }
1117
1118 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1119 {
1120   env->setEpIdx(epIdx);
1121   _deliverForNodeBocMsg(ck,env, obj);
1122 }
1123
1124 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1125 {
1126   register CkGroupID groupID = env->getGroupNum();
1127   register void *obj;
1128
1129   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1130   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1131   if(!obj) { // groupmember not yet created
1132 #if CMK_IMMEDIATE_MSG
1133     if (CmiIsImmediate(env)) {
1134       //CmiDelayImmediate();        /* buffer immediate message */
1135       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1136     }
1137 #endif
1138     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1139     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1140     return;
1141   }
1142   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1143   ck->process();
1144   env->setMsgtype(ForChareMsg);
1145   env->setObjPtr(obj);
1146   _processForChareMsg(ck,env);
1147   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1148 }
1149
1150 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1151 {
1152   register CkGroupID groupID = env->getGroupNum();
1153   register int epIdx = env->getEpIdx();
1154   if (!env->getGroupDep().isZero()) {      // dependence
1155     CkGroupID dep = env->getGroupDep();
1156     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1157     if (obj == NULL) return;
1158   }
1159   else
1160     ck->process();
1161   CkCreateLocalGroup(groupID, epIdx, env);
1162 }
1163
1164 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1165 {
1166   register CkGroupID groupID = env->getGroupNum();
1167   register int epIdx = env->getEpIdx();
1168   CkCreateLocalNodeGroup(groupID, epIdx, env);
1169 }
1170
1171 /************** Receive: Arrays *************/
1172
1173 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1174   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1175   if (mgr) {
1176     _SET_USED(env, 0);
1177     mgr->insertElement((CkMessage *)EnvToUsr(env));
1178   }
1179 }
1180 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1181   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1182   if (mgr) {
1183     _SET_USED(env, 0);
1184     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1185   }
1186 }
1187
1188 //BIGSIM_OOC DEBUGGING
1189 #define TELLMSGTYPE(x) //x
1190
1191 /**
1192  * This is the main converse-level handler used by all of Charm++.
1193  *
1194  * \addtogroup CriticalPathFramework
1195  */
1196 void _processHandler(void *converseMsg,CkCoreState *ck)
1197 {
1198   register envelope *env = (envelope *) converseMsg;
1199
1200   MESSAGE_PHASE_CHECK(env);
1201
1202 //#if CMK_RECORD_REPLAY
1203   if (ck->watcher!=NULL) {
1204     if (!ck->watcher->processMessage(&env,ck)) return;
1205   }
1206 //#endif
1207 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1208         Chare *obj=NULL;
1209         CkObjID sender;
1210         MCount SN;
1211         MlogEntry *entry=NULL;
1212         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1213            || env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForIDedObjMsg
1214            || env->getMsgtype() == ForChareMsg) {
1215                 sender = env->sender;
1216                 SN = env->SN;
1217                 int result = preProcessReceivedMessage(env,&obj,&entry);
1218                 if(result == 0){
1219                         return;
1220                 }
1221         }
1222 #endif
1223 #if USE_CRITICAL_PATH_HEADER_ARRAY
1224   CK_CRITICALPATH_START(env)
1225 #endif
1226
1227   switch(env->getMsgtype()) {
1228 // Group support
1229     case BocInitMsg :
1230       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1231       // QD processing moved inside _processBocInitMsg because it is conditional
1232       //ck->process(); 
1233       if(env->isPacked()) CkUnpackMessage(&env);
1234       _processBocInitMsg(ck,env);
1235       break;
1236     case NodeBocInitMsg :
1237       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1238       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1239       _processNodeBocInitMsg(ck,env);
1240       break;
1241     case ForBocMsg :
1242       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1243       // QD processing moved inside _processForBocMsg because it is conditional
1244       if(env->isPacked()) CkUnpackMessage(&env);
1245       _processForBocMsg(ck,env);
1246       // stats record moved inside _processForBocMsg because it is conditional
1247       break;
1248     case ForNodeBocMsg :
1249       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1250       // QD processing moved to _processForNodeBocMsg because it is conditional
1251       if(env->isPacked()) CkUnpackMessage(&env);
1252       _processForNodeBocMsg(ck,env);
1253       // stats record moved to _processForNodeBocMsg because it is conditional
1254       break;
1255
1256 // Array support
1257     case ArrayEltInitMsg:
1258       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1259       if(env->isPacked()) CkUnpackMessage(&env);
1260       _processArrayEltInitMsg(ck,env);
1261       break;
1262     case ForIDedObjMsg:
1263     case ForArrayEltMsg:
1264       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1265       if(env->isPacked()) CkUnpackMessage(&env);
1266       _processArrayEltMsg(ck,env);
1267       break;
1268
1269 // Chare support
1270     case NewChareMsg :
1271       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1272       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1273       _processNewChareMsg(ck,env);
1274       _STATS_RECORD_PROCESS_CHARE_1();
1275       break;
1276     case NewVChareMsg :
1277       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1278       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1279       _processNewVChareMsg(ck,env);
1280       _STATS_RECORD_PROCESS_CHARE_1();
1281       break;
1282     case ForChareMsg :
1283       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1284       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1285       _processForPlainChareMsg(ck,env);
1286       _STATS_RECORD_PROCESS_MSG_1();
1287       break;
1288     case ForVidMsg   :
1289       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1290       ck->process();
1291       _processForVidMsg(ck,env);
1292       break;
1293     case FillVidMsg  :
1294       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1295       ck->process();
1296       _processFillVidMsg(ck,env);
1297       break;
1298     case DeleteVidMsg  :
1299       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1300       ck->process();
1301       _processDeleteVidMsg(ck,env);
1302       break;
1303
1304     default:
1305       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1306   }
1307 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1308         if(obj != NULL){
1309                 postProcessReceivedMessage(obj,sender,SN,entry);
1310         }
1311 #endif
1312
1313
1314 #if USE_CRITICAL_PATH_HEADER_ARRAY
1315   CK_CRITICALPATH_END()
1316 #endif
1317
1318 }
1319
1320
1321 /******************** Message Send **********************/
1322
1323 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1324              int *queueing, int *priobits, unsigned int **prioptr)
1325 {
1326   register envelope *env = (envelope *)converseMsg;
1327   *pfn = (CldPackFn)CkPackMessage;
1328   *len = env->getTotalsize();
1329   *queueing = env->getQueueing();
1330   *priobits = env->getPriobits();
1331   *prioptr = (unsigned int *) env->getPrioPtr();
1332 }
1333
1334 void CkPackMessage(envelope **pEnv)
1335 {
1336   register envelope *env = *pEnv;
1337   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1338     register void *msg = EnvToUsr(env);
1339     _TRACE_BEGIN_PACK();
1340     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1341     _TRACE_END_PACK();
1342     env=UsrToEnv(msg);
1343     env->setPacked(1);
1344     *pEnv = env;
1345   }
1346 }
1347
1348 void CkUnpackMessage(envelope **pEnv)
1349 {
1350   register envelope *env = *pEnv;
1351   register int msgIdx = env->getMsgIdx();
1352   if(env->isPacked()) {
1353     register void *msg = EnvToUsr(env);
1354     _TRACE_BEGIN_UNPACK();
1355     msg = _msgTable[msgIdx]->unpack(msg);
1356     _TRACE_END_UNPACK();
1357     env=UsrToEnv(msg);
1358     env->setPacked(0);
1359     *pEnv = env;
1360   }
1361 }
1362
1363 //There's no reason for most messages to go through the Cld--
1364 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1365 // Thus these accellerated versions of the Cld calls.
1366 #if CMK_OBJECT_QUEUE_AVAILABLE
1367 static int index_objectQHandler;
1368 #endif
1369 int index_tokenHandler;
1370 int index_skipCldHandler;
1371
1372 void _skipCldHandler(void *converseMsg)
1373 {
1374   register envelope *env = (envelope *)(converseMsg);
1375   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1376 #if CMK_GRID_QUEUE_AVAILABLE
1377   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1378     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1379                        env, env->getQueueing (), env->getPriobits (),
1380                        (unsigned int *) env->getPrioPtr ());
1381   } else {
1382     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1383                        env, env->getQueueing (), env->getPriobits (),
1384                        (unsigned int *) env->getPrioPtr ());
1385   }
1386 #else
1387   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1388         env, env->getQueueing(),env->getPriobits(),
1389         (unsigned int *)env->getPrioPtr());
1390 #endif
1391 }
1392
1393
1394 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1395 // Made non-static to be used by ckmessagelogging
1396 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1397 {
1398 #if CMK_CHARMDEBUG
1399   if (!ConverseDeliver(pe)) {
1400     CmiFree(env);
1401     return;
1402   }
1403 #endif
1404   if(pe == CkMyPe() ){
1405     if(!CmiNodeAlive(CkMyPe())){
1406         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1407 //      return;
1408     }
1409   }
1410   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1411 #if CMK_OBJECT_QUEUE_AVAILABLE
1412     Chare *obj = CkFindObjectPtr(env);
1413     if (obj && obj->CkGetObjQueue().queue()) {
1414       _enqObjQueue(obj, env);
1415     }
1416     else
1417 #endif
1418     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1419         env, env->getQueueing(),env->getPriobits(),
1420         (unsigned int *)env->getPrioPtr());
1421 #if CMK_PERSISTENT_COMM
1422     CmiPersistentOneSend();
1423 #endif
1424   } else {
1425     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1426       CkPackMessage(&env);
1427     int len=env->getTotalsize();
1428     CmiSetXHandler(env,CmiGetHandler(env));
1429 #if CMK_OBJECT_QUEUE_AVAILABLE
1430     CmiSetHandler(env,index_objectQHandler);
1431 #else
1432     CmiSetHandler(env,index_skipCldHandler);
1433 #endif
1434     CmiSetInfo(env,infoFn);
1435     if (pe==CLD_BROADCAST) {
1436 #if CMK_MESSAGE_LOGGING
1437         if(env->flags & CK_FREE_MSG_MLOG)
1438                 CmiSyncBroadcastAndFree(len, (char *)env); 
1439         else
1440                 CmiSyncBroadcast(len, (char *)env);
1441 #else
1442                         CmiSyncBroadcastAndFree(len, (char *)env); 
1443 #endif
1444
1445 }
1446     else if (pe==CLD_BROADCAST_ALL) { 
1447 #if CMK_MESSAGE_LOGGING
1448         if(env->flags & CK_FREE_MSG_MLOG)
1449                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1450         else
1451                 CmiSyncBroadcastAll(len, (char *)env);
1452 #else
1453                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1454 #endif
1455
1456 }
1457     else{
1458 #if CMK_MESSAGE_LOGGING
1459         if(env->flags & CK_FREE_MSG_MLOG)
1460                 CmiSyncSendAndFree(pe, len, (char *)env);
1461         else
1462                 CmiSyncSend(pe, len, (char *)env);
1463 #else
1464                         CmiSyncSendAndFree(pe, len, (char *)env);
1465 #endif
1466
1467                 }
1468   }
1469 }
1470
1471 #if CMK_BIGSIM_CHARM
1472 #   define  _skipCldEnqueue   _CldEnqueue
1473 #endif
1474
1475 // by pass Charm++ priority queue, send as Converse message
1476 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1477 {
1478 #if CMK_CHARMDEBUG
1479   if (!ConverseDeliver(-1)) {
1480     CmiFree(env);
1481     return;
1482   }
1483 #endif
1484   CkPackMessage(&env);
1485   int len=env->getTotalsize();
1486   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1487 }
1488
1489 static void _noCldEnqueue(int pe, envelope *env)
1490 {
1491 /*
1492   if (pe == CkMyPe()) {
1493     CmiHandleMessage(env);
1494   } else
1495 */
1496 #if CMK_CHARMDEBUG
1497   if (!ConverseDeliver(pe)) {
1498     CmiFree(env);
1499     return;
1500   }
1501 #endif
1502   CkPackMessage(&env);
1503   int len=env->getTotalsize();
1504   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1505   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1506   else CmiSyncSendAndFree(pe, len, (char *)env);
1507 }
1508
1509 //static void _noCldNodeEnqueue(int node, envelope *env)
1510 //Made non-static to be used by ckmessagelogging
1511 void _noCldNodeEnqueue(int node, envelope *env)
1512 {
1513 /*
1514   if (node == CkMyNode()) {
1515     CmiHandleMessage(env);
1516   } else {
1517 */
1518 #if CMK_CHARMDEBUG
1519   if (!ConverseDeliver(node)) {
1520     CmiFree(env);
1521     return;
1522   }
1523 #endif
1524   CkPackMessage(&env);
1525   int len=env->getTotalsize();
1526   if (node==CLD_BROADCAST) { 
1527 #if CMK_MESSAGE_LOGGING
1528         if(env->flags & CK_FREE_MSG_MLOG)
1529                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1530         else
1531                 CmiSyncNodeBroadcast(len, (char *)env);
1532 #else
1533         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1534 #endif
1535 }
1536   else if (node==CLD_BROADCAST_ALL) { 
1537 #if CMK_MESSAGE_LOGGING
1538         if(env->flags & CK_FREE_MSG_MLOG)
1539                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1540         else
1541                 CmiSyncNodeBroadcastAll(len, (char *)env);
1542 #else
1543                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1544 #endif
1545
1546 }
1547   else {
1548 #if CMK_MESSAGE_LOGGING
1549         if(env->flags & CK_FREE_MSG_MLOG)
1550                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1551         else
1552                 CmiSyncNodeSend(node, len, (char *)env);
1553 #else
1554         CmiSyncNodeSendAndFree(node, len, (char *)env);
1555 #endif
1556   }
1557 }
1558
1559 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1560 {
1561   register envelope *env = UsrToEnv(msg);
1562   _CHECK_USED(env);
1563   _SET_USED(env, 1);
1564 #if CMK_REPLAYSYSTEM
1565   setEventID(env);
1566 #endif
1567   env->setMsgtype(ForChareMsg);
1568   env->setEpIdx(eIdx);
1569   env->setSrcPe(CkMyPe());
1570
1571 #if USE_CRITICAL_PATH_HEADER_ARRAY
1572   CK_CRITICALPATH_SEND(env)
1573   //CK_AUTOMATE_PRIORITY(env)
1574 #endif
1575 #if CMK_CHARMDEBUG
1576   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1577 #endif
1578 #if CMK_OBJECT_QUEUE_AVAILABLE
1579   CmiSetHandler(env, index_objectQHandler);
1580 #else
1581   CmiSetHandler(env, _charmHandlerIdx);
1582 #endif
1583   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1584     register int pe = -(pCid->onPE+1);
1585     if(pe==CkMyPe()) {
1586 #ifndef CMK_CHARE_USE_PTR
1587       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1588 #else
1589       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1590 #endif
1591       void *objPtr;
1592       if (NULL!=(objPtr=vblk->getLocalChare()))
1593       { //A ready local chare
1594         env->setObjPtr(objPtr);
1595         return pe;
1596       }
1597       else { //The vidblock is not ready-- forget it
1598         vblk->send(env);
1599         return -1;
1600       }
1601     } else { //Valid vidblock for another PE:
1602       env->setMsgtype(ForVidMsg);
1603       env->setVidPtr(pCid->objPtr);
1604       return pe;
1605     }
1606   }
1607   else {
1608     env->setObjPtr(pCid->objPtr);
1609     return pCid->onPE;
1610   }
1611 }
1612
1613 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1614 {
1615   int destPE = _prepareMsg(eIdx, msg, pCid);
1616   if (destPE != -1) {
1617     register envelope *env = UsrToEnv(msg);
1618     //criticalPath_send(env);
1619 #if USE_CRITICAL_PATH_HEADER_ARRAY
1620     CK_CRITICALPATH_SEND(env)
1621     //CK_AUTOMATE_PRIORITY(env)
1622 #endif
1623     CmiBecomeImmediate(env);
1624   }
1625   return destPE;
1626 }
1627
1628 extern "C"
1629 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1630 {
1631   if (opts & CK_MSG_INLINE) {
1632     CkSendMsgInline(entryIdx, msg, pCid, opts);
1633     return;
1634   }
1635 #if CMK_ERROR_CHECKING
1636   if (opts & CK_MSG_IMMEDIATE) {
1637     CmiAbort("Immediate message is not allowed in Chare!");
1638   }
1639 #endif
1640   register envelope *env = UsrToEnv(msg);
1641   int destPE=_prepareMsg(entryIdx,msg,pCid);
1642   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1643   // VidBlock was not yet filled). The problem is that the creation was never
1644   // traced later when the VidBlock was filled. One solution is to trace the
1645   // creation here, the other to trace it in VidBlock->msgDeliver().
1646 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1647   if (destPE!=-1) {
1648     CpvAccess(_qd)->create();
1649   }
1650         sendChareMsg(env,destPE,_infoIdx,pCid);
1651 #else
1652   _TRACE_CREATION_1(env);
1653   if (destPE!=-1) {
1654     CpvAccess(_qd)->create();
1655     if (opts & CK_MSG_SKIP_OR_IMM)
1656       _noCldEnqueue(destPE, env);
1657     else
1658       _CldEnqueue(destPE, env, _infoIdx);
1659   }
1660   _TRACE_CREATION_DONE(1);
1661 #endif
1662 }
1663
1664 extern "C"
1665 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1666 {
1667   if (pCid->onPE==CkMyPe())
1668   {
1669     if(!CmiNodeAlive(CkMyPe())){
1670         return;
1671     }
1672 #if CMK_CHARMDEBUG
1673     //Just in case we need to breakpoint or use the envelope in some way
1674     _prepareMsg(entryIndex,msg,pCid);
1675 #endif
1676                 //Just directly call the chare (skip QD handling & scheduler)
1677     register envelope *env = UsrToEnv(msg);
1678     if (env->isPacked()) CkUnpackMessage(&env);
1679     _STATS_RECORD_PROCESS_MSG_1();
1680     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1681   }
1682   else {
1683     //No way to inline a cross-processor message:
1684     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1685   }
1686 }
1687
1688 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1689 {
1690   register envelope *env = UsrToEnv(msg);
1691   /*#if CMK_ERROR_CHECKING
1692   CkNodeGroupID nodeRedMgr;
1693 #endif
1694   */
1695   _CHECK_USED(env);
1696   _SET_USED(env, 1);
1697 #if CMK_REPLAYSYSTEM
1698   setEventID(env);
1699 #endif
1700   env->setMsgtype(type);
1701   env->setEpIdx(eIdx);
1702   env->setGroupNum(gID);
1703   env->setSrcPe(CkMyPe());
1704   /*
1705 #if CMK_ERROR_CHECKING
1706   nodeRedMgr.setZero();
1707   env->setRednMgr(nodeRedMgr);
1708 #endif
1709 */
1710   //criticalPath_send(env);
1711 #if USE_CRITICAL_PATH_HEADER_ARRAY
1712   CK_CRITICALPATH_SEND(env)
1713   //CK_AUTOMATE_PRIORITY(env)
1714 #endif
1715 #if CMK_CHARMDEBUG
1716   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1717 #endif
1718   CmiSetHandler(env, _charmHandlerIdx);
1719   return env;
1720 }
1721
1722 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1723 {
1724   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1725 #if USE_CRITICAL_PATH_HEADER_ARRAY
1726   CK_CRITICALPATH_SEND(env)
1727   //CK_AUTOMATE_PRIORITY(env)
1728 #endif
1729   CmiBecomeImmediate(env);
1730   return env;
1731 }
1732
1733 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1734                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1735 {
1736   int numPes;
1737   register envelope *env;
1738     if (opts & CK_MSG_IMMEDIATE) {
1739         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1740     }else
1741     {
1742         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1743     }
1744
1745 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1746         sendGroupMsg(env,pe,_infoIdx);
1747 #else
1748   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1749   _TRACE_CREATION_N(env, numPes);
1750   if (opts & CK_MSG_SKIP_OR_IMM)
1751     _noCldEnqueue(pe, env);
1752   else
1753     _skipCldEnqueue(pe, env, _infoIdx);
1754   _TRACE_CREATION_DONE(1);
1755 #endif
1756 }
1757
1758 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1759                            int npes, int *pes)
1760 {
1761   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1762   _TRACE_CREATION_MULTICAST(env, npes, pes);
1763   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1764   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1765 }
1766
1767 extern "C"
1768 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1769 {
1770 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1771   if (destPE==CkMyPe())
1772   {
1773     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1774     return;
1775   }
1776   //Can't inline-- send the usual way
1777   register envelope *env = UsrToEnv(msg);
1778   int numPes;
1779   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1780   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1781   _TRACE_CREATION_N(env, numPes);
1782   _noCldEnqueue(destPE, env);
1783   _STATS_RECORD_SEND_BRANCH_1();
1784   CkpvAccess(_coreState)->create();
1785   _TRACE_CREATION_DONE(1);
1786 #else
1787   // no support for immediate message, send inline
1788   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1789 #endif
1790 }
1791
1792 extern "C"
1793 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1794 {
1795   if (destPE==CkMyPe())
1796   {
1797     if(!CmiNodeAlive(CkMyPe())){
1798         return;
1799     }
1800     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1801     if (obj!=NULL)
1802     { //Just directly call the group:
1803 #if CMK_ERROR_CHECKING
1804       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1805 #else
1806       envelope *env=UsrToEnv(msg);
1807 #endif
1808       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1809       return;
1810     }
1811   }
1812   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1813   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1814 }
1815
1816 extern "C"
1817 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1818 {
1819   if (opts & CK_MSG_INLINE) {
1820     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1821     return;
1822   }
1823   if (opts & CK_MSG_IMMEDIATE) {
1824     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1825     return;
1826   }
1827   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1828   _STATS_RECORD_SEND_BRANCH_1();
1829   CkpvAccess(_coreState)->create();
1830 }
1831
1832 extern "C"
1833 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1834 {
1835 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1836   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1837   _TRACE_CREATION_MULTICAST(env, npes, pes);
1838   _noCldEnqueueMulti(npes, pes, env);
1839   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1840 #else
1841   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1842   CpvAccess(_qd)->create(-npes);
1843 #endif
1844   _STATS_RECORD_SEND_BRANCH_N(npes);
1845   CpvAccess(_qd)->create(npes);
1846 }
1847
1848 extern "C"
1849 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1850 {
1851   if (opts & CK_MSG_IMMEDIATE) {
1852     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1853     return;
1854   }
1855     // normal mesg
1856   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1857   _STATS_RECORD_SEND_BRANCH_N(npes);
1858   CpvAccess(_qd)->create(npes);
1859 }
1860
1861 extern "C"
1862 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1863 {
1864   int npes;
1865   int *pes;
1866   if (opts & CK_MSG_IMMEDIATE) {
1867     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1868     return;
1869   }
1870     // normal mesg
1871   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1872   CmiLookupGroup(grp, &npes, &pes);
1873   _TRACE_CREATION_MULTICAST(env, npes, pes);
1874   _CldEnqueueGroup(grp, env, _infoIdx);
1875   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1876   _STATS_RECORD_SEND_BRANCH_N(npes);
1877   CpvAccess(_qd)->create(npes);
1878 }
1879
1880 extern "C"
1881 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1882 {
1883   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1884   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1885   CpvAccess(_qd)->create(CkNumPes());
1886 }
1887
1888 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1889                 int node=CLD_BROADCAST_ALL, int opts=0)
1890 {
1891     int numPes;
1892     register envelope *env;
1893     if (opts & CK_MSG_IMMEDIATE) {
1894         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1895     }else
1896     {
1897         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1898     }
1899 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1900     sendNodeGroupMsg(env,node,_infoIdx);
1901 #else
1902   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1903   _TRACE_CREATION_N(env, numPes);
1904   if (opts & CK_MSG_SKIP_OR_IMM) {
1905     _noCldNodeEnqueue(node, env);
1906   }
1907   else
1908     _CldNodeEnqueue(node, env, _infoIdx);
1909   _TRACE_CREATION_DONE(1);
1910 #endif
1911 }
1912
1913 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1914                            int npes, int *nodes)
1915 {
1916   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1917   _TRACE_CREATION_N(env, npes);
1918   for (int i=0; i<npes; i++) {
1919     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1920   }
1921   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1922 }
1923
1924 extern "C"
1925 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1926 {
1927 #if CMK_IMMEDIATE_MSG
1928   if (node==CkMyNode())
1929   {
1930     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1931     return;
1932   }
1933   //Can't inline-- send the usual way
1934   register envelope *env = UsrToEnv(msg);
1935   int numPes;
1936   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1937   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1938   _TRACE_CREATION_N(env, numPes);
1939   _noCldNodeEnqueue(node, env);
1940   _STATS_RECORD_SEND_BRANCH_1();
1941   CkpvAccess(_coreState)->create();
1942   _TRACE_CREATION_DONE(1);
1943 #else
1944   // no support for immediate message, send inline
1945   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1946 #endif
1947 }
1948
1949 extern "C"
1950 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1951 {
1952   if (node==CkMyNode())
1953   {
1954     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1955     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1956     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1957     if (obj!=NULL)
1958     { //Just directly call the group:
1959 #if CMK_ERROR_CHECKING
1960       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1961 #else
1962       envelope *env=UsrToEnv(msg);
1963 #endif
1964       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1965       return;
1966     }
1967   }
1968   //Can't inline-- send the usual way
1969   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1970 }
1971
1972 extern "C"
1973 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1974 {
1975   if (opts & CK_MSG_INLINE) {
1976     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1977     return;
1978   }
1979   if (opts & CK_MSG_IMMEDIATE) {
1980     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1981     return;
1982   }
1983   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1984   _STATS_RECORD_SEND_NODE_BRANCH_1();
1985   CkpvAccess(_coreState)->create();
1986 }
1987
1988 extern "C"
1989 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1990 {
1991 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1992   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1993   _noCldEnqueueMulti(npes, nodes, env);
1994 #else
1995   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1996   CpvAccess(_qd)->create(-npes);
1997 #endif
1998   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1999   CpvAccess(_qd)->create(npes);
2000 }
2001
2002 extern "C"
2003 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2004 {
2005   if (opts & CK_MSG_IMMEDIATE) {
2006     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2007     return;
2008   }
2009     // normal mesg
2010   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2011   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2012   CpvAccess(_qd)->create(npes);
2013 }
2014
2015 extern "C"
2016 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2017 {
2018   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2019   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2020   CpvAccess(_qd)->create(CkNumNodes());
2021 }
2022
2023 //Needed by delegation manager:
2024 extern "C"
2025 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2026 { return _prepareMsg(eIdx,msg,pCid); }
2027 extern "C"
2028 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2029 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2030 extern "C"
2031 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2032 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2033
2034 void _ckModuleInit(void) {
2035         index_skipCldHandler = CkRegisterHandler(_skipCldHandler);
2036 #if CMK_OBJECT_QUEUE_AVAILABLE
2037         index_objectQHandler = CkRegisterHandler(_ObjectQHandler);
2038 #endif
2039         index_tokenHandler = CkRegisterHandler(_TokenHandler);
2040         CkpvInitialize(TokenPool*, _tokenPool);
2041         CkpvAccess(_tokenPool) = new TokenPool;
2042 }
2043
2044
2045 /************** Send: Arrays *************/
2046
2047 extern void CkArrayManagerInsert(int onPe,void *msg);
2048 //extern void CkArrayManagerDeliver(int onPe,void *msg);
2049
2050 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2051 {
2052   _CHECK_USED(env);
2053   _SET_USED(env, 1);
2054   env->setMsgtype(type);
2055 #if CMK_CHARMDEBUG
2056   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2057 #endif
2058   CmiSetHandler(env, _charmHandlerIdx);
2059   CpvAccess(_qd)->create();
2060 }
2061
2062 extern "C"
2063 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
2064   register envelope *env = UsrToEnv(msg);
2065   env->setArrayMgr(aID);
2066   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
2067   _CldEnqueue(pe, env, _infoIdx);
2068 }
2069
2070 extern "C"
2071 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2072   register envelope *env = UsrToEnv(msg);
2073   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2074 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2075         sendArrayMsg(env,pe,_infoIdx);
2076 #else
2077   if (opts & CK_MSG_IMMEDIATE)
2078     CmiBecomeImmediate(env);
2079   if (opts & CK_MSG_SKIP_OR_IMM)
2080     _noCldEnqueue(pe, env);
2081   else
2082     _skipCldEnqueue(pe, env, _infoIdx);
2083 #endif
2084 }
2085
2086 class ElementDestroyer : public CkLocIterator {
2087 private:
2088         CkLocMgr *locMgr;
2089 public:
2090         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2091         void addLocation(CkLocation &loc) {
2092           loc.destroyAll();
2093         }
2094 };
2095
2096 void CkDeleteChares() {
2097   int i;
2098   int numGroups = CkpvAccess(_groupIDTable)->size();
2099
2100   // delete all plain chares
2101 #ifndef CMK_CHARE_USE_PTR
2102   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2103         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2104         delete obj;
2105         CkpvAccess(chare_objs)[i] = NULL;
2106   }
2107   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2108         VidBlock *obj = CkpvAccess(vidblocks)[i];
2109         delete obj;
2110         CkpvAccess(vidblocks)[i] = NULL;
2111   }
2112 #endif
2113
2114   // delete all array elements
2115   for(i=0;i<numGroups;i++) {
2116     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2117     if(obj && obj->isLocMgr())  {
2118       CkLocMgr *mgr = (CkLocMgr*)obj;
2119       ElementDestroyer destroyer(mgr);
2120       mgr->iterate(destroyer);
2121     }
2122   }
2123
2124   // delete all groups
2125   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2126   for(i=0;i<numGroups;i++) {
2127     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2128     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2129     if (obj) delete obj;
2130   }
2131   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2132
2133   // delete all node groups
2134   if (CkMyRank() == 0) {
2135     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2136     for(i=0;i<numNodeGroups;i++) {
2137       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2138       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2139       if (obj) delete obj;
2140     }
2141   }
2142 }
2143
2144 #if CMK_BIGSIM_CHARM
2145 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2146                                    int pb,unsigned int *prio);
2147 #endif
2148
2149 //------------------- Message Watcher (record/replay) ----------------
2150
2151 #include "crc32.h"
2152
2153 CkpvDeclare(int, envelopeEventID);
2154 int _recplay_crc = 0;
2155 int _recplay_checksum = 0;
2156 int _recplay_logsize = 1024*1024;
2157
2158 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2159 #define REPLAYDEBUG(args) /* empty */
2160
2161 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2162
2163 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2164 #include "BaseLB.h" /* For LBMigrateMsg message */
2165
2166 #if CMK_REPLAYSYSTEM
2167 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2168
2169     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2170     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2171     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2172     FILE *f=fopen(fName,permissions);
2173     REPLAYDEBUG("openReplayfile "<<fName);
2174     if (f==NULL) {
2175         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2176             CkMyPe(),fName,permissions);
2177         CkAbort("openReplayFile> Could not open replay file");
2178     }
2179     return f;
2180 }
2181
2182 class CkMessageRecorder : public CkMessageWatcher {
2183   char *buffer;
2184   unsigned int curpos;
2185   bool firstOpen;
2186 public:
2187   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2188   ~CkMessageRecorder() {
2189     flushLog(0);
2190     fprintf(f,"-1 -1 -1 ");
2191     fclose(f);
2192     delete[] buffer;
2193 #if 0
2194     FILE *stsfp = fopen("sts", "w");
2195     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2196     traceWriteSTS(stsfp, 0);
2197     fclose(stsfp);
2198 #endif
2199     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2200   }
2201
2202 private:
2203   void flushLog(int verbose=1) {
2204     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2205     fprintf(f, "%s", buffer);
2206     curpos=0;
2207   }
2208   virtual bool process(envelope **envptr,CkCoreState *ck) {
2209     if ((*envptr)->getEvent()) {
2210       bool wasPacked = (*envptr)->isPacked();
2211       if (!wasPacked) CkPackMessage(envptr);
2212       envelope *env = *envptr;
2213       unsigned int crc1=0, crc2=0;
2214       if (_recplay_crc) {
2215         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2216         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2217         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2218       } else if (_recplay_checksum) {
2219         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2220         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2221       }
2222       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2223       if (curpos > _recplay_logsize-128) flushLog();
2224       if (!wasPacked) CkUnpackMessage(envptr);
2225     }
2226     return true;
2227   }
2228   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2229     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2230     if (curpos > _recplay_logsize-128) flushLog();
2231     return true;
2232   }
2233   
2234   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2235     FILE *f;
2236     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2237     else f = openReplayFile("ckreplay_",".lb","a");
2238     firstOpen = false;
2239     if (f != NULL) {
2240       PUP::toDisk p(f);
2241       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2242       (*msg)->pup(p);
2243       fclose(f);
2244     }
2245     return true;
2246   }
2247 };
2248
2249 class CkMessageDetailRecorder : public CkMessageWatcher {
2250 public:
2251   CkMessageDetailRecorder(FILE *f_) {
2252     f=f_;
2253     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2254      * The value of 'x' is the pointer size.
2255      */
2256     CmiUInt2 little = sizeof(void*);
2257     fwrite(&little, 2, 1, f);
2258   }
2259   ~CkMessageDetailRecorder() {fclose(f);}
2260 private:
2261   virtual bool process(envelope **envptr, CkCoreState *ck) {
2262     bool wasPacked = (*envptr)->isPacked();
2263     if (!wasPacked) CkPackMessage(envptr);
2264     envelope *env = *envptr;
2265     CmiUInt4 size = env->getTotalsize();
2266     fwrite(&size, 4, 1, f);
2267     fwrite(env, env->getTotalsize(), 1, f);
2268     if (!wasPacked) CkUnpackMessage(envptr);
2269     return true;
2270   }
2271 };
2272
2273 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2274 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2275
2276 class CkMessageReplay : public CkMessageWatcher {
2277   int counter;
2278         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2279         int nextEP;
2280         unsigned int crc1, crc2;
2281         FILE *lbFile;
2282         /// Read the next message we need from the file:
2283         void getNext(void) {
2284           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2285           if (nextSize > 0) {
2286             // We are reading a regular message
2287             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2288               CkAbort("CkMessageReplay> Syntax error reading replay file");
2289             }
2290             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2291           } else if (nextSize == -2) {
2292             // We are reading a special message (right now only thread awaken)
2293             // Nothing to do since we have already read all info
2294             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2295           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2296             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2297             CkAbort("CkMessageReplay> Unrecognized input");
2298           }
2299             /*
2300                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2301                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2302                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2303                 }
2304                 */
2305                 counter++;
2306         }
2307         /// If this is the next message we need, advance and return true.
2308         bool isNext(envelope *env) {
2309                 if (nextPE!=env->getSrcPe()) return false;
2310                 if (nextEvent!=env->getEvent()) return false;
2311                 if (nextSize<0) return false; // not waiting for a regular message
2312 #if 1
2313                 if (nextEP != env->getEpIdx()) {
2314                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2315                         return false;
2316                 }
2317 #endif
2318 #if ! CMK_BIGSIM_CHARM
2319                 if (nextSize!=env->getTotalsize())
2320                 {
2321                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2322                         return false;
2323                 }
2324                 if (_recplay_crc || _recplay_checksum) {
2325                   bool wasPacked = env->isPacked();
2326                   if (!wasPacked) CkPackMessage(&env);
2327                   if (_recplay_crc) {
2328                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2329                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2330                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2331                     if (crcnew1 != crc1) {
2332                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2333                     }
2334                     if (crcnew2 != crc2) {
2335                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2336                     }
2337                   } else if (_recplay_checksum) {
2338             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2339             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2340             if (crcnew1 != crc1) {
2341               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2342             }
2343             if (crcnew2 != crc2) {
2344               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2345             }               
2346                   }
2347                   if (!wasPacked) CkUnpackMessage(&env);
2348                 }
2349 #endif
2350                 return true;
2351         }
2352         bool isNext(CthThreadToken *token) {
2353           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2354           return false;
2355         }
2356
2357         /// This is a (short) list of messages we aren't yet ready for:
2358         CkQ<envelope *> delayedMessages;
2359         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2360         CkQ<CthThreadToken *> delayedTokens;
2361
2362         /// Try to flush out any delayed messages
2363         void flush(void) {
2364           if (nextSize>0) {
2365                 int len=delayedMessages.length();
2366                 for (int i=0;i<len;i++) {
2367                         envelope *env=delayedMessages.deq();
2368                         if (isNext(env)) { /* this is the next message: process it */
2369                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2370                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2371                                 return;
2372                         }
2373                         else /* Not ready yet-- put it back in the
2374                                 queue */
2375                           {
2376                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2377                                 delayedMessages.enq(env);
2378                           }
2379                 }
2380           } else if (nextSize==-2) {
2381             int len=delayedTokens.length();
2382             for (int i=0;i<len;++i) {
2383               CthThreadToken *token=delayedTokens.deq();
2384               if (isNext(token)) {
2385             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2386 #if ! CMK_BIGSIM_CHARM
2387                 CsdEnqueueLifo((void*)token);
2388 #else
2389                 CthEnqueueBigSimThread(token,0,0,NULL);
2390 #endif
2391                 return;
2392               } else {
2393             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2394                 delayedTokens.enq(token);
2395               }
2396             }
2397           }
2398         }
2399
2400 public:
2401         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2402           counter=0;
2403           f=f_;
2404           getNext();
2405           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2406           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2407         }
2408         ~CkMessageReplay() {fclose(f);}
2409
2410 private:
2411         virtual bool process(envelope **envptr,CkCoreState *ck) {
2412           bool wasPacked = (*envptr)->isPacked();
2413           if (!wasPacked) CkPackMessage(envptr);
2414           envelope *env = *envptr;
2415           //CkAssert(*(int*)env == 0x34567890);
2416           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2417                 if (env->getEvent() == 0) return true;
2418                 if (isNext(env)) { /* This is the message we were expecting */
2419                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2420                         getNext(); /* Advance over this message */
2421                         flush(); /* try to process queued-up stuff */
2422                         if (!wasPacked) CkUnpackMessage(envptr);
2423                         return true;
2424                 }
2425 #if CMK_SMP
2426                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2427                          // try next rank, we can't just buffer the msg and left
2428                          // we need to keep unprocessed msg on the fly
2429                         int nextpe = CkMyPe()+1;
2430                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2431                         nextpe = CkNodeFirst(CkMyNode());
2432                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2433                         return false;
2434                 }
2435 #endif
2436                 else /*!isNext(env) */ {
2437                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2438                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2439                         delayedMessages.enq(env);
2440                         flush();
2441                         return false;
2442                 }
2443         }
2444         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2445       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2446           if (isNext(token)) {
2447         REPLAYDEBUG("Executing token: "<<token->serialNo)
2448             getNext();
2449             flush();
2450             return true;
2451           } else {
2452         REPLAYDEBUG("Queueing token: "<<token->serialNo
2453             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2454             delayedTokens.enq(token);
2455             return false;
2456           }
2457         }
2458
2459         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2460           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2461           if (lbFile != NULL) {
2462             int num_moves = 0;
2463         PUP::fromDisk p(lbFile);
2464             p | num_moves;
2465             if (num_moves != (*msg)->n_moves) {
2466               delete *msg;
2467               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2468             }
2469             (*msg)->pup(p);
2470           }
2471           return true;
2472         }
2473 };
2474
2475 class CkMessageDetailReplay : public CkMessageWatcher {
2476   void *getNext() {
2477     CmiUInt4 size; size_t nread;
2478     if ((nread=fread(&size, 4, 1, f)) < 1) {
2479       if (feof(f)) return NULL;
2480       CkPrintf("Broken record file (metadata) got %d\n",nread);
2481       CkAbort("");
2482     }
2483     void *env = CmiAlloc(size);
2484     long tell = ftell(f);
2485     if ((nread=fread(env, size, 1, f)) < 1) {
2486       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2487       CkAbort("");
2488     }
2489     //*(int*)env = 0x34567890; // set first integer as magic
2490     return env;
2491   }
2492 public:
2493   double starttime;
2494   CkMessageDetailReplay(FILE *f_) {
2495     f=f_;
2496     starttime=CkWallTimer();
2497     /* This must match what CkMessageDetailRecorder did */
2498     CmiUInt2 little;
2499     fread(&little, 2, 1, f);
2500     if (little != sizeof(void*)) {
2501       CkAbort("Replaying on a different architecture from which recording was done!");
2502     }
2503
2504     CsdEnqueue(getNext());
2505
2506     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2507   }
2508   virtual bool process(envelope **env,CkCoreState *ck) {
2509     void *msg = getNext();
2510     if (msg != NULL) CsdEnqueue(msg);
2511     return true;
2512   }
2513 };
2514
2515 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2516 #if ! CMK_BIGSIM_CHARM
2517   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2518 #endif
2519   CkMessageReplay *replay = (CkMessageReplay*)rep;
2520   //CmiStartQD(CkMessageReplayQuiescence, replay);
2521 }
2522
2523 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2524   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2525   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2526   ConverseExit();
2527 }
2528 #endif
2529
2530 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2531   CkCoreState *ck = CkpvAccess(_coreState);
2532   if (ck->watcher!=NULL) {
2533     return ck->watcher->processThread(token,ck);
2534   }
2535   return true;
2536 }
2537
2538 CpvCExtern(int, CthResumeNormalThreadIdx);
2539 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2540 {
2541   CthThread t = token->thread;
2542
2543   if(t == NULL){
2544     free(token);
2545     return;
2546   }
2547 #if CMK_TRACE_ENABLED
2548 #if ! CMK_TRACE_IN_CHARM
2549   if(CpvAccess(traceOn))
2550     CthTraceResume(t);
2551 /*    if(CpvAccess(_traceCoreOn)) 
2552             resumeTraceCore();*/
2553 #endif
2554 #endif
2555   
2556   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2557   if (CpdExecuteThreadResume(token)) {
2558     CthResume(t);
2559   }
2560 }
2561
2562 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2563   CkCoreState *ck = CkpvAccess(_coreState);
2564   if (ck->watcher!=NULL) {
2565     ck->watcher->processLBMessage(msg, ck);
2566   }
2567 }
2568
2569 #if CMK_BIGSIM_CHARM
2570 CpvExtern(int      , CthResumeBigSimThreadIdx);
2571 #endif
2572
2573 #include "ckliststring.h"
2574 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2575     CmiArgGroup("Charm++","Record/Replay");
2576     bool forceReplay = false;
2577     char *procs = NULL;
2578     _replaySystem = 0;
2579     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2580       if(CmiMyRank() == 0) _recplay_crc = 1;
2581     }
2582     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2583       if(CmiMyRank() == 0) _recplay_checksum = 1;
2584     }
2585     int tmplogsize;
2586     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
2587       {
2588         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
2589       }
2590     REPLAYDEBUG("CkMessageWatcherInit ");
2591     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2592 #if CMK_REPLAYSYSTEM
2593         CkListString list(procs);
2594         if (list.includes(CkMyPe())) {
2595           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2596           CpdSetInitializeMemory(1);
2597           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2598         }
2599 #else
2600         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
2601 #endif
2602     }
2603     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2604 #if CMK_REPLAYSYSTEM
2605       if (CkMyPe() == 0) {
2606         CmiPrintf("Charm++> record mode.\n");
2607         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2608           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2609           _recplay_crc = _recplay_checksum = 0;
2610         }
2611       }
2612       CpdSetInitializeMemory(1);
2613       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2614       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2615 #else
2616       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
2617 #endif
2618     }
2619         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2620 #if CMK_REPLAYSYSTEM
2621             forceReplay = true;
2622             CpdSetInitializeMemory(1);
2623             // Set the parameters of the processor
2624 #if CMK_SHARED_VARS_UNAVAILABLE
2625             _Cmi_mype = atoi(procs);
2626             while (procs[0]!='/') procs++;
2627             procs++;
2628             _Cmi_numpes = atoi(procs);
2629 #else
2630             CkAbort("+replay-detail available only for non-SMP build");
2631 #endif
2632             _replaySystem = 1;
2633             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2634 #else
2635           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
2636 #endif
2637         }
2638         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2639 #if CMK_REPLAYSYSTEM
2640           if (CkMyPe() == 0)  {
2641             CmiPrintf("Charm++> replay mode.\n");
2642             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2643               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2644               _recplay_crc = _recplay_checksum = 0;
2645             }
2646           }
2647           CpdSetInitializeMemory(1);
2648 #if ! CMK_BIGSIM_CHARM
2649           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2650 #else
2651           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2652 #endif
2653           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2654 #else
2655           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
2656 #endif
2657         }
2658         if (_recplay_crc && _recplay_checksum) {
2659           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2660         }
2661 }
2662
2663 extern "C"
2664 int CkMessageToEpIdx(void *msg) {
2665         envelope *env=UsrToEnv(msg);
2666         int ep=env->getEpIdx();
2667         if (ep==CkIndex_CkArray::recvBroadcast(0))
2668                 return env->getsetArrayBcastEp();
2669         else
2670                 return ep;
2671 }
2672
2673 extern "C"
2674 int getCharmEnvelopeSize() {
2675   return sizeof(envelope);
2676 }
2677
2678 /// Best-effort guess at whether @arg msg points at a charm envelope
2679 extern "C"
2680 int isCharmEnvelope(void *msg) {
2681     envelope *e = (envelope *)msg;
2682     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2683     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2684     if (e->getTotalsize() < sizeof(envelope)) return 0;
2685     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2686 #if CMK_SMP
2687     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2688 #else
2689     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2690 #endif
2691     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2692     return 1;
2693 }
2694
2695 #include "CkMarshall.def.h"