Widen integers being stuffed into objPtr to silence warnings
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #ifndef CMK_CHARE_USE_PTR
64      // for plain chare, objPtr is actually the index to chare obj table
65   if (CkpvAccess(currentChareIdx) >= 0) {
66     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
67   }
68   chareIdx = CkpvAccess(currentChareIdx);
69 #endif
70 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
71   mlogData = new ChareMlogData();
72   mlogData->objID.type = TypeChare;
73   mlogData->objID.data.chare.id = thishandle;
74 #endif
75 #if CMK_OBJECT_QUEUE_AVAILABLE
76   if (_defaultObjectQ)  CkEnableObjQ();
77 #endif
78 }
79
80 Chare::Chare(CkMigrateMessage* m) {
81   thishandle.onPE=CkMyPe();
82   thishandle.objPtr=this;
83
84 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
85         mlogData = NULL;
86 #endif
87
88 #if CMK_OBJECT_QUEUE_AVAILABLE
89   if (_defaultObjectQ)  CkEnableObjQ();
90 #endif
91 }
92
93 void Chare::CkEnableObjQ()
94 {
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96   objQ.create();
97 #endif
98 }
99
100 Chare::~Chare() {
101 #ifndef CMK_CHARE_USE_PTR
102 /*
103   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
104 */
105   if (chareIdx != -1)
106   {
107     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
108     CkpvAccess(chare_objs)[chareIdx] = NULL;
109     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
110     if (iter != CkpvAccess(vmap).end()) {
111       register CkChareID *pCid = (CkChareID *)
112         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
113       int srcPe = iter->second.onPE;
114       *pCid = iter->second;
115       register envelope *ret = UsrToEnv(pCid);
116       ret->setVidPtr(iter->second.objPtr);
117       ret->setSrcPe(CkMyPe());
118       CmiSetHandler(ret, _charmHandlerIdx);
119       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
120       CpvAccess(_qd)->create();
121       CkpvAccess(vmap).erase(iter);
122     }
123   }
124 #endif
125 }
126
127 void Chare::pup(PUP::er &p)
128 {
129   p(thishandle.onPE);
130   thishandle.objPtr=(void *)this;
131 #ifndef CMK_CHARE_USE_PTR
132   p(chareIdx);
133   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
134 #endif
135 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
136         if(p.isUnpacking()){
137                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
138                 mlogData = new ChareMlogData();
139         }
140         mlogData->pup(p);
141 #endif
142 }
143
144 int Chare::ckGetChareType() const {
145   return -3;
146 }
147 char *Chare::ckDebugChareName(void) {
148   char buf[100];
149   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
150   return strdup(buf);
151 }
152 int Chare::ckDebugChareID(char *str, int limit) {
153   // pure chares for now do not have a valid ID
154   str[0] = 0;
155   return 1;
156 }
157 void Chare::ckDebugPup(PUP::er &p) {
158   pup(p);
159 }
160
161 /// This method is called before starting a [threaded] entry method.
162 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
163   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
164   traceAddThreadListeners(th, UsrToEnv(msg));
165 }
166
167 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
168   p.comment("Bytes");
169   int ts=UsrToEnv(msg)->getTotalsize();
170   int msgLen=ts-sizeof(envelope);
171   if (msgLen>0)
172     p((char*)msg,msgLen);
173 }
174
175 IrrGroup::IrrGroup(void) {
176   thisgroup = CkpvAccess(_currentGroup);
177 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
178         mlogData->objID.type = TypeGroup;
179         mlogData->objID.data.group.id = thisgroup;
180         mlogData->objID.data.group.onPE = CkMyPe();
181 #endif
182 }
183
184 IrrGroup::~IrrGroup() {
185   // remove the object pointer
186   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
187   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
188   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
189 }
190
191 void IrrGroup::pup(PUP::er &p)
192 {
193   Chare::pup(p);
194   p|thisgroup;
195 }
196
197 int IrrGroup::ckGetChareType() const {
198   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
199 }
200
201 int IrrGroup::ckDebugChareID(char *str, int limit) {
202   if (limit<5) return -1;
203   str[0] = 1;
204   *((int*)&str[1]) = thisgroup.idx;
205   return 5;
206 }
207
208 char *IrrGroup::ckDebugChareName() {
209   return strdup(_chareTable[ckGetChareType()]->name);
210 }
211
212 void IrrGroup::ckJustMigrated(void)
213 {
214 }
215
216 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
217   /* FIXME: **CW** not entirely sure what we should do here yet */
218 }
219
220 void Group::CkAddThreadListeners(CthThread th, void *msg) {
221   Chare::CkAddThreadListeners(th, msg);
222   CthSetThreadID(th, thisgroup.idx, 0, 0);
223 }
224
225 void Group::pup(PUP::er &p)
226 {
227   CkReductionMgr::pup(p);
228   reductionInfo.pup(p);
229 }
230
231 /**** Delegation Manager Group */
232 CkDelegateMgr::~CkDelegateMgr() { }
233
234 //Default delegator implementation: do not delegate-- send directly
235 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
236   { CkSendMsg(ep,m,c); }
237 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
238   { CkSendMsgBranch(ep,m,onPE,g); }
239 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
240   { CkBroadcastMsgBranch(ep,m,g); }
241 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
242   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
243 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
244   { CkSendMsgNodeBranch(ep,m,onNode,g); }
245 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
246   { CkBroadcastMsgNodeBranch(ep,m,g); }
247 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
248   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
249 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
250 {
251         CProxyElement_ArrayBase ap(a,idx);
252         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
253 }
254 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
255 {
256         CProxyElement_ArrayBase ap(a,idx);
257         ap.ckSend((CkArrayMessage *)m,ep);
258 }
259 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
260 {
261         CProxy_ArrayBase ap(a);
262         ap.ckBroadcast((CkArrayMessage *)m,ep);
263 }
264
265 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
266 {
267         CmiAbort("ArraySectionSend is not implemented!\n");
268 /*
269         CProxyElement_ArrayBase ap(a,idx);
270         ap.ckSend((CkArrayMessage *)m,ep);
271 */
272 }
273
274 /*** Proxy <-> delegator communication */
275 CkDelegateData::~CkDelegateData() {}
276
277 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
278   return pd; // default implementation ignores pup call
279 }
280
281 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
282    this tricky manual reference counting business... */
283
284 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
285         if (dPtr) dPtr->ref();
286         ckUndelegate();
287         delegatedMgr = dTo;
288         delegatedPtr = dPtr;
289         delegatedGroupId = delegatedMgr->CkGetGroupID();
290         isNodeGroup = delegatedMgr->isNodeGroup();
291 }
292 void CProxy::ckUndelegate(void) {
293         delegatedMgr=NULL;
294         delegatedGroupId.setZero();
295         if (delegatedPtr) delegatedPtr->unref();
296         delegatedPtr=NULL;
297 }
298
299 /// Copy constructor
300 CProxy::CProxy(const CProxy &src)
301   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
302    isNodeGroup(src.isNodeGroup) {
303     delegatedPtr = NULL;
304     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
305         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
306     }
307 }
308
309 /// Assignment operator
310 CProxy& CProxy::operator=(const CProxy &src) {
311         CkDelegateData *oldPtr=delegatedPtr;
312         ckUndelegate();
313         delegatedMgr=src.delegatedMgr;
314         delegatedGroupId = src.delegatedGroupId; 
315         isNodeGroup = src.isNodeGroup;
316
317         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
318             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
319         else
320             delegatedPtr = NULL;
321
322         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
323         if (oldPtr) oldPtr->unref();
324         return *this;
325 }
326
327 void CProxy::pup(PUP::er &p) {
328   if (!p.isUnpacking()) {
329     if (ckDelegatedTo() != NULL) {
330       delegatedGroupId = delegatedMgr->CkGetGroupID();
331       isNodeGroup = delegatedMgr->isNodeGroup();
332     }
333   }
334   p|delegatedGroupId;
335   if (!delegatedGroupId.isZero()) {
336     p|isNodeGroup;
337     if (p.isUnpacking()) {
338       delegatedMgr = ckDelegatedTo(); 
339     }
340
341     int migCtor, cIdx; 
342     if (!p.isUnpacking()) {
343       if (isNodeGroup) {
344         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
345         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
346         migCtor = _chareTable[cIdx]->migCtor; 
347         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
348       }
349       else  {
350         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
351         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
352         migCtor = _chareTable[cIdx]->migCtor; 
353         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
354       }         
355     }
356
357     p|migCtor;
358
359     // if delegated manager has not been created, construct a dummy
360     // object on which to call DelegatePointerPup
361     if (delegatedMgr == NULL) {
362
363       // create a dummy object for calling DelegatePointerPup
364       int objId = _entryTable[migCtor]->chareIdx; 
365       int objSize = _chareTable[objId]->size; 
366       void *obj = malloc(objSize); 
367       _entryTable[migCtor]->call(NULL, obj); 
368       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
369         ->DelegatePointerPup(p, delegatedPtr);           
370       free(obj);
371
372     }
373     else {
374
375       // delegated manager has been created, so we can use it
376       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
377
378     }
379
380     if (p.isUnpacking() && delegatedPtr) {
381       delegatedPtr->ref();
382     }
383   }
384 }
385
386 /**** Array sections */
387 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
388 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
389   _cookie.get_aid() = aid;      \
390   _cookie.get_pe() = CkMyPe();  \
391   _elems = new CkArrayIndex[nElems];    \
392   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
393   pelist = NULL;        \
394   npes  = 0;    \
395 }
396
397 CKSECTIONID_CONSTRUCTOR_DEF(1D)
398 CKSECTIONID_CONSTRUCTOR_DEF(2D)
399 CKSECTIONID_CONSTRUCTOR_DEF(3D)
400 CKSECTIONID_CONSTRUCTOR_DEF(4D)
401 CKSECTIONID_CONSTRUCTOR_DEF(5D)
402 CKSECTIONID_CONSTRUCTOR_DEF(6D)
403 CKSECTIONID_CONSTRUCTOR_DEF(Max)
404
405 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
406   pelist = new int[npes];
407   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
408   _cookie.get_aid() = gid;
409 }
410
411 CkSectionID::CkSectionID(const CkSectionID &sid) {
412   int i;
413   _cookie = sid._cookie;
414   _nElems = sid._nElems;
415   if (_nElems > 0) {
416     _elems = new CkArrayIndex[_nElems];
417     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
418   } else _elems = NULL;
419   npes = sid.npes;
420   if (npes > 0) {
421     pelist = new int[npes];
422     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
423   } else pelist = NULL;
424 }
425
426 void CkSectionID::operator=(const CkSectionID &sid) {
427   int i;
428   _cookie = sid._cookie;
429   _nElems = sid._nElems;
430   if (_nElems > 0) {
431     _elems = new CkArrayIndex[_nElems];
432     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
433   } else _elems = NULL;
434   npes = sid.npes;
435   if (npes > 0) {
436     pelist = new int[npes];
437     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
438   } else pelist = NULL;
439 }
440
441 void CkSectionID::pup(PUP::er &p) {
442     p | _cookie;
443     p(_nElems);
444     if (_nElems > 0) {
445       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
446       for (int i=0; i< _nElems; i++) p | _elems[i];
447       npes = 0;
448       pelist = NULL;
449     } else {
450       // If _nElems is zero, than this section describes processors instead of array elements
451       _elems = NULL;
452       p(npes);
453       if (p.isUnpacking()) pelist = new int[npes];
454       p(pelist, npes);
455     }
456 }
457
458 /**** Tiny random API routines */
459
460 #ifdef CMK_CUDA
461 void CUDACallbackManager(void *fn) {
462   if (fn != NULL) {
463     CkCallback *cb = (CkCallback*) fn;
464     cb->send();
465   }
466 }
467
468 #endif
469
470 extern "C"
471 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
472 {
473   UsrToEnv(msg)->setRef(ref);
474 }
475
476 extern "C"
477 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
478 {
479   return UsrToEnv(msg)->getRef();
480 }
481
482 extern "C"
483 int CkGetSrcPe(void *msg)
484 {
485   return UsrToEnv(msg)->getSrcPe();
486 }
487
488 extern "C"
489 int CkGetSrcNode(void *msg)
490 {
491   return CmiNodeOf(CkGetSrcPe(msg));
492 }
493
494 extern "C"
495 void *CkLocalBranch(CkGroupID gID) {
496   return _localBranch(gID);
497 }
498
499 static
500 void *_ckLocalNodeBranch(CkGroupID groupID) {
501   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
502   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
503   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
504   return retval;
505 }
506
507 extern "C"
508 void *CkLocalNodeBranch(CkGroupID groupID)
509 {
510   void *retval;
511   // we are called in a constructor
512   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
513     return CkpvAccess(_currentNodeGroupObj);
514   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
515   { // Nodegroup hasn't finished being created yet-- schedule...
516     CsdScheduler(0);
517   }
518   return retval;
519 }
520
521 extern "C"
522 void *CkLocalChare(const CkChareID *pCid)
523 {
524         int pe=pCid->onPE;
525         if (pe<0) { //A virtual chare ID
526                 if (pe!=(-(CkMyPe()+1)))
527                         return NULL;//VID block not on this PE
528 #ifdef CMK_CHARE_USE_PTR
529                 VidBlock *v=(VidBlock *)pCid->objPtr;
530 #else
531                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
532 #endif
533                 return v->getLocalChareObj();
534         }
535         else
536         { //An ordinary chare ID
537                 if (pe!=CkMyPe())
538                         return NULL;//Chare not on this PE
539 #ifdef CMK_CHARE_USE_PTR
540                 return pCid->objPtr;
541 #else
542                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
543 #endif
544         }
545 }
546
547 CkpvDeclare(char**,Ck_argv);
548
549 extern "C" char **CkGetArgv(void) {
550         return CkpvAccess(Ck_argv);
551 }
552 extern "C" int CkGetArgc(void) {
553         return CmiGetArgc(CkpvAccess(Ck_argv));
554 }
555
556 /******************** Basic support *****************/
557 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
558 {
559 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
560         CpvAccess(_currentObj) = (Chare *)obj;
561 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
562 #endif
563   //BIGSIM_OOC DEBUGGING
564   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
565   //fflush(stdout);
566 #if CMK_CHARMDEBUG
567   CpdBeforeEp(epIdx, obj, msg);
568 #endif    
569   _entryTable[epIdx]->call(msg, obj);
570 #if CMK_CHARMDEBUG
571   CpdAfterEp(epIdx);
572 #endif
573   if (_entryTable[epIdx]->noKeep)
574   { /* Method doesn't keep/delete the message, so we have to: */
575     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
576   }
577 }
578 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
579 {
580   //BIGSIM_OOC DEBUGGING
581   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
582   //fflush(stdout);
583
584   void *deliverMsg;
585 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
586         CpvAccess(_currentObj) = (Chare *)obj;
587 #endif
588   if (_entryTable[epIdx]->noKeep)
589   { /* Deliver a read-only copy of the message */
590     deliverMsg=(void *)msg;
591   } else
592   { /* Method needs a copy of the message to keep/delete */
593     void *oldMsg=(void *)msg;
594     deliverMsg=CkCopyMsg(&oldMsg);
595 #if CMK_ERROR_CHECKING
596     if (oldMsg!=msg)
597       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
598 #endif
599   }
600 #if CMK_CHARMDEBUG
601   CpdBeforeEp(epIdx, obj, (void*)msg);
602 #endif
603   _entryTable[epIdx]->call(deliverMsg, obj);
604 #if CMK_CHARMDEBUG
605   CpdAfterEp(epIdx);
606 #endif
607 }
608
609 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
610 {
611   register void *msg = EnvToUsr(env);
612   _SET_USED(env, 0);
613   CkDeliverMessageFree(epIdx,msg,obj);
614 }
615
616 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
617 {
618
619 #if CMK_TRACE_ENABLED 
620   if (_entryTable[epIdx]->traceEnabled) {
621     _TRACE_BEGIN_EXECUTE(env);
622     _invokeEntryNoTrace(epIdx,env,obj);
623     _TRACE_END_EXECUTE();
624   }
625   else
626 #endif
627     _invokeEntryNoTrace(epIdx,env,obj);
628
629 }
630
631 /********************* Creation ********************/
632
633 extern "C"
634 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
635 {
636   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
637   envelope *env = UsrToEnv(msg);
638   _CHECK_USED(env);
639   if(pCid == 0) {
640     env->setMsgtype(NewChareMsg);
641   } else {
642     pCid->onPE = (-(CkMyPe()+1));
643     //  pCid->magic = _GETIDX(cIdx);
644     pCid->objPtr = (void *) new VidBlock();
645     _MEMCHECK(pCid->objPtr);
646     env->setMsgtype(NewVChareMsg);
647     env->setVidPtr(pCid->objPtr);
648 #ifndef CMK_CHARE_USE_PTR
649     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
650     int idx = CkpvAccess(vidblocks).size()-1;
651     pCid->objPtr = (void *)(CmiIntPtr)idx;
652     env->setVidPtr((void *)(CmiIntPtr)idx);
653 #endif
654   }
655   env->setEpIdx(eIdx);
656   env->setSrcPe(CkMyPe());
657   CmiSetHandler(env, _charmHandlerIdx);
658   _TRACE_CREATION_1(env);
659   CpvAccess(_qd)->create();
660   _STATS_RECORD_CREATE_CHARE_1();
661   _SET_USED(env, 1);
662   if(destPE == CK_PE_ANY)
663     env->setForAnyPE(1);
664   else
665     env->setForAnyPE(0);
666   _CldEnqueue(destPE, env, _infoIdx);
667   _TRACE_CREATION_DONE(1);
668 }
669
670 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
671 {
672   register int gIdx = _entryTable[epIdx]->chareIdx;
673   register void *obj = malloc(_chareTable[gIdx]->size);
674   _MEMCHECK(obj);
675   setMemoryTypeChare(obj);
676   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
677   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
678   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
679   CkpvAccess(_groupIDTable)->push_back(groupID);
680   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
681   if(ptrq) {
682     void *pending;
683     while((pending=ptrq->deq())!=0)
684       _CldEnqueue(CkMyPe(), pending, _infoIdx);
685 //    delete ptrq;
686       CkpvAccess(_groupTable)->find(groupID).clearPending();
687   }
688   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
689
690   CkpvAccess(_currentGroup) = groupID;
691   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
692 #ifndef CMK_CHARE_USE_PTR
693   //((Chare *)obj)->chareIdx = -1;
694   CkpvAccess(currentChareIdx) = -1;
695 #endif
696   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
697   _STATS_RECORD_PROCESS_GROUP_1();
698 }
699
700 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
701 {
702   register int gIdx = _entryTable[epIdx]->chareIdx;
703   int objSize=_chareTable[gIdx]->size;
704   register void *obj = malloc(objSize);
705   _MEMCHECK(obj);
706   setMemoryTypeChare(obj);
707   CkpvAccess(_currentGroup) = groupID;
708
709 // Now that the NodeGroup is created, add it to the table.
710 //  NodeGroups can be accessed by multiple processors, so
711 //  this is in the opposite order from groups - invoking the constructor
712 //  before registering it.
713 // User may call CkLocalNodeBranch() inside the nodegroup constructor
714 //  store nodegroup into _currentNodeGroupObj
715   CkpvAccess(_currentNodeGroupObj) = obj;
716 #ifndef CMK_CHARE_USE_PTR
717   //((Chare *)obj)->chareIdx = -1;
718   CkpvAccess(currentChareIdx) = -1;
719 #endif
720   _invokeEntryNoTrace(epIdx,env,obj);
721   CkpvAccess(_currentNodeGroupObj) = NULL;
722   _STATS_RECORD_PROCESS_NODE_GROUP_1();
723
724   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
725   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
726   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
727   CksvAccess(_nodeGroupIDTable).push_back(groupID);
728
729   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
730   if(ptrq) {
731     void *pending;
732     while((pending=ptrq->deq())!=0)
733       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
734 //    delete ptrq;
735       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
736   }
737   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
738 }
739
740 void _createGroup(CkGroupID groupID, envelope *env)
741 {
742   _CHECK_USED(env);
743   _SET_USED(env, 1);
744   register int epIdx = env->getEpIdx();
745   int gIdx = _entryTable[epIdx]->chareIdx;
746   CkNodeGroupID rednMgr;
747   if(_chareTable[gIdx]->isIrr == 0){
748                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
749                 rednMgr = rednMgrProxy;
750 //              rednMgrProxy.setAttachedGroup(groupID);
751   }else{
752         rednMgr.setZero();
753   }
754   env->setGroupNum(groupID);
755   env->setSrcPe(CkMyPe());
756   env->setRednMgr(rednMgr);
757   env->setGroupEpoch(CkpvAccess(_charmEpoch));
758
759   if(CkNumPes()>1) {
760     CkPackMessage(&env);
761     CmiSetHandler(env, _bocHandlerIdx);
762     _numInitMsgs++;
763     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
764     CpvAccess(_qd)->create(CkNumPes()-1);
765     CkUnpackMessage(&env);
766   }
767   _STATS_RECORD_CREATE_GROUP_1();
768   CkCreateLocalGroup(groupID, epIdx, env);
769 }
770
771 void _createNodeGroup(CkGroupID groupID, envelope *env)
772 {
773   _CHECK_USED(env);
774   _SET_USED(env, 1);
775   register int epIdx = env->getEpIdx();
776   env->setGroupNum(groupID);
777   env->setSrcPe(CkMyPe());
778   env->setGroupEpoch(CkpvAccess(_charmEpoch));
779   if(CkNumNodes()>1) {
780     CkPackMessage(&env);
781     CmiSetHandler(env, _bocHandlerIdx);
782     _numInitMsgs++;
783     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
784     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
785     CpvAccess(_qd)->create(CkNumNodes()-1);
786     CkUnpackMessage(&env);
787   }
788   _STATS_RECORD_CREATE_NODE_GROUP_1();
789   CkCreateLocalNodeGroup(groupID, epIdx, env);
790 }
791
792 // new _groupCreate
793
794 static CkGroupID _groupCreate(envelope *env)
795 {
796   register CkGroupID groupNum;
797
798   // check CkMyPe(). if it is 0 then idx is _numGroups++
799   // if not, then something else...
800   if(CkMyPe() == 0)
801      groupNum.idx = CkpvAccess(_numGroups)++;
802   else
803      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
804   _createGroup(groupNum, env);
805   return groupNum;
806 }
807
808 // new _nodeGroupCreate
809 static CkGroupID _nodeGroupCreate(envelope *env)
810 {
811   register CkGroupID groupNum;
812   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
813   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
814           groupNum.idx = CksvAccess(_numNodeGroups)++;
815    else
816           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
817   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
818   _createNodeGroup(groupNum, env);
819   return groupNum;
820 }
821
822 /**** generate the group idx when group is creator pe is not pe0
823  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
824  **** remaining bits contain the group creator processor number and
825  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
826
827 int _getGroupIdx(int numNodes,int myNode,int numGroups)
828 {
829         int idx;
830         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
831         int n = 32 - (x+1);                                     // number of bits remaining for the index
832         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
833                                                                 // then add the next available index
834         // of course this won't work when int is 8 bytes long on T3E
835         //idx |= 0x80000000;                                      // set the most significant bit to 1
836         idx = - idx;
837                                                                 // if int is not 32 bits, wouldn't this be wrong?
838         return idx;
839 }
840
841 extern "C"
842 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
843 {
844   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
845   register envelope *env = UsrToEnv(msg);
846   env->setMsgtype(BocInitMsg);
847   env->setEpIdx(eIdx);
848   env->setSrcPe(CkMyPe());
849   _TRACE_CREATION_N(env, CkNumPes());
850   CkGroupID gid = _groupCreate(env);
851   _TRACE_CREATION_DONE(1);
852   return gid;
853 }
854
855 extern "C"
856 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
857 {
858   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
859   register envelope *env = UsrToEnv(msg);
860   env->setMsgtype(NodeBocInitMsg);
861   env->setEpIdx(eIdx);
862   env->setSrcPe(CkMyPe());
863   _TRACE_CREATION_N(env, CkNumNodes());
864   CkGroupID gid = _nodeGroupCreate(env);
865   _TRACE_CREATION_DONE(1);
866   return gid;
867 }
868
869 static inline void *_allocNewChare(envelope *env, int &idx)
870 {
871   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
872   void *tmp=malloc(_chareTable[chareIdx]->size);
873   _MEMCHECK(tmp);
874 #ifndef CMK_CHARE_USE_PTR
875   CkpvAccess(chare_objs).push_back(tmp);
876   CkpvAccess(chare_types).push_back(chareIdx);
877   idx = CkpvAccess(chare_objs).size()-1;
878 #endif
879   setMemoryTypeChare(tmp);
880   return tmp;
881 }
882
883 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
884 {
885   int idx;
886   register void *obj = _allocNewChare(env, idx);
887 #ifndef CMK_CHARE_USE_PTR
888   //((Chare *)obj)->chareIdx = idx;
889   CkpvAccess(currentChareIdx) = idx;
890 #endif
891   _invokeEntry(env->getEpIdx(),env,obj);
892 }
893
894 void CkCreateLocalChare(int epIdx, envelope *env)
895 {
896   env->setEpIdx(epIdx);
897   _processNewChareMsg(NULL, env);
898 }
899
900 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
901 {
902   int idx;
903   register void *obj = _allocNewChare(env, idx);
904   register CkChareID *pCid = (CkChareID *)
905       _allocMsg(FillVidMsg, sizeof(CkChareID));
906   pCid->onPE = CkMyPe();
907 #ifndef CMK_CHARE_USE_PTR
908   pCid->objPtr = (void*)(CmiIntPtr)idx;
909 #else
910   pCid->objPtr = obj;
911 #endif
912   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
913   register envelope *ret = UsrToEnv(pCid);
914   ret->setVidPtr(env->getVidPtr());
915   register int srcPe = env->getSrcPe();
916   ret->setSrcPe(CkMyPe());
917   CmiSetHandler(ret, _charmHandlerIdx);
918   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
919 #ifndef CMK_CHARE_USE_PTR
920   // register the remote vidblock for deletion when chare is deleted
921   CkChareID vid;
922   vid.onPE = srcPe;
923   vid.objPtr = env->getVidPtr();
924   CkpvAccess(vmap)[idx] = vid;    
925 #endif
926   CpvAccess(_qd)->create();
927 #ifndef CMK_CHARE_USE_PTR
928   //((Chare *)obj)->chareIdx = idx;
929   CkpvAccess(currentChareIdx) = idx;
930 #endif
931   _invokeEntry(env->getEpIdx(),env,obj);
932 }
933
934 /************** Receive: Chares *************/
935
936 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
937 {
938   register int epIdx = env->getEpIdx();
939   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
940   register void *obj;
941   if (mainIdx != -1)  {           // mainchare
942     CmiAssert(CkMyPe()==0);
943     obj = _mainTable[mainIdx]->getObj();
944   }
945   else {
946 #ifndef CMK_CHARE_USE_PTR
947     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
948       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
949     else
950       obj = env->getObjPtr();
951 #else
952     obj = env->getObjPtr();
953 #endif
954   }
955   _invokeEntry(epIdx,env,obj);
956 }
957
958 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
959 {
960   register int epIdx = env->getEpIdx();
961   register void *obj = env->getObjPtr();
962   _invokeEntry(epIdx,env,obj);
963 }
964
965 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
966 {
967 #ifndef CMK_CHARE_USE_PTR
968   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
969 #else
970   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
971   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
972 #endif
973   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
974   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
975   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
976   CmiFree(env);
977 }
978
979 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
980 {
981 #ifndef CMK_CHARE_USE_PTR
982   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
983 #else
984   VidBlock *vptr = (VidBlock *) env->getVidPtr();
985   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
986 #endif
987   _SET_USED(env, 1);
988   vptr->send(env);
989 }
990
991 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
992 {
993 #ifndef CMK_CHARE_USE_PTR
994   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
995   delete vptr;
996   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
997 #endif
998   CmiFree(env);
999 }
1000
1001 /************** Receive: Groups ****************/
1002
1003 /**
1004  Return a pointer to the local BOC of "groupID".
1005  The message "env" passed in has some known dependency on this groupID
1006  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1007  Therefore, if the return value is NULL, this function buffers the massage so that
1008  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1009  The message passed in must have its handlers correctly set so that it can be
1010  scheduled again.
1011 */
1012 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1013 {
1014
1015         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1016         IrrGroup *obj = ck->localBranch(groupID);
1017         if (obj==NULL) { /* groupmember not yet created: stash message */
1018                 ck->getGroupTable()->find(groupID).enqMsg(env);
1019         }
1020         else { /* will be able to process message */
1021                 ck->process();
1022         }
1023         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1024         return obj;
1025 }
1026
1027 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1028 {
1029 #if CMK_LBDB_ON
1030   // if there is a running obj being measured, stop it temporarily
1031   LDObjHandle objHandle;
1032   int objstopped = 0;
1033   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1034   if (the_lbdb->RunningObject(&objHandle)) {
1035     objstopped = 1;
1036     the_lbdb->ObjectStop(objHandle);
1037   }
1038 #endif
1039   _invokeEntry(epIdx,env,obj);
1040 #if CMK_LBDB_ON
1041   if (objstopped) the_lbdb->ObjectStart(objHandle);
1042 #endif
1043   _STATS_RECORD_PROCESS_BRANCH_1();
1044 }
1045
1046 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1047 {
1048   register CkGroupID groupID =  env->getGroupNum();
1049   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1050   if(obj) {
1051     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1052   }
1053 }
1054
1055 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1056 {
1057   env->setMsgtype(ForChareMsg);
1058   env->setObjPtr(obj);
1059   _processForChareMsg(ck,env);
1060   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1061 }
1062
1063 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1064 {
1065   env->setEpIdx(epIdx);
1066   _deliverForNodeBocMsg(ck,env, obj);
1067 }
1068
1069 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1070 {
1071   register CkGroupID groupID = env->getGroupNum();
1072   register void *obj;
1073
1074   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1075   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1076   if(!obj) { // groupmember not yet created
1077 #if CMK_IMMEDIATE_MSG
1078     if (CmiIsImmediate(env))     // buffer immediate message
1079       CmiDelayImmediate();
1080     else
1081 #endif
1082     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1083     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1084     return;
1085   }
1086   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1087 #if CMK_IMMEDIATE_MSG
1088   if (!CmiIsImmediate(env))
1089 #endif
1090   ck->process();
1091   env->setMsgtype(ForChareMsg);
1092   env->setObjPtr(obj);
1093   _processForChareMsg(ck,env);
1094   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1095 }
1096
1097 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1098 {
1099   register CkGroupID groupID = env->getGroupNum();
1100   register int epIdx = env->getEpIdx();
1101   if (!env->getGroupDep().isZero()) {      // dependence
1102     CkGroupID dep = env->getGroupDep();
1103     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1104     if (obj == NULL) return;
1105   }
1106   else
1107     ck->process();
1108   CkCreateLocalGroup(groupID, epIdx, env);
1109 }
1110
1111 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1112 {
1113   register CkGroupID groupID = env->getGroupNum();
1114   register int epIdx = env->getEpIdx();
1115   CkCreateLocalNodeGroup(groupID, epIdx, env);
1116 }
1117
1118 /************** Receive: Arrays *************/
1119
1120 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1121   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1122   if (mgr) {
1123     _SET_USED(env, 0);
1124     mgr->insertElement((CkMessage *)EnvToUsr(env));
1125   }
1126 }
1127 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1128   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1129   if (mgr) {
1130     _SET_USED(env, 0);
1131     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1132   }
1133 }
1134
1135 //BIGSIM_OOC DEBUGGING
1136 #define TELLMSGTYPE(x) //x
1137
1138 /**
1139  * This is the main converse-level handler used by all of Charm++.
1140  *
1141  * \addtogroup CriticalPathFramework
1142  */
1143 void _processHandler(void *converseMsg,CkCoreState *ck)
1144 {
1145   register envelope *env = (envelope *) converseMsg;
1146
1147   MESSAGE_PHASE_CHECK(env);
1148
1149 //#if CMK_RECORD_REPLAY
1150   if (ck->watcher!=NULL) {
1151     if (!ck->watcher->processMessage(&env,ck)) return;
1152   }
1153 //#endif
1154 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1155         Chare *obj=NULL;
1156         CkObjID sender;
1157         MCount SN;
1158         MlogEntry *entry=NULL;
1159         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1160         env->getMsgtype() == ForArrayEltMsg){
1161                 sender = env->sender;
1162                 SN = env->SN;
1163                 int result = preProcessReceivedMessage(env,&obj,&entry);
1164                 if(result == 0){
1165                         return;
1166                 }
1167         }
1168 #endif
1169
1170 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1171   //  CkPrintf("START\n");
1172   criticalPath_start(env);
1173 #endif
1174
1175
1176   switch(env->getMsgtype()) {
1177 // Group support
1178     case BocInitMsg :
1179       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1180       // QD processing moved inside _processBocInitMsg because it is conditional
1181       //ck->process(); 
1182       if(env->isPacked()) CkUnpackMessage(&env);
1183       _processBocInitMsg(ck,env);
1184       break;
1185     case NodeBocInitMsg :
1186       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1187       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1188       _processNodeBocInitMsg(ck,env);
1189       break;
1190     case ForBocMsg :
1191       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1192       // QD processing moved inside _processForBocMsg because it is conditional
1193       if(env->isPacked()) CkUnpackMessage(&env);
1194       _processForBocMsg(ck,env);
1195       // stats record moved inside _processForBocMsg because it is conditional
1196       break;
1197     case ForNodeBocMsg :
1198       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1199       // QD processing moved to _processForNodeBocMsg because it is conditional
1200       if(env->isPacked()) CkUnpackMessage(&env);
1201       _processForNodeBocMsg(ck,env);
1202       // stats record moved to _processForNodeBocMsg because it is conditional
1203       break;
1204
1205 // Array support
1206     case ArrayEltInitMsg:
1207       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1208       if(env->isPacked()) CkUnpackMessage(&env);
1209       _processArrayEltInitMsg(ck,env);
1210       break;
1211     case ForArrayEltMsg:
1212       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1213       if(env->isPacked()) CkUnpackMessage(&env);
1214       _processArrayEltMsg(ck,env);
1215       break;
1216
1217 // Chare support
1218     case NewChareMsg :
1219       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1220       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1221       _processNewChareMsg(ck,env);
1222       _STATS_RECORD_PROCESS_CHARE_1();
1223       break;
1224     case NewVChareMsg :
1225       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1226       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1227       _processNewVChareMsg(ck,env);
1228       _STATS_RECORD_PROCESS_CHARE_1();
1229       break;
1230     case ForChareMsg :
1231       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1232       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1233       _processForPlainChareMsg(ck,env);
1234       _STATS_RECORD_PROCESS_MSG_1();
1235       break;
1236     case ForVidMsg   :
1237       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1238       ck->process();
1239       _processForVidMsg(ck,env);
1240       break;
1241     case FillVidMsg  :
1242       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1243       ck->process();
1244       _processFillVidMsg(ck,env);
1245       break;
1246     case DeleteVidMsg  :
1247       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1248       ck->process();
1249       _processDeleteVidMsg(ck,env);
1250       break;
1251
1252     default:
1253       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1254   }
1255 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1256         if(obj != NULL){
1257                 postProcessReceivedMessage(obj,sender,SN,entry);
1258         }
1259 #endif
1260
1261
1262 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1263   criticalPath_end();
1264   //  CkPrintf("STOP\n");
1265 #endif
1266
1267
1268 }
1269
1270
1271 /******************** Message Send **********************/
1272
1273 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1274              int *queueing, int *priobits, unsigned int **prioptr)
1275 {
1276   register envelope *env = (envelope *)converseMsg;
1277   *pfn = (CldPackFn)CkPackMessage;
1278   *len = env->getTotalsize();
1279   *queueing = env->getQueueing();
1280   *priobits = env->getPriobits();
1281   *prioptr = (unsigned int *) env->getPrioPtr();
1282 }
1283
1284 void CkPackMessage(envelope **pEnv)
1285 {
1286   register envelope *env = *pEnv;
1287   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1288     register void *msg = EnvToUsr(env);
1289     _TRACE_BEGIN_PACK();
1290     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1291     _TRACE_END_PACK();
1292     env=UsrToEnv(msg);
1293     env->setPacked(1);
1294     *pEnv = env;
1295   }
1296 }
1297
1298 void CkUnpackMessage(envelope **pEnv)
1299 {
1300   register envelope *env = *pEnv;
1301   register int msgIdx = env->getMsgIdx();
1302   if(env->isPacked()) {
1303     register void *msg = EnvToUsr(env);
1304     _TRACE_BEGIN_UNPACK();
1305     msg = _msgTable[msgIdx]->unpack(msg);
1306     _TRACE_END_UNPACK();
1307     env=UsrToEnv(msg);
1308     env->setPacked(0);
1309     *pEnv = env;
1310   }
1311 }
1312
1313 //There's no reason for most messages to go through the Cld--
1314 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1315 // Thus these accellerated versions of the Cld calls.
1316 #if CMK_OBJECT_QUEUE_AVAILABLE
1317 static int index_objectQHandler;
1318 #endif
1319 int index_tokenHandler;
1320 int index_skipCldHandler;
1321
1322 void _skipCldHandler(void *converseMsg)
1323 {
1324   register envelope *env = (envelope *)(converseMsg);
1325   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1326 #if CMK_GRID_QUEUE_AVAILABLE
1327   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1328     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1329                        env, env->getQueueing (), env->getPriobits (),
1330                        (unsigned int *) env->getPrioPtr ());
1331   } else {
1332     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1333                        env, env->getQueueing (), env->getPriobits (),
1334                        (unsigned int *) env->getPrioPtr ());
1335   }
1336 #else
1337   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1338         env, env->getQueueing(),env->getPriobits(),
1339         (unsigned int *)env->getPrioPtr());
1340 #endif
1341 }
1342
1343
1344 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1345 // Made non-static to be used by ckmessagelogging
1346 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1347 {
1348 #if CMK_CHARMDEBUG
1349   if (!ConverseDeliver(pe)) {
1350     CmiFree(env);
1351     return;
1352   }
1353 #endif
1354   if(pe == CkMyPe() ){
1355     if(!CmiNodeAlive(CkMyPe())){
1356         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1357 //      return;
1358     }
1359   }
1360   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1361 #if CMK_OBJECT_QUEUE_AVAILABLE
1362     Chare *obj = CkFindObjectPtr(env);
1363     if (obj && obj->CkGetObjQueue().queue()) {
1364       _enqObjQueue(obj, env);
1365     }
1366     else
1367 #endif
1368     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1369         env, env->getQueueing(),env->getPriobits(),
1370         (unsigned int *)env->getPrioPtr());
1371   } else {
1372     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1373       CkPackMessage(&env);
1374     int len=env->getTotalsize();
1375     CmiSetXHandler(env,CmiGetHandler(env));
1376 #if CMK_OBJECT_QUEUE_AVAILABLE
1377     CmiSetHandler(env,index_objectQHandler);
1378 #else
1379     CmiSetHandler(env,index_skipCldHandler);
1380 #endif
1381     CmiSetInfo(env,infoFn);
1382     if (pe==CLD_BROADCAST) {
1383 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1384                         CmiSyncBroadcast(len, (char *)env);
1385 #else
1386                         CmiSyncBroadcastAndFree(len, (char *)env); 
1387 #endif
1388
1389 }
1390     else if (pe==CLD_BROADCAST_ALL) { 
1391 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1392                         CmiSyncBroadcastAll(len, (char *)env);
1393 #else
1394                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1395 #endif
1396
1397 }
1398     else{
1399 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1400                         CmiSyncSend(pe, len, (char *)env);
1401 #else
1402                         CmiSyncSendAndFree(pe, len, (char *)env);
1403 #endif
1404
1405                 }
1406   }
1407 }
1408
1409 #if CMK_BIGSIM_CHARM
1410 #   define  _skipCldEnqueue   _CldEnqueue
1411 #endif
1412
1413 // by pass Charm++ priority queue, send as Converse message
1414 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1415 {
1416 #if CMK_CHARMDEBUG
1417   if (!ConverseDeliver(-1)) {
1418     CmiFree(env);
1419     return;
1420   }
1421 #endif
1422   CkPackMessage(&env);
1423   int len=env->getTotalsize();
1424   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1425 }
1426
1427 static void _noCldEnqueue(int pe, envelope *env)
1428 {
1429 /*
1430   if (pe == CkMyPe()) {
1431     CmiHandleMessage(env);
1432   } else
1433 */
1434 #if CMK_CHARMDEBUG
1435   if (!ConverseDeliver(pe)) {
1436     CmiFree(env);
1437     return;
1438   }
1439 #endif
1440   CkPackMessage(&env);
1441   int len=env->getTotalsize();
1442   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1443   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1444   else CmiSyncSendAndFree(pe, len, (char *)env);
1445 }
1446
1447 //static void _noCldNodeEnqueue(int node, envelope *env)
1448 //Made non-static to be used by ckmessagelogging
1449 void _noCldNodeEnqueue(int node, envelope *env)
1450 {
1451 /*
1452   if (node == CkMyNode()) {
1453     CmiHandleMessage(env);
1454   } else {
1455 */
1456 #if CMK_CHARMDEBUG
1457   if (!ConverseDeliver(node)) {
1458     CmiFree(env);
1459     return;
1460   }
1461 #endif
1462   CkPackMessage(&env);
1463   int len=env->getTotalsize();
1464   if (node==CLD_BROADCAST) { 
1465 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1466         CmiSyncNodeBroadcast(len, (char *)env);
1467 #else
1468         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1469 #endif
1470 }
1471   else if (node==CLD_BROADCAST_ALL) { 
1472 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1473                 CmiSyncNodeBroadcastAll(len, (char *)env);
1474 #else
1475                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1476 #endif
1477
1478 }
1479   else {
1480 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1481         CmiSyncNodeSend(node, len, (char *)env);
1482 #else
1483         CmiSyncNodeSendAndFree(node, len, (char *)env);
1484 #endif
1485   }
1486 }
1487
1488 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1489 {
1490   register envelope *env = UsrToEnv(msg);
1491   _CHECK_USED(env);
1492   _SET_USED(env, 1);
1493 #if CMK_REPLAYSYSTEM
1494   setEventID(env);
1495 #endif
1496   env->setMsgtype(ForChareMsg);
1497   env->setEpIdx(eIdx);
1498   env->setSrcPe(CkMyPe());
1499 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1500   criticalPath_send(env);
1501   automaticallySetMessagePriority(env);
1502 #endif
1503 #if CMK_CHARMDEBUG
1504   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1505 #endif
1506 #if CMK_OBJECT_QUEUE_AVAILABLE
1507   CmiSetHandler(env, index_objectQHandler);
1508 #else
1509   CmiSetHandler(env, _charmHandlerIdx);
1510 #endif
1511   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1512     register int pe = -(pCid->onPE+1);
1513     if(pe==CkMyPe()) {
1514 #ifndef CMK_CHARE_USE_PTR
1515       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1516 #else
1517       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1518 #endif
1519       void *objPtr;
1520       if (NULL!=(objPtr=vblk->getLocalChare()))
1521       { //A ready local chare
1522         env->setObjPtr(objPtr);
1523         return pe;
1524       }
1525       else { //The vidblock is not ready-- forget it
1526         vblk->send(env);
1527         return -1;
1528       }
1529     } else { //Valid vidblock for another PE:
1530       env->setMsgtype(ForVidMsg);
1531       env->setVidPtr(pCid->objPtr);
1532       return pe;
1533     }
1534   }
1535   else {
1536     env->setObjPtr(pCid->objPtr);
1537     return pCid->onPE;
1538   }
1539 }
1540
1541 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1542 {
1543   int destPE = _prepareMsg(eIdx, msg, pCid);
1544   if (destPE != -1) {
1545     register envelope *env = UsrToEnv(msg);
1546 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1547     criticalPath_send(env);
1548     automaticallySetMessagePriority(env);
1549 #endif
1550     CmiBecomeImmediate(env);
1551   }
1552   return destPE;
1553 }
1554
1555 extern "C"
1556 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1557 {
1558   if (opts & CK_MSG_INLINE) {
1559     CkSendMsgInline(entryIdx, msg, pCid, opts);
1560     return;
1561   }
1562 #if CMK_ERROR_CHECKING
1563   if (opts & CK_MSG_IMMEDIATE) {
1564     CmiAbort("Immediate message is not allowed in Chare!");
1565   }
1566 #endif
1567   register envelope *env = UsrToEnv(msg);
1568   int destPE=_prepareMsg(entryIdx,msg,pCid);
1569   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1570   // VidBlock was not yet filled). The problem is that the creation was never
1571   // traced later when the VidBlock was filled. One solution is to trace the
1572   // creation here, the other to trace it in VidBlock->msgDeliver().
1573   _TRACE_CREATION_1(env);
1574   if (destPE!=-1) {
1575     CpvAccess(_qd)->create();
1576     if (opts & CK_MSG_SKIP_OR_IMM)
1577       _noCldEnqueue(destPE, env);
1578     else
1579       _CldEnqueue(destPE, env, _infoIdx);
1580   }
1581   _TRACE_CREATION_DONE(1);
1582 }
1583
1584 extern "C"
1585 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1586 {
1587   if (pCid->onPE==CkMyPe())
1588   {
1589     if(!CmiNodeAlive(CkMyPe())){
1590         return;
1591     }
1592 #if CMK_CHARMDEBUG
1593     //Just in case we need to breakpoint or use the envelope in some way
1594     _prepareMsg(entryIndex,msg,pCid);
1595 #endif
1596                 //Just directly call the chare (skip QD handling & scheduler)
1597     register envelope *env = UsrToEnv(msg);
1598     if (env->isPacked()) CkUnpackMessage(&env);
1599     _STATS_RECORD_PROCESS_MSG_1();
1600     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1601   }
1602   else {
1603     //No way to inline a cross-processor message:
1604     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1605   }
1606 }
1607
1608 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1609 {
1610   register envelope *env = UsrToEnv(msg);
1611 #if CMK_ERROR_CHECKING
1612   CkNodeGroupID nodeRedMgr;
1613 #endif
1614   _CHECK_USED(env);
1615   _SET_USED(env, 1);
1616 #if CMK_REPLAYSYSTEM
1617   setEventID(env);
1618 #endif
1619   env->setMsgtype(type);
1620   env->setEpIdx(eIdx);
1621   env->setGroupNum(gID);
1622   env->setSrcPe(CkMyPe());
1623 #if CMK_ERROR_CHECKING
1624   nodeRedMgr.setZero();
1625   env->setRednMgr(nodeRedMgr);
1626 #endif
1627 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1628   criticalPath_send(env);
1629   automaticallySetMessagePriority(env);
1630 #endif
1631 #if CMK_CHARMDEBUG
1632   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1633 #endif
1634   CmiSetHandler(env, _charmHandlerIdx);
1635   return env;
1636 }
1637
1638 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1639 {
1640   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1641 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1642   criticalPath_send(env);
1643   automaticallySetMessagePriority(env);
1644 #endif
1645   CmiBecomeImmediate(env);
1646   return env;
1647 }
1648
1649 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1650                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1651 {
1652   int numPes;
1653   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1654 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1655   sendTicketGroupRequest(env,pe,_infoIdx);
1656 #else
1657   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1658   _TRACE_CREATION_N(env, numPes);
1659   if (opts & CK_MSG_SKIP_OR_IMM)
1660     _noCldEnqueue(pe, env);
1661   else
1662     _skipCldEnqueue(pe, env, _infoIdx);
1663   _TRACE_CREATION_DONE(1);
1664 #endif
1665 }
1666
1667 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1668                            int npes, int *pes)
1669 {
1670   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1671   _TRACE_CREATION_MULTICAST(env, npes, pes);
1672   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1673   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1674 }
1675
1676 extern "C"
1677 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1678 {
1679 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1680   if (destPE==CkMyPe())
1681   {
1682     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1683     return;
1684   }
1685   //Can't inline-- send the usual way
1686   register envelope *env = UsrToEnv(msg);
1687   int numPes;
1688   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1689   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1690   _TRACE_CREATION_N(env, numPes);
1691   _noCldEnqueue(destPE, env);
1692   _STATS_RECORD_SEND_BRANCH_1();
1693   CkpvAccess(_coreState)->create();
1694   _TRACE_CREATION_DONE(1);
1695 #else
1696   // no support for immediate message, send inline
1697   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1698 #endif
1699 }
1700
1701 extern "C"
1702 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1703 {
1704   if (destPE==CkMyPe())
1705   {
1706     if(!CmiNodeAlive(CkMyPe())){
1707         return;
1708     }
1709     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1710     if (obj!=NULL)
1711     { //Just directly call the group:
1712 #if CMK_ERROR_CHECKING
1713       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1714 #else
1715       envelope *env=UsrToEnv(msg);
1716 #endif
1717       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1718       return;
1719     }
1720   }
1721   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1722   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1723 }
1724
1725 extern "C"
1726 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1727 {
1728   if (opts & CK_MSG_INLINE) {
1729     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1730     return;
1731   }
1732   if (opts & CK_MSG_IMMEDIATE) {
1733     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1734     return;
1735   }
1736   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1737   _STATS_RECORD_SEND_BRANCH_1();
1738   CkpvAccess(_coreState)->create();
1739 }
1740
1741 extern "C"
1742 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1743 {
1744 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1745   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1746   _TRACE_CREATION_MULTICAST(env, npes, pes);
1747   _noCldEnqueueMulti(npes, pes, env);
1748   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1749 #else
1750   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1751   CpvAccess(_qd)->create(-npes);
1752 #endif
1753   _STATS_RECORD_SEND_BRANCH_N(npes);
1754   CpvAccess(_qd)->create(npes);
1755 }
1756
1757 extern "C"
1758 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1759 {
1760   if (opts & CK_MSG_IMMEDIATE) {
1761     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1762     return;
1763   }
1764     // normal mesg
1765   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1766   _STATS_RECORD_SEND_BRANCH_N(npes);
1767   CpvAccess(_qd)->create(npes);
1768 }
1769
1770 extern "C"
1771 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1772 {
1773   int npes;
1774   int *pes;
1775   if (opts & CK_MSG_IMMEDIATE) {
1776     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1777     return;
1778   }
1779     // normal mesg
1780   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1781   CmiLookupGroup(grp, &npes, &pes);
1782   _TRACE_CREATION_MULTICAST(env, npes, pes);
1783   _CldEnqueueGroup(grp, env, _infoIdx);
1784   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1785   _STATS_RECORD_SEND_BRANCH_N(npes);
1786   CpvAccess(_qd)->create(npes);
1787 }
1788
1789 extern "C"
1790 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1791 {
1792   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1793   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1794   CpvAccess(_qd)->create(CkNumPes());
1795 }
1796
1797 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1798                 int node=CLD_BROADCAST_ALL, int opts=0)
1799 {
1800   int numPes;
1801   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1802 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1803         sendTicketNodeGroupRequest(env,node,_infoIdx);
1804 #else
1805   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1806   _TRACE_CREATION_N(env, numPes);
1807   if (opts & CK_MSG_SKIP_OR_IMM) {
1808     _noCldNodeEnqueue(node, env);
1809     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1810       CkpvAccess(_coreState)->create(-numPes);
1811     }
1812   }
1813   else
1814     _CldNodeEnqueue(node, env, _infoIdx);
1815   _TRACE_CREATION_DONE(1);
1816 #endif
1817 }
1818
1819 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1820                            int npes, int *nodes)
1821 {
1822   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1823   _TRACE_CREATION_N(env, npes);
1824   for (int i=0; i<npes; i++) {
1825     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1826   }
1827   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1828 }
1829
1830 extern "C"
1831 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1832 {
1833 #if CMK_IMMEDIATE_MSG
1834   if (node==CkMyNode())
1835   {
1836     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1837     return;
1838   }
1839   //Can't inline-- send the usual way
1840   register envelope *env = UsrToEnv(msg);
1841   int numPes;
1842   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1843   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1844   _TRACE_CREATION_N(env, numPes);
1845   _noCldNodeEnqueue(node, env);
1846   _STATS_RECORD_SEND_BRANCH_1();
1847   /* immeidate message is invisible to QD */
1848 //  CkpvAccess(_coreState)->create();
1849   _TRACE_CREATION_DONE(1);
1850 #else
1851   // no support for immediate message, send inline
1852   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1853 #endif
1854 }
1855
1856 extern "C"
1857 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1858 {
1859   if (node==CkMyNode())
1860   {
1861     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1862     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1863     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1864     if (obj!=NULL)
1865     { //Just directly call the group:
1866 #if CMK_ERROR_CHECKING
1867       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1868 #else
1869       envelope *env=UsrToEnv(msg);
1870 #endif
1871       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1872       return;
1873     }
1874   }
1875   //Can't inline-- send the usual way
1876   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1877 }
1878
1879 extern "C"
1880 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1881 {
1882   if (opts & CK_MSG_INLINE) {
1883     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1884     return;
1885   }
1886   if (opts & CK_MSG_IMMEDIATE) {
1887     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1888     return;
1889   }
1890   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1891   _STATS_RECORD_SEND_NODE_BRANCH_1();
1892   CkpvAccess(_coreState)->create();
1893 }
1894
1895 extern "C"
1896 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1897 {
1898 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1899   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1900   _noCldEnqueueMulti(npes, nodes, env);
1901 #else
1902   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1903   CpvAccess(_qd)->create(-npes);
1904 #endif
1905   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1906   CpvAccess(_qd)->create(npes);
1907 }
1908
1909 extern "C"
1910 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1911 {
1912   if (opts & CK_MSG_IMMEDIATE) {
1913     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1914     return;
1915   }
1916     // normal mesg
1917   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1918   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1919   CpvAccess(_qd)->create(npes);
1920 }
1921
1922 extern "C"
1923 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1924 {
1925   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1926   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1927   CpvAccess(_qd)->create(CkNumNodes());
1928 }
1929
1930 //Needed by delegation manager:
1931 extern "C"
1932 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1933 { return _prepareMsg(eIdx,msg,pCid); }
1934 extern "C"
1935 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1936 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1937 extern "C"
1938 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1939 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1940
1941 void _ckModuleInit(void) {
1942         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1943 #if CMK_OBJECT_QUEUE_AVAILABLE
1944         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1945 #endif
1946         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1947         CkpvInitialize(TokenPool*, _tokenPool);
1948         CkpvAccess(_tokenPool) = new TokenPool;
1949 }
1950
1951
1952 /************** Send: Arrays *************/
1953
1954 extern void CkArrayManagerInsert(int onPe,void *msg);
1955 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1956
1957 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1958 {
1959   _CHECK_USED(env);
1960   _SET_USED(env, 1);
1961   env->setMsgtype(type);
1962 #if CMK_CHARMDEBUG
1963   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1964 #endif
1965   CmiSetHandler(env, _charmHandlerIdx);
1966   CpvAccess(_qd)->create();
1967 }
1968
1969 extern "C"
1970 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1971   register envelope *env = UsrToEnv(msg);
1972   env->getsetArrayMgr()=aID;
1973   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1974   _CldEnqueue(pe, env, _infoIdx);
1975 }
1976
1977 extern "C"
1978 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1979   register envelope *env = UsrToEnv(msg);
1980   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1981 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1982    sendTicketArrayRequest(env,pe,_infoIdx);
1983 #else
1984   if (opts & CK_MSG_IMMEDIATE)
1985     CmiBecomeImmediate(env);
1986   if (opts & CK_MSG_SKIP_OR_IMM)
1987     _noCldEnqueue(pe, env);
1988   else
1989     _skipCldEnqueue(pe, env, _infoIdx);
1990 #endif
1991 }
1992
1993 class ElementDestroyer : public CkLocIterator {
1994 private:
1995         CkLocMgr *locMgr;
1996 public:
1997         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1998         void addLocation(CkLocation &loc) {
1999           loc.destroyAll();
2000         }
2001 };
2002
2003 void CkDeleteChares() {
2004   int i;
2005   int numGroups = CkpvAccess(_groupIDTable)->size();
2006
2007   // delete all plain chares
2008 #ifndef CMK_CHARE_USE_PTR
2009   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2010         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2011         delete obj;
2012         CkpvAccess(chare_objs)[i] = NULL;
2013   }
2014   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2015         VidBlock *obj = CkpvAccess(vidblocks)[i];
2016         delete obj;
2017         CkpvAccess(vidblocks)[i] = NULL;
2018   }
2019 #endif
2020
2021   // delete all array elements
2022   for(i=0;i<numGroups;i++) {
2023     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2024     if(obj && obj->isLocMgr())  {
2025       CkLocMgr *mgr = (CkLocMgr*)obj;
2026       ElementDestroyer destroyer(mgr);
2027       mgr->iterate(destroyer);
2028     }
2029   }
2030
2031   // delete all groups
2032   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2033   for(i=0;i<numGroups;i++) {
2034     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2035     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2036     if (obj) delete obj;
2037   }
2038   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2039
2040   // delete all node groups
2041   if (CkMyRank() == 0) {
2042     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2043     for(i=0;i<numNodeGroups;i++) {
2044       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2045       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2046       if (obj) delete obj;
2047     }
2048   }
2049 }
2050
2051 //------------------- Message Watcher (record/replay) ----------------
2052
2053 #include "crc32.h"
2054
2055 CkpvDeclare(int, envelopeEventID);
2056 int _recplay_crc = 0;
2057 int _recplay_checksum = 0;
2058 int _recplay_logsize = 1024*1024;
2059
2060 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2061 #define REPLAYDEBUG(args) /* empty */
2062
2063 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2064
2065 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2066
2067 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2068
2069     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2070     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2071     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2072     FILE *f=fopen(fName,permissions);
2073     REPLAYDEBUG("openReplayfile "<<fName);
2074     if (f==NULL) {
2075         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2076             CkMyPe(),fName,permissions);
2077         CkAbort("openReplayFile> Could not open replay file");
2078     }
2079     return f;
2080 }
2081
2082 #include "BaseLB.h" /* For LBMigrateMsg message */
2083
2084 class CkMessageRecorder : public CkMessageWatcher {
2085   char *buffer;
2086   unsigned int curpos;
2087   bool firstOpen;
2088 public:
2089   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2090   ~CkMessageRecorder() {
2091     flushLog(0);
2092     fprintf(f,"-1 -1 -1 ");
2093     fclose(f);
2094     delete[] buffer;
2095 #if 0
2096     FILE *stsfp = fopen("sts", "w");
2097     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2098     traceWriteSTS(stsfp, 0);
2099     fclose(stsfp);
2100 #endif
2101     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2102   }
2103
2104 private:
2105   void flushLog(int verbose=1) {
2106     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2107     fprintf(f, "%s", buffer);
2108     curpos=0;
2109   }
2110   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2111     if ((*envptr)->getEvent()) {
2112       bool wasPacked = (*envptr)->isPacked();
2113       if (!wasPacked) CkPackMessage(envptr);
2114       envelope *env = *envptr;
2115       unsigned int crc1=0, crc2=0;
2116       if (_recplay_crc) {
2117         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2118         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2119         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2120       } else if (_recplay_checksum) {
2121         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2122         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2123       }
2124       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2125       if (curpos > _recplay_logsize-128) flushLog();
2126       if (!wasPacked) CkUnpackMessage(envptr);
2127     }
2128     return CmiTrue;
2129   }
2130   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2131     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2132     if (curpos > _recplay_logsize-128) flushLog();
2133     return CmiTrue;
2134   }
2135   
2136   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2137     FILE *f;
2138     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2139     else f = openReplayFile("ckreplay_",".lb","a");
2140     firstOpen = false;
2141     if (f != NULL) {
2142       PUP::toDisk p(f);
2143       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2144       (*msg)->pup(p);
2145       fclose(f);
2146     }
2147     return CmiTrue;
2148   }
2149 };
2150
2151 class CkMessageDetailRecorder : public CkMessageWatcher {
2152 public:
2153   CkMessageDetailRecorder(FILE *f_) {
2154     f=f_;
2155     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2156      * The value of 'x' is the pointer size.
2157      */
2158     CmiUInt2 little = sizeof(void*);
2159     fwrite(&little, 2, 1, f);
2160   }
2161   ~CkMessageDetailRecorder() {fclose(f);}
2162 private:
2163   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2164     bool wasPacked = (*envptr)->isPacked();
2165     if (!wasPacked) CkPackMessage(envptr);
2166     envelope *env = *envptr;
2167     CmiUInt4 size = env->getTotalsize();
2168     fwrite(&size, 4, 1, f);
2169     fwrite(env, env->getTotalsize(), 1, f);
2170     if (!wasPacked) CkUnpackMessage(envptr);
2171     return CmiTrue;
2172   }
2173 };
2174
2175 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2176 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2177
2178 #if CMK_BIGSIM_CHARM
2179 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2180                                    int pb,unsigned int *prio);
2181 #endif
2182
2183 class CkMessageReplay : public CkMessageWatcher {
2184   int counter;
2185         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2186         int nextEP;
2187         unsigned int crc1, crc2;
2188         FILE *lbFile;
2189         /// Read the next message we need from the file:
2190         void getNext(void) {
2191           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2192           if (nextSize > 0) {
2193             // We are reading a regular message
2194             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2195               CkAbort("CkMessageReplay> Syntax error reading replay file");
2196             }
2197             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2198           } else if (nextSize == -2) {
2199             // We are reading a special message (right now only thread awaken)
2200             // Nothing to do since we have already read all info
2201             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2202           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2203             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2204             CkAbort("CkMessageReplay> Unrecognized input");
2205           }
2206             /*
2207                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2208                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2209                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2210                 }
2211                 */
2212                 counter++;
2213         }
2214         /// If this is the next message we need, advance and return CmiTrue.
2215         CmiBool isNext(envelope *env) {
2216                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2217                 if (nextEvent!=env->getEvent()) return CmiFalse;
2218                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2219 #if 1
2220                 if (nextEP != env->getEpIdx()) {
2221                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2222                         return CmiFalse;
2223                 }
2224 #endif
2225 #if ! CMK_BIGSIM_CHARM
2226                 if (nextSize!=env->getTotalsize())
2227                 {
2228                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2229                         return CmiFalse;
2230                 }
2231                 if (_recplay_crc || _recplay_checksum) {
2232                   bool wasPacked = env->isPacked();
2233                   if (!wasPacked) CkPackMessage(&env);
2234                   if (_recplay_crc) {
2235                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2236                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2237                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2238                     if (crcnew1 != crc1) {
2239                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2240                     }
2241                     if (crcnew2 != crc2) {
2242                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2243                     }
2244                   } else if (_recplay_checksum) {
2245             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2246             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2247             if (crcnew1 != crc1) {
2248               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2249             }
2250             if (crcnew2 != crc2) {
2251               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2252             }               
2253                   }
2254                   if (!wasPacked) CkUnpackMessage(&env);
2255                 }
2256 #endif
2257                 return CmiTrue;
2258         }
2259         CmiBool isNext(CthThreadToken *token) {
2260           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2261           return CmiFalse;
2262         }
2263
2264         /// This is a (short) list of messages we aren't yet ready for:
2265         CkQ<envelope *> delayedMessages;
2266         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2267         CkQ<CthThreadToken *> delayedTokens;
2268
2269         /// Try to flush out any delayed messages
2270         void flush(void) {
2271           if (nextSize>0) {
2272                 int len=delayedMessages.length();
2273                 for (int i=0;i<len;i++) {
2274                         envelope *env=delayedMessages.deq();
2275                         if (isNext(env)) { /* this is the next message: process it */
2276                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2277                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2278                                 return;
2279                         }
2280                         else /* Not ready yet-- put it back in the
2281                                 queue */
2282                           {
2283                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2284                                 delayedMessages.enq(env);
2285                           }
2286                 }
2287           } else if (nextSize==-2) {
2288             int len=delayedTokens.length();
2289             for (int i=0;i<len;++i) {
2290               CthThreadToken *token=delayedTokens.deq();
2291               if (isNext(token)) {
2292             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2293 #if ! CMK_BIGSIM_CHARM
2294                 CsdEnqueueLifo((void*)token);
2295 #else
2296                 CthEnqueueBigSimThread(token,0,0,NULL);
2297 #endif
2298                 return;
2299               } else {
2300             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2301                 delayedTokens.enq(token);
2302               }
2303             }
2304           }
2305         }
2306
2307 public:
2308         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2309           counter=0;
2310           f=f_;
2311           getNext();
2312           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2313           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2314         }
2315         ~CkMessageReplay() {fclose(f);}
2316
2317 private:
2318         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2319           bool wasPacked = (*envptr)->isPacked();
2320           if (!wasPacked) CkPackMessage(envptr);
2321           envelope *env = *envptr;
2322           //CkAssert(*(int*)env == 0x34567890);
2323           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2324                 if (env->getEvent() == 0) return CmiTrue;
2325                 if (isNext(env)) { /* This is the message we were expecting */
2326                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2327                         getNext(); /* Advance over this message */
2328                         flush(); /* try to process queued-up stuff */
2329                         if (!wasPacked) CkUnpackMessage(envptr);
2330                         return CmiTrue;
2331                 }
2332 #if CMK_SMP
2333                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2334                          // try next rank, we can't just buffer the msg and left
2335                          // we need to keep unprocessed msg on the fly
2336                         int nextpe = CkMyPe()+1;
2337                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2338                         nextpe = CkNodeFirst(CkMyNode());
2339                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2340                         return CmiFalse;
2341                 }
2342 #endif
2343                 else /*!isNext(env) */ {
2344                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2345                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2346                         delayedMessages.enq(env);
2347                         flush();
2348                         return CmiFalse;
2349                 }
2350         }
2351         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2352       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2353           if (isNext(token)) {
2354         REPLAYDEBUG("Executing token: "<<token->serialNo)
2355             getNext();
2356             flush();
2357             return CmiTrue;
2358           } else {
2359         REPLAYDEBUG("Queueing token: "<<token->serialNo
2360             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2361             delayedTokens.enq(token);
2362             return CmiFalse;
2363           }
2364         }
2365
2366         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2367           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2368           if (lbFile != NULL) {
2369             int num_moves;
2370         PUP::fromDisk p(lbFile);
2371             p | num_moves;
2372             if (num_moves != (*msg)->n_moves) {
2373               delete *msg;
2374               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2375             }
2376             (*msg)->pup(p);
2377           }
2378           return CmiTrue;
2379         }
2380 };
2381
2382 class CkMessageDetailReplay : public CkMessageWatcher {
2383   void *getNext() {
2384     CmiUInt4 size; size_t nread;
2385     if ((nread=fread(&size, 4, 1, f)) < 1) {
2386       if (feof(f)) return NULL;
2387       CkPrintf("Broken record file (metadata) got %d\n",nread);
2388       CkAbort("");
2389     }
2390     void *env = CmiAlloc(size);
2391     long tell = ftell(f);
2392     if ((nread=fread(env, size, 1, f)) < 1) {
2393       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2394       CkAbort("");
2395     }
2396     //*(int*)env = 0x34567890; // set first integer as magic
2397     return env;
2398   }
2399 public:
2400   double starttime;
2401   CkMessageDetailReplay(FILE *f_) {
2402     f=f_;
2403     starttime=CkWallTimer();
2404     /* This must match what CkMessageDetailRecorder did */
2405     CmiUInt2 little;
2406     fread(&little, 2, 1, f);
2407     if (little != sizeof(void*)) {
2408       CkAbort("Replaying on a different architecture from which recording was done!");
2409     }
2410
2411     CsdEnqueue(getNext());
2412
2413     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2414   }
2415   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2416     void *msg = getNext();
2417     if (msg != NULL) CsdEnqueue(msg);
2418     return CmiTrue;
2419   }
2420 };
2421
2422 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2423 #if ! CMK_BIGSIM_CHARM
2424   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2425 #endif
2426   CkMessageReplay *replay = (CkMessageReplay*)rep;
2427   //CmiStartQD(CkMessageReplayQuiescence, replay);
2428 }
2429
2430 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2431   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2432   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2433   ConverseExit();
2434 }
2435
2436 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2437   CkCoreState *ck = CkpvAccess(_coreState);
2438   if (ck->watcher!=NULL) {
2439     return ck->watcher->processThread(token,ck);
2440   }
2441   return CmiTrue;
2442 }
2443
2444 CpvCExtern(int, CthResumeNormalThreadIdx);
2445 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2446 {
2447   CthThread t = token->thread;
2448
2449   if(t == NULL){
2450     free(token);
2451     return;
2452   }
2453 #if CMK_TRACE_ENABLED
2454 #if ! CMK_TRACE_IN_CHARM
2455   if(CpvAccess(traceOn))
2456     CthTraceResume(t);
2457 /*    if(CpvAccess(_traceCoreOn)) 
2458             resumeTraceCore();*/
2459 #endif
2460 #endif
2461   
2462   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2463   if (CpdExecuteThreadResume(token)) {
2464     CthResume(t);
2465   }
2466 }
2467
2468 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2469   CkCoreState *ck = CkpvAccess(_coreState);
2470   if (ck->watcher!=NULL) {
2471     ck->watcher->processLBMessage(msg, ck);
2472   }
2473 }
2474
2475 #if CMK_BIGSIM_CHARM
2476 CpvExtern(int      , CthResumeBigSimThreadIdx);
2477 #endif
2478
2479 #include "ckliststring.h"
2480 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2481     CmiArgGroup("Charm++","Record/Replay");
2482     CmiBool forceReplay = CmiFalse;
2483     char *procs = NULL;
2484     _replaySystem = 0;
2485     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2486       _recplay_crc = 1;
2487     }
2488     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2489       _recplay_checksum = 1;
2490     }
2491     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2492     REPLAYDEBUG("CkMessageWatcherInit ");
2493     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2494         CkListString list(procs);
2495         if (list.includes(CkMyPe())) {
2496           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2497           CpdSetInitializeMemory(1);
2498           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2499         }
2500     }
2501     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2502       if (CkMyPe() == 0) {
2503         CmiPrintf("Charm++> record mode.\n");
2504         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2505           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2506           _recplay_crc = _recplay_checksum = 0;
2507         }
2508       }
2509       CpdSetInitializeMemory(1);
2510       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2511       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2512     }
2513         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2514             forceReplay = CmiTrue;
2515             CpdSetInitializeMemory(1);
2516             // Set the parameters of the processor
2517 #if CMK_SHARED_VARS_UNAVAILABLE
2518             _Cmi_mype = atoi(procs);
2519             while (procs[0]!='/') procs++;
2520             procs++;
2521             _Cmi_numpes = atoi(procs);
2522 #else
2523             CkAbort("+replay-detail available only for non-SMP build");
2524 #endif
2525             _replaySystem = 1;
2526             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2527         }
2528         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2529           if (CkMyPe() == 0)  {
2530             CmiPrintf("Charm++> replay mode.\n");
2531             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2532               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2533               _recplay_crc = _recplay_checksum = 0;
2534             }
2535           }
2536           CpdSetInitializeMemory(1);
2537 #if ! CMK_BIGSIM_CHARM
2538           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2539 #else
2540           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2541 #endif
2542           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2543         }
2544         if (_recplay_crc && _recplay_checksum) {
2545           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2546         }
2547 }
2548
2549 extern "C"
2550 int CkMessageToEpIdx(void *msg) {
2551         envelope *env=UsrToEnv(msg);
2552         int ep=env->getEpIdx();
2553         if (ep==CkIndex_CkArray::recvBroadcast(0))
2554                 return env->getsetArrayBcastEp();
2555         else
2556                 return ep;
2557 }
2558
2559 extern "C"
2560 int getCharmEnvelopeSize() {
2561   return sizeof(envelope);
2562 }
2563
2564
2565 #include "CkMarshall.def.h"
2566