fixed a race condition when an insert element message arrives before CkArray is created
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 extern "C"
480 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
481 {
482   UsrToEnv(msg)->setRef(ref);
483 }
484
485 extern "C"
486 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
487 {
488   return UsrToEnv(msg)->getRef();
489 }
490
491 extern "C"
492 int CkGetSrcPe(void *msg)
493 {
494   return UsrToEnv(msg)->getSrcPe();
495 }
496
497 extern "C"
498 int CkGetSrcNode(void *msg)
499 {
500   return CmiNodeOf(CkGetSrcPe(msg));
501 }
502
503 extern "C"
504 void *CkLocalBranch(CkGroupID gID) {
505   return _localBranch(gID);
506 }
507
508 static
509 void *_ckLocalNodeBranch(CkGroupID groupID) {
510   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
511   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
512   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
513   return retval;
514 }
515
516 extern "C"
517 void *CkLocalNodeBranch(CkGroupID groupID)
518 {
519   void *retval;
520   // we are called in a constructor
521   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
522     return CkpvAccess(_currentNodeGroupObj);
523   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
524   { // Nodegroup hasn't finished being created yet-- schedule...
525     CsdScheduler(0);
526   }
527   return retval;
528 }
529
530 extern "C"
531 void *CkLocalChare(const CkChareID *pCid)
532 {
533         int pe=pCid->onPE;
534         if (pe<0) { //A virtual chare ID
535                 if (pe!=(-(CkMyPe()+1)))
536                         return NULL;//VID block not on this PE
537 #ifdef CMK_CHARE_USE_PTR
538                 VidBlock *v=(VidBlock *)pCid->objPtr;
539 #else
540                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
541 #endif
542                 return v->getLocalChareObj();
543         }
544         else
545         { //An ordinary chare ID
546                 if (pe!=CkMyPe())
547                         return NULL;//Chare not on this PE
548 #ifdef CMK_CHARE_USE_PTR
549                 return pCid->objPtr;
550 #else
551                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
552 #endif
553         }
554 }
555
556 CkpvDeclare(char**,Ck_argv);
557
558 extern "C" char **CkGetArgv(void) {
559         return CkpvAccess(Ck_argv);
560 }
561 extern "C" int CkGetArgc(void) {
562         return CmiGetArgc(CkpvAccess(Ck_argv));
563 }
564
565 /******************** Basic support *****************/
566 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
567 {
568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
569         CpvAccess(_currentObj) = (Chare *)obj;
570 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
571 #endif
572   //BIGSIM_OOC DEBUGGING
573   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574   //fflush(stdout);
575 #if CMK_CHARMDEBUG
576   CpdBeforeEp(epIdx, obj, msg);
577 #endif    
578   _entryTable[epIdx]->call(msg, obj);
579 #if CMK_CHARMDEBUG
580   CpdAfterEp(epIdx);
581 #endif
582   if (_entryTable[epIdx]->noKeep)
583   { /* Method doesn't keep/delete the message, so we have to: */
584     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585   }
586 }
587 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
588 {
589   //BIGSIM_OOC DEBUGGING
590   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
591   //fflush(stdout);
592
593   void *deliverMsg;
594 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
595         CpvAccess(_currentObj) = (Chare *)obj;
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Deliver a read-only copy of the message */
599     deliverMsg=(void *)msg;
600   } else
601   { /* Method needs a copy of the message to keep/delete */
602     void *oldMsg=(void *)msg;
603     deliverMsg=CkCopyMsg(&oldMsg);
604 #if CMK_ERROR_CHECKING
605     if (oldMsg!=msg)
606       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
607 #endif
608   }
609 #if CMK_CHARMDEBUG
610   CpdBeforeEp(epIdx, obj, (void*)msg);
611 #endif
612   _entryTable[epIdx]->call(deliverMsg, obj);
613 #if CMK_CHARMDEBUG
614   CpdAfterEp(epIdx);
615 #endif
616 }
617
618 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
619 {
620   register void *msg = EnvToUsr(env);
621   _SET_USED(env, 0);
622   CkDeliverMessageFree(epIdx,msg,obj);
623 }
624
625 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 {
627
628 #if CMK_TRACE_ENABLED 
629   if (_entryTable[epIdx]->traceEnabled) {
630     _TRACE_BEGIN_EXECUTE(env);
631     _invokeEntryNoTrace(epIdx,env,obj);
632     _TRACE_END_EXECUTE();
633   }
634   else
635 #endif
636     _invokeEntryNoTrace(epIdx,env,obj);
637
638 }
639
640 /********************* Creation ********************/
641
642 extern "C"
643 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
644 {
645   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
646   envelope *env = UsrToEnv(msg);
647   _CHECK_USED(env);
648   if(pCid == 0) {
649     env->setMsgtype(NewChareMsg);
650   } else {
651     pCid->onPE = (-(CkMyPe()+1));
652     //  pCid->magic = _GETIDX(cIdx);
653     pCid->objPtr = (void *) new VidBlock();
654     _MEMCHECK(pCid->objPtr);
655     env->setMsgtype(NewVChareMsg);
656     env->setVidPtr(pCid->objPtr);
657 #ifndef CMK_CHARE_USE_PTR
658     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
659     int idx = CkpvAccess(vidblocks).size()-1;
660     pCid->objPtr = (void *)(CmiIntPtr)idx;
661     env->setVidPtr((void *)(CmiIntPtr)idx);
662 #endif
663   }
664   env->setEpIdx(eIdx);
665   env->setByPe(CkMyPe());
666   env->setSrcPe(CkMyPe());
667   CmiSetHandler(env, _charmHandlerIdx);
668   _TRACE_CREATION_1(env);
669   CpvAccess(_qd)->create();
670   _STATS_RECORD_CREATE_CHARE_1();
671   _SET_USED(env, 1);
672   if(destPE == CK_PE_ANY)
673     env->setForAnyPE(1);
674   else
675     env->setForAnyPE(0);
676   _CldEnqueue(destPE, env, _infoIdx);
677   _TRACE_CREATION_DONE(1);
678 }
679
680 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
681 {
682   register int gIdx = _entryTable[epIdx]->chareIdx;
683   register void *obj = malloc(_chareTable[gIdx]->size);
684   _MEMCHECK(obj);
685   setMemoryTypeChare(obj);
686   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
687   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
688   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
689   CkpvAccess(_groupIDTable)->push_back(groupID);
690   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
691   if(ptrq) {
692     void *pending;
693     while((pending=ptrq->deq())!=0)
694       _CldEnqueue(CkMyPe(), pending, _infoIdx);
695 //    delete ptrq;
696       CkpvAccess(_groupTable)->find(groupID).clearPending();
697   }
698   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
699
700   CkpvAccess(_currentGroup) = groupID;
701   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
702 #ifndef CMK_CHARE_USE_PTR
703   //((Chare *)obj)->chareIdx = -1;
704   CkpvAccess(currentChareIdx) = -1;
705 #endif
706   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
707   _STATS_RECORD_PROCESS_GROUP_1();
708 }
709
710 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
711 {
712   register int gIdx = _entryTable[epIdx]->chareIdx;
713   int objSize=_chareTable[gIdx]->size;
714   register void *obj = malloc(objSize);
715   _MEMCHECK(obj);
716   setMemoryTypeChare(obj);
717   CkpvAccess(_currentGroup) = groupID;
718
719 // Now that the NodeGroup is created, add it to the table.
720 //  NodeGroups can be accessed by multiple processors, so
721 //  this is in the opposite order from groups - invoking the constructor
722 //  before registering it.
723 // User may call CkLocalNodeBranch() inside the nodegroup constructor
724 //  store nodegroup into _currentNodeGroupObj
725   CkpvAccess(_currentNodeGroupObj) = obj;
726 #ifndef CMK_CHARE_USE_PTR
727   //((Chare *)obj)->chareIdx = -1;
728   CkpvAccess(currentChareIdx) = -1;
729 #endif
730   _invokeEntryNoTrace(epIdx,env,obj);
731   CkpvAccess(_currentNodeGroupObj) = NULL;
732   _STATS_RECORD_PROCESS_NODE_GROUP_1();
733
734   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
735   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
736   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
737   CksvAccess(_nodeGroupIDTable).push_back(groupID);
738
739   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
740   if(ptrq) {
741     void *pending;
742     while((pending=ptrq->deq())!=0)
743       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
744 //    delete ptrq;
745       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
746   }
747   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
748 }
749
750 void _createGroup(CkGroupID groupID, envelope *env)
751 {
752   _CHECK_USED(env);
753   _SET_USED(env, 1);
754   register int epIdx = env->getEpIdx();
755   int gIdx = _entryTable[epIdx]->chareIdx;
756   CkNodeGroupID rednMgr;
757   if(_chareTable[gIdx]->isIrr == 0){
758                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
759                 rednMgr = rednMgrProxy;
760 //              rednMgrProxy.setAttachedGroup(groupID);
761   }else{
762         rednMgr.setZero();
763   }
764   env->setGroupNum(groupID);
765   env->setSrcPe(CkMyPe());
766   env->setRednMgr(rednMgr);
767   env->setGroupEpoch(CkpvAccess(_charmEpoch));
768
769   if(CkNumPes()>1) {
770     CkPackMessage(&env);
771     CmiSetHandler(env, _bocHandlerIdx);
772     _numInitMsgs++;
773     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
774     CpvAccess(_qd)->create(CkNumPes()-1);
775     CkUnpackMessage(&env);
776   }
777   _STATS_RECORD_CREATE_GROUP_1();
778   CkCreateLocalGroup(groupID, epIdx, env);
779 }
780
781 void _createNodeGroup(CkGroupID groupID, envelope *env)
782 {
783   _CHECK_USED(env);
784   _SET_USED(env, 1);
785   register int epIdx = env->getEpIdx();
786   env->setGroupNum(groupID);
787   env->setSrcPe(CkMyPe());
788   env->setGroupEpoch(CkpvAccess(_charmEpoch));
789   if(CkNumNodes()>1) {
790     CkPackMessage(&env);
791     CmiSetHandler(env, _bocHandlerIdx);
792     _numInitMsgs++;
793     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
794     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
795     CpvAccess(_qd)->create(CkNumNodes()-1);
796     CkUnpackMessage(&env);
797   }
798   _STATS_RECORD_CREATE_NODE_GROUP_1();
799   CkCreateLocalNodeGroup(groupID, epIdx, env);
800 }
801
802 // new _groupCreate
803
804 static CkGroupID _groupCreate(envelope *env)
805 {
806   register CkGroupID groupNum;
807
808   // check CkMyPe(). if it is 0 then idx is _numGroups++
809   // if not, then something else...
810   if(CkMyPe() == 0)
811      groupNum.idx = CkpvAccess(_numGroups)++;
812   else
813      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
814   _createGroup(groupNum, env);
815   return groupNum;
816 }
817
818 // new _nodeGroupCreate
819 static CkGroupID _nodeGroupCreate(envelope *env)
820 {
821   register CkGroupID groupNum;
822   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
823   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
824           groupNum.idx = CksvAccess(_numNodeGroups)++;
825    else
826           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
827   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
828   _createNodeGroup(groupNum, env);
829   return groupNum;
830 }
831
832 /**** generate the group idx when group is creator pe is not pe0
833  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
834  **** remaining bits contain the group creator processor number and
835  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
836
837 int _getGroupIdx(int numNodes,int myNode,int numGroups)
838 {
839         int idx;
840         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
841         int n = 32 - (x+1);                                     // number of bits remaining for the index
842         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
843                                                                 // then add the next available index
844         // of course this won't work when int is 8 bytes long on T3E
845         //idx |= 0x80000000;                                      // set the most significant bit to 1
846         idx = - idx;
847                                                                 // if int is not 32 bits, wouldn't this be wrong?
848         return idx;
849 }
850
851 extern "C"
852 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
853 {
854   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
855   register envelope *env = UsrToEnv(msg);
856   env->setMsgtype(BocInitMsg);
857   env->setEpIdx(eIdx);
858   env->setSrcPe(CkMyPe());
859   _TRACE_CREATION_N(env, CkNumPes());
860   CkGroupID gid = _groupCreate(env);
861   _TRACE_CREATION_DONE(1);
862   return gid;
863 }
864
865 extern "C"
866 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
867 {
868   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
869   register envelope *env = UsrToEnv(msg);
870   env->setMsgtype(NodeBocInitMsg);
871   env->setEpIdx(eIdx);
872   env->setSrcPe(CkMyPe());
873   _TRACE_CREATION_N(env, CkNumNodes());
874   CkGroupID gid = _nodeGroupCreate(env);
875   _TRACE_CREATION_DONE(1);
876   return gid;
877 }
878
879 static inline void *_allocNewChare(envelope *env, int &idx)
880 {
881   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
882   void *tmp=malloc(_chareTable[chareIdx]->size);
883   _MEMCHECK(tmp);
884 #ifndef CMK_CHARE_USE_PTR
885   CkpvAccess(chare_objs).push_back(tmp);
886   CkpvAccess(chare_types).push_back(chareIdx);
887   idx = CkpvAccess(chare_objs).size()-1;
888 #endif
889   setMemoryTypeChare(tmp);
890   return tmp;
891 }
892
893 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
894 {
895   int idx;
896   register void *obj = _allocNewChare(env, idx);
897 #ifndef CMK_CHARE_USE_PTR
898   //((Chare *)obj)->chareIdx = idx;
899   CkpvAccess(currentChareIdx) = idx;
900 #endif
901   _invokeEntry(env->getEpIdx(),env,obj);
902 }
903
904 void CkCreateLocalChare(int epIdx, envelope *env)
905 {
906   env->setEpIdx(epIdx);
907   _processNewChareMsg(NULL, env);
908 }
909
910 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
911 {
912   int idx;
913   register void *obj = _allocNewChare(env, idx);
914   register CkChareID *pCid = (CkChareID *)
915       _allocMsg(FillVidMsg, sizeof(CkChareID));
916   pCid->onPE = CkMyPe();
917 #ifndef CMK_CHARE_USE_PTR
918   pCid->objPtr = (void*)(CmiIntPtr)idx;
919 #else
920   pCid->objPtr = obj;
921 #endif
922   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
923   register envelope *ret = UsrToEnv(pCid);
924   ret->setVidPtr(env->getVidPtr());
925   register int srcPe = env->getByPe();
926   ret->setSrcPe(CkMyPe());
927   CmiSetHandler(ret, _charmHandlerIdx);
928   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
929 #ifndef CMK_CHARE_USE_PTR
930   // register the remote vidblock for deletion when chare is deleted
931   CkChareID vid;
932   vid.onPE = srcPe;
933   vid.objPtr = env->getVidPtr();
934   CkpvAccess(vmap)[idx] = vid;    
935 #endif
936   CpvAccess(_qd)->create();
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
942 }
943
944 /************** Receive: Chares *************/
945
946 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
947 {
948   register int epIdx = env->getEpIdx();
949   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
950   register void *obj;
951   if (mainIdx != -1)  {           // mainchare
952     CmiAssert(CkMyPe()==0);
953     obj = _mainTable[mainIdx]->getObj();
954   }
955   else {
956 #ifndef CMK_CHARE_USE_PTR
957     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
958       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
959     else
960       obj = env->getObjPtr();
961 #else
962     obj = env->getObjPtr();
963 #endif
964   }
965   _invokeEntry(epIdx,env,obj);
966 }
967
968 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
969 {
970   register int epIdx = env->getEpIdx();
971   register void *obj = env->getObjPtr();
972   _invokeEntry(epIdx,env,obj);
973 }
974
975 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
976 {
977 #ifndef CMK_CHARE_USE_PTR
978   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
979 #else
980   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
981   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
982 #endif
983   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
984   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
985   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
986   CmiFree(env);
987 }
988
989 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
990 {
991 #ifndef CMK_CHARE_USE_PTR
992   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
993 #else
994   VidBlock *vptr = (VidBlock *) env->getVidPtr();
995   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
996 #endif
997   _SET_USED(env, 1);
998   vptr->send(env);
999 }
1000
1001 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1002 {
1003 #ifndef CMK_CHARE_USE_PTR
1004   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1005   delete vptr;
1006   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1007 #endif
1008   CmiFree(env);
1009 }
1010
1011 /************** Receive: Groups ****************/
1012
1013 /**
1014  Return a pointer to the local BOC of "groupID".
1015  The message "env" passed in has some known dependency on this groupID
1016  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1017  Therefore, if the return value is NULL, this function buffers the massage so that
1018  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1019  The message passed in must have its handlers correctly set so that it can be
1020  scheduled again.
1021 */
1022 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1023 {
1024
1025         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1026         IrrGroup *obj = ck->localBranch(groupID);
1027         if (obj==NULL) { /* groupmember not yet created: stash message */
1028                 ck->getGroupTable()->find(groupID).enqMsg(env);
1029         }
1030         else { /* will be able to process message */
1031                 ck->process();
1032         }
1033         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1034         return obj;
1035 }
1036
1037 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1038 {
1039   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1040 }
1041
1042 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1043 {
1044 #if CMK_LBDB_ON
1045   // if there is a running obj being measured, stop it temporarily
1046   LDObjHandle objHandle;
1047   int objstopped = 0;
1048   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1049   if (the_lbdb->RunningObject(&objHandle)) {
1050     objstopped = 1;
1051     the_lbdb->ObjectStop(objHandle);
1052   }
1053 #endif
1054   _invokeEntry(epIdx,env,obj);
1055 #if CMK_LBDB_ON
1056   if (objstopped) the_lbdb->ObjectStart(objHandle);
1057 #endif
1058   _STATS_RECORD_PROCESS_BRANCH_1();
1059 }
1060
1061 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1062 {
1063   register CkGroupID groupID =  env->getGroupNum();
1064   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1065   if(obj) {
1066     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1067   }
1068 }
1069
1070 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1071 {
1072   env->setMsgtype(ForChareMsg);
1073   env->setObjPtr(obj);
1074   _processForChareMsg(ck,env);
1075   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1076 }
1077
1078 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1079 {
1080   env->setEpIdx(epIdx);
1081   _deliverForNodeBocMsg(ck,env, obj);
1082 }
1083
1084 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1085 {
1086   register CkGroupID groupID = env->getGroupNum();
1087   register void *obj;
1088
1089   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1090   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1091   if(!obj) { // groupmember not yet created
1092 #if CMK_IMMEDIATE_MSG
1093     if (CmiIsImmediate(env))     // buffer immediate message
1094       CmiDelayImmediate();
1095     else
1096 #endif
1097     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1098     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1099     return;
1100   }
1101   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1102 #if CMK_IMMEDIATE_MSG
1103   if (!CmiIsImmediate(env))
1104 #endif
1105   ck->process();
1106   env->setMsgtype(ForChareMsg);
1107   env->setObjPtr(obj);
1108   _processForChareMsg(ck,env);
1109   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1110 }
1111
1112 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1113 {
1114   register CkGroupID groupID = env->getGroupNum();
1115   register int epIdx = env->getEpIdx();
1116   if (!env->getGroupDep().isZero()) {      // dependence
1117     CkGroupID dep = env->getGroupDep();
1118     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1119     if (obj == NULL) return;
1120   }
1121   else
1122     ck->process();
1123   CkCreateLocalGroup(groupID, epIdx, env);
1124 }
1125
1126 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1127 {
1128   register CkGroupID groupID = env->getGroupNum();
1129   register int epIdx = env->getEpIdx();
1130   CkCreateLocalNodeGroup(groupID, epIdx, env);
1131 }
1132
1133 /************** Receive: Arrays *************/
1134
1135 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1136   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1137   if (mgr) {
1138     _SET_USED(env, 0);
1139     mgr->insertElement((CkMessage *)EnvToUsr(env));
1140   }
1141 }
1142 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1143   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1144   if (mgr) {
1145     _SET_USED(env, 0);
1146     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1147   }
1148 }
1149
1150 //BIGSIM_OOC DEBUGGING
1151 #define TELLMSGTYPE(x) //x
1152
1153 /**
1154  * This is the main converse-level handler used by all of Charm++.
1155  *
1156  * \addtogroup CriticalPathFramework
1157  */
1158 void _processHandler(void *converseMsg,CkCoreState *ck)
1159 {
1160   register envelope *env = (envelope *) converseMsg;
1161
1162   MESSAGE_PHASE_CHECK(env);
1163
1164 //#if CMK_RECORD_REPLAY
1165   if (ck->watcher!=NULL) {
1166     if (!ck->watcher->processMessage(&env,ck)) return;
1167   }
1168 //#endif
1169 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1170         Chare *obj=NULL;
1171         CkObjID sender;
1172         MCount SN;
1173         MlogEntry *entry=NULL;
1174         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1175         env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForChareMsg){
1176                 sender = env->sender;
1177                 SN = env->SN;
1178                 int result = preProcessReceivedMessage(env,&obj,&entry);
1179                 if(result == 0){
1180                         return;
1181                 }
1182         }
1183 #endif
1184
1185 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1186   //  CkPrintf("START\n");
1187   criticalPath_start(env);
1188 #endif
1189
1190
1191   switch(env->getMsgtype()) {
1192 // Group support
1193     case BocInitMsg :
1194       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1195       // QD processing moved inside _processBocInitMsg because it is conditional
1196       //ck->process(); 
1197       if(env->isPacked()) CkUnpackMessage(&env);
1198       _processBocInitMsg(ck,env);
1199       break;
1200     case NodeBocInitMsg :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1202       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1203       _processNodeBocInitMsg(ck,env);
1204       break;
1205     case ForBocMsg :
1206       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1207       // QD processing moved inside _processForBocMsg because it is conditional
1208       if(env->isPacked()) CkUnpackMessage(&env);
1209       _processForBocMsg(ck,env);
1210       // stats record moved inside _processForBocMsg because it is conditional
1211       break;
1212     case ForNodeBocMsg :
1213       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1214       // QD processing moved to _processForNodeBocMsg because it is conditional
1215       if(env->isPacked()) CkUnpackMessage(&env);
1216       _processForNodeBocMsg(ck,env);
1217       // stats record moved to _processForNodeBocMsg because it is conditional
1218       break;
1219
1220 // Array support
1221     case ArrayEltInitMsg:
1222       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1223       if(env->isPacked()) CkUnpackMessage(&env);
1224       _processArrayEltInitMsg(ck,env);
1225       break;
1226     case ForArrayEltMsg:
1227       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1228       if(env->isPacked()) CkUnpackMessage(&env);
1229       _processArrayEltMsg(ck,env);
1230       break;
1231
1232 // Chare support
1233     case NewChareMsg :
1234       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1235       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1236       _processNewChareMsg(ck,env);
1237       _STATS_RECORD_PROCESS_CHARE_1();
1238       break;
1239     case NewVChareMsg :
1240       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1241       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1242       _processNewVChareMsg(ck,env);
1243       _STATS_RECORD_PROCESS_CHARE_1();
1244       break;
1245     case ForChareMsg :
1246       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1247       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1248       _processForPlainChareMsg(ck,env);
1249       _STATS_RECORD_PROCESS_MSG_1();
1250       break;
1251     case ForVidMsg   :
1252       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1253       ck->process();
1254       _processForVidMsg(ck,env);
1255       break;
1256     case FillVidMsg  :
1257       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1258       ck->process();
1259       _processFillVidMsg(ck,env);
1260       break;
1261     case DeleteVidMsg  :
1262       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1263       ck->process();
1264       _processDeleteVidMsg(ck,env);
1265       break;
1266
1267     default:
1268       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1269   }
1270 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1271         if(obj != NULL){
1272                 postProcessReceivedMessage(obj,sender,SN,entry);
1273         }
1274 #endif
1275
1276
1277 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1278   criticalPath_end();
1279   //  CkPrintf("STOP\n");
1280 #endif
1281
1282
1283 }
1284
1285
1286 /******************** Message Send **********************/
1287
1288 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1289              int *queueing, int *priobits, unsigned int **prioptr)
1290 {
1291   register envelope *env = (envelope *)converseMsg;
1292   *pfn = (CldPackFn)CkPackMessage;
1293   *len = env->getTotalsize();
1294   *queueing = env->getQueueing();
1295   *priobits = env->getPriobits();
1296   *prioptr = (unsigned int *) env->getPrioPtr();
1297 }
1298
1299 void CkPackMessage(envelope **pEnv)
1300 {
1301   register envelope *env = *pEnv;
1302   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1303     register void *msg = EnvToUsr(env);
1304     _TRACE_BEGIN_PACK();
1305     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1306     _TRACE_END_PACK();
1307     env=UsrToEnv(msg);
1308     env->setPacked(1);
1309     *pEnv = env;
1310   }
1311 }
1312
1313 void CkUnpackMessage(envelope **pEnv)
1314 {
1315   register envelope *env = *pEnv;
1316   register int msgIdx = env->getMsgIdx();
1317   if(env->isPacked()) {
1318     register void *msg = EnvToUsr(env);
1319     _TRACE_BEGIN_UNPACK();
1320     msg = _msgTable[msgIdx]->unpack(msg);
1321     _TRACE_END_UNPACK();
1322     env=UsrToEnv(msg);
1323     env->setPacked(0);
1324     *pEnv = env;
1325   }
1326 }
1327
1328 //There's no reason for most messages to go through the Cld--
1329 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1330 // Thus these accellerated versions of the Cld calls.
1331 #if CMK_OBJECT_QUEUE_AVAILABLE
1332 static int index_objectQHandler;
1333 #endif
1334 int index_tokenHandler;
1335 int index_skipCldHandler;
1336
1337 void _skipCldHandler(void *converseMsg)
1338 {
1339   register envelope *env = (envelope *)(converseMsg);
1340   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1341 #if CMK_GRID_QUEUE_AVAILABLE
1342   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1343     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1344                        env, env->getQueueing (), env->getPriobits (),
1345                        (unsigned int *) env->getPrioPtr ());
1346   } else {
1347     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1348                        env, env->getQueueing (), env->getPriobits (),
1349                        (unsigned int *) env->getPrioPtr ());
1350   }
1351 #else
1352   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1353         env, env->getQueueing(),env->getPriobits(),
1354         (unsigned int *)env->getPrioPtr());
1355 #endif
1356 }
1357
1358
1359 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1360 // Made non-static to be used by ckmessagelogging
1361 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1362 {
1363 #if CMK_CHARMDEBUG
1364   if (!ConverseDeliver(pe)) {
1365     CmiFree(env);
1366     return;
1367   }
1368 #endif
1369   if(pe == CkMyPe() ){
1370     if(!CmiNodeAlive(CkMyPe())){
1371         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1372 //      return;
1373     }
1374   }
1375   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1376 #if CMK_OBJECT_QUEUE_AVAILABLE
1377     Chare *obj = CkFindObjectPtr(env);
1378     if (obj && obj->CkGetObjQueue().queue()) {
1379       _enqObjQueue(obj, env);
1380     }
1381     else
1382 #endif
1383     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1384         env, env->getQueueing(),env->getPriobits(),
1385         (unsigned int *)env->getPrioPtr());
1386 #if CMK_PERSISTENT_COMM
1387     CmiPersistentOneSend();
1388 #endif
1389   } else {
1390     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1391       CkPackMessage(&env);
1392     int len=env->getTotalsize();
1393     CmiSetXHandler(env,CmiGetHandler(env));
1394 #if CMK_OBJECT_QUEUE_AVAILABLE
1395     CmiSetHandler(env,index_objectQHandler);
1396 #else
1397     CmiSetHandler(env,index_skipCldHandler);
1398 #endif
1399     CmiSetInfo(env,infoFn);
1400     if (pe==CLD_BROADCAST) {
1401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1402                         CmiSyncBroadcast(len, (char *)env);
1403 #else
1404                         CmiSyncBroadcastAndFree(len, (char *)env); 
1405 #endif
1406
1407 }
1408     else if (pe==CLD_BROADCAST_ALL) { 
1409 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1410                         CmiSyncBroadcastAll(len, (char *)env);
1411 #else
1412                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1413 #endif
1414
1415 }
1416     else{
1417 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1418                         CmiSyncSend(pe, len, (char *)env);
1419 #else
1420                         CmiSyncSendAndFree(pe, len, (char *)env);
1421 #endif
1422
1423                 }
1424   }
1425 }
1426
1427 #if CMK_BIGSIM_CHARM
1428 #   define  _skipCldEnqueue   _CldEnqueue
1429 #endif
1430
1431 // by pass Charm++ priority queue, send as Converse message
1432 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1433 {
1434 #if CMK_CHARMDEBUG
1435   if (!ConverseDeliver(-1)) {
1436     CmiFree(env);
1437     return;
1438   }
1439 #endif
1440   CkPackMessage(&env);
1441   int len=env->getTotalsize();
1442   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1443 }
1444
1445 static void _noCldEnqueue(int pe, envelope *env)
1446 {
1447 /*
1448   if (pe == CkMyPe()) {
1449     CmiHandleMessage(env);
1450   } else
1451 */
1452 #if CMK_CHARMDEBUG
1453   if (!ConverseDeliver(pe)) {
1454     CmiFree(env);
1455     return;
1456   }
1457 #endif
1458   CkPackMessage(&env);
1459   int len=env->getTotalsize();
1460   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1461   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1462   else CmiSyncSendAndFree(pe, len, (char *)env);
1463 }
1464
1465 //static void _noCldNodeEnqueue(int node, envelope *env)
1466 //Made non-static to be used by ckmessagelogging
1467 void _noCldNodeEnqueue(int node, envelope *env)
1468 {
1469 /*
1470   if (node == CkMyNode()) {
1471     CmiHandleMessage(env);
1472   } else {
1473 */
1474 #if CMK_CHARMDEBUG
1475   if (!ConverseDeliver(node)) {
1476     CmiFree(env);
1477     return;
1478   }
1479 #endif
1480   CkPackMessage(&env);
1481   int len=env->getTotalsize();
1482   if (node==CLD_BROADCAST) { 
1483 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1484         CmiSyncNodeBroadcast(len, (char *)env);
1485 #else
1486         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1487 #endif
1488 }
1489   else if (node==CLD_BROADCAST_ALL) { 
1490 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1491                 CmiSyncNodeBroadcastAll(len, (char *)env);
1492 #else
1493                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1494 #endif
1495
1496 }
1497   else {
1498 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1499         CmiSyncNodeSend(node, len, (char *)env);
1500 #else
1501         CmiSyncNodeSendAndFree(node, len, (char *)env);
1502 #endif
1503   }
1504 }
1505
1506 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1507 {
1508   register envelope *env = UsrToEnv(msg);
1509   _CHECK_USED(env);
1510   _SET_USED(env, 1);
1511 #if CMK_REPLAYSYSTEM
1512   setEventID(env);
1513 #endif
1514   env->setMsgtype(ForChareMsg);
1515   env->setEpIdx(eIdx);
1516   env->setSrcPe(CkMyPe());
1517 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1518   criticalPath_send(env);
1519   automaticallySetMessagePriority(env);
1520 #endif
1521 #if CMK_CHARMDEBUG
1522   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1523 #endif
1524 #if CMK_OBJECT_QUEUE_AVAILABLE
1525   CmiSetHandler(env, index_objectQHandler);
1526 #else
1527   CmiSetHandler(env, _charmHandlerIdx);
1528 #endif
1529   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1530     register int pe = -(pCid->onPE+1);
1531     if(pe==CkMyPe()) {
1532 #ifndef CMK_CHARE_USE_PTR
1533       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1534 #else
1535       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1536 #endif
1537       void *objPtr;
1538       if (NULL!=(objPtr=vblk->getLocalChare()))
1539       { //A ready local chare
1540         env->setObjPtr(objPtr);
1541         return pe;
1542       }
1543       else { //The vidblock is not ready-- forget it
1544         vblk->send(env);
1545         return -1;
1546       }
1547     } else { //Valid vidblock for another PE:
1548       env->setMsgtype(ForVidMsg);
1549       env->setVidPtr(pCid->objPtr);
1550       return pe;
1551     }
1552   }
1553   else {
1554     env->setObjPtr(pCid->objPtr);
1555     return pCid->onPE;
1556   }
1557 }
1558
1559 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1560 {
1561   int destPE = _prepareMsg(eIdx, msg, pCid);
1562   if (destPE != -1) {
1563     register envelope *env = UsrToEnv(msg);
1564 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1565     criticalPath_send(env);
1566     automaticallySetMessagePriority(env);
1567 #endif
1568     CmiBecomeImmediate(env);
1569   }
1570   return destPE;
1571 }
1572
1573 extern "C"
1574 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1575 {
1576   if (opts & CK_MSG_INLINE) {
1577     CkSendMsgInline(entryIdx, msg, pCid, opts);
1578     return;
1579   }
1580 #if CMK_ERROR_CHECKING
1581   if (opts & CK_MSG_IMMEDIATE) {
1582     CmiAbort("Immediate message is not allowed in Chare!");
1583   }
1584 #endif
1585   register envelope *env = UsrToEnv(msg);
1586   int destPE=_prepareMsg(entryIdx,msg,pCid);
1587   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1588   // VidBlock was not yet filled). The problem is that the creation was never
1589   // traced later when the VidBlock was filled. One solution is to trace the
1590   // creation here, the other to trace it in VidBlock->msgDeliver().
1591 #if defined(_FAULT_CAUSAL_)
1592         sendChareMsg(env,destPE,_infoIdx,pCid);
1593 #else
1594   _TRACE_CREATION_1(env);
1595   if (destPE!=-1) {
1596     CpvAccess(_qd)->create();
1597     if (opts & CK_MSG_SKIP_OR_IMM)
1598       _noCldEnqueue(destPE, env);
1599     else
1600       _CldEnqueue(destPE, env, _infoIdx);
1601   }
1602   _TRACE_CREATION_DONE(1);
1603 #endif
1604 }
1605
1606 extern "C"
1607 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1608 {
1609   if (pCid->onPE==CkMyPe())
1610   {
1611     if(!CmiNodeAlive(CkMyPe())){
1612         return;
1613     }
1614 #if CMK_CHARMDEBUG
1615     //Just in case we need to breakpoint or use the envelope in some way
1616     _prepareMsg(entryIndex,msg,pCid);
1617 #endif
1618                 //Just directly call the chare (skip QD handling & scheduler)
1619     register envelope *env = UsrToEnv(msg);
1620     if (env->isPacked()) CkUnpackMessage(&env);
1621     _STATS_RECORD_PROCESS_MSG_1();
1622     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1623   }
1624   else {
1625     //No way to inline a cross-processor message:
1626     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1627   }
1628 }
1629
1630 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1631 {
1632   register envelope *env = UsrToEnv(msg);
1633 #if CMK_ERROR_CHECKING
1634   CkNodeGroupID nodeRedMgr;
1635 #endif
1636   _CHECK_USED(env);
1637   _SET_USED(env, 1);
1638 #if CMK_REPLAYSYSTEM
1639   setEventID(env);
1640 #endif
1641   env->setMsgtype(type);
1642   env->setEpIdx(eIdx);
1643   env->setGroupNum(gID);
1644   env->setSrcPe(CkMyPe());
1645 #if CMK_ERROR_CHECKING
1646   nodeRedMgr.setZero();
1647   env->setRednMgr(nodeRedMgr);
1648 #endif
1649 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1650   criticalPath_send(env);
1651   automaticallySetMessagePriority(env);
1652 #endif
1653 #if CMK_CHARMDEBUG
1654   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1655 #endif
1656   CmiSetHandler(env, _charmHandlerIdx);
1657   return env;
1658 }
1659
1660 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1661 {
1662   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1663 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1664   criticalPath_send(env);
1665   automaticallySetMessagePriority(env);
1666 #endif
1667   CmiBecomeImmediate(env);
1668   return env;
1669 }
1670
1671 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1672                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1673 {
1674   int numPes;
1675   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1676 #if defined(_FAULT_MLOG_) 
1677   sendTicketGroupRequest(env,pe,_infoIdx);
1678 #elif defined(_FAULT_CAUSAL_)
1679         sendGroupMsg(env,pe,_infoIdx);
1680 #else
1681   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1682   _TRACE_CREATION_N(env, numPes);
1683   if (opts & CK_MSG_SKIP_OR_IMM)
1684     _noCldEnqueue(pe, env);
1685   else
1686     _skipCldEnqueue(pe, env, _infoIdx);
1687   _TRACE_CREATION_DONE(1);
1688 #endif
1689 }
1690
1691 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1692                            int npes, int *pes)
1693 {
1694   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1695   _TRACE_CREATION_MULTICAST(env, npes, pes);
1696   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1697   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1698 }
1699
1700 extern "C"
1701 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1702 {
1703 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1704   if (destPE==CkMyPe())
1705   {
1706     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1707     return;
1708   }
1709   //Can't inline-- send the usual way
1710   register envelope *env = UsrToEnv(msg);
1711   int numPes;
1712   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1713   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1714   _TRACE_CREATION_N(env, numPes);
1715   _noCldEnqueue(destPE, env);
1716   _STATS_RECORD_SEND_BRANCH_1();
1717   CkpvAccess(_coreState)->create();
1718   _TRACE_CREATION_DONE(1);
1719 #else
1720   // no support for immediate message, send inline
1721   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1722 #endif
1723 }
1724
1725 extern "C"
1726 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1727 {
1728   if (destPE==CkMyPe())
1729   {
1730     if(!CmiNodeAlive(CkMyPe())){
1731         return;
1732     }
1733     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1734     if (obj!=NULL)
1735     { //Just directly call the group:
1736 #if CMK_ERROR_CHECKING
1737       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1738 #else
1739       envelope *env=UsrToEnv(msg);
1740 #endif
1741       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1742       return;
1743     }
1744   }
1745   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1746   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1747 }
1748
1749 extern "C"
1750 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1751 {
1752   if (opts & CK_MSG_INLINE) {
1753     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1754     return;
1755   }
1756   if (opts & CK_MSG_IMMEDIATE) {
1757     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1758     return;
1759   }
1760   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1761   _STATS_RECORD_SEND_BRANCH_1();
1762   CkpvAccess(_coreState)->create();
1763 }
1764
1765 extern "C"
1766 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1767 {
1768 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1769   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1770   _TRACE_CREATION_MULTICAST(env, npes, pes);
1771   _noCldEnqueueMulti(npes, pes, env);
1772   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1773 #else
1774   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1775   CpvAccess(_qd)->create(-npes);
1776 #endif
1777   _STATS_RECORD_SEND_BRANCH_N(npes);
1778   CpvAccess(_qd)->create(npes);
1779 }
1780
1781 extern "C"
1782 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1783 {
1784   if (opts & CK_MSG_IMMEDIATE) {
1785     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1786     return;
1787   }
1788     // normal mesg
1789   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1790   _STATS_RECORD_SEND_BRANCH_N(npes);
1791   CpvAccess(_qd)->create(npes);
1792 }
1793
1794 extern "C"
1795 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1796 {
1797   int npes;
1798   int *pes;
1799   if (opts & CK_MSG_IMMEDIATE) {
1800     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1801     return;
1802   }
1803     // normal mesg
1804   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1805   CmiLookupGroup(grp, &npes, &pes);
1806   _TRACE_CREATION_MULTICAST(env, npes, pes);
1807   _CldEnqueueGroup(grp, env, _infoIdx);
1808   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1809   _STATS_RECORD_SEND_BRANCH_N(npes);
1810   CpvAccess(_qd)->create(npes);
1811 }
1812
1813 extern "C"
1814 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1815 {
1816   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1817   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1818   CpvAccess(_qd)->create(CkNumPes());
1819 }
1820
1821 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1822                 int node=CLD_BROADCAST_ALL, int opts=0)
1823 {
1824   int numPes;
1825   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1826 #if defined(_FAULT_MLOG_)
1827         sendTicketNodeGroupRequest(env,node,_infoIdx);
1828 #elif defined(_FAULT_CAUSAL_)
1829         sendNodeGroupMsg(env,node,_infoIdx);
1830 #else
1831   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1832   _TRACE_CREATION_N(env, numPes);
1833   if (opts & CK_MSG_SKIP_OR_IMM) {
1834     _noCldNodeEnqueue(node, env);
1835     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1836       CkpvAccess(_coreState)->create(-numPes);
1837     }
1838   }
1839   else
1840     _CldNodeEnqueue(node, env, _infoIdx);
1841   _TRACE_CREATION_DONE(1);
1842 #endif
1843 }
1844
1845 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1846                            int npes, int *nodes)
1847 {
1848   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1849   _TRACE_CREATION_N(env, npes);
1850   for (int i=0; i<npes; i++) {
1851     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1852   }
1853   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1854 }
1855
1856 extern "C"
1857 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1858 {
1859 #if CMK_IMMEDIATE_MSG
1860   if (node==CkMyNode())
1861   {
1862     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1863     return;
1864   }
1865   //Can't inline-- send the usual way
1866   register envelope *env = UsrToEnv(msg);
1867   int numPes;
1868   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1869   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1870   _TRACE_CREATION_N(env, numPes);
1871   _noCldNodeEnqueue(node, env);
1872   _STATS_RECORD_SEND_BRANCH_1();
1873   /* immeidate message is invisible to QD */
1874 //  CkpvAccess(_coreState)->create();
1875   _TRACE_CREATION_DONE(1);
1876 #else
1877   // no support for immediate message, send inline
1878   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1879 #endif
1880 }
1881
1882 extern "C"
1883 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1884 {
1885   if (node==CkMyNode())
1886   {
1887     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1888     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1889     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1890     if (obj!=NULL)
1891     { //Just directly call the group:
1892 #if CMK_ERROR_CHECKING
1893       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1894 #else
1895       envelope *env=UsrToEnv(msg);
1896 #endif
1897       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1898       return;
1899     }
1900   }
1901   //Can't inline-- send the usual way
1902   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1903 }
1904
1905 extern "C"
1906 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1907 {
1908   if (opts & CK_MSG_INLINE) {
1909     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1910     return;
1911   }
1912   if (opts & CK_MSG_IMMEDIATE) {
1913     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1914     return;
1915   }
1916   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1917   _STATS_RECORD_SEND_NODE_BRANCH_1();
1918   CkpvAccess(_coreState)->create();
1919 }
1920
1921 extern "C"
1922 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1923 {
1924 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1925   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1926   _noCldEnqueueMulti(npes, nodes, env);
1927 #else
1928   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1929   CpvAccess(_qd)->create(-npes);
1930 #endif
1931   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1932   CpvAccess(_qd)->create(npes);
1933 }
1934
1935 extern "C"
1936 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1937 {
1938   if (opts & CK_MSG_IMMEDIATE) {
1939     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1940     return;
1941   }
1942     // normal mesg
1943   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1944   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1945   CpvAccess(_qd)->create(npes);
1946 }
1947
1948 extern "C"
1949 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1950 {
1951   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1952   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1953   CpvAccess(_qd)->create(CkNumNodes());
1954 }
1955
1956 //Needed by delegation manager:
1957 extern "C"
1958 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1959 { return _prepareMsg(eIdx,msg,pCid); }
1960 extern "C"
1961 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1962 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1963 extern "C"
1964 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1965 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1966
1967 void _ckModuleInit(void) {
1968         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1969 #if CMK_OBJECT_QUEUE_AVAILABLE
1970         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1971 #endif
1972         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1973         CkpvInitialize(TokenPool*, _tokenPool);
1974         CkpvAccess(_tokenPool) = new TokenPool;
1975 }
1976
1977
1978 /************** Send: Arrays *************/
1979
1980 extern void CkArrayManagerInsert(int onPe,void *msg);
1981 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1982
1983 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1984 {
1985   _CHECK_USED(env);
1986   _SET_USED(env, 1);
1987   env->setMsgtype(type);
1988 #if CMK_CHARMDEBUG
1989   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1990 #endif
1991   CmiSetHandler(env, _charmHandlerIdx);
1992   CpvAccess(_qd)->create();
1993 }
1994
1995 extern "C"
1996 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1997   register envelope *env = UsrToEnv(msg);
1998   env->getsetArrayMgr()=aID;
1999   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
2000   _CldEnqueue(pe, env, _infoIdx);
2001 }
2002
2003 extern "C"
2004 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2005   register envelope *env = UsrToEnv(msg);
2006   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2007 #if defined(_FAULT_MLOG_)
2008    sendTicketArrayRequest(env,pe,_infoIdx);
2009 #elif defined(_FAULT_CAUSAL_)
2010         sendArrayMsg(env,pe,_infoIdx);
2011 #else
2012   if (opts & CK_MSG_IMMEDIATE)
2013     CmiBecomeImmediate(env);
2014   if (opts & CK_MSG_SKIP_OR_IMM)
2015     _noCldEnqueue(pe, env);
2016   else
2017     _skipCldEnqueue(pe, env, _infoIdx);
2018 #endif
2019 }
2020
2021 class ElementDestroyer : public CkLocIterator {
2022 private:
2023         CkLocMgr *locMgr;
2024 public:
2025         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2026         void addLocation(CkLocation &loc) {
2027           loc.destroyAll();
2028         }
2029 };
2030
2031 void CkDeleteChares() {
2032   int i;
2033   int numGroups = CkpvAccess(_groupIDTable)->size();
2034
2035   // delete all plain chares
2036 #ifndef CMK_CHARE_USE_PTR
2037   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2038         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2039         delete obj;
2040         CkpvAccess(chare_objs)[i] = NULL;
2041   }
2042   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2043         VidBlock *obj = CkpvAccess(vidblocks)[i];
2044         delete obj;
2045         CkpvAccess(vidblocks)[i] = NULL;
2046   }
2047 #endif
2048
2049   // delete all array elements
2050   for(i=0;i<numGroups;i++) {
2051     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2052     if(obj && obj->isLocMgr())  {
2053       CkLocMgr *mgr = (CkLocMgr*)obj;
2054       ElementDestroyer destroyer(mgr);
2055       mgr->iterate(destroyer);
2056     }
2057   }
2058
2059   // delete all groups
2060   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2061   for(i=0;i<numGroups;i++) {
2062     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2063     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2064     if (obj) delete obj;
2065   }
2066   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2067
2068   // delete all node groups
2069   if (CkMyRank() == 0) {
2070     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2071     for(i=0;i<numNodeGroups;i++) {
2072       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2073       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2074       if (obj) delete obj;
2075     }
2076   }
2077 }
2078
2079 //------------------- Message Watcher (record/replay) ----------------
2080
2081 #include "crc32.h"
2082
2083 CkpvDeclare(int, envelopeEventID);
2084 int _recplay_crc = 0;
2085 int _recplay_checksum = 0;
2086 int _recplay_logsize = 1024*1024;
2087
2088 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2089 #define REPLAYDEBUG(args) /* empty */
2090
2091 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2092
2093 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2094
2095 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2096
2097     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2098     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2099     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2100     FILE *f=fopen(fName,permissions);
2101     REPLAYDEBUG("openReplayfile "<<fName);
2102     if (f==NULL) {
2103         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2104             CkMyPe(),fName,permissions);
2105         CkAbort("openReplayFile> Could not open replay file");
2106     }
2107     return f;
2108 }
2109
2110 #include "BaseLB.h" /* For LBMigrateMsg message */
2111
2112 class CkMessageRecorder : public CkMessageWatcher {
2113   char *buffer;
2114   unsigned int curpos;
2115   bool firstOpen;
2116 public:
2117   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2118   ~CkMessageRecorder() {
2119     flushLog(0);
2120     fprintf(f,"-1 -1 -1 ");
2121     fclose(f);
2122     delete[] buffer;
2123 #if 0
2124     FILE *stsfp = fopen("sts", "w");
2125     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2126     traceWriteSTS(stsfp, 0);
2127     fclose(stsfp);
2128 #endif
2129     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2130   }
2131
2132 private:
2133   void flushLog(int verbose=1) {
2134     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2135     fprintf(f, "%s", buffer);
2136     curpos=0;
2137   }
2138   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2139     if ((*envptr)->getEvent()) {
2140       bool wasPacked = (*envptr)->isPacked();
2141       if (!wasPacked) CkPackMessage(envptr);
2142       envelope *env = *envptr;
2143       unsigned int crc1=0, crc2=0;
2144       if (_recplay_crc) {
2145         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2146         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2147         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2148       } else if (_recplay_checksum) {
2149         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2150         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2151       }
2152       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2153       if (curpos > _recplay_logsize-128) flushLog();
2154       if (!wasPacked) CkUnpackMessage(envptr);
2155     }
2156     return CmiTrue;
2157   }
2158   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2159     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2160     if (curpos > _recplay_logsize-128) flushLog();
2161     return CmiTrue;
2162   }
2163   
2164   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2165     FILE *f;
2166     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2167     else f = openReplayFile("ckreplay_",".lb","a");
2168     firstOpen = false;
2169     if (f != NULL) {
2170       PUP::toDisk p(f);
2171       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2172       (*msg)->pup(p);
2173       fclose(f);
2174     }
2175     return CmiTrue;
2176   }
2177 };
2178
2179 class CkMessageDetailRecorder : public CkMessageWatcher {
2180 public:
2181   CkMessageDetailRecorder(FILE *f_) {
2182     f=f_;
2183     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2184      * The value of 'x' is the pointer size.
2185      */
2186     CmiUInt2 little = sizeof(void*);
2187     fwrite(&little, 2, 1, f);
2188   }
2189   ~CkMessageDetailRecorder() {fclose(f);}
2190 private:
2191   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2192     bool wasPacked = (*envptr)->isPacked();
2193     if (!wasPacked) CkPackMessage(envptr);
2194     envelope *env = *envptr;
2195     CmiUInt4 size = env->getTotalsize();
2196     fwrite(&size, 4, 1, f);
2197     fwrite(env, env->getTotalsize(), 1, f);
2198     if (!wasPacked) CkUnpackMessage(envptr);
2199     return CmiTrue;
2200   }
2201 };
2202
2203 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2204 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2205
2206 #if CMK_BIGSIM_CHARM
2207 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2208                                    int pb,unsigned int *prio);
2209 #endif
2210
2211 class CkMessageReplay : public CkMessageWatcher {
2212   int counter;
2213         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2214         int nextEP;
2215         unsigned int crc1, crc2;
2216         FILE *lbFile;
2217         /// Read the next message we need from the file:
2218         void getNext(void) {
2219           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2220           if (nextSize > 0) {
2221             // We are reading a regular message
2222             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2223               CkAbort("CkMessageReplay> Syntax error reading replay file");
2224             }
2225             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2226           } else if (nextSize == -2) {
2227             // We are reading a special message (right now only thread awaken)
2228             // Nothing to do since we have already read all info
2229             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2230           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2231             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2232             CkAbort("CkMessageReplay> Unrecognized input");
2233           }
2234             /*
2235                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2236                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2237                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2238                 }
2239                 */
2240                 counter++;
2241         }
2242         /// If this is the next message we need, advance and return CmiTrue.
2243         CmiBool isNext(envelope *env) {
2244                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2245                 if (nextEvent!=env->getEvent()) return CmiFalse;
2246                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2247 #if 1
2248                 if (nextEP != env->getEpIdx()) {
2249                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2250                         return CmiFalse;
2251                 }
2252 #endif
2253 #if ! CMK_BIGSIM_CHARM
2254                 if (nextSize!=env->getTotalsize())
2255                 {
2256                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2257                         return CmiFalse;
2258                 }
2259                 if (_recplay_crc || _recplay_checksum) {
2260                   bool wasPacked = env->isPacked();
2261                   if (!wasPacked) CkPackMessage(&env);
2262                   if (_recplay_crc) {
2263                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2264                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2265                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2266                     if (crcnew1 != crc1) {
2267                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2268                     }
2269                     if (crcnew2 != crc2) {
2270                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2271                     }
2272                   } else if (_recplay_checksum) {
2273             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2274             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2275             if (crcnew1 != crc1) {
2276               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2277             }
2278             if (crcnew2 != crc2) {
2279               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2280             }               
2281                   }
2282                   if (!wasPacked) CkUnpackMessage(&env);
2283                 }
2284 #endif
2285                 return CmiTrue;
2286         }
2287         CmiBool isNext(CthThreadToken *token) {
2288           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2289           return CmiFalse;
2290         }
2291
2292         /// This is a (short) list of messages we aren't yet ready for:
2293         CkQ<envelope *> delayedMessages;
2294         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2295         CkQ<CthThreadToken *> delayedTokens;
2296
2297         /// Try to flush out any delayed messages
2298         void flush(void) {
2299           if (nextSize>0) {
2300                 int len=delayedMessages.length();
2301                 for (int i=0;i<len;i++) {
2302                         envelope *env=delayedMessages.deq();
2303                         if (isNext(env)) { /* this is the next message: process it */
2304                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2305                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2306                                 return;
2307                         }
2308                         else /* Not ready yet-- put it back in the
2309                                 queue */
2310                           {
2311                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2312                                 delayedMessages.enq(env);
2313                           }
2314                 }
2315           } else if (nextSize==-2) {
2316             int len=delayedTokens.length();
2317             for (int i=0;i<len;++i) {
2318               CthThreadToken *token=delayedTokens.deq();
2319               if (isNext(token)) {
2320             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2321 #if ! CMK_BIGSIM_CHARM
2322                 CsdEnqueueLifo((void*)token);
2323 #else
2324                 CthEnqueueBigSimThread(token,0,0,NULL);
2325 #endif
2326                 return;
2327               } else {
2328             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2329                 delayedTokens.enq(token);
2330               }
2331             }
2332           }
2333         }
2334
2335 public:
2336         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2337           counter=0;
2338           f=f_;
2339           getNext();
2340           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2341           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2342         }
2343         ~CkMessageReplay() {fclose(f);}
2344
2345 private:
2346         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2347           bool wasPacked = (*envptr)->isPacked();
2348           if (!wasPacked) CkPackMessage(envptr);
2349           envelope *env = *envptr;
2350           //CkAssert(*(int*)env == 0x34567890);
2351           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2352                 if (env->getEvent() == 0) return CmiTrue;
2353                 if (isNext(env)) { /* This is the message we were expecting */
2354                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2355                         getNext(); /* Advance over this message */
2356                         flush(); /* try to process queued-up stuff */
2357                         if (!wasPacked) CkUnpackMessage(envptr);
2358                         return CmiTrue;
2359                 }
2360 #if CMK_SMP
2361                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2362                          // try next rank, we can't just buffer the msg and left
2363                          // we need to keep unprocessed msg on the fly
2364                         int nextpe = CkMyPe()+1;
2365                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2366                         nextpe = CkNodeFirst(CkMyNode());
2367                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2368                         return CmiFalse;
2369                 }
2370 #endif
2371                 else /*!isNext(env) */ {
2372                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2373                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2374                         delayedMessages.enq(env);
2375                         flush();
2376                         return CmiFalse;
2377                 }
2378         }
2379         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2380       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2381           if (isNext(token)) {
2382         REPLAYDEBUG("Executing token: "<<token->serialNo)
2383             getNext();
2384             flush();
2385             return CmiTrue;
2386           } else {
2387         REPLAYDEBUG("Queueing token: "<<token->serialNo
2388             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2389             delayedTokens.enq(token);
2390             return CmiFalse;
2391           }
2392         }
2393
2394         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2395           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2396           if (lbFile != NULL) {
2397             int num_moves;
2398         PUP::fromDisk p(lbFile);
2399             p | num_moves;
2400             if (num_moves != (*msg)->n_moves) {
2401               delete *msg;
2402               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2403             }
2404             (*msg)->pup(p);
2405           }
2406           return CmiTrue;
2407         }
2408 };
2409
2410 class CkMessageDetailReplay : public CkMessageWatcher {
2411   void *getNext() {
2412     CmiUInt4 size; size_t nread;
2413     if ((nread=fread(&size, 4, 1, f)) < 1) {
2414       if (feof(f)) return NULL;
2415       CkPrintf("Broken record file (metadata) got %d\n",nread);
2416       CkAbort("");
2417     }
2418     void *env = CmiAlloc(size);
2419     long tell = ftell(f);
2420     if ((nread=fread(env, size, 1, f)) < 1) {
2421       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2422       CkAbort("");
2423     }
2424     //*(int*)env = 0x34567890; // set first integer as magic
2425     return env;
2426   }
2427 public:
2428   double starttime;
2429   CkMessageDetailReplay(FILE *f_) {
2430     f=f_;
2431     starttime=CkWallTimer();
2432     /* This must match what CkMessageDetailRecorder did */
2433     CmiUInt2 little;
2434     fread(&little, 2, 1, f);
2435     if (little != sizeof(void*)) {
2436       CkAbort("Replaying on a different architecture from which recording was done!");
2437     }
2438
2439     CsdEnqueue(getNext());
2440
2441     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2442   }
2443   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2444     void *msg = getNext();
2445     if (msg != NULL) CsdEnqueue(msg);
2446     return CmiTrue;
2447   }
2448 };
2449
2450 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2451 #if ! CMK_BIGSIM_CHARM
2452   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2453 #endif
2454   CkMessageReplay *replay = (CkMessageReplay*)rep;
2455   //CmiStartQD(CkMessageReplayQuiescence, replay);
2456 }
2457
2458 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2459   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2460   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2461   ConverseExit();
2462 }
2463
2464 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2465   CkCoreState *ck = CkpvAccess(_coreState);
2466   if (ck->watcher!=NULL) {
2467     return ck->watcher->processThread(token,ck);
2468   }
2469   return CmiTrue;
2470 }
2471
2472 CpvCExtern(int, CthResumeNormalThreadIdx);
2473 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2474 {
2475   CthThread t = token->thread;
2476
2477   if(t == NULL){
2478     free(token);
2479     return;
2480   }
2481 #if CMK_TRACE_ENABLED
2482 #if ! CMK_TRACE_IN_CHARM
2483   if(CpvAccess(traceOn))
2484     CthTraceResume(t);
2485 /*    if(CpvAccess(_traceCoreOn)) 
2486             resumeTraceCore();*/
2487 #endif
2488 #endif
2489   
2490   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2491   if (CpdExecuteThreadResume(token)) {
2492     CthResume(t);
2493   }
2494 }
2495
2496 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2497   CkCoreState *ck = CkpvAccess(_coreState);
2498   if (ck->watcher!=NULL) {
2499     ck->watcher->processLBMessage(msg, ck);
2500   }
2501 }
2502
2503 #if CMK_BIGSIM_CHARM
2504 CpvExtern(int      , CthResumeBigSimThreadIdx);
2505 #endif
2506
2507 #include "ckliststring.h"
2508 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2509     CmiArgGroup("Charm++","Record/Replay");
2510     CmiBool forceReplay = CmiFalse;
2511     char *procs = NULL;
2512     _replaySystem = 0;
2513     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2514       _recplay_crc = 1;
2515     }
2516     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2517       _recplay_checksum = 1;
2518     }
2519     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2520     REPLAYDEBUG("CkMessageWatcherInit ");
2521     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2522         CkListString list(procs);
2523         if (list.includes(CkMyPe())) {
2524           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2525           CpdSetInitializeMemory(1);
2526           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2527         }
2528     }
2529     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2530       if (CkMyPe() == 0) {
2531         CmiPrintf("Charm++> record mode.\n");
2532         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2533           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2534           _recplay_crc = _recplay_checksum = 0;
2535         }
2536       }
2537       CpdSetInitializeMemory(1);
2538       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2539       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2540     }
2541         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2542             forceReplay = CmiTrue;
2543             CpdSetInitializeMemory(1);
2544             // Set the parameters of the processor
2545 #if CMK_SHARED_VARS_UNAVAILABLE
2546             _Cmi_mype = atoi(procs);
2547             while (procs[0]!='/') procs++;
2548             procs++;
2549             _Cmi_numpes = atoi(procs);
2550 #else
2551             CkAbort("+replay-detail available only for non-SMP build");
2552 #endif
2553             _replaySystem = 1;
2554             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2555         }
2556         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2557           if (CkMyPe() == 0)  {
2558             CmiPrintf("Charm++> replay mode.\n");
2559             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2560               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2561               _recplay_crc = _recplay_checksum = 0;
2562             }
2563           }
2564           CpdSetInitializeMemory(1);
2565 #if ! CMK_BIGSIM_CHARM
2566           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2567 #else
2568           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2569 #endif
2570           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2571         }
2572         if (_recplay_crc && _recplay_checksum) {
2573           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2574         }
2575 }
2576
2577 extern "C"
2578 int CkMessageToEpIdx(void *msg) {
2579         envelope *env=UsrToEnv(msg);
2580         int ep=env->getEpIdx();
2581         if (ep==CkIndex_CkArray::recvBroadcast(0))
2582                 return env->getsetArrayBcastEp();
2583         else
2584                 return ep;
2585 }
2586
2587 extern "C"
2588 int getCharmEnvelopeSize() {
2589   return sizeof(envelope);
2590 }
2591
2592 extern "C"
2593 int isCharmEnvelope(void *msg) {
2594       // best efford guessing if this is a charm envelope
2595     envelope *e = (envelope *)msg;
2596     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2597     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2598     if (e->getTotalsize() < sizeof(envelope)) return 0;
2599     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2600 #if CMK_SMP
2601     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2602 #else
2603     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2604 #endif
2605     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2606     return 1;
2607 }
2608
2609
2610 #include "CkMarshall.def.h"
2611