282b45e26ee3b2ef11033b8b6360e75e6cc20303
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   if (CkpvAccess(_destroyingNodeGroup)) {
196     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
197     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
198     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
199     CkpvAccess(_destroyingNodeGroup) = false;
200   } else {
201     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
202     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
203     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
204   }
205 }
206
207 void IrrGroup::pup(PUP::er &p)
208 {
209   Chare::pup(p);
210   p|thisgroup;
211 }
212
213 int IrrGroup::ckGetChareType() const {
214   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
215 }
216
217 int IrrGroup::ckDebugChareID(char *str, int limit) {
218   if (limit<5) return -1;
219   str[0] = 1;
220   *((int*)&str[1]) = thisgroup.idx;
221   return 5;
222 }
223
224 char *IrrGroup::ckDebugChareName() {
225   return strdup(_chareTable[ckGetChareType()]->name);
226 }
227
228 void IrrGroup::ckJustMigrated(void)
229 {
230 }
231
232 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
233   /* FIXME: **CW** not entirely sure what we should do here yet */
234 }
235
236 void Group::CkAddThreadListeners(CthThread th, void *msg) {
237   Chare::CkAddThreadListeners(th, msg);
238   CthSetThreadID(th, thisgroup.idx, 0, 0);
239 }
240
241 void Group::pup(PUP::er &p)
242 {
243   CkReductionMgr::pup(p);
244   reductionInfo.pup(p);
245 }
246
247 /**** Delegation Manager Group */
248 CkDelegateMgr::~CkDelegateMgr() { }
249
250 //Default delegator implementation: do not delegate-- send directly
251 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
252   { CkSendMsg(ep,m,c); }
253 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
254   { CkSendMsgBranch(ep,m,onPE,g); }
255 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
256   { CkBroadcastMsgBranch(ep,m,g); }
257 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
258   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
259 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
260   { CkSendMsgNodeBranch(ep,m,onNode,g); }
261 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
262   { CkBroadcastMsgNodeBranch(ep,m,g); }
263 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
264   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
265 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
266 {
267         CProxyElement_ArrayBase ap(a,idx);
268         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
269 }
270 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
271 {
272         CProxyElement_ArrayBase ap(a,idx);
273         ap.ckSend((CkArrayMessage *)m,ep);
274 }
275 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
276 {
277         CProxy_ArrayBase ap(a);
278         ap.ckBroadcast((CkArrayMessage *)m,ep);
279 }
280
281 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
282 {
283         CmiAbort("ArraySectionSend is not implemented!\n");
284 /*
285         CProxyElement_ArrayBase ap(a,idx);
286         ap.ckSend((CkArrayMessage *)m,ep);
287 */
288 }
289
290 /*** Proxy <-> delegator communication */
291 CkDelegateData::~CkDelegateData() {}
292
293 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
294   return pd; // default implementation ignores pup call
295 }
296
297 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
298    this tricky manual reference counting business... */
299
300 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
301         if (dPtr) dPtr->ref();
302         ckUndelegate();
303         delegatedMgr = dTo;
304         delegatedPtr = dPtr;
305         delegatedGroupId = delegatedMgr->CkGetGroupID();
306         isNodeGroup = delegatedMgr->isNodeGroup();
307 }
308 void CProxy::ckUndelegate(void) {
309         delegatedMgr=NULL;
310         delegatedGroupId.setZero();
311         if (delegatedPtr) delegatedPtr->unref();
312         delegatedPtr=NULL;
313 }
314
315 /// Copy constructor
316 CProxy::CProxy(const CProxy &src)
317   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
318    isNodeGroup(src.isNodeGroup) {
319     delegatedPtr = NULL;
320     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
321         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
322     }
323 }
324
325 /// Assignment operator
326 CProxy& CProxy::operator=(const CProxy &src) {
327         CkDelegateData *oldPtr=delegatedPtr;
328         ckUndelegate();
329         delegatedMgr=src.delegatedMgr;
330         delegatedGroupId = src.delegatedGroupId; 
331         isNodeGroup = src.isNodeGroup;
332
333         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
334             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
335         else
336             delegatedPtr = NULL;
337
338         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
339         if (oldPtr) oldPtr->unref();
340         return *this;
341 }
342
343 void CProxy::pup(PUP::er &p) {
344   if (!p.isUnpacking()) {
345     if (ckDelegatedTo() != NULL) {
346       delegatedGroupId = delegatedMgr->CkGetGroupID();
347       isNodeGroup = delegatedMgr->isNodeGroup();
348     }
349   }
350   p|delegatedGroupId;
351   if (!delegatedGroupId.isZero()) {
352     p|isNodeGroup;
353     if (p.isUnpacking()) {
354       delegatedMgr = ckDelegatedTo(); 
355     }
356
357     int migCtor = 0, cIdx; 
358     if (!p.isUnpacking()) {
359       if (isNodeGroup) {
360         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
361         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
362         migCtor = _chareTable[cIdx]->migCtor; 
363         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
364       }
365       else  {
366         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
367         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
368         migCtor = _chareTable[cIdx]->migCtor; 
369         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
370       }         
371     }
372
373     p|migCtor;
374
375     // if delegated manager has not been created, construct a dummy
376     // object on which to call DelegatePointerPup
377     if (delegatedMgr == NULL) {
378
379       // create a dummy object for calling DelegatePointerPup
380       int objId = _entryTable[migCtor]->chareIdx; 
381       int objSize = _chareTable[objId]->size; 
382       void *obj = malloc(objSize); 
383       _entryTable[migCtor]->call(NULL, obj); 
384       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
385         ->DelegatePointerPup(p, delegatedPtr);           
386       free(obj);
387
388     }
389     else {
390
391       // delegated manager has been created, so we can use it
392       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
393
394     }
395
396     if (p.isUnpacking() && delegatedPtr) {
397       delegatedPtr->ref();
398     }
399   }
400 }
401
402 /**** Array sections */
403 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
404 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
405   _cookie.get_aid() = aid;      \
406   _cookie.get_pe() = CkMyPe();  \
407   _elems = new CkArrayIndex[nElems];    \
408   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
409   pelist = NULL;        \
410   npes  = 0;    \
411 }
412
413 CKSECTIONID_CONSTRUCTOR_DEF(1D)
414 CKSECTIONID_CONSTRUCTOR_DEF(2D)
415 CKSECTIONID_CONSTRUCTOR_DEF(3D)
416 CKSECTIONID_CONSTRUCTOR_DEF(4D)
417 CKSECTIONID_CONSTRUCTOR_DEF(5D)
418 CKSECTIONID_CONSTRUCTOR_DEF(6D)
419 CKSECTIONID_CONSTRUCTOR_DEF(Max)
420
421 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
422   pelist = new int[npes];
423   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
424   _cookie.get_aid() = gid;
425 }
426
427 CkSectionID::CkSectionID(const CkSectionID &sid) {
428   int i;
429   _cookie = sid._cookie;
430   _nElems = sid._nElems;
431   if (_nElems > 0) {
432     _elems = new CkArrayIndex[_nElems];
433     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
434   } else _elems = NULL;
435   npes = sid.npes;
436   if (npes > 0) {
437     pelist = new int[npes];
438     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
439   } else pelist = NULL;
440 }
441
442 void CkSectionID::operator=(const CkSectionID &sid) {
443   int i;
444   _cookie = sid._cookie;
445   _nElems = sid._nElems;
446   if (_nElems > 0) {
447     _elems = new CkArrayIndex[_nElems];
448     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
449   } else _elems = NULL;
450   npes = sid.npes;
451   if (npes > 0) {
452     pelist = new int[npes];
453     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
454   } else pelist = NULL;
455 }
456
457 void CkSectionID::pup(PUP::er &p) {
458     p | _cookie;
459     p(_nElems);
460     if (_nElems > 0) {
461       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
462       for (int i=0; i< _nElems; i++) p | _elems[i];
463       npes = 0;
464       pelist = NULL;
465     } else {
466       // If _nElems is zero, than this section describes processors instead of array elements
467       _elems = NULL;
468       p(npes);
469       if (p.isUnpacking()) pelist = new int[npes];
470       p(pelist, npes);
471     }
472 }
473
474 /**** Tiny random API routines */
475
476 #if CMK_CUDA
477 void CUDACallbackManager(void *fn) {
478   if (fn != NULL) {
479     CkCallback *cb = (CkCallback*) fn;
480     cb->send();
481   }
482 }
483
484 #endif
485
486 void QdCreate(int n) {
487   CpvAccess(_qd)->create(n);
488 }
489
490 void QdProcess(int n) {
491   CpvAccess(_qd)->process(n);
492 }
493
494 extern "C"
495 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
496 {
497   UsrToEnv(msg)->setRef(ref);
498 }
499
500 extern "C"
501 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
502 {
503   return UsrToEnv(msg)->getRef();
504 }
505
506 extern "C"
507 int CkGetSrcPe(void *msg)
508 {
509   return UsrToEnv(msg)->getSrcPe();
510 }
511
512 extern "C"
513 int CkGetSrcNode(void *msg)
514 {
515   return CmiNodeOf(CkGetSrcPe(msg));
516 }
517
518 extern "C"
519 void *CkLocalBranch(CkGroupID gID) {
520   return _localBranch(gID);
521 }
522
523 static
524 void *_ckLocalNodeBranch(CkGroupID groupID) {
525   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
526   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
527   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
528   return retval;
529 }
530
531 extern "C"
532 void *CkLocalNodeBranch(CkGroupID groupID)
533 {
534   void *retval;
535   // we are called in a constructor
536   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
537     return CkpvAccess(_currentNodeGroupObj);
538   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
539   { // Nodegroup hasn't finished being created yet-- schedule...
540     CsdScheduler(0);
541   }
542   return retval;
543 }
544
545 extern "C"
546 void *CkLocalChare(const CkChareID *pCid)
547 {
548         int pe=pCid->onPE;
549         if (pe<0) { //A virtual chare ID
550                 if (pe!=(-(CkMyPe()+1)))
551                         return NULL;//VID block not on this PE
552 #ifdef CMK_CHARE_USE_PTR
553                 VidBlock *v=(VidBlock *)pCid->objPtr;
554 #else
555                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
556 #endif
557                 return v->getLocalChareObj();
558         }
559         else
560         { //An ordinary chare ID
561                 if (pe!=CkMyPe())
562                         return NULL;//Chare not on this PE
563 #ifdef CMK_CHARE_USE_PTR
564                 return pCid->objPtr;
565 #else
566                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
567 #endif
568         }
569 }
570
571 CkpvDeclare(char**,Ck_argv);
572
573 extern "C" char **CkGetArgv(void) {
574         return CkpvAccess(Ck_argv);
575 }
576 extern "C" int CkGetArgc(void) {
577         return CmiGetArgc(CkpvAccess(Ck_argv));
578 }
579
580 /******************** Basic support *****************/
581 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
582 {
583 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
584         CpvAccess(_currentObj) = (Chare *)obj;
585 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
586 #endif
587   //BIGSIM_OOC DEBUGGING
588   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
589   //fflush(stdout);
590 #if CMK_CHARMDEBUG
591   CpdBeforeEp(epIdx, obj, msg);
592 #endif    
593   _entryTable[epIdx]->call(msg, obj);
594 #if CMK_CHARMDEBUG
595   CpdAfterEp(epIdx);
596 #endif
597   if (_entryTable[epIdx]->noKeep)
598   { /* Method doesn't keep/delete the message, so we have to: */
599     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
600   }
601 }
602 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
603 {
604   //BIGSIM_OOC DEBUGGING
605   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
606   //fflush(stdout);
607
608   void *deliverMsg;
609 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
610         CpvAccess(_currentObj) = (Chare *)obj;
611 #endif
612   if (_entryTable[epIdx]->noKeep)
613   { /* Deliver a read-only copy of the message */
614     deliverMsg=(void *)msg;
615   } else
616   { /* Method needs a copy of the message to keep/delete */
617     void *oldMsg=(void *)msg;
618     deliverMsg=CkCopyMsg(&oldMsg);
619 #if CMK_ERROR_CHECKING
620     if (oldMsg!=msg)
621       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
622 #endif
623   }
624 #if CMK_CHARMDEBUG
625   CpdBeforeEp(epIdx, obj, (void*)msg);
626 #endif
627   _entryTable[epIdx]->call(deliverMsg, obj);
628 #if CMK_CHARMDEBUG
629   CpdAfterEp(epIdx);
630 #endif
631 }
632
633 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
634 {
635   register void *msg = EnvToUsr(env);
636   _SET_USED(env, 0);
637   CkDeliverMessageFree(epIdx,msg,obj);
638 }
639
640 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
641 {
642
643 #if CMK_TRACE_ENABLED 
644   if (_entryTable[epIdx]->traceEnabled) {
645     _TRACE_BEGIN_EXECUTE(env, obj);
646     if(_entryTable[epIdx]->appWork)
647         _TRACE_BEGIN_APPWORK();
648     _invokeEntryNoTrace(epIdx,env,obj);
649     if(_entryTable[epIdx]->appWork)
650         _TRACE_END_APPWORK();
651     _TRACE_END_EXECUTE();
652   }
653   else
654 #endif
655     _invokeEntryNoTrace(epIdx,env,obj);
656
657 }
658
659 /********************* Creation ********************/
660
661 extern "C"
662 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
663 {
664   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
665   envelope *env = UsrToEnv(msg);
666   _CHECK_USED(env);
667   if(pCid == 0) {
668     env->setMsgtype(NewChareMsg);
669   } else {
670     pCid->onPE = (-(CkMyPe()+1));
671     //  pCid->magic = _GETIDX(cIdx);
672     pCid->objPtr = (void *) new VidBlock();
673     _MEMCHECK(pCid->objPtr);
674     env->setMsgtype(NewVChareMsg);
675     env->setVidPtr(pCid->objPtr);
676 #ifndef CMK_CHARE_USE_PTR
677     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
678     int idx = CkpvAccess(vidblocks).size()-1;
679     pCid->objPtr = (void *)(CmiIntPtr)idx;
680     env->setVidPtr((void *)(CmiIntPtr)idx);
681 #endif
682   }
683   env->setEpIdx(eIdx);
684   env->setByPe(CkMyPe());
685   env->setSrcPe(CkMyPe());
686   CmiSetHandler(env, _charmHandlerIdx);
687   _TRACE_CREATION_1(env);
688   CpvAccess(_qd)->create();
689   _STATS_RECORD_CREATE_CHARE_1();
690   _SET_USED(env, 1);
691   if(destPE == CK_PE_ANY)
692     env->setForAnyPE(1);
693   else
694     env->setForAnyPE(0);
695   _CldEnqueue(destPE, env, _infoIdx);
696   _TRACE_CREATION_DONE(1);
697 }
698
699 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
700 {
701   register int gIdx = _entryTable[epIdx]->chareIdx;
702   register void *obj = malloc(_chareTable[gIdx]->size);
703   _MEMCHECK(obj);
704   setMemoryTypeChare(obj);
705   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
706   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
707   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
708   CkpvAccess(_groupIDTable)->push_back(groupID);
709   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
710   if(ptrq) {
711     void *pending;
712     while((pending=ptrq->deq())!=0) {
713 #if CMK_BIGSIM_CHARM
714       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
715       //handler to converse-level BigSim handler.
716       _CldEnqueue(CkMyPe(), pending, _infoIdx);
717 #else
718       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
719 #endif
720     }
721     CkpvAccess(_groupTable)->find(groupID).clearPending();
722   }
723   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
724
725   CkpvAccess(_currentGroup) = groupID;
726   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
727
728 #ifndef CMK_CHARE_USE_PTR
729   int callingChareIdx = CkpvAccess(currentChareIdx);
730   CkpvAccess(currentChareIdx) = -1;
731 #endif
732
733   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
734
735 #ifndef CMK_CHARE_USE_PTR
736   CkpvAccess(currentChareIdx) = callingChareIdx;
737 #endif
738
739   _STATS_RECORD_PROCESS_GROUP_1();
740 }
741
742 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
743 {
744   register int gIdx = _entryTable[epIdx]->chareIdx;
745   int objSize=_chareTable[gIdx]->size;
746   register void *obj = malloc(objSize);
747   _MEMCHECK(obj);
748   setMemoryTypeChare(obj);
749   CkpvAccess(_currentGroup) = groupID;
750
751 // Now that the NodeGroup is created, add it to the table.
752 //  NodeGroups can be accessed by multiple processors, so
753 //  this is in the opposite order from groups - invoking the constructor
754 //  before registering it.
755 // User may call CkLocalNodeBranch() inside the nodegroup constructor
756 //  store nodegroup into _currentNodeGroupObj
757   CkpvAccess(_currentNodeGroupObj) = obj;
758
759 #ifndef CMK_CHARE_USE_PTR
760   int callingChareIdx = CkpvAccess(currentChareIdx);
761   CkpvAccess(currentChareIdx) = -1;
762 #endif
763
764   _invokeEntryNoTrace(epIdx,env,obj);
765
766 #ifndef CMK_CHARE_USE_PTR
767   CkpvAccess(currentChareIdx) = callingChareIdx;
768 #endif
769
770   CkpvAccess(_currentNodeGroupObj) = NULL;
771   _STATS_RECORD_PROCESS_NODE_GROUP_1();
772
773   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
774   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
775   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
776   CksvAccess(_nodeGroupIDTable).push_back(groupID);
777
778   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
779   if(ptrq) {
780     void *pending;
781     while((pending=ptrq->deq())!=0) {
782       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
783     }
784     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
785   }
786   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
787 }
788
789 void _createGroup(CkGroupID groupID, envelope *env)
790 {
791   _CHECK_USED(env);
792   _SET_USED(env, 1);
793   register int epIdx = env->getEpIdx();
794   int gIdx = _entryTable[epIdx]->chareIdx;
795   CkNodeGroupID rednMgr;
796 #if !GROUP_LEVEL_REDUCTION
797   if(_chareTable[gIdx]->isIrr == 0){
798                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
799                 rednMgr = rednMgrProxy;
800 //              rednMgrProxy.setAttachedGroup(groupID);
801   }
802   else
803 #endif
804   {
805         rednMgr.setZero();
806   }
807   env->setGroupNum(groupID);
808   env->setSrcPe(CkMyPe());
809   env->setRednMgr(rednMgr);
810   env->setGroupEpoch(CkpvAccess(_charmEpoch));
811
812   if(CkNumPes()>1) {
813     CkPackMessage(&env);
814     CmiSetHandler(env, _bocHandlerIdx);
815     _numInitMsgs++;
816     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
817     CpvAccess(_qd)->create(CkNumPes()-1);
818     CkUnpackMessage(&env);
819   }
820   _STATS_RECORD_CREATE_GROUP_1();
821   CkCreateLocalGroup(groupID, epIdx, env);
822 }
823
824 void _createNodeGroup(CkGroupID groupID, envelope *env)
825 {
826   _CHECK_USED(env);
827   _SET_USED(env, 1);
828   register int epIdx = env->getEpIdx();
829   env->setGroupNum(groupID);
830   env->setSrcPe(CkMyPe());
831   env->setGroupEpoch(CkpvAccess(_charmEpoch));
832   if(CkNumNodes()>1) {
833     CkPackMessage(&env);
834     CmiSetHandler(env, _bocHandlerIdx);
835     _numInitMsgs++;
836     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
837     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
838     CpvAccess(_qd)->create(CkNumNodes()-1);
839     CkUnpackMessage(&env);
840   }
841   _STATS_RECORD_CREATE_NODE_GROUP_1();
842   CkCreateLocalNodeGroup(groupID, epIdx, env);
843 }
844
845 // new _groupCreate
846
847 static CkGroupID _groupCreate(envelope *env)
848 {
849   register CkGroupID groupNum;
850
851   // check CkMyPe(). if it is 0 then idx is _numGroups++
852   // if not, then something else...
853   if(CkMyPe() == 0)
854      groupNum.idx = CkpvAccess(_numGroups)++;
855   else
856      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
857   _createGroup(groupNum, env);
858   return groupNum;
859 }
860
861 // new _nodeGroupCreate
862 static CkGroupID _nodeGroupCreate(envelope *env)
863 {
864   register CkGroupID groupNum;
865   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
866   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
867           groupNum.idx = CksvAccess(_numNodeGroups)++;
868    else
869           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
870   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
871   _createNodeGroup(groupNum, env);
872   return groupNum;
873 }
874
875 /**** generate the group idx when group is creator pe is not pe0
876  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
877  **** remaining bits contain the group creator processor number and
878  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
879
880 int _getGroupIdx(int numNodes,int myNode,int numGroups)
881 {
882         int idx;
883         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
884         int n = 32 - (x+1);                                     // number of bits remaining for the index
885         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
886                                                                 // then add the next available index
887         // of course this won't work when int is 8 bytes long on T3E
888         //idx |= 0x80000000;                                      // set the most significant bit to 1
889         idx = - idx;
890                                                                 // if int is not 32 bits, wouldn't this be wrong?
891         return idx;
892 }
893
894 extern "C"
895 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
896 {
897   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
898   register envelope *env = UsrToEnv(msg);
899   env->setMsgtype(BocInitMsg);
900   env->setEpIdx(eIdx);
901   env->setSrcPe(CkMyPe());
902   _TRACE_CREATION_N(env, CkNumPes());
903   CkGroupID gid = _groupCreate(env);
904   _TRACE_CREATION_DONE(1);
905   return gid;
906 }
907
908 extern "C"
909 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
910 {
911   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
912   register envelope *env = UsrToEnv(msg);
913   env->setMsgtype(NodeBocInitMsg);
914   env->setEpIdx(eIdx);
915   env->setSrcPe(CkMyPe());
916   _TRACE_CREATION_N(env, CkNumNodes());
917   CkGroupID gid = _nodeGroupCreate(env);
918   _TRACE_CREATION_DONE(1);
919   return gid;
920 }
921
922 static inline void *_allocNewChare(envelope *env, int &idx)
923 {
924   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
925   void *tmp=malloc(_chareTable[chareIdx]->size);
926   _MEMCHECK(tmp);
927 #ifndef CMK_CHARE_USE_PTR
928   CkpvAccess(chare_objs).push_back(tmp);
929   CkpvAccess(chare_types).push_back(chareIdx);
930   idx = CkpvAccess(chare_objs).size()-1;
931 #endif
932   setMemoryTypeChare(tmp);
933   return tmp;
934 }
935
936 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
937 {
938   int idx;
939   register void *obj = _allocNewChare(env, idx);
940 #ifndef CMK_CHARE_USE_PTR
941   //((Chare *)obj)->chareIdx = idx;
942   CkpvAccess(currentChareIdx) = idx;
943 #endif
944   _invokeEntry(env->getEpIdx(),env,obj);
945 }
946
947 void CkCreateLocalChare(int epIdx, envelope *env)
948 {
949   env->setEpIdx(epIdx);
950   _processNewChareMsg(NULL, env);
951 }
952
953 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
954 {
955   int idx;
956   register void *obj = _allocNewChare(env, idx);
957   register CkChareID *pCid = (CkChareID *)
958       _allocMsg(FillVidMsg, sizeof(CkChareID));
959   pCid->onPE = CkMyPe();
960 #ifndef CMK_CHARE_USE_PTR
961   pCid->objPtr = (void*)(CmiIntPtr)idx;
962 #else
963   pCid->objPtr = obj;
964 #endif
965   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
966   register envelope *ret = UsrToEnv(pCid);
967   ret->setVidPtr(env->getVidPtr());
968   register int srcPe = env->getByPe();
969   ret->setSrcPe(CkMyPe());
970   CmiSetHandler(ret, _charmHandlerIdx);
971   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
972 #ifndef CMK_CHARE_USE_PTR
973   // register the remote vidblock for deletion when chare is deleted
974   CkChareID vid;
975   vid.onPE = srcPe;
976   vid.objPtr = env->getVidPtr();
977   CkpvAccess(vmap)[idx] = vid;    
978 #endif
979   CpvAccess(_qd)->create();
980 #ifndef CMK_CHARE_USE_PTR
981   //((Chare *)obj)->chareIdx = idx;
982   CkpvAccess(currentChareIdx) = idx;
983 #endif
984   _invokeEntry(env->getEpIdx(),env,obj);
985 }
986
987 /************** Receive: Chares *************/
988
989 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
990 {
991   register int epIdx = env->getEpIdx();
992   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
993   register void *obj;
994   if (mainIdx != -1)  {           // mainchare
995     CmiAssert(CkMyPe()==0);
996     obj = _mainTable[mainIdx]->getObj();
997   }
998   else {
999 #ifndef CMK_CHARE_USE_PTR
1000     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
1001       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
1002     else
1003       obj = env->getObjPtr();
1004 #else
1005     obj = env->getObjPtr();
1006 #endif
1007   }
1008   _invokeEntry(epIdx,env,obj);
1009 }
1010
1011 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
1012 {
1013   register int epIdx = env->getEpIdx();
1014   register void *obj = env->getObjPtr();
1015   _invokeEntry(epIdx,env,obj);
1016 }
1017
1018 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1019 {
1020 #ifndef CMK_CHARE_USE_PTR
1021   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1022 #else
1023   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
1024   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1025 #endif
1026   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1027   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1028   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1029   CmiFree(env);
1030 }
1031
1032 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1033 {
1034 #ifndef CMK_CHARE_USE_PTR
1035   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1036 #else
1037   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1038   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1039 #endif
1040   _SET_USED(env, 1);
1041   vptr->send(env);
1042 }
1043
1044 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1045 {
1046 #ifndef CMK_CHARE_USE_PTR
1047   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1048   delete vptr;
1049   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1050 #endif
1051   CmiFree(env);
1052 }
1053
1054 /************** Receive: Groups ****************/
1055
1056 /**
1057  Return a pointer to the local BOC of "groupID".
1058  The message "env" passed in has some known dependency on this groupID
1059  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1060  Therefore, if the return value is NULL, this function buffers the message so that
1061  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1062  The message passed in must have its handlers correctly set so that it can be
1063  scheduled again.
1064 */
1065 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1066 {
1067
1068         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1069         IrrGroup *obj = ck->localBranch(groupID);
1070         if (obj==NULL) { /* groupmember not yet created: stash message */
1071                 ck->getGroupTable()->find(groupID).enqMsg(env);
1072         }
1073         else { /* will be able to process message */
1074                 ck->process();
1075         }
1076         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1077         return obj;
1078 }
1079
1080 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1081 {
1082   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1083 }
1084
1085 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1086 {
1087 #if CMK_LBDB_ON
1088   // if there is a running obj being measured, stop it temporarily
1089   LDObjHandle objHandle;
1090   int objstopped = 0;
1091   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1092   if (the_lbdb->RunningObject(&objHandle)) {
1093     objstopped = 1;
1094     the_lbdb->ObjectStop(objHandle);
1095   }
1096 #endif
1097   _invokeEntry(epIdx,env,obj);
1098 #if CMK_LBDB_ON
1099   if (objstopped) the_lbdb->ObjectStart(objHandle);
1100 #endif
1101   _STATS_RECORD_PROCESS_BRANCH_1();
1102 }
1103
1104 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1105 {
1106   register CkGroupID groupID =  env->getGroupNum();
1107   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1108   if(obj) {
1109     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1110   }
1111 }
1112
1113 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1114 {
1115   env->setMsgtype(ForChareMsg);
1116   env->setObjPtr(obj);
1117   _processForChareMsg(ck,env);
1118   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1119 }
1120
1121 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1122 {
1123   env->setEpIdx(epIdx);
1124   _deliverForNodeBocMsg(ck,env, obj);
1125 }
1126
1127 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1128 {
1129   register CkGroupID groupID = env->getGroupNum();
1130   register void *obj;
1131
1132   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1133   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1134   if(!obj) { // groupmember not yet created
1135 #if CMK_IMMEDIATE_MSG
1136     if (CmiIsImmediate(env)) {
1137       //CmiDelayImmediate();        /* buffer immediate message */
1138       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1139     }
1140 #endif
1141     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1142     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1143     return;
1144   }
1145   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1146   ck->process();
1147   env->setMsgtype(ForChareMsg);
1148   env->setObjPtr(obj);
1149   _processForChareMsg(ck,env);
1150   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1151 }
1152
1153 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1154 {
1155   register CkGroupID groupID = env->getGroupNum();
1156   register int epIdx = env->getEpIdx();
1157   if (!env->getGroupDep().isZero()) {      // dependence
1158     CkGroupID dep = env->getGroupDep();
1159     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1160     if (obj == NULL) return;
1161   }
1162   else
1163     ck->process();
1164   CkCreateLocalGroup(groupID, epIdx, env);
1165 }
1166
1167 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1168 {
1169   register CkGroupID groupID = env->getGroupNum();
1170   register int epIdx = env->getEpIdx();
1171   CkCreateLocalNodeGroup(groupID, epIdx, env);
1172 }
1173
1174 /************** Receive: Arrays *************/
1175
1176 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1177   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1178   if (mgr) {
1179     _SET_USED(env, 0);
1180     mgr->insertElement((CkMessage *)EnvToUsr(env));
1181   }
1182 }
1183 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1184   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1185   if (mgr) {
1186     _SET_USED(env, 0);
1187     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1188   }
1189 }
1190
1191 //BIGSIM_OOC DEBUGGING
1192 #define TELLMSGTYPE(x) //x
1193
1194 /**
1195  * This is the main converse-level handler used by all of Charm++.
1196  *
1197  * \addtogroup CriticalPathFramework
1198  */
1199 void _processHandler(void *converseMsg,CkCoreState *ck)
1200 {
1201   register envelope *env = (envelope *) converseMsg;
1202
1203   MESSAGE_PHASE_CHECK(env);
1204
1205 //#if CMK_RECORD_REPLAY
1206   if (ck->watcher!=NULL) {
1207     if (!ck->watcher->processMessage(&env,ck)) return;
1208   }
1209 //#endif
1210 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1211         Chare *obj=NULL;
1212         CkObjID sender;
1213         MCount SN;
1214         MlogEntry *entry=NULL;
1215         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1216            || env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForIDedObjMsg
1217            || env->getMsgtype() == ForChareMsg) {
1218                 sender = env->sender;
1219                 SN = env->SN;
1220                 int result = preProcessReceivedMessage(env,&obj,&entry);
1221                 if(result == 0){
1222                         return;
1223                 }
1224         }
1225 #endif
1226
1227 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1228   //  CkPrintf("START\n");
1229   criticalPath_start(env);
1230 #endif
1231
1232
1233   switch(env->getMsgtype()) {
1234 // Group support
1235     case BocInitMsg :
1236       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1237       // QD processing moved inside _processBocInitMsg because it is conditional
1238       //ck->process(); 
1239       if(env->isPacked()) CkUnpackMessage(&env);
1240       _processBocInitMsg(ck,env);
1241       break;
1242     case NodeBocInitMsg :
1243       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1244       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1245       _processNodeBocInitMsg(ck,env);
1246       break;
1247     case ForBocMsg :
1248       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1249       // QD processing moved inside _processForBocMsg because it is conditional
1250       if(env->isPacked()) CkUnpackMessage(&env);
1251       _processForBocMsg(ck,env);
1252       // stats record moved inside _processForBocMsg because it is conditional
1253       break;
1254     case ForNodeBocMsg :
1255       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1256       // QD processing moved to _processForNodeBocMsg because it is conditional
1257       if(env->isPacked()) CkUnpackMessage(&env);
1258       _processForNodeBocMsg(ck,env);
1259       // stats record moved to _processForNodeBocMsg because it is conditional
1260       break;
1261
1262 // Array support
1263     case ArrayEltInitMsg:
1264       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1265       if(env->isPacked()) CkUnpackMessage(&env);
1266       _processArrayEltInitMsg(ck,env);
1267       break;
1268     case ForIDedObjMsg:
1269     case ForArrayEltMsg:
1270       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1271       if(env->isPacked()) CkUnpackMessage(&env);
1272       _processArrayEltMsg(ck,env);
1273       break;
1274
1275 // Chare support
1276     case NewChareMsg :
1277       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1278       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1279       _processNewChareMsg(ck,env);
1280       _STATS_RECORD_PROCESS_CHARE_1();
1281       break;
1282     case NewVChareMsg :
1283       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1284       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1285       _processNewVChareMsg(ck,env);
1286       _STATS_RECORD_PROCESS_CHARE_1();
1287       break;
1288     case ForChareMsg :
1289       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1290       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1291       _processForPlainChareMsg(ck,env);
1292       _STATS_RECORD_PROCESS_MSG_1();
1293       break;
1294     case ForVidMsg   :
1295       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1296       ck->process();
1297       _processForVidMsg(ck,env);
1298       break;
1299     case FillVidMsg  :
1300       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1301       ck->process();
1302       _processFillVidMsg(ck,env);
1303       break;
1304     case DeleteVidMsg  :
1305       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1306       ck->process();
1307       _processDeleteVidMsg(ck,env);
1308       break;
1309
1310     default:
1311       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1312   }
1313 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1314         if(obj != NULL){
1315                 postProcessReceivedMessage(obj,sender,SN,entry);
1316         }
1317 #endif
1318
1319
1320 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1321   criticalPath_end();
1322   //  CkPrintf("STOP\n");
1323 #endif
1324
1325
1326 }
1327
1328
1329 /******************** Message Send **********************/
1330
1331 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1332              int *queueing, int *priobits, unsigned int **prioptr)
1333 {
1334   register envelope *env = (envelope *)converseMsg;
1335   *pfn = (CldPackFn)CkPackMessage;
1336   *len = env->getTotalsize();
1337   *queueing = env->getQueueing();
1338   *priobits = env->getPriobits();
1339   *prioptr = (unsigned int *) env->getPrioPtr();
1340 }
1341
1342 void CkPackMessage(envelope **pEnv)
1343 {
1344   register envelope *env = *pEnv;
1345   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1346     register void *msg = EnvToUsr(env);
1347     _TRACE_BEGIN_PACK();
1348     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1349     _TRACE_END_PACK();
1350     env=UsrToEnv(msg);
1351     env->setPacked(1);
1352     *pEnv = env;
1353   }
1354 }
1355
1356 void CkUnpackMessage(envelope **pEnv)
1357 {
1358   register envelope *env = *pEnv;
1359   register int msgIdx = env->getMsgIdx();
1360   if(env->isPacked()) {
1361     register void *msg = EnvToUsr(env);
1362     _TRACE_BEGIN_UNPACK();
1363     msg = _msgTable[msgIdx]->unpack(msg);
1364     _TRACE_END_UNPACK();
1365     env=UsrToEnv(msg);
1366     env->setPacked(0);
1367     *pEnv = env;
1368   }
1369 }
1370
1371 //There's no reason for most messages to go through the Cld--
1372 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1373 // Thus these accellerated versions of the Cld calls.
1374 #if CMK_OBJECT_QUEUE_AVAILABLE
1375 static int index_objectQHandler;
1376 #endif
1377 int index_tokenHandler;
1378 int index_skipCldHandler;
1379
1380 void _skipCldHandler(void *converseMsg)
1381 {
1382   register envelope *env = (envelope *)(converseMsg);
1383   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1384 #if CMK_GRID_QUEUE_AVAILABLE
1385   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1386     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1387                        env, env->getQueueing (), env->getPriobits (),
1388                        (unsigned int *) env->getPrioPtr ());
1389   } else {
1390     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1391                        env, env->getQueueing (), env->getPriobits (),
1392                        (unsigned int *) env->getPrioPtr ());
1393   }
1394 #else
1395   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1396         env, env->getQueueing(),env->getPriobits(),
1397         (unsigned int *)env->getPrioPtr());
1398 #endif
1399 }
1400
1401
1402 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1403 // Made non-static to be used by ckmessagelogging
1404 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1405 {
1406 #if CMK_CHARMDEBUG
1407   if (!ConverseDeliver(pe)) {
1408     CmiFree(env);
1409     return;
1410   }
1411 #endif
1412   if(pe == CkMyPe() ){
1413     if(!CmiNodeAlive(CkMyPe())){
1414         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1415 //      return;
1416     }
1417   }
1418   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1419 #if CMK_OBJECT_QUEUE_AVAILABLE
1420     Chare *obj = CkFindObjectPtr(env);
1421     if (obj && obj->CkGetObjQueue().queue()) {
1422       _enqObjQueue(obj, env);
1423     }
1424     else
1425 #endif
1426     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1427         env, env->getQueueing(),env->getPriobits(),
1428         (unsigned int *)env->getPrioPtr());
1429 #if CMK_PERSISTENT_COMM
1430     CmiPersistentOneSend();
1431 #endif
1432   } else {
1433     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1434       CkPackMessage(&env);
1435     int len=env->getTotalsize();
1436     CmiSetXHandler(env,CmiGetHandler(env));
1437 #if CMK_OBJECT_QUEUE_AVAILABLE
1438     CmiSetHandler(env,index_objectQHandler);
1439 #else
1440     CmiSetHandler(env,index_skipCldHandler);
1441 #endif
1442     CmiSetInfo(env,infoFn);
1443     if (pe==CLD_BROADCAST) {
1444 #if CMK_MESSAGE_LOGGING
1445         if(env->flags & CK_FREE_MSG_MLOG)
1446                 CmiSyncBroadcastAndFree(len, (char *)env); 
1447         else
1448                 CmiSyncBroadcast(len, (char *)env);
1449 #else
1450                         CmiSyncBroadcastAndFree(len, (char *)env); 
1451 #endif
1452
1453 }
1454     else if (pe==CLD_BROADCAST_ALL) { 
1455 #if CMK_MESSAGE_LOGGING
1456         if(env->flags & CK_FREE_MSG_MLOG)
1457                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1458         else
1459                 CmiSyncBroadcastAll(len, (char *)env);
1460 #else
1461                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1462 #endif
1463
1464 }
1465     else{
1466 #if CMK_MESSAGE_LOGGING
1467         if(env->flags & CK_FREE_MSG_MLOG)
1468                 CmiSyncSendAndFree(pe, len, (char *)env);
1469         else
1470                 CmiSyncSend(pe, len, (char *)env);
1471 #else
1472                         CmiSyncSendAndFree(pe, len, (char *)env);
1473 #endif
1474
1475                 }
1476   }
1477 }
1478
1479 #if CMK_BIGSIM_CHARM
1480 #   define  _skipCldEnqueue   _CldEnqueue
1481 #endif
1482
1483 // by pass Charm++ priority queue, send as Converse message
1484 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1485 {
1486 #if CMK_CHARMDEBUG
1487   if (!ConverseDeliver(-1)) {
1488     CmiFree(env);
1489     return;
1490   }
1491 #endif
1492   CkPackMessage(&env);
1493   int len=env->getTotalsize();
1494   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1495 }
1496
1497 static void _noCldEnqueue(int pe, envelope *env)
1498 {
1499 /*
1500   if (pe == CkMyPe()) {
1501     CmiHandleMessage(env);
1502   } else
1503 */
1504 #if CMK_CHARMDEBUG
1505   if (!ConverseDeliver(pe)) {
1506     CmiFree(env);
1507     return;
1508   }
1509 #endif
1510   CkPackMessage(&env);
1511   int len=env->getTotalsize();
1512   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1513   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1514   else CmiSyncSendAndFree(pe, len, (char *)env);
1515 }
1516
1517 //static void _noCldNodeEnqueue(int node, envelope *env)
1518 //Made non-static to be used by ckmessagelogging
1519 void _noCldNodeEnqueue(int node, envelope *env)
1520 {
1521 /*
1522   if (node == CkMyNode()) {
1523     CmiHandleMessage(env);
1524   } else {
1525 */
1526 #if CMK_CHARMDEBUG
1527   if (!ConverseDeliver(node)) {
1528     CmiFree(env);
1529     return;
1530   }
1531 #endif
1532   CkPackMessage(&env);
1533   int len=env->getTotalsize();
1534   if (node==CLD_BROADCAST) { 
1535 #if CMK_MESSAGE_LOGGING
1536         if(env->flags & CK_FREE_MSG_MLOG)
1537                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1538         else
1539                 CmiSyncNodeBroadcast(len, (char *)env);
1540 #else
1541         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1542 #endif
1543 }
1544   else if (node==CLD_BROADCAST_ALL) { 
1545 #if CMK_MESSAGE_LOGGING
1546         if(env->flags & CK_FREE_MSG_MLOG)
1547                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1548         else
1549                 CmiSyncNodeBroadcastAll(len, (char *)env);
1550 #else
1551                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1552 #endif
1553
1554 }
1555   else {
1556 #if CMK_MESSAGE_LOGGING
1557         if(env->flags & CK_FREE_MSG_MLOG)
1558                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1559         else
1560                 CmiSyncNodeSend(node, len, (char *)env);
1561 #else
1562         CmiSyncNodeSendAndFree(node, len, (char *)env);
1563 #endif
1564   }
1565 }
1566
1567 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1568 {
1569   register envelope *env = UsrToEnv(msg);
1570   _CHECK_USED(env);
1571   _SET_USED(env, 1);
1572 #if CMK_REPLAYSYSTEM
1573   setEventID(env);
1574 #endif
1575   env->setMsgtype(ForChareMsg);
1576   env->setEpIdx(eIdx);
1577   env->setSrcPe(CkMyPe());
1578 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1579   criticalPath_send(env);
1580   automaticallySetMessagePriority(env);
1581 #endif
1582 #if CMK_CHARMDEBUG
1583   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1584 #endif
1585 #if CMK_OBJECT_QUEUE_AVAILABLE
1586   CmiSetHandler(env, index_objectQHandler);
1587 #else
1588   CmiSetHandler(env, _charmHandlerIdx);
1589 #endif
1590   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1591     register int pe = -(pCid->onPE+1);
1592     if(pe==CkMyPe()) {
1593 #ifndef CMK_CHARE_USE_PTR
1594       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1595 #else
1596       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1597 #endif
1598       void *objPtr;
1599       if (NULL!=(objPtr=vblk->getLocalChare()))
1600       { //A ready local chare
1601         env->setObjPtr(objPtr);
1602         return pe;
1603       }
1604       else { //The vidblock is not ready-- forget it
1605         vblk->send(env);
1606         return -1;
1607       }
1608     } else { //Valid vidblock for another PE:
1609       env->setMsgtype(ForVidMsg);
1610       env->setVidPtr(pCid->objPtr);
1611       return pe;
1612     }
1613   }
1614   else {
1615     env->setObjPtr(pCid->objPtr);
1616     return pCid->onPE;
1617   }
1618 }
1619
1620 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1621 {
1622   int destPE = _prepareMsg(eIdx, msg, pCid);
1623   if (destPE != -1) {
1624     register envelope *env = UsrToEnv(msg);
1625 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1626     criticalPath_send(env);
1627     automaticallySetMessagePriority(env);
1628 #endif
1629     CmiBecomeImmediate(env);
1630   }
1631   return destPE;
1632 }
1633
1634 extern "C"
1635 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1636 {
1637   if (opts & CK_MSG_INLINE) {
1638     CkSendMsgInline(entryIdx, msg, pCid, opts);
1639     return;
1640   }
1641 #if CMK_ERROR_CHECKING
1642   if (opts & CK_MSG_IMMEDIATE) {
1643     CmiAbort("Immediate message is not allowed in Chare!");
1644   }
1645 #endif
1646   register envelope *env = UsrToEnv(msg);
1647   int destPE=_prepareMsg(entryIdx,msg,pCid);
1648   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1649   // VidBlock was not yet filled). The problem is that the creation was never
1650   // traced later when the VidBlock was filled. One solution is to trace the
1651   // creation here, the other to trace it in VidBlock->msgDeliver().
1652 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1653   if (destPE!=-1) {
1654     CpvAccess(_qd)->create();
1655   }
1656         sendChareMsg(env,destPE,_infoIdx,pCid);
1657 #else
1658   _TRACE_CREATION_1(env);
1659   if (destPE!=-1) {
1660     CpvAccess(_qd)->create();
1661     if (opts & CK_MSG_SKIP_OR_IMM)
1662       _noCldEnqueue(destPE, env);
1663     else
1664       _CldEnqueue(destPE, env, _infoIdx);
1665   }
1666   _TRACE_CREATION_DONE(1);
1667 #endif
1668 }
1669
1670 extern "C"
1671 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1672 {
1673   if (pCid->onPE==CkMyPe())
1674   {
1675     if(!CmiNodeAlive(CkMyPe())){
1676         return;
1677     }
1678 #if CMK_CHARMDEBUG
1679     //Just in case we need to breakpoint or use the envelope in some way
1680     _prepareMsg(entryIndex,msg,pCid);
1681 #endif
1682                 //Just directly call the chare (skip QD handling & scheduler)
1683     register envelope *env = UsrToEnv(msg);
1684     if (env->isPacked()) CkUnpackMessage(&env);
1685     _STATS_RECORD_PROCESS_MSG_1();
1686     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1687   }
1688   else {
1689     //No way to inline a cross-processor message:
1690     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1691   }
1692 }
1693
1694 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1695 {
1696   register envelope *env = UsrToEnv(msg);
1697   /*#if CMK_ERROR_CHECKING
1698   CkNodeGroupID nodeRedMgr;
1699 #endif
1700   */
1701   _CHECK_USED(env);
1702   _SET_USED(env, 1);
1703 #if CMK_REPLAYSYSTEM
1704   setEventID(env);
1705 #endif
1706   env->setMsgtype(type);
1707   env->setEpIdx(eIdx);
1708   env->setGroupNum(gID);
1709   env->setSrcPe(CkMyPe());
1710   /*
1711 #if CMK_ERROR_CHECKING
1712   nodeRedMgr.setZero();
1713   env->setRednMgr(nodeRedMgr);
1714 #endif
1715   */
1716 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1717   criticalPath_send(env);
1718   automaticallySetMessagePriority(env);
1719 #endif
1720 #if CMK_CHARMDEBUG
1721   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1722 #endif
1723   CmiSetHandler(env, _charmHandlerIdx);
1724   return env;
1725 }
1726
1727 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1728 {
1729   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1730 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1731   criticalPath_send(env);
1732   automaticallySetMessagePriority(env);
1733 #endif
1734   CmiBecomeImmediate(env);
1735   return env;
1736 }
1737
1738 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1739                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1740 {
1741   int numPes;
1742   register envelope *env;
1743     if (opts & CK_MSG_IMMEDIATE) {
1744         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1745     }else
1746     {
1747         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1748     }
1749
1750 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1751         sendGroupMsg(env,pe,_infoIdx);
1752 #else
1753   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1754   _TRACE_CREATION_N(env, numPes);
1755   if (opts & CK_MSG_SKIP_OR_IMM)
1756     _noCldEnqueue(pe, env);
1757   else
1758     _skipCldEnqueue(pe, env, _infoIdx);
1759   _TRACE_CREATION_DONE(1);
1760 #endif
1761 }
1762
1763 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1764                            int npes, int *pes)
1765 {
1766   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1767   _TRACE_CREATION_MULTICAST(env, npes, pes);
1768   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1769   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1770 }
1771
1772 extern "C"
1773 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1774 {
1775 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1776   if (destPE==CkMyPe())
1777   {
1778     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1779     return;
1780   }
1781   //Can't inline-- send the usual way
1782   register envelope *env = UsrToEnv(msg);
1783   int numPes;
1784   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1785   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1786   _TRACE_CREATION_N(env, numPes);
1787   _noCldEnqueue(destPE, env);
1788   _STATS_RECORD_SEND_BRANCH_1();
1789   CkpvAccess(_coreState)->create();
1790   _TRACE_CREATION_DONE(1);
1791 #else
1792   // no support for immediate message, send inline
1793   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1794 #endif
1795 }
1796
1797 extern "C"
1798 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1799 {
1800   if (destPE==CkMyPe())
1801   {
1802     if(!CmiNodeAlive(CkMyPe())){
1803         return;
1804     }
1805     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1806     if (obj!=NULL)
1807     { //Just directly call the group:
1808 #if CMK_ERROR_CHECKING
1809       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1810 #else
1811       envelope *env=UsrToEnv(msg);
1812 #endif
1813       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1814       return;
1815     }
1816   }
1817   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1818   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1819 }
1820
1821 extern "C"
1822 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1823 {
1824   if (opts & CK_MSG_INLINE) {
1825     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1826     return;
1827   }
1828   if (opts & CK_MSG_IMMEDIATE) {
1829     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1830     return;
1831   }
1832   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1833   _STATS_RECORD_SEND_BRANCH_1();
1834   CkpvAccess(_coreState)->create();
1835 }
1836
1837 extern "C"
1838 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1839 {
1840 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1841   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1842   _TRACE_CREATION_MULTICAST(env, npes, pes);
1843   _noCldEnqueueMulti(npes, pes, env);
1844   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1845 #else
1846   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1847   CpvAccess(_qd)->create(-npes);
1848 #endif
1849   _STATS_RECORD_SEND_BRANCH_N(npes);
1850   CpvAccess(_qd)->create(npes);
1851 }
1852
1853 extern "C"
1854 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1855 {
1856   if (opts & CK_MSG_IMMEDIATE) {
1857     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1858     return;
1859   }
1860     // normal mesg
1861   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1862   _STATS_RECORD_SEND_BRANCH_N(npes);
1863   CpvAccess(_qd)->create(npes);
1864 }
1865
1866 extern "C"
1867 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1868 {
1869   int npes;
1870   int *pes;
1871   if (opts & CK_MSG_IMMEDIATE) {
1872     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1873     return;
1874   }
1875     // normal mesg
1876   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1877   CmiLookupGroup(grp, &npes, &pes);
1878   _TRACE_CREATION_MULTICAST(env, npes, pes);
1879   _CldEnqueueGroup(grp, env, _infoIdx);
1880   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1881   _STATS_RECORD_SEND_BRANCH_N(npes);
1882   CpvAccess(_qd)->create(npes);
1883 }
1884
1885 extern "C"
1886 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1887 {
1888   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1889   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1890   CpvAccess(_qd)->create(CkNumPes());
1891 }
1892
1893 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1894                 int node=CLD_BROADCAST_ALL, int opts=0)
1895 {
1896     int numPes;
1897     register envelope *env;
1898     if (opts & CK_MSG_IMMEDIATE) {
1899         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1900     }else
1901     {
1902         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1903     }
1904 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1905     sendNodeGroupMsg(env,node,_infoIdx);
1906 #else
1907   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1908   _TRACE_CREATION_N(env, numPes);
1909   if (opts & CK_MSG_SKIP_OR_IMM) {
1910     _noCldNodeEnqueue(node, env);
1911   }
1912   else
1913     _CldNodeEnqueue(node, env, _infoIdx);
1914   _TRACE_CREATION_DONE(1);
1915 #endif
1916 }
1917
1918 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1919                            int npes, int *nodes)
1920 {
1921   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1922   _TRACE_CREATION_N(env, npes);
1923   for (int i=0; i<npes; i++) {
1924     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1925   }
1926   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1927 }
1928
1929 extern "C"
1930 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1931 {
1932 #if CMK_IMMEDIATE_MSG
1933   if (node==CkMyNode())
1934   {
1935     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1936     return;
1937   }
1938   //Can't inline-- send the usual way
1939   register envelope *env = UsrToEnv(msg);
1940   int numPes;
1941   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1942   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1943   _TRACE_CREATION_N(env, numPes);
1944   _noCldNodeEnqueue(node, env);
1945   _STATS_RECORD_SEND_BRANCH_1();
1946   CkpvAccess(_coreState)->create();
1947   _TRACE_CREATION_DONE(1);
1948 #else
1949   // no support for immediate message, send inline
1950   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1951 #endif
1952 }
1953
1954 extern "C"
1955 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1956 {
1957   if (node==CkMyNode())
1958   {
1959     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1960     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1961     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1962     if (obj!=NULL)
1963     { //Just directly call the group:
1964 #if CMK_ERROR_CHECKING
1965       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1966 #else
1967       envelope *env=UsrToEnv(msg);
1968 #endif
1969       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1970       return;
1971     }
1972   }
1973   //Can't inline-- send the usual way
1974   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1975 }
1976
1977 extern "C"
1978 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1979 {
1980   if (opts & CK_MSG_INLINE) {
1981     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1982     return;
1983   }
1984   if (opts & CK_MSG_IMMEDIATE) {
1985     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1986     return;
1987   }
1988   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1989   _STATS_RECORD_SEND_NODE_BRANCH_1();
1990   CkpvAccess(_coreState)->create();
1991 }
1992
1993 extern "C"
1994 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1995 {
1996 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1997   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1998   _noCldEnqueueMulti(npes, nodes, env);
1999 #else
2000   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2001   CpvAccess(_qd)->create(-npes);
2002 #endif
2003   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2004   CpvAccess(_qd)->create(npes);
2005 }
2006
2007 extern "C"
2008 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2009 {
2010   if (opts & CK_MSG_IMMEDIATE) {
2011     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2012     return;
2013   }
2014     // normal mesg
2015   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2016   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2017   CpvAccess(_qd)->create(npes);
2018 }
2019
2020 extern "C"
2021 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2022 {
2023   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2024   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2025   CpvAccess(_qd)->create(CkNumNodes());
2026 }
2027
2028 //Needed by delegation manager:
2029 extern "C"
2030 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2031 { return _prepareMsg(eIdx,msg,pCid); }
2032 extern "C"
2033 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2034 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2035 extern "C"
2036 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2037 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2038
2039 void _ckModuleInit(void) {
2040         index_skipCldHandler = CkRegisterHandler(_skipCldHandler);
2041 #if CMK_OBJECT_QUEUE_AVAILABLE
2042         index_objectQHandler = CkRegisterHandler(_ObjectQHandler);
2043 #endif
2044         index_tokenHandler = CkRegisterHandler(_TokenHandler);
2045         CkpvInitialize(TokenPool*, _tokenPool);
2046         CkpvAccess(_tokenPool) = new TokenPool;
2047 }
2048
2049
2050 /************** Send: Arrays *************/
2051
2052 extern void CkArrayManagerInsert(int onPe,void *msg);
2053 //extern void CkArrayManagerDeliver(int onPe,void *msg);
2054
2055 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2056 {
2057   _CHECK_USED(env);
2058   _SET_USED(env, 1);
2059   env->setMsgtype(type);
2060 #if CMK_CHARMDEBUG
2061   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2062 #endif
2063   CmiSetHandler(env, _charmHandlerIdx);
2064   CpvAccess(_qd)->create();
2065 }
2066
2067 extern "C"
2068 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
2069   register envelope *env = UsrToEnv(msg);
2070   env->setArrayMgr(aID);
2071   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
2072   _CldEnqueue(pe, env, _infoIdx);
2073 }
2074
2075 extern "C"
2076 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2077   register envelope *env = UsrToEnv(msg);
2078   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2079 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2080         sendArrayMsg(env,pe,_infoIdx);
2081 #else
2082   if (opts & CK_MSG_IMMEDIATE)
2083     CmiBecomeImmediate(env);
2084   if (opts & CK_MSG_SKIP_OR_IMM)
2085     _noCldEnqueue(pe, env);
2086   else
2087     _skipCldEnqueue(pe, env, _infoIdx);
2088 #endif
2089 }
2090
2091 class ElementDestroyer : public CkLocIterator {
2092 private:
2093         CkLocMgr *locMgr;
2094 public:
2095         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2096         void addLocation(CkLocation &loc) {
2097           loc.destroyAll();
2098         }
2099 };
2100
2101 void CkDeleteChares() {
2102   int i;
2103   int numGroups = CkpvAccess(_groupIDTable)->size();
2104
2105   // delete all plain chares
2106 #ifndef CMK_CHARE_USE_PTR
2107   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2108         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2109         delete obj;
2110         CkpvAccess(chare_objs)[i] = NULL;
2111   }
2112   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2113         VidBlock *obj = CkpvAccess(vidblocks)[i];
2114         delete obj;
2115         CkpvAccess(vidblocks)[i] = NULL;
2116   }
2117 #endif
2118
2119   // delete all array elements
2120   for(i=0;i<numGroups;i++) {
2121     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2122     if(obj && obj->isLocMgr())  {
2123       CkLocMgr *mgr = (CkLocMgr*)obj;
2124       ElementDestroyer destroyer(mgr);
2125       mgr->iterate(destroyer);
2126     }
2127   }
2128
2129   // delete all groups
2130   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2131   for(i=0;i<numGroups;i++) {
2132     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2133     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2134     if (obj) delete obj;
2135   }
2136   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2137
2138   // delete all node groups
2139   if (CkMyRank() == 0) {
2140     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2141     for(i=0;i<numNodeGroups;i++) {
2142       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2143       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2144       if (obj) delete obj;
2145     }
2146   }
2147 }
2148
2149 #if CMK_BIGSIM_CHARM
2150 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2151                                    int pb,unsigned int *prio);
2152 #endif
2153
2154 //------------------- Message Watcher (record/replay) ----------------
2155
2156 #include "crc32.h"
2157
2158 CkpvDeclare(int, envelopeEventID);
2159 int _recplay_crc = 0;
2160 int _recplay_checksum = 0;
2161 int _recplay_logsize = 1024*1024;
2162
2163 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2164 #define REPLAYDEBUG(args) /* empty */
2165
2166 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2167
2168 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2169 #include "BaseLB.h" /* For LBMigrateMsg message */
2170
2171 #if CMK_REPLAYSYSTEM
2172 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2173
2174     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2175     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2176     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2177     FILE *f=fopen(fName,permissions);
2178     REPLAYDEBUG("openReplayfile "<<fName);
2179     if (f==NULL) {
2180         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2181             CkMyPe(),fName,permissions);
2182         CkAbort("openReplayFile> Could not open replay file");
2183     }
2184     return f;
2185 }
2186
2187 class CkMessageRecorder : public CkMessageWatcher {
2188   char *buffer;
2189   unsigned int curpos;
2190   bool firstOpen;
2191 public:
2192   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2193   ~CkMessageRecorder() {
2194     flushLog(0);
2195     fprintf(f,"-1 -1 -1 ");
2196     fclose(f);
2197     delete[] buffer;
2198 #if 0
2199     FILE *stsfp = fopen("sts", "w");
2200     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2201     traceWriteSTS(stsfp, 0);
2202     fclose(stsfp);
2203 #endif
2204     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2205   }
2206
2207 private:
2208   void flushLog(int verbose=1) {
2209     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2210     fprintf(f, "%s", buffer);
2211     curpos=0;
2212   }
2213   virtual bool process(envelope **envptr,CkCoreState *ck) {
2214     if ((*envptr)->getEvent()) {
2215       bool wasPacked = (*envptr)->isPacked();
2216       if (!wasPacked) CkPackMessage(envptr);
2217       envelope *env = *envptr;
2218       unsigned int crc1=0, crc2=0;
2219       if (_recplay_crc) {
2220         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2221         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2222         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2223       } else if (_recplay_checksum) {
2224         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2225         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2226       }
2227       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2228       if (curpos > _recplay_logsize-128) flushLog();
2229       if (!wasPacked) CkUnpackMessage(envptr);
2230     }
2231     return true;
2232   }
2233   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2234     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2235     if (curpos > _recplay_logsize-128) flushLog();
2236     return true;
2237   }
2238   
2239   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2240     FILE *f;
2241     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2242     else f = openReplayFile("ckreplay_",".lb","a");
2243     firstOpen = false;
2244     if (f != NULL) {
2245       PUP::toDisk p(f);
2246       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2247       (*msg)->pup(p);
2248       fclose(f);
2249     }
2250     return true;
2251   }
2252 };
2253
2254 class CkMessageDetailRecorder : public CkMessageWatcher {
2255 public:
2256   CkMessageDetailRecorder(FILE *f_) {
2257     f=f_;
2258     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2259      * The value of 'x' is the pointer size.
2260      */
2261     CmiUInt2 little = sizeof(void*);
2262     fwrite(&little, 2, 1, f);
2263   }
2264   ~CkMessageDetailRecorder() {fclose(f);}
2265 private:
2266   virtual bool process(envelope **envptr, CkCoreState *ck) {
2267     bool wasPacked = (*envptr)->isPacked();
2268     if (!wasPacked) CkPackMessage(envptr);
2269     envelope *env = *envptr;
2270     CmiUInt4 size = env->getTotalsize();
2271     fwrite(&size, 4, 1, f);
2272     fwrite(env, env->getTotalsize(), 1, f);
2273     if (!wasPacked) CkUnpackMessage(envptr);
2274     return true;
2275   }
2276 };
2277
2278 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2279 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2280
2281 class CkMessageReplay : public CkMessageWatcher {
2282   int counter;
2283         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2284         int nextEP;
2285         unsigned int crc1, crc2;
2286         FILE *lbFile;
2287         /// Read the next message we need from the file:
2288         void getNext(void) {
2289           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2290           if (nextSize > 0) {
2291             // We are reading a regular message
2292             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2293               CkAbort("CkMessageReplay> Syntax error reading replay file");
2294             }
2295             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2296           } else if (nextSize == -2) {
2297             // We are reading a special message (right now only thread awaken)
2298             // Nothing to do since we have already read all info
2299             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2300           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2301             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2302             CkAbort("CkMessageReplay> Unrecognized input");
2303           }
2304             /*
2305                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2306                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2307                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2308                 }
2309                 */
2310                 counter++;
2311         }
2312         /// If this is the next message we need, advance and return true.
2313         bool isNext(envelope *env) {
2314                 if (nextPE!=env->getSrcPe()) return false;
2315                 if (nextEvent!=env->getEvent()) return false;
2316                 if (nextSize<0) return false; // not waiting for a regular message
2317 #if 1
2318                 if (nextEP != env->getEpIdx()) {
2319                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2320                         return false;
2321                 }
2322 #endif
2323 #if ! CMK_BIGSIM_CHARM
2324                 if (nextSize!=env->getTotalsize())
2325                 {
2326                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2327                         return false;
2328                 }
2329                 if (_recplay_crc || _recplay_checksum) {
2330                   bool wasPacked = env->isPacked();
2331                   if (!wasPacked) CkPackMessage(&env);
2332                   if (_recplay_crc) {
2333                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2334                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2335                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2336                     if (crcnew1 != crc1) {
2337                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2338                     }
2339                     if (crcnew2 != crc2) {
2340                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2341                     }
2342                   } else if (_recplay_checksum) {
2343             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2344             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2345             if (crcnew1 != crc1) {
2346               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2347             }
2348             if (crcnew2 != crc2) {
2349               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2350             }               
2351                   }
2352                   if (!wasPacked) CkUnpackMessage(&env);
2353                 }
2354 #endif
2355                 return true;
2356         }
2357         bool isNext(CthThreadToken *token) {
2358           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2359           return false;
2360         }
2361
2362         /// This is a (short) list of messages we aren't yet ready for:
2363         CkQ<envelope *> delayedMessages;
2364         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2365         CkQ<CthThreadToken *> delayedTokens;
2366
2367         /// Try to flush out any delayed messages
2368         void flush(void) {
2369           if (nextSize>0) {
2370                 int len=delayedMessages.length();
2371                 for (int i=0;i<len;i++) {
2372                         envelope *env=delayedMessages.deq();
2373                         if (isNext(env)) { /* this is the next message: process it */
2374                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2375                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2376                                 return;
2377                         }
2378                         else /* Not ready yet-- put it back in the
2379                                 queue */
2380                           {
2381                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2382                                 delayedMessages.enq(env);
2383                           }
2384                 }
2385           } else if (nextSize==-2) {
2386             int len=delayedTokens.length();
2387             for (int i=0;i<len;++i) {
2388               CthThreadToken *token=delayedTokens.deq();
2389               if (isNext(token)) {
2390             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2391 #if ! CMK_BIGSIM_CHARM
2392                 CsdEnqueueLifo((void*)token);
2393 #else
2394                 CthEnqueueBigSimThread(token,0,0,NULL);
2395 #endif
2396                 return;
2397               } else {
2398             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2399                 delayedTokens.enq(token);
2400               }
2401             }
2402           }
2403         }
2404
2405 public:
2406         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2407           counter=0;
2408           f=f_;
2409           getNext();
2410           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2411           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2412         }
2413         ~CkMessageReplay() {fclose(f);}
2414
2415 private:
2416         virtual bool process(envelope **envptr,CkCoreState *ck) {
2417           bool wasPacked = (*envptr)->isPacked();
2418           if (!wasPacked) CkPackMessage(envptr);
2419           envelope *env = *envptr;
2420           //CkAssert(*(int*)env == 0x34567890);
2421           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2422                 if (env->getEvent() == 0) return true;
2423                 if (isNext(env)) { /* This is the message we were expecting */
2424                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2425                         getNext(); /* Advance over this message */
2426                         flush(); /* try to process queued-up stuff */
2427                         if (!wasPacked) CkUnpackMessage(envptr);
2428                         return true;
2429                 }
2430 #if CMK_SMP
2431                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2432                          // try next rank, we can't just buffer the msg and left
2433                          // we need to keep unprocessed msg on the fly
2434                         int nextpe = CkMyPe()+1;
2435                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2436                         nextpe = CkNodeFirst(CkMyNode());
2437                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2438                         return false;
2439                 }
2440 #endif
2441                 else /*!isNext(env) */ {
2442                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2443                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2444                         delayedMessages.enq(env);
2445                         flush();
2446                         return false;
2447                 }
2448         }
2449         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2450       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2451           if (isNext(token)) {
2452         REPLAYDEBUG("Executing token: "<<token->serialNo)
2453             getNext();
2454             flush();
2455             return true;
2456           } else {
2457         REPLAYDEBUG("Queueing token: "<<token->serialNo
2458             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2459             delayedTokens.enq(token);
2460             return false;
2461           }
2462         }
2463
2464         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2465           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2466           if (lbFile != NULL) {
2467             int num_moves = 0;
2468         PUP::fromDisk p(lbFile);
2469             p | num_moves;
2470             if (num_moves != (*msg)->n_moves) {
2471               delete *msg;
2472               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2473             }
2474             (*msg)->pup(p);
2475           }
2476           return true;
2477         }
2478 };
2479
2480 class CkMessageDetailReplay : public CkMessageWatcher {
2481   void *getNext() {
2482     CmiUInt4 size; size_t nread;
2483     if ((nread=fread(&size, 4, 1, f)) < 1) {
2484       if (feof(f)) return NULL;
2485       CkPrintf("Broken record file (metadata) got %d\n",nread);
2486       CkAbort("");
2487     }
2488     void *env = CmiAlloc(size);
2489     long tell = ftell(f);
2490     if ((nread=fread(env, size, 1, f)) < 1) {
2491       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2492       CkAbort("");
2493     }
2494     //*(int*)env = 0x34567890; // set first integer as magic
2495     return env;
2496   }
2497 public:
2498   double starttime;
2499   CkMessageDetailReplay(FILE *f_) {
2500     f=f_;
2501     starttime=CkWallTimer();
2502     /* This must match what CkMessageDetailRecorder did */
2503     CmiUInt2 little;
2504     fread(&little, 2, 1, f);
2505     if (little != sizeof(void*)) {
2506       CkAbort("Replaying on a different architecture from which recording was done!");
2507     }
2508
2509     CsdEnqueue(getNext());
2510
2511     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2512   }
2513   virtual bool process(envelope **env,CkCoreState *ck) {
2514     void *msg = getNext();
2515     if (msg != NULL) CsdEnqueue(msg);
2516     return true;
2517   }
2518 };
2519
2520 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2521 #if ! CMK_BIGSIM_CHARM
2522   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2523 #endif
2524   CkMessageReplay *replay = (CkMessageReplay*)rep;
2525   //CmiStartQD(CkMessageReplayQuiescence, replay);
2526 }
2527
2528 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2529   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2530   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2531   ConverseExit();
2532 }
2533 #endif
2534
2535 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2536   CkCoreState *ck = CkpvAccess(_coreState);
2537   if (ck->watcher!=NULL) {
2538     return ck->watcher->processThread(token,ck);
2539   }
2540   return true;
2541 }
2542
2543 CpvCExtern(int, CthResumeNormalThreadIdx);
2544 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2545 {
2546   CthThread t = token->thread;
2547
2548   if(t == NULL){
2549     free(token);
2550     return;
2551   }
2552 #if CMK_TRACE_ENABLED
2553 #if ! CMK_TRACE_IN_CHARM
2554   if(CpvAccess(traceOn))
2555     CthTraceResume(t);
2556 /*    if(CpvAccess(_traceCoreOn)) 
2557             resumeTraceCore();*/
2558 #endif
2559 #endif
2560   
2561   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2562   if (CpdExecuteThreadResume(token)) {
2563     CthResume(t);
2564   }
2565 }
2566
2567 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2568   CkCoreState *ck = CkpvAccess(_coreState);
2569   if (ck->watcher!=NULL) {
2570     ck->watcher->processLBMessage(msg, ck);
2571   }
2572 }
2573
2574 #if CMK_BIGSIM_CHARM
2575 CpvExtern(int      , CthResumeBigSimThreadIdx);
2576 #endif
2577
2578 #include "ckliststring.h"
2579 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2580     CmiArgGroup("Charm++","Record/Replay");
2581     bool forceReplay = false;
2582     char *procs = NULL;
2583     _replaySystem = 0;
2584     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2585       if(CmiMyRank() == 0) _recplay_crc = 1;
2586     }
2587     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2588       if(CmiMyRank() == 0) _recplay_checksum = 1;
2589     }
2590     int tmplogsize;
2591     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
2592       {
2593         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
2594       }
2595     REPLAYDEBUG("CkMessageWatcherInit ");
2596     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2597 #if CMK_REPLAYSYSTEM
2598         CkListString list(procs);
2599         if (list.includes(CkMyPe())) {
2600           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2601           CpdSetInitializeMemory(1);
2602           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2603         }
2604 #else
2605         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
2606 #endif
2607     }
2608     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2609 #if CMK_REPLAYSYSTEM
2610       if (CkMyPe() == 0) {
2611         CmiPrintf("Charm++> record mode.\n");
2612         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2613           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2614           _recplay_crc = _recplay_checksum = 0;
2615         }
2616       }
2617       CpdSetInitializeMemory(1);
2618       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2619       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2620 #else
2621       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
2622 #endif
2623     }
2624         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2625 #if CMK_REPLAYSYSTEM
2626             forceReplay = true;
2627             CpdSetInitializeMemory(1);
2628             // Set the parameters of the processor
2629 #if CMK_SHARED_VARS_UNAVAILABLE
2630             _Cmi_mype = atoi(procs);
2631             while (procs[0]!='/') procs++;
2632             procs++;
2633             _Cmi_numpes = atoi(procs);
2634 #else
2635             CkAbort("+replay-detail available only for non-SMP build");
2636 #endif
2637             _replaySystem = 1;
2638             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2639 #else
2640           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
2641 #endif
2642         }
2643         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2644 #if CMK_REPLAYSYSTEM
2645           if (CkMyPe() == 0)  {
2646             CmiPrintf("Charm++> replay mode.\n");
2647             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2648               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2649               _recplay_crc = _recplay_checksum = 0;
2650             }
2651           }
2652           CpdSetInitializeMemory(1);
2653 #if ! CMK_BIGSIM_CHARM
2654           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2655 #else
2656           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2657 #endif
2658           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2659 #else
2660           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
2661 #endif
2662         }
2663         if (_recplay_crc && _recplay_checksum) {
2664           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2665         }
2666 }
2667
2668 extern "C"
2669 int CkMessageToEpIdx(void *msg) {
2670         envelope *env=UsrToEnv(msg);
2671         int ep=env->getEpIdx();
2672         if (ep==CkIndex_CkArray::recvBroadcast(0))
2673                 return env->getsetArrayBcastEp();
2674         else
2675                 return ep;
2676 }
2677
2678 extern "C"
2679 int getCharmEnvelopeSize() {
2680   return sizeof(envelope);
2681 }
2682
2683 /// Best-effort guess at whether @arg msg points at a charm envelope
2684 extern "C"
2685 int isCharmEnvelope(void *msg) {
2686     envelope *e = (envelope *)msg;
2687     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2688     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2689     if (e->getTotalsize() < sizeof(envelope)) return 0;
2690     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2691 #if CMK_SMP
2692     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2693 #else
2694     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2695 #endif
2696     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2697     return 1;
2698 }
2699
2700
2701 #include "CkMarshall.def.h"
2702