Making use of both message-logging techniques homogeneous in Charm++ core.
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 extern "C"
480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
481 {
482   UsrToEnv(msg)->setRef(ref);
483 }
484
485 extern "C"
486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
487 {
488   return UsrToEnv(msg)->getRef();
489 }
490
491 extern "C"
492 int CkGetSrcPe(void *msg)
493 {
494   return UsrToEnv(msg)->getSrcPe();
495 }
496
497 extern "C"
498 int CkGetSrcNode(void *msg)
499 {
500   return CmiNodeOf(CkGetSrcPe(msg));
501 }
502
503 extern "C"
504 void *CkLocalBranch(CkGroupID gID) {
505   return _localBranch(gID);
506 }
507
508 static
509 void *_ckLocalNodeBranch(CkGroupID groupID) {
510   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
511   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
512   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
513   return retval;
514 }
515
516 extern "C"
517 void *CkLocalNodeBranch(CkGroupID groupID)
518 {
519   void *retval;
520   // we are called in a constructor
521   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
522     return CkpvAccess(_currentNodeGroupObj);
523   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
524   { // Nodegroup hasn't finished being created yet-- schedule...
525     CsdScheduler(0);
526   }
527   return retval;
528 }
529
530 extern "C"
531 void *CkLocalChare(const CkChareID *pCid)
532 {
533         int pe=pCid->onPE;
534         if (pe<0) { //A virtual chare ID
535                 if (pe!=(-(CkMyPe()+1)))
536                         return NULL;//VID block not on this PE
537 #ifdef CMK_CHARE_USE_PTR
538                 VidBlock *v=(VidBlock *)pCid->objPtr;
539 #else
540                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
541 #endif
542                 return v->getLocalChareObj();
543         }
544         else
545         { //An ordinary chare ID
546                 if (pe!=CkMyPe())
547                         return NULL;//Chare not on this PE
548 #ifdef CMK_CHARE_USE_PTR
549                 return pCid->objPtr;
550 #else
551                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
552 #endif
553         }
554 }
555
556 CkpvDeclare(char**,Ck_argv);
557
558 extern "C" char **CkGetArgv(void) {
559         return CkpvAccess(Ck_argv);
560 }
561 extern "C" int CkGetArgc(void) {
562         return CmiGetArgc(CkpvAccess(Ck_argv));
563 }
564
565 /******************** Basic support *****************/
566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
567 {
568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
569         CpvAccess(_currentObj) = (Chare *)obj;
570 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
571 #endif
572   //BIGSIM_OOC DEBUGGING
573   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574   //fflush(stdout);
575 #if CMK_CHARMDEBUG
576   CpdBeforeEp(epIdx, obj, msg);
577 #endif    
578   _entryTable[epIdx]->call(msg, obj);
579 #if CMK_CHARMDEBUG
580   CpdAfterEp(epIdx);
581 #endif
582   if (_entryTable[epIdx]->noKeep)
583   { /* Method doesn't keep/delete the message, so we have to: */
584     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585   }
586 }
587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
588 {
589   //BIGSIM_OOC DEBUGGING
590   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
591   //fflush(stdout);
592
593   void *deliverMsg;
594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
595         CpvAccess(_currentObj) = (Chare *)obj;
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Deliver a read-only copy of the message */
599     deliverMsg=(void *)msg;
600   } else
601   { /* Method needs a copy of the message to keep/delete */
602     void *oldMsg=(void *)msg;
603     deliverMsg=CkCopyMsg(&oldMsg);
604 #if CMK_ERROR_CHECKING
605     if (oldMsg!=msg)
606       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
607 #endif
608   }
609 #if CMK_CHARMDEBUG
610   CpdBeforeEp(epIdx, obj, (void*)msg);
611 #endif
612   _entryTable[epIdx]->call(deliverMsg, obj);
613 #if CMK_CHARMDEBUG
614   CpdAfterEp(epIdx);
615 #endif
616 }
617
618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
619 {
620   register void *msg = EnvToUsr(env);
621   _SET_USED(env, 0);
622   CkDeliverMessageFree(epIdx,msg,obj);
623 }
624
625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 {
627
628 #if CMK_TRACE_ENABLED 
629   if (_entryTable[epIdx]->traceEnabled) {
630     _TRACE_BEGIN_EXECUTE(env);
631     _invokeEntryNoTrace(epIdx,env,obj);
632     _TRACE_END_EXECUTE();
633   }
634   else
635 #endif
636     _invokeEntryNoTrace(epIdx,env,obj);
637
638 }
639
640 /********************* Creation ********************/
641
642 extern "C"
643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
644 {
645   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
646   envelope *env = UsrToEnv(msg);
647   _CHECK_USED(env);
648   if(pCid == 0) {
649     env->setMsgtype(NewChareMsg);
650   } else {
651     pCid->onPE = (-(CkMyPe()+1));
652     //  pCid->magic = _GETIDX(cIdx);
653     pCid->objPtr = (void *) new VidBlock();
654     _MEMCHECK(pCid->objPtr);
655     env->setMsgtype(NewVChareMsg);
656     env->setVidPtr(pCid->objPtr);
657 #ifndef CMK_CHARE_USE_PTR
658     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
659     int idx = CkpvAccess(vidblocks).size()-1;
660     pCid->objPtr = (void *)(CmiIntPtr)idx;
661     env->setVidPtr((void *)(CmiIntPtr)idx);
662 #endif
663   }
664   env->setEpIdx(eIdx);
665   env->setByPe(CkMyPe());
666   env->setSrcPe(CkMyPe());
667   CmiSetHandler(env, _charmHandlerIdx);
668   _TRACE_CREATION_1(env);
669   CpvAccess(_qd)->create();
670   _STATS_RECORD_CREATE_CHARE_1();
671   _SET_USED(env, 1);
672   if(destPE == CK_PE_ANY)
673     env->setForAnyPE(1);
674   else
675     env->setForAnyPE(0);
676   _CldEnqueue(destPE, env, _infoIdx);
677   _TRACE_CREATION_DONE(1);
678 }
679
680 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
681 {
682   register int gIdx = _entryTable[epIdx]->chareIdx;
683   register void *obj = malloc(_chareTable[gIdx]->size);
684   _MEMCHECK(obj);
685   setMemoryTypeChare(obj);
686   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
687   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
688   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
689   CkpvAccess(_groupIDTable)->push_back(groupID);
690   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
691   if(ptrq) {
692     void *pending;
693     while((pending=ptrq->deq())!=0)
694       _CldEnqueue(CkMyPe(), pending, _infoIdx);
695 //    delete ptrq;
696       CkpvAccess(_groupTable)->find(groupID).clearPending();
697   }
698   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
699
700   CkpvAccess(_currentGroup) = groupID;
701   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
702 #ifndef CMK_CHARE_USE_PTR
703   //((Chare *)obj)->chareIdx = -1;
704   CkpvAccess(currentChareIdx) = -1;
705 #endif
706   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
707   _STATS_RECORD_PROCESS_GROUP_1();
708 }
709
710 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
711 {
712   register int gIdx = _entryTable[epIdx]->chareIdx;
713   int objSize=_chareTable[gIdx]->size;
714   register void *obj = malloc(objSize);
715   _MEMCHECK(obj);
716   setMemoryTypeChare(obj);
717   CkpvAccess(_currentGroup) = groupID;
718
719 // Now that the NodeGroup is created, add it to the table.
720 //  NodeGroups can be accessed by multiple processors, so
721 //  this is in the opposite order from groups - invoking the constructor
722 //  before registering it.
723 // User may call CkLocalNodeBranch() inside the nodegroup constructor
724 //  store nodegroup into _currentNodeGroupObj
725   CkpvAccess(_currentNodeGroupObj) = obj;
726 #ifndef CMK_CHARE_USE_PTR
727   //((Chare *)obj)->chareIdx = -1;
728   CkpvAccess(currentChareIdx) = -1;
729 #endif
730   _invokeEntryNoTrace(epIdx,env,obj);
731   CkpvAccess(_currentNodeGroupObj) = NULL;
732   _STATS_RECORD_PROCESS_NODE_GROUP_1();
733
734   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
735   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
736   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
737   CksvAccess(_nodeGroupIDTable).push_back(groupID);
738
739   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
740   if(ptrq) {
741     void *pending;
742     while((pending=ptrq->deq())!=0)
743       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
744 //    delete ptrq;
745       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
746   }
747   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
748 }
749
750 void _createGroup(CkGroupID groupID, envelope *env)
751 {
752   _CHECK_USED(env);
753   _SET_USED(env, 1);
754   register int epIdx = env->getEpIdx();
755   int gIdx = _entryTable[epIdx]->chareIdx;
756   CkNodeGroupID rednMgr;
757   if(_chareTable[gIdx]->isIrr == 0){
758                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
759                 rednMgr = rednMgrProxy;
760 //              rednMgrProxy.setAttachedGroup(groupID);
761   }else{
762         rednMgr.setZero();
763   }
764   env->setGroupNum(groupID);
765   env->setSrcPe(CkMyPe());
766   env->setRednMgr(rednMgr);
767   env->setGroupEpoch(CkpvAccess(_charmEpoch));
768
769   if(CkNumPes()>1) {
770     CkPackMessage(&env);
771     CmiSetHandler(env, _bocHandlerIdx);
772     _numInitMsgs++;
773     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
774     CpvAccess(_qd)->create(CkNumPes()-1);
775     CkUnpackMessage(&env);
776   }
777   _STATS_RECORD_CREATE_GROUP_1();
778   CkCreateLocalGroup(groupID, epIdx, env);
779 }
780
781 void _createNodeGroup(CkGroupID groupID, envelope *env)
782 {
783   _CHECK_USED(env);
784   _SET_USED(env, 1);
785   register int epIdx = env->getEpIdx();
786   env->setGroupNum(groupID);
787   env->setSrcPe(CkMyPe());
788   env->setGroupEpoch(CkpvAccess(_charmEpoch));
789   if(CkNumNodes()>1) {
790     CkPackMessage(&env);
791     CmiSetHandler(env, _bocHandlerIdx);
792     _numInitMsgs++;
793     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
794     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
795     CpvAccess(_qd)->create(CkNumNodes()-1);
796     CkUnpackMessage(&env);
797   }
798   _STATS_RECORD_CREATE_NODE_GROUP_1();
799   CkCreateLocalNodeGroup(groupID, epIdx, env);
800 }
801
802 // new _groupCreate
803
804 static CkGroupID _groupCreate(envelope *env)
805 {
806   register CkGroupID groupNum;
807
808   // check CkMyPe(). if it is 0 then idx is _numGroups++
809   // if not, then something else...
810   if(CkMyPe() == 0)
811      groupNum.idx = CkpvAccess(_numGroups)++;
812   else
813      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
814   _createGroup(groupNum, env);
815   return groupNum;
816 }
817
818 // new _nodeGroupCreate
819 static CkGroupID _nodeGroupCreate(envelope *env)
820 {
821   register CkGroupID groupNum;
822   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
823   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
824           groupNum.idx = CksvAccess(_numNodeGroups)++;
825    else
826           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
827   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
828   _createNodeGroup(groupNum, env);
829   return groupNum;
830 }
831
832 /**** generate the group idx when group is creator pe is not pe0
833  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
834  **** remaining bits contain the group creator processor number and
835  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
836
837 int _getGroupIdx(int numNodes,int myNode,int numGroups)
838 {
839         int idx;
840         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
841         int n = 32 - (x+1);                                     // number of bits remaining for the index
842         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
843                                                                 // then add the next available index
844         // of course this won't work when int is 8 bytes long on T3E
845         //idx |= 0x80000000;                                      // set the most significant bit to 1
846         idx = - idx;
847                                                                 // if int is not 32 bits, wouldn't this be wrong?
848         return idx;
849 }
850
851 extern "C"
852 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
853 {
854   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
855   register envelope *env = UsrToEnv(msg);
856   env->setMsgtype(BocInitMsg);
857   env->setEpIdx(eIdx);
858   env->setSrcPe(CkMyPe());
859   _TRACE_CREATION_N(env, CkNumPes());
860   CkGroupID gid = _groupCreate(env);
861   _TRACE_CREATION_DONE(1);
862   return gid;
863 }
864
865 extern "C"
866 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
867 {
868   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
869   register envelope *env = UsrToEnv(msg);
870   env->setMsgtype(NodeBocInitMsg);
871   env->setEpIdx(eIdx);
872   env->setSrcPe(CkMyPe());
873   _TRACE_CREATION_N(env, CkNumNodes());
874   CkGroupID gid = _nodeGroupCreate(env);
875   _TRACE_CREATION_DONE(1);
876   return gid;
877 }
878
879 static inline void *_allocNewChare(envelope *env, int &idx)
880 {
881   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
882   void *tmp=malloc(_chareTable[chareIdx]->size);
883   _MEMCHECK(tmp);
884 #ifndef CMK_CHARE_USE_PTR
885   CkpvAccess(chare_objs).push_back(tmp);
886   CkpvAccess(chare_types).push_back(chareIdx);
887   idx = CkpvAccess(chare_objs).size()-1;
888 #endif
889   setMemoryTypeChare(tmp);
890   return tmp;
891 }
892
893 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
894 {
895   int idx;
896   register void *obj = _allocNewChare(env, idx);
897 #ifndef CMK_CHARE_USE_PTR
898   //((Chare *)obj)->chareIdx = idx;
899   CkpvAccess(currentChareIdx) = idx;
900 #endif
901   _invokeEntry(env->getEpIdx(),env,obj);
902 }
903
904 void CkCreateLocalChare(int epIdx, envelope *env)
905 {
906   env->setEpIdx(epIdx);
907   _processNewChareMsg(NULL, env);
908 }
909
910 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
911 {
912   int idx;
913   register void *obj = _allocNewChare(env, idx);
914   register CkChareID *pCid = (CkChareID *)
915       _allocMsg(FillVidMsg, sizeof(CkChareID));
916   pCid->onPE = CkMyPe();
917 #ifndef CMK_CHARE_USE_PTR
918   pCid->objPtr = (void*)(CmiIntPtr)idx;
919 #else
920   pCid->objPtr = obj;
921 #endif
922   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
923   register envelope *ret = UsrToEnv(pCid);
924   ret->setVidPtr(env->getVidPtr());
925   register int srcPe = env->getByPe();
926   ret->setSrcPe(CkMyPe());
927   CmiSetHandler(ret, _charmHandlerIdx);
928   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
929 #ifndef CMK_CHARE_USE_PTR
930   // register the remote vidblock for deletion when chare is deleted
931   CkChareID vid;
932   vid.onPE = srcPe;
933   vid.objPtr = env->getVidPtr();
934   CkpvAccess(vmap)[idx] = vid;    
935 #endif
936   CpvAccess(_qd)->create();
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
942 }
943
944 /************** Receive: Chares *************/
945
946 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
947 {
948   register int epIdx = env->getEpIdx();
949   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
950   register void *obj;
951   if (mainIdx != -1)  {           // mainchare
952     CmiAssert(CkMyPe()==0);
953     obj = _mainTable[mainIdx]->getObj();
954   }
955   else {
956 #ifndef CMK_CHARE_USE_PTR
957     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
958       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
959     else
960       obj = env->getObjPtr();
961 #else
962     obj = env->getObjPtr();
963 #endif
964   }
965   _invokeEntry(epIdx,env,obj);
966 }
967
968 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
969 {
970   register int epIdx = env->getEpIdx();
971   register void *obj = env->getObjPtr();
972   _invokeEntry(epIdx,env,obj);
973 }
974
975 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
976 {
977 #ifndef CMK_CHARE_USE_PTR
978   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
979 #else
980   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
981   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
982 #endif
983   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
984   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
985   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
986   CmiFree(env);
987 }
988
989 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
990 {
991 #ifndef CMK_CHARE_USE_PTR
992   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
993 #else
994   VidBlock *vptr = (VidBlock *) env->getVidPtr();
995   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
996 #endif
997   _SET_USED(env, 1);
998   vptr->send(env);
999 }
1000
1001 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1002 {
1003 #ifndef CMK_CHARE_USE_PTR
1004   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1005   delete vptr;
1006   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1007 #endif
1008   CmiFree(env);
1009 }
1010
1011 /************** Receive: Groups ****************/
1012
1013 /**
1014  Return a pointer to the local BOC of "groupID".
1015  The message "env" passed in has some known dependency on this groupID
1016  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1017  Therefore, if the return value is NULL, this function buffers the massage so that
1018  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1019  The message passed in must have its handlers correctly set so that it can be
1020  scheduled again.
1021 */
1022 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1023 {
1024
1025         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1026         IrrGroup *obj = ck->localBranch(groupID);
1027         if (obj==NULL) { /* groupmember not yet created: stash message */
1028                 ck->getGroupTable()->find(groupID).enqMsg(env);
1029         }
1030         else { /* will be able to process message */
1031                 ck->process();
1032         }
1033         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1034         return obj;
1035 }
1036
1037 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1038 {
1039   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1040 }
1041
1042 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1043 {
1044 #if CMK_LBDB_ON
1045   // if there is a running obj being measured, stop it temporarily
1046   LDObjHandle objHandle;
1047   int objstopped = 0;
1048   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1049   if (the_lbdb->RunningObject(&objHandle)) {
1050     objstopped = 1;
1051     the_lbdb->ObjectStop(objHandle);
1052   }
1053 #endif
1054   _invokeEntry(epIdx,env,obj);
1055 #if CMK_LBDB_ON
1056   if (objstopped) the_lbdb->ObjectStart(objHandle);
1057 #endif
1058   _STATS_RECORD_PROCESS_BRANCH_1();
1059 }
1060
1061 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1062 {
1063   register CkGroupID groupID =  env->getGroupNum();
1064   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1065   if(obj) {
1066     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1067   }
1068 }
1069
1070 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1071 {
1072   env->setMsgtype(ForChareMsg);
1073   env->setObjPtr(obj);
1074   _processForChareMsg(ck,env);
1075   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1076 }
1077
1078 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1079 {
1080   env->setEpIdx(epIdx);
1081   _deliverForNodeBocMsg(ck,env, obj);
1082 }
1083
1084 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1085 {
1086   register CkGroupID groupID = env->getGroupNum();
1087   register void *obj;
1088
1089   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1090   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1091   if(!obj) { // groupmember not yet created
1092 #if CMK_IMMEDIATE_MSG
1093     if (CmiIsImmediate(env))     // buffer immediate message
1094       CmiDelayImmediate();
1095     else
1096 #endif
1097     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1098     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1099     return;
1100   }
1101   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1102 #if CMK_IMMEDIATE_MSG
1103   if (!CmiIsImmediate(env))
1104 #endif
1105   ck->process();
1106   env->setMsgtype(ForChareMsg);
1107   env->setObjPtr(obj);
1108   _processForChareMsg(ck,env);
1109   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1110 }
1111
1112 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1113 {
1114   register CkGroupID groupID = env->getGroupNum();
1115   register int epIdx = env->getEpIdx();
1116   if (!env->getGroupDep().isZero()) {      // dependence
1117     CkGroupID dep = env->getGroupDep();
1118     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1119     if (obj == NULL) return;
1120   }
1121   else
1122     ck->process();
1123   CkCreateLocalGroup(groupID, epIdx, env);
1124 }
1125
1126 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1127 {
1128   register CkGroupID groupID = env->getGroupNum();
1129   register int epIdx = env->getEpIdx();
1130   CkCreateLocalNodeGroup(groupID, epIdx, env);
1131 }
1132
1133 /************** Receive: Arrays *************/
1134
1135 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1136   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1137   if (mgr) {
1138     _SET_USED(env, 0);
1139     mgr->insertElement((CkMessage *)EnvToUsr(env));
1140   }
1141 }
1142 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1143   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1144   if (mgr) {
1145     _SET_USED(env, 0);
1146     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1147   }
1148 }
1149
1150 //BIGSIM_OOC DEBUGGING
1151 #define TELLMSGTYPE(x) //x
1152
1153 /**
1154  * This is the main converse-level handler used by all of Charm++.
1155  *
1156  * \addtogroup CriticalPathFramework
1157  */
1158 void _processHandler(void *converseMsg,CkCoreState *ck)
1159 {
1160   register envelope *env = (envelope *) converseMsg;
1161
1162   MESSAGE_PHASE_CHECK(env);
1163
1164 //#if CMK_RECORD_REPLAY
1165   if (ck->watcher!=NULL) {
1166     if (!ck->watcher->processMessage(&env,ck)) return;
1167   }
1168 //#endif
1169 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1170         Chare *obj=NULL;
1171         CkObjID sender;
1172         MCount SN;
1173         MlogEntry *entry=NULL;
1174         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1175         env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForChareMsg){
1176                 sender = env->sender;
1177                 SN = env->SN;
1178                 int result = preProcessReceivedMessage(env,&obj,&entry);
1179                 if(result == 0){
1180                         return;
1181                 }
1182         }
1183 #endif
1184
1185 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1186   //  CkPrintf("START\n");
1187   criticalPath_start(env);
1188 #endif
1189
1190
1191   switch(env->getMsgtype()) {
1192 // Group support
1193     case BocInitMsg :
1194       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1195       // QD processing moved inside _processBocInitMsg because it is conditional
1196       //ck->process(); 
1197       if(env->isPacked()) CkUnpackMessage(&env);
1198       _processBocInitMsg(ck,env);
1199       break;
1200     case NodeBocInitMsg :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1202       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1203       _processNodeBocInitMsg(ck,env);
1204       break;
1205     case ForBocMsg :
1206       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1207       // QD processing moved inside _processForBocMsg because it is conditional
1208       if(env->isPacked()) CkUnpackMessage(&env);
1209       _processForBocMsg(ck,env);
1210       // stats record moved inside _processForBocMsg because it is conditional
1211       break;
1212     case ForNodeBocMsg :
1213       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1214       // QD processing moved to _processForNodeBocMsg because it is conditional
1215       if(env->isPacked()) CkUnpackMessage(&env);
1216       _processForNodeBocMsg(ck,env);
1217       // stats record moved to _processForNodeBocMsg because it is conditional
1218       break;
1219
1220 // Array support
1221     case ArrayEltInitMsg:
1222       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1223       if(env->isPacked()) CkUnpackMessage(&env);
1224       _processArrayEltInitMsg(ck,env);
1225       break;
1226     case ForArrayEltMsg:
1227       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1228       if(env->isPacked()) CkUnpackMessage(&env);
1229       _processArrayEltMsg(ck,env);
1230       break;
1231
1232 // Chare support
1233     case NewChareMsg :
1234       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1235       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1236       _processNewChareMsg(ck,env);
1237       _STATS_RECORD_PROCESS_CHARE_1();
1238       break;
1239     case NewVChareMsg :
1240       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1241       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1242       _processNewVChareMsg(ck,env);
1243       _STATS_RECORD_PROCESS_CHARE_1();
1244       break;
1245     case ForChareMsg :
1246       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1247       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1248       _processForPlainChareMsg(ck,env);
1249       _STATS_RECORD_PROCESS_MSG_1();
1250       break;
1251     case ForVidMsg   :
1252       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1253       ck->process();
1254       _processForVidMsg(ck,env);
1255       break;
1256     case FillVidMsg  :
1257       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1258       ck->process();
1259       _processFillVidMsg(ck,env);
1260       break;
1261     case DeleteVidMsg  :
1262       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1263       ck->process();
1264       _processDeleteVidMsg(ck,env);
1265       break;
1266
1267     default:
1268       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1269   }
1270 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1271         if(obj != NULL){
1272                 postProcessReceivedMessage(obj,sender,SN,entry);
1273         }
1274 #endif
1275
1276
1277 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1278   criticalPath_end();
1279   //  CkPrintf("STOP\n");
1280 #endif
1281
1282
1283 }
1284
1285
1286 /******************** Message Send **********************/
1287
1288 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1289              int *queueing, int *priobits, unsigned int **prioptr)
1290 {
1291   register envelope *env = (envelope *)converseMsg;
1292   *pfn = (CldPackFn)CkPackMessage;
1293   *len = env->getTotalsize();
1294   *queueing = env->getQueueing();
1295   *priobits = env->getPriobits();
1296   *prioptr = (unsigned int *) env->getPrioPtr();
1297 }
1298
1299 void CkPackMessage(envelope **pEnv)
1300 {
1301   register envelope *env = *pEnv;
1302   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1303     register void *msg = EnvToUsr(env);
1304     _TRACE_BEGIN_PACK();
1305     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1306     _TRACE_END_PACK();
1307     env=UsrToEnv(msg);
1308     env->setPacked(1);
1309     *pEnv = env;
1310   }
1311 }
1312
1313 void CkUnpackMessage(envelope **pEnv)
1314 {
1315   register envelope *env = *pEnv;
1316   register int msgIdx = env->getMsgIdx();
1317   if(env->isPacked()) {
1318     register void *msg = EnvToUsr(env);
1319     _TRACE_BEGIN_UNPACK();
1320     msg = _msgTable[msgIdx]->unpack(msg);
1321     _TRACE_END_UNPACK();
1322     env=UsrToEnv(msg);
1323     env->setPacked(0);
1324     *pEnv = env;
1325   }
1326 }
1327
1328 //There's no reason for most messages to go through the Cld--
1329 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1330 // Thus these accellerated versions of the Cld calls.
1331 #if CMK_OBJECT_QUEUE_AVAILABLE
1332 static int index_objectQHandler;
1333 #endif
1334 int index_tokenHandler;
1335 int index_skipCldHandler;
1336
1337 void _skipCldHandler(void *converseMsg)
1338 {
1339   register envelope *env = (envelope *)(converseMsg);
1340   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1341 #if CMK_GRID_QUEUE_AVAILABLE
1342   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1343     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1344                        env, env->getQueueing (), env->getPriobits (),
1345                        (unsigned int *) env->getPrioPtr ());
1346   } else {
1347     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1348                        env, env->getQueueing (), env->getPriobits (),
1349                        (unsigned int *) env->getPrioPtr ());
1350   }
1351 #else
1352   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1353         env, env->getQueueing(),env->getPriobits(),
1354         (unsigned int *)env->getPrioPtr());
1355 #endif
1356 }
1357
1358
1359 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1360 // Made non-static to be used by ckmessagelogging
1361 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1362 {
1363 #if CMK_CHARMDEBUG
1364   if (!ConverseDeliver(pe)) {
1365     CmiFree(env);
1366     return;
1367   }
1368 #endif
1369   if(pe == CkMyPe() ){
1370     if(!CmiNodeAlive(CkMyPe())){
1371         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1372 //      return;
1373     }
1374   }
1375   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1376 #if CMK_OBJECT_QUEUE_AVAILABLE
1377     Chare *obj = CkFindObjectPtr(env);
1378     if (obj && obj->CkGetObjQueue().queue()) {
1379       _enqObjQueue(obj, env);
1380     }
1381     else
1382 #endif
1383     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1384         env, env->getQueueing(),env->getPriobits(),
1385         (unsigned int *)env->getPrioPtr());
1386 #if CMK_PERSISTENT_COMM
1387     CmiPersistentOneSend();
1388 #endif
1389   } else {
1390     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1391       CkPackMessage(&env);
1392     int len=env->getTotalsize();
1393     CmiSetXHandler(env,CmiGetHandler(env));
1394 #if CMK_OBJECT_QUEUE_AVAILABLE
1395     CmiSetHandler(env,index_objectQHandler);
1396 #else
1397     CmiSetHandler(env,index_skipCldHandler);
1398 #endif
1399     CmiSetInfo(env,infoFn);
1400     if (pe==CLD_BROADCAST) {
1401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1402                         CmiSyncBroadcast(len, (char *)env);
1403 #else
1404                         CmiSyncBroadcastAndFree(len, (char *)env); 
1405 #endif
1406
1407 }
1408     else if (pe==CLD_BROADCAST_ALL) { 
1409 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1410                         CmiSyncBroadcastAll(len, (char *)env);
1411 #else
1412                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1413 #endif
1414
1415 }
1416     else{
1417 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1418                         CmiSyncSend(pe, len, (char *)env);
1419 #else
1420                         CmiSyncSendAndFree(pe, len, (char *)env);
1421 #endif
1422
1423                 }
1424   }
1425 }
1426
1427 #if CMK_BIGSIM_CHARM
1428 #   define  _skipCldEnqueue   _CldEnqueue
1429 #endif
1430
1431 // by pass Charm++ priority queue, send as Converse message
1432 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1433 {
1434 #if CMK_CHARMDEBUG
1435   if (!ConverseDeliver(-1)) {
1436     CmiFree(env);
1437     return;
1438   }
1439 #endif
1440   CkPackMessage(&env);
1441   int len=env->getTotalsize();
1442   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1443 }
1444
1445 static void _noCldEnqueue(int pe, envelope *env)
1446 {
1447 /*
1448   if (pe == CkMyPe()) {
1449     CmiHandleMessage(env);
1450   } else
1451 */
1452 #if CMK_CHARMDEBUG
1453   if (!ConverseDeliver(pe)) {
1454     CmiFree(env);
1455     return;
1456   }
1457 #endif
1458   CkPackMessage(&env);
1459   int len=env->getTotalsize();
1460   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1461   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1462   else CmiSyncSendAndFree(pe, len, (char *)env);
1463 }
1464
1465 //static void _noCldNodeEnqueue(int node, envelope *env)
1466 //Made non-static to be used by ckmessagelogging
1467 void _noCldNodeEnqueue(int node, envelope *env)
1468 {
1469 /*
1470   if (node == CkMyNode()) {
1471     CmiHandleMessage(env);
1472   } else {
1473 */
1474 #if CMK_CHARMDEBUG
1475   if (!ConverseDeliver(node)) {
1476     CmiFree(env);
1477     return;
1478   }
1479 #endif
1480   CkPackMessage(&env);
1481   int len=env->getTotalsize();
1482   if (node==CLD_BROADCAST) { 
1483 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1484         CmiSyncNodeBroadcast(len, (char *)env);
1485 #else
1486         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1487 #endif
1488 }
1489   else if (node==CLD_BROADCAST_ALL) { 
1490 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1491                 CmiSyncNodeBroadcastAll(len, (char *)env);
1492 #else
1493                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1494 #endif
1495
1496 }
1497   else {
1498 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1499         CmiSyncNodeSend(node, len, (char *)env);
1500 #else
1501         CmiSyncNodeSendAndFree(node, len, (char *)env);
1502 #endif
1503   }
1504 }
1505
1506 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1507 {
1508   register envelope *env = UsrToEnv(msg);
1509   _CHECK_USED(env);
1510   _SET_USED(env, 1);
1511 #if CMK_REPLAYSYSTEM
1512   setEventID(env);
1513 #endif
1514   env->setMsgtype(ForChareMsg);
1515   env->setEpIdx(eIdx);
1516   env->setSrcPe(CkMyPe());
1517 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1518   criticalPath_send(env);
1519   automaticallySetMessagePriority(env);
1520 #endif
1521 #if CMK_CHARMDEBUG
1522   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1523 #endif
1524 #if CMK_OBJECT_QUEUE_AVAILABLE
1525   CmiSetHandler(env, index_objectQHandler);
1526 #else
1527   CmiSetHandler(env, _charmHandlerIdx);
1528 #endif
1529   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1530     register int pe = -(pCid->onPE+1);
1531     if(pe==CkMyPe()) {
1532 #ifndef CMK_CHARE_USE_PTR
1533       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1534 #else
1535       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1536 #endif
1537       void *objPtr;
1538       if (NULL!=(objPtr=vblk->getLocalChare()))
1539       { //A ready local chare
1540         env->setObjPtr(objPtr);
1541         return pe;
1542       }
1543       else { //The vidblock is not ready-- forget it
1544         vblk->send(env);
1545         return -1;
1546       }
1547     } else { //Valid vidblock for another PE:
1548       env->setMsgtype(ForVidMsg);
1549       env->setVidPtr(pCid->objPtr);
1550       return pe;
1551     }
1552   }
1553   else {
1554     env->setObjPtr(pCid->objPtr);
1555     return pCid->onPE;
1556   }
1557 }
1558
1559 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1560 {
1561   int destPE = _prepareMsg(eIdx, msg, pCid);
1562   if (destPE != -1) {
1563     register envelope *env = UsrToEnv(msg);
1564 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1565     criticalPath_send(env);
1566     automaticallySetMessagePriority(env);
1567 #endif
1568     CmiBecomeImmediate(env);
1569   }
1570   return destPE;
1571 }
1572
1573 extern "C"
1574 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1575 {
1576   if (opts & CK_MSG_INLINE) {
1577     CkSendMsgInline(entryIdx, msg, pCid, opts);
1578     return;
1579   }
1580 #if CMK_ERROR_CHECKING
1581   if (opts & CK_MSG_IMMEDIATE) {
1582     CmiAbort("Immediate message is not allowed in Chare!");
1583   }
1584 #endif
1585   register envelope *env = UsrToEnv(msg);
1586   int destPE=_prepareMsg(entryIdx,msg,pCid);
1587   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1588   // VidBlock was not yet filled). The problem is that the creation was never
1589   // traced later when the VidBlock was filled. One solution is to trace the
1590   // creation here, the other to trace it in VidBlock->msgDeliver().
1591 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1592         sendChareMsg(env,destPE,_infoIdx,pCid);
1593 #else
1594   _TRACE_CREATION_1(env);
1595   if (destPE!=-1) {
1596     CpvAccess(_qd)->create();
1597     if (opts & CK_MSG_SKIP_OR_IMM)
1598       _noCldEnqueue(destPE, env);
1599     else
1600       _CldEnqueue(destPE, env, _infoIdx);
1601   }
1602   _TRACE_CREATION_DONE(1);
1603 #endif
1604 }
1605
1606 extern "C"
1607 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1608 {
1609   if (pCid->onPE==CkMyPe())
1610   {
1611     if(!CmiNodeAlive(CkMyPe())){
1612         return;
1613     }
1614 #if CMK_CHARMDEBUG
1615     //Just in case we need to breakpoint or use the envelope in some way
1616     _prepareMsg(entryIndex,msg,pCid);
1617 #endif
1618                 //Just directly call the chare (skip QD handling & scheduler)
1619     register envelope *env = UsrToEnv(msg);
1620     if (env->isPacked()) CkUnpackMessage(&env);
1621     _STATS_RECORD_PROCESS_MSG_1();
1622     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1623   }
1624   else {
1625     //No way to inline a cross-processor message:
1626     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1627   }
1628 }
1629
1630 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1631 {
1632   register envelope *env = UsrToEnv(msg);
1633 #if CMK_ERROR_CHECKING
1634   CkNodeGroupID nodeRedMgr;
1635 #endif
1636   _CHECK_USED(env);
1637   _SET_USED(env, 1);
1638 #if CMK_REPLAYSYSTEM
1639   setEventID(env);
1640 #endif
1641   env->setMsgtype(type);
1642   env->setEpIdx(eIdx);
1643   env->setGroupNum(gID);
1644   env->setSrcPe(CkMyPe());
1645 #if CMK_ERROR_CHECKING
1646   nodeRedMgr.setZero();
1647   env->setRednMgr(nodeRedMgr);
1648 #endif
1649 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1650   criticalPath_send(env);
1651   automaticallySetMessagePriority(env);
1652 #endif
1653 #if CMK_CHARMDEBUG
1654   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1655 #endif
1656   CmiSetHandler(env, _charmHandlerIdx);
1657   return env;
1658 }
1659
1660 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1661 {
1662   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1663 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1664   criticalPath_send(env);
1665   automaticallySetMessagePriority(env);
1666 #endif
1667   CmiBecomeImmediate(env);
1668   return env;
1669 }
1670
1671 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1672                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1673 {
1674   int numPes;
1675   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1676 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1677         sendGroupMsg(env,pe,_infoIdx);
1678 #else
1679   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1680   _TRACE_CREATION_N(env, numPes);
1681   if (opts & CK_MSG_SKIP_OR_IMM)
1682     _noCldEnqueue(pe, env);
1683   else
1684     _skipCldEnqueue(pe, env, _infoIdx);
1685   _TRACE_CREATION_DONE(1);
1686 #endif
1687 }
1688
1689 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1690                            int npes, int *pes)
1691 {
1692   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1693   _TRACE_CREATION_MULTICAST(env, npes, pes);
1694   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1695   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1696 }
1697
1698 extern "C"
1699 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1700 {
1701 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1702   if (destPE==CkMyPe())
1703   {
1704     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1705     return;
1706   }
1707   //Can't inline-- send the usual way
1708   register envelope *env = UsrToEnv(msg);
1709   int numPes;
1710   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1711   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1712   _TRACE_CREATION_N(env, numPes);
1713   _noCldEnqueue(destPE, env);
1714   _STATS_RECORD_SEND_BRANCH_1();
1715   CkpvAccess(_coreState)->create();
1716   _TRACE_CREATION_DONE(1);
1717 #else
1718   // no support for immediate message, send inline
1719   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1720 #endif
1721 }
1722
1723 extern "C"
1724 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1725 {
1726   if (destPE==CkMyPe())
1727   {
1728     if(!CmiNodeAlive(CkMyPe())){
1729         return;
1730     }
1731     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1732     if (obj!=NULL)
1733     { //Just directly call the group:
1734 #if CMK_ERROR_CHECKING
1735       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1736 #else
1737       envelope *env=UsrToEnv(msg);
1738 #endif
1739       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1740       return;
1741     }
1742   }
1743   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1744   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1745 }
1746
1747 extern "C"
1748 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1749 {
1750   if (opts & CK_MSG_INLINE) {
1751     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1752     return;
1753   }
1754   if (opts & CK_MSG_IMMEDIATE) {
1755     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1756     return;
1757   }
1758   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1759   _STATS_RECORD_SEND_BRANCH_1();
1760   CkpvAccess(_coreState)->create();
1761 }
1762
1763 extern "C"
1764 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1765 {
1766 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1767   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1768   _TRACE_CREATION_MULTICAST(env, npes, pes);
1769   _noCldEnqueueMulti(npes, pes, env);
1770   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1771 #else
1772   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1773   CpvAccess(_qd)->create(-npes);
1774 #endif
1775   _STATS_RECORD_SEND_BRANCH_N(npes);
1776   CpvAccess(_qd)->create(npes);
1777 }
1778
1779 extern "C"
1780 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1781 {
1782   if (opts & CK_MSG_IMMEDIATE) {
1783     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1784     return;
1785   }
1786     // normal mesg
1787   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1788   _STATS_RECORD_SEND_BRANCH_N(npes);
1789   CpvAccess(_qd)->create(npes);
1790 }
1791
1792 extern "C"
1793 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1794 {
1795   int npes;
1796   int *pes;
1797   if (opts & CK_MSG_IMMEDIATE) {
1798     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1799     return;
1800   }
1801     // normal mesg
1802   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1803   CmiLookupGroup(grp, &npes, &pes);
1804   _TRACE_CREATION_MULTICAST(env, npes, pes);
1805   _CldEnqueueGroup(grp, env, _infoIdx);
1806   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1807   _STATS_RECORD_SEND_BRANCH_N(npes);
1808   CpvAccess(_qd)->create(npes);
1809 }
1810
1811 extern "C"
1812 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1813 {
1814   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1815   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1816   CpvAccess(_qd)->create(CkNumPes());
1817 }
1818
1819 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1820                 int node=CLD_BROADCAST_ALL, int opts=0)
1821 {
1822   int numPes;
1823   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1824 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1825         sendNodeGroupMsg(env,node,_infoIdx);
1826 #else
1827   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1828   _TRACE_CREATION_N(env, numPes);
1829   if (opts & CK_MSG_SKIP_OR_IMM) {
1830     _noCldNodeEnqueue(node, env);
1831     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1832       CkpvAccess(_coreState)->create(-numPes);
1833     }
1834   }
1835   else
1836     _CldNodeEnqueue(node, env, _infoIdx);
1837   _TRACE_CREATION_DONE(1);
1838 #endif
1839 }
1840
1841 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1842                            int npes, int *nodes)
1843 {
1844   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1845   _TRACE_CREATION_N(env, npes);
1846   for (int i=0; i<npes; i++) {
1847     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1848   }
1849   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1850 }
1851
1852 extern "C"
1853 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1854 {
1855 #if CMK_IMMEDIATE_MSG
1856   if (node==CkMyNode())
1857   {
1858     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1859     return;
1860   }
1861   //Can't inline-- send the usual way
1862   register envelope *env = UsrToEnv(msg);
1863   int numPes;
1864   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1865   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1866   _TRACE_CREATION_N(env, numPes);
1867   _noCldNodeEnqueue(node, env);
1868   _STATS_RECORD_SEND_BRANCH_1();
1869   /* immeidate message is invisible to QD */
1870 //  CkpvAccess(_coreState)->create();
1871   _TRACE_CREATION_DONE(1);
1872 #else
1873   // no support for immediate message, send inline
1874   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1875 #endif
1876 }
1877
1878 extern "C"
1879 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1880 {
1881   if (node==CkMyNode())
1882   {
1883     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1884     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1885     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1886     if (obj!=NULL)
1887     { //Just directly call the group:
1888 #if CMK_ERROR_CHECKING
1889       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1890 #else
1891       envelope *env=UsrToEnv(msg);
1892 #endif
1893       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1894       return;
1895     }
1896   }
1897   //Can't inline-- send the usual way
1898   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1899 }
1900
1901 extern "C"
1902 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1903 {
1904   if (opts & CK_MSG_INLINE) {
1905     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1906     return;
1907   }
1908   if (opts & CK_MSG_IMMEDIATE) {
1909     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1910     return;
1911   }
1912   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1913   _STATS_RECORD_SEND_NODE_BRANCH_1();
1914   CkpvAccess(_coreState)->create();
1915 }
1916
1917 extern "C"
1918 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1919 {
1920 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1921   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1922   _noCldEnqueueMulti(npes, nodes, env);
1923 #else
1924   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1925   CpvAccess(_qd)->create(-npes);
1926 #endif
1927   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1928   CpvAccess(_qd)->create(npes);
1929 }
1930
1931 extern "C"
1932 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1933 {
1934   if (opts & CK_MSG_IMMEDIATE) {
1935     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1936     return;
1937   }
1938     // normal mesg
1939   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1940   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1941   CpvAccess(_qd)->create(npes);
1942 }
1943
1944 extern "C"
1945 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1946 {
1947   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1948   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1949   CpvAccess(_qd)->create(CkNumNodes());
1950 }
1951
1952 //Needed by delegation manager:
1953 extern "C"
1954 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1955 { return _prepareMsg(eIdx,msg,pCid); }
1956 extern "C"
1957 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1958 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1959 extern "C"
1960 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1961 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1962
1963 void _ckModuleInit(void) {
1964         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1965 #if CMK_OBJECT_QUEUE_AVAILABLE
1966         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1967 #endif
1968         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1969         CkpvInitialize(TokenPool*, _tokenPool);
1970         CkpvAccess(_tokenPool) = new TokenPool;
1971 }
1972
1973
1974 /************** Send: Arrays *************/
1975
1976 extern void CkArrayManagerInsert(int onPe,void *msg);
1977 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1978
1979 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1980 {
1981   _CHECK_USED(env);
1982   _SET_USED(env, 1);
1983   env->setMsgtype(type);
1984 #if CMK_CHARMDEBUG
1985   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1986 #endif
1987   CmiSetHandler(env, _charmHandlerIdx);
1988   CpvAccess(_qd)->create();
1989 }
1990
1991 extern "C"
1992 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1993   register envelope *env = UsrToEnv(msg);
1994   env->getsetArrayMgr()=aID;
1995   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1996   _CldEnqueue(pe, env, _infoIdx);
1997 }
1998
1999 extern "C"
2000 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2001   register envelope *env = UsrToEnv(msg);
2002   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2003 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2004         sendArrayMsg(env,pe,_infoIdx);
2005 #else
2006   if (opts & CK_MSG_IMMEDIATE)
2007     CmiBecomeImmediate(env);
2008   if (opts & CK_MSG_SKIP_OR_IMM)
2009     _noCldEnqueue(pe, env);
2010   else
2011     _skipCldEnqueue(pe, env, _infoIdx);
2012 #endif
2013 }
2014
2015 class ElementDestroyer : public CkLocIterator {
2016 private:
2017         CkLocMgr *locMgr;
2018 public:
2019         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2020         void addLocation(CkLocation &loc) {
2021           loc.destroyAll();
2022         }
2023 };
2024
2025 void CkDeleteChares() {
2026   int i;
2027   int numGroups = CkpvAccess(_groupIDTable)->size();
2028
2029   // delete all plain chares
2030 #ifndef CMK_CHARE_USE_PTR
2031   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2032         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2033         delete obj;
2034         CkpvAccess(chare_objs)[i] = NULL;
2035   }
2036   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2037         VidBlock *obj = CkpvAccess(vidblocks)[i];
2038         delete obj;
2039         CkpvAccess(vidblocks)[i] = NULL;
2040   }
2041 #endif
2042
2043   // delete all array elements
2044   for(i=0;i<numGroups;i++) {
2045     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2046     if(obj && obj->isLocMgr())  {
2047       CkLocMgr *mgr = (CkLocMgr*)obj;
2048       ElementDestroyer destroyer(mgr);
2049       mgr->iterate(destroyer);
2050     }
2051   }
2052
2053   // delete all groups
2054   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2055   for(i=0;i<numGroups;i++) {
2056     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2057     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2058     if (obj) delete obj;
2059   }
2060   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2061
2062   // delete all node groups
2063   if (CkMyRank() == 0) {
2064     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2065     for(i=0;i<numNodeGroups;i++) {
2066       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2067       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2068       if (obj) delete obj;
2069     }
2070   }
2071 }
2072
2073 //------------------- Message Watcher (record/replay) ----------------
2074
2075 #include "crc32.h"
2076
2077 CkpvDeclare(int, envelopeEventID);
2078 int _recplay_crc = 0;
2079 int _recplay_checksum = 0;
2080 int _recplay_logsize = 1024*1024;
2081
2082 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2083 #define REPLAYDEBUG(args) /* empty */
2084
2085 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2086
2087 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2088
2089 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2090
2091     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2092     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2093     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2094     FILE *f=fopen(fName,permissions);
2095     REPLAYDEBUG("openReplayfile "<<fName);
2096     if (f==NULL) {
2097         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2098             CkMyPe(),fName,permissions);
2099         CkAbort("openReplayFile> Could not open replay file");
2100     }
2101     return f;
2102 }
2103
2104 #include "BaseLB.h" /* For LBMigrateMsg message */
2105
2106 class CkMessageRecorder : public CkMessageWatcher {
2107   char *buffer;
2108   unsigned int curpos;
2109   bool firstOpen;
2110 public:
2111   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2112   ~CkMessageRecorder() {
2113     flushLog(0);
2114     fprintf(f,"-1 -1 -1 ");
2115     fclose(f);
2116     delete[] buffer;
2117 #if 0
2118     FILE *stsfp = fopen("sts", "w");
2119     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2120     traceWriteSTS(stsfp, 0);
2121     fclose(stsfp);
2122 #endif
2123     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2124   }
2125
2126 private:
2127   void flushLog(int verbose=1) {
2128     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2129     fprintf(f, "%s", buffer);
2130     curpos=0;
2131   }
2132   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2133     if ((*envptr)->getEvent()) {
2134       bool wasPacked = (*envptr)->isPacked();
2135       if (!wasPacked) CkPackMessage(envptr);
2136       envelope *env = *envptr;
2137       unsigned int crc1=0, crc2=0;
2138       if (_recplay_crc) {
2139         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2140         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2141         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2142       } else if (_recplay_checksum) {
2143         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2144         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2145       }
2146       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2147       if (curpos > _recplay_logsize-128) flushLog();
2148       if (!wasPacked) CkUnpackMessage(envptr);
2149     }
2150     return CmiTrue;
2151   }
2152   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2153     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2154     if (curpos > _recplay_logsize-128) flushLog();
2155     return CmiTrue;
2156   }
2157   
2158   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2159     FILE *f;
2160     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2161     else f = openReplayFile("ckreplay_",".lb","a");
2162     firstOpen = false;
2163     if (f != NULL) {
2164       PUP::toDisk p(f);
2165       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2166       (*msg)->pup(p);
2167       fclose(f);
2168     }
2169     return CmiTrue;
2170   }
2171 };
2172
2173 class CkMessageDetailRecorder : public CkMessageWatcher {
2174 public:
2175   CkMessageDetailRecorder(FILE *f_) {
2176     f=f_;
2177     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2178      * The value of 'x' is the pointer size.
2179      */
2180     CmiUInt2 little = sizeof(void*);
2181     fwrite(&little, 2, 1, f);
2182   }
2183   ~CkMessageDetailRecorder() {fclose(f);}
2184 private:
2185   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2186     bool wasPacked = (*envptr)->isPacked();
2187     if (!wasPacked) CkPackMessage(envptr);
2188     envelope *env = *envptr;
2189     CmiUInt4 size = env->getTotalsize();
2190     fwrite(&size, 4, 1, f);
2191     fwrite(env, env->getTotalsize(), 1, f);
2192     if (!wasPacked) CkUnpackMessage(envptr);
2193     return CmiTrue;
2194   }
2195 };
2196
2197 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2198 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2199
2200 #if CMK_BIGSIM_CHARM
2201 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2202                                    int pb,unsigned int *prio);
2203 #endif
2204
2205 class CkMessageReplay : public CkMessageWatcher {
2206   int counter;
2207         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2208         int nextEP;
2209         unsigned int crc1, crc2;
2210         FILE *lbFile;
2211         /// Read the next message we need from the file:
2212         void getNext(void) {
2213           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2214           if (nextSize > 0) {
2215             // We are reading a regular message
2216             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2217               CkAbort("CkMessageReplay> Syntax error reading replay file");
2218             }
2219             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2220           } else if (nextSize == -2) {
2221             // We are reading a special message (right now only thread awaken)
2222             // Nothing to do since we have already read all info
2223             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2224           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2225             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2226             CkAbort("CkMessageReplay> Unrecognized input");
2227           }
2228             /*
2229                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2230                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2231                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2232                 }
2233                 */
2234                 counter++;
2235         }
2236         /// If this is the next message we need, advance and return CmiTrue.
2237         CmiBool isNext(envelope *env) {
2238                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2239                 if (nextEvent!=env->getEvent()) return CmiFalse;
2240                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2241 #if 1
2242                 if (nextEP != env->getEpIdx()) {
2243                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2244                         return CmiFalse;
2245                 }
2246 #endif
2247 #if ! CMK_BIGSIM_CHARM
2248                 if (nextSize!=env->getTotalsize())
2249                 {
2250                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2251                         return CmiFalse;
2252                 }
2253                 if (_recplay_crc || _recplay_checksum) {
2254                   bool wasPacked = env->isPacked();
2255                   if (!wasPacked) CkPackMessage(&env);
2256                   if (_recplay_crc) {
2257                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2258                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2259                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2260                     if (crcnew1 != crc1) {
2261                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2262                     }
2263                     if (crcnew2 != crc2) {
2264                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2265                     }
2266                   } else if (_recplay_checksum) {
2267             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2268             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2269             if (crcnew1 != crc1) {
2270               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2271             }
2272             if (crcnew2 != crc2) {
2273               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2274             }               
2275                   }
2276                   if (!wasPacked) CkUnpackMessage(&env);
2277                 }
2278 #endif
2279                 return CmiTrue;
2280         }
2281         CmiBool isNext(CthThreadToken *token) {
2282           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2283           return CmiFalse;
2284         }
2285
2286         /// This is a (short) list of messages we aren't yet ready for:
2287         CkQ<envelope *> delayedMessages;
2288         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2289         CkQ<CthThreadToken *> delayedTokens;
2290
2291         /// Try to flush out any delayed messages
2292         void flush(void) {
2293           if (nextSize>0) {
2294                 int len=delayedMessages.length();
2295                 for (int i=0;i<len;i++) {
2296                         envelope *env=delayedMessages.deq();
2297                         if (isNext(env)) { /* this is the next message: process it */
2298                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2299                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2300                                 return;
2301                         }
2302                         else /* Not ready yet-- put it back in the
2303                                 queue */
2304                           {
2305                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2306                                 delayedMessages.enq(env);
2307                           }
2308                 }
2309           } else if (nextSize==-2) {
2310             int len=delayedTokens.length();
2311             for (int i=0;i<len;++i) {
2312               CthThreadToken *token=delayedTokens.deq();
2313               if (isNext(token)) {
2314             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2315 #if ! CMK_BIGSIM_CHARM
2316                 CsdEnqueueLifo((void*)token);
2317 #else
2318                 CthEnqueueBigSimThread(token,0,0,NULL);
2319 #endif
2320                 return;
2321               } else {
2322             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2323                 delayedTokens.enq(token);
2324               }
2325             }
2326           }
2327         }
2328
2329 public:
2330         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2331           counter=0;
2332           f=f_;
2333           getNext();
2334           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2335           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2336         }
2337         ~CkMessageReplay() {fclose(f);}
2338
2339 private:
2340         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2341           bool wasPacked = (*envptr)->isPacked();
2342           if (!wasPacked) CkPackMessage(envptr);
2343           envelope *env = *envptr;
2344           //CkAssert(*(int*)env == 0x34567890);
2345           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2346                 if (env->getEvent() == 0) return CmiTrue;
2347                 if (isNext(env)) { /* This is the message we were expecting */
2348                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2349                         getNext(); /* Advance over this message */
2350                         flush(); /* try to process queued-up stuff */
2351                         if (!wasPacked) CkUnpackMessage(envptr);
2352                         return CmiTrue;
2353                 }
2354 #if CMK_SMP
2355                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2356                          // try next rank, we can't just buffer the msg and left
2357                          // we need to keep unprocessed msg on the fly
2358                         int nextpe = CkMyPe()+1;
2359                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2360                         nextpe = CkNodeFirst(CkMyNode());
2361                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2362                         return CmiFalse;
2363                 }
2364 #endif
2365                 else /*!isNext(env) */ {
2366                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2367                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2368                         delayedMessages.enq(env);
2369                         flush();
2370                         return CmiFalse;
2371                 }
2372         }
2373         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2374       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2375           if (isNext(token)) {
2376         REPLAYDEBUG("Executing token: "<<token->serialNo)
2377             getNext();
2378             flush();
2379             return CmiTrue;
2380           } else {
2381         REPLAYDEBUG("Queueing token: "<<token->serialNo
2382             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2383             delayedTokens.enq(token);
2384             return CmiFalse;
2385           }
2386         }
2387
2388         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2389           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2390           if (lbFile != NULL) {
2391             int num_moves;
2392         PUP::fromDisk p(lbFile);
2393             p | num_moves;
2394             if (num_moves != (*msg)->n_moves) {
2395               delete *msg;
2396               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2397             }
2398             (*msg)->pup(p);
2399           }
2400           return CmiTrue;
2401         }
2402 };
2403
2404 class CkMessageDetailReplay : public CkMessageWatcher {
2405   void *getNext() {
2406     CmiUInt4 size; size_t nread;
2407     if ((nread=fread(&size, 4, 1, f)) < 1) {
2408       if (feof(f)) return NULL;
2409       CkPrintf("Broken record file (metadata) got %d\n",nread);
2410       CkAbort("");
2411     }
2412     void *env = CmiAlloc(size);
2413     long tell = ftell(f);
2414     if ((nread=fread(env, size, 1, f)) < 1) {
2415       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2416       CkAbort("");
2417     }
2418     //*(int*)env = 0x34567890; // set first integer as magic
2419     return env;
2420   }
2421 public:
2422   double starttime;
2423   CkMessageDetailReplay(FILE *f_) {
2424     f=f_;
2425     starttime=CkWallTimer();
2426     /* This must match what CkMessageDetailRecorder did */
2427     CmiUInt2 little;
2428     fread(&little, 2, 1, f);
2429     if (little != sizeof(void*)) {
2430       CkAbort("Replaying on a different architecture from which recording was done!");
2431     }
2432
2433     CsdEnqueue(getNext());
2434
2435     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2436   }
2437   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2438     void *msg = getNext();
2439     if (msg != NULL) CsdEnqueue(msg);
2440     return CmiTrue;
2441   }
2442 };
2443
2444 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2445 #if ! CMK_BIGSIM_CHARM
2446   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2447 #endif
2448   CkMessageReplay *replay = (CkMessageReplay*)rep;
2449   //CmiStartQD(CkMessageReplayQuiescence, replay);
2450 }
2451
2452 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2453   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2454   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2455   ConverseExit();
2456 }
2457
2458 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2459   CkCoreState *ck = CkpvAccess(_coreState);
2460   if (ck->watcher!=NULL) {
2461     return ck->watcher->processThread(token,ck);
2462   }
2463   return CmiTrue;
2464 }
2465
2466 CpvCExtern(int, CthResumeNormalThreadIdx);
2467 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2468 {
2469   CthThread t = token->thread;
2470
2471   if(t == NULL){
2472     free(token);
2473     return;
2474   }
2475 #if CMK_TRACE_ENABLED
2476 #if ! CMK_TRACE_IN_CHARM
2477   if(CpvAccess(traceOn))
2478     CthTraceResume(t);
2479 /*    if(CpvAccess(_traceCoreOn)) 
2480             resumeTraceCore();*/
2481 #endif
2482 #endif
2483   
2484   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2485   if (CpdExecuteThreadResume(token)) {
2486     CthResume(t);
2487   }
2488 }
2489
2490 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2491   CkCoreState *ck = CkpvAccess(_coreState);
2492   if (ck->watcher!=NULL) {
2493     ck->watcher->processLBMessage(msg, ck);
2494   }
2495 }
2496
2497 #if CMK_BIGSIM_CHARM
2498 CpvExtern(int      , CthResumeBigSimThreadIdx);
2499 #endif
2500
2501 #include "ckliststring.h"
2502 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2503     CmiArgGroup("Charm++","Record/Replay");
2504     CmiBool forceReplay = CmiFalse;
2505     char *procs = NULL;
2506     _replaySystem = 0;
2507     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2508       _recplay_crc = 1;
2509     }
2510     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2511       _recplay_checksum = 1;
2512     }
2513     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2514     REPLAYDEBUG("CkMessageWatcherInit ");
2515     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2516         CkListString list(procs);
2517         if (list.includes(CkMyPe())) {
2518           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2519           CpdSetInitializeMemory(1);
2520           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2521         }
2522     }
2523     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2524       if (CkMyPe() == 0) {
2525         CmiPrintf("Charm++> record mode.\n");
2526         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2527           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2528           _recplay_crc = _recplay_checksum = 0;
2529         }
2530       }
2531       CpdSetInitializeMemory(1);
2532       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2533       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2534     }
2535         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2536             forceReplay = CmiTrue;
2537             CpdSetInitializeMemory(1);
2538             // Set the parameters of the processor
2539 #if CMK_SHARED_VARS_UNAVAILABLE
2540             _Cmi_mype = atoi(procs);
2541             while (procs[0]!='/') procs++;
2542             procs++;
2543             _Cmi_numpes = atoi(procs);
2544 #else
2545             CkAbort("+replay-detail available only for non-SMP build");
2546 #endif
2547             _replaySystem = 1;
2548             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2549         }
2550         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2551           if (CkMyPe() == 0)  {
2552             CmiPrintf("Charm++> replay mode.\n");
2553             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2554               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2555               _recplay_crc = _recplay_checksum = 0;
2556             }
2557           }
2558           CpdSetInitializeMemory(1);
2559 #if ! CMK_BIGSIM_CHARM
2560           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2561 #else
2562           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2563 #endif
2564           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2565         }
2566         if (_recplay_crc && _recplay_checksum) {
2567           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2568         }
2569 }
2570
2571 extern "C"
2572 int CkMessageToEpIdx(void *msg) {
2573         envelope *env=UsrToEnv(msg);
2574         int ep=env->getEpIdx();
2575         if (ep==CkIndex_CkArray::recvBroadcast(0))
2576                 return env->getsetArrayBcastEp();
2577         else
2578                 return ep;
2579 }
2580
2581 extern "C"
2582 int getCharmEnvelopeSize() {
2583   return sizeof(envelope);
2584 }
2585
2586 extern "C"
2587 int isCharmEnvelope(void *msg) {
2588       // best efford guessing if this is a charm envelope
2589     envelope *e = (envelope *)msg;
2590     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2591     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2592     if (e->getTotalsize() < sizeof(envelope)) return 0;
2593     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2594 #if CMK_SMP
2595     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2596 #else
2597     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2598 #endif
2599     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2600     return 1;
2601 }
2602
2603
2604 #include "CkMarshall.def.h"
2605