netlrts: replace some CMK_SHARED_VARS_UNAVAILABLE with not CMK_SMP
[charm.git] / src / ck-core / ck.C
1 /**
2 \addtogroup Ck
3
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
12
13 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
14 #include "pathHistory.h"
15 void automaticallySetMessagePriority(envelope *env); // in control point framework.
16 #endif
17
18 #if CMK_LBDB_ON
19 #include "LBDatabase.h"
20 #endif // CMK_LBDB_ON
21
22 #ifndef CMK_CHARE_USE_PTR
23 #include <map>
24 CkpvDeclare(CkVec<void *>, chare_objs);
25 CkpvDeclare(CkVec<int>, chare_types);
26 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
27
28 typedef std::map<int, CkChareID>  Vidblockmap;
29 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
30 CkpvDeclare(int, currentChareIdx);
31 #endif
32
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 void _initChareTables()
47 {
48 #ifndef CMK_CHARE_USE_PTR
49           /* chare and vidblock table */
50   CkpvInitialize(CkVec<void *>, chare_objs);
51   CkpvInitialize(CkVec<int>, chare_types);
52   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
53   CkpvInitialize(Vidblockmap, vmap);
54   CkpvInitialize(int, currentChareIdx);
55   CkpvAccess(currentChareIdx) = -1;
56 #endif
57 }
58
59 //Charm++ virtual functions: declaring these here results in a smaller executable
60 Chare::Chare(void) {
61   thishandle.onPE=CkMyPe();
62   thishandle.objPtr=this;
63 #if CMK_ERROR_CHECKING
64   magic = CHARE_MAGIC;
65 #endif
66 #ifndef CMK_CHARE_USE_PTR
67      // for plain chare, objPtr is actually the index to chare obj table
68   if (CkpvAccess(currentChareIdx) >= 0) {
69     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
70   }
71   chareIdx = CkpvAccess(currentChareIdx);
72 #endif
73 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
74   mlogData = new ChareMlogData();
75   mlogData->objID.type = TypeChare;
76   mlogData->objID.data.chare.id = thishandle;
77 #endif
78 #if CMK_OBJECT_QUEUE_AVAILABLE
79   if (_defaultObjectQ)  CkEnableObjQ();
80 #endif
81 }
82
83 Chare::Chare(CkMigrateMessage* m) {
84   thishandle.onPE=CkMyPe();
85   thishandle.objPtr=this;
86 #if CMK_ERROR_CHECKING
87   magic = 0;
88 #endif
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 #if CMK_ERROR_CHECKING
149   p(magic);
150 #endif
151 }
152
153 int Chare::ckGetChareType() const {
154   return -3;
155 }
156 char *Chare::ckDebugChareName(void) {
157   char buf[100];
158   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
159   return strdup(buf);
160 }
161 int Chare::ckDebugChareID(char *str, int limit) {
162   // pure chares for now do not have a valid ID
163   str[0] = 0;
164   return 1;
165 }
166 void Chare::ckDebugPup(PUP::er &p) {
167   pup(p);
168 }
169
170 /// This method is called before starting a [threaded] entry method.
171 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
172   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
173   traceAddThreadListeners(th, UsrToEnv(msg));
174 }
175
176 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
177   p.comment("Bytes");
178   int ts=UsrToEnv(msg)->getTotalsize();
179   int msgLen=ts-sizeof(envelope);
180   if (msgLen>0)
181     p((char*)msg,msgLen);
182 }
183
184 IrrGroup::IrrGroup(void) {
185   thisgroup = CkpvAccess(_currentGroup);
186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
187         mlogData->objID.type = TypeGroup;
188         mlogData->objID.data.group.id = thisgroup;
189         mlogData->objID.data.group.onPE = CkMyPe();
190 #endif
191 }
192
193 IrrGroup::~IrrGroup() {
194   // remove the object pointer
195   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
196   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
197   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
198 }
199
200 void IrrGroup::pup(PUP::er &p)
201 {
202   Chare::pup(p);
203   p|thisgroup;
204 }
205
206 int IrrGroup::ckGetChareType() const {
207   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
208 }
209
210 int IrrGroup::ckDebugChareID(char *str, int limit) {
211   if (limit<5) return -1;
212   str[0] = 1;
213   *((int*)&str[1]) = thisgroup.idx;
214   return 5;
215 }
216
217 char *IrrGroup::ckDebugChareName() {
218   return strdup(_chareTable[ckGetChareType()]->name);
219 }
220
221 void IrrGroup::ckJustMigrated(void)
222 {
223 }
224
225 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
226   /* FIXME: **CW** not entirely sure what we should do here yet */
227 }
228
229 void Group::CkAddThreadListeners(CthThread th, void *msg) {
230   Chare::CkAddThreadListeners(th, msg);
231   CthSetThreadID(th, thisgroup.idx, 0, 0);
232 }
233
234 void Group::pup(PUP::er &p)
235 {
236   CkReductionMgr::pup(p);
237   reductionInfo.pup(p);
238 }
239
240 /**** Delegation Manager Group */
241 CkDelegateMgr::~CkDelegateMgr() { }
242
243 //Default delegator implementation: do not delegate-- send directly
244 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
245   { CkSendMsg(ep,m,c); }
246 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
247   { CkSendMsgBranch(ep,m,onPE,g); }
248 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
249   { CkBroadcastMsgBranch(ep,m,g); }
250 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
251   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
252 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
253   { CkSendMsgNodeBranch(ep,m,onNode,g); }
254 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
255   { CkBroadcastMsgNodeBranch(ep,m,g); }
256 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
257   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
258 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
259 {
260         CProxyElement_ArrayBase ap(a,idx);
261         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
262 }
263 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
264 {
265         CProxyElement_ArrayBase ap(a,idx);
266         ap.ckSend((CkArrayMessage *)m,ep);
267 }
268 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
269 {
270         CProxy_ArrayBase ap(a);
271         ap.ckBroadcast((CkArrayMessage *)m,ep);
272 }
273
274 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
275 {
276         CmiAbort("ArraySectionSend is not implemented!\n");
277 /*
278         CProxyElement_ArrayBase ap(a,idx);
279         ap.ckSend((CkArrayMessage *)m,ep);
280 */
281 }
282
283 /*** Proxy <-> delegator communication */
284 CkDelegateData::~CkDelegateData() {}
285
286 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
287   return pd; // default implementation ignores pup call
288 }
289
290 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
291    this tricky manual reference counting business... */
292
293 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
294         if (dPtr) dPtr->ref();
295         ckUndelegate();
296         delegatedMgr = dTo;
297         delegatedPtr = dPtr;
298         delegatedGroupId = delegatedMgr->CkGetGroupID();
299         isNodeGroup = delegatedMgr->isNodeGroup();
300 }
301 void CProxy::ckUndelegate(void) {
302         delegatedMgr=NULL;
303         delegatedGroupId.setZero();
304         if (delegatedPtr) delegatedPtr->unref();
305         delegatedPtr=NULL;
306 }
307
308 /// Copy constructor
309 CProxy::CProxy(const CProxy &src)
310   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
311    isNodeGroup(src.isNodeGroup) {
312     delegatedPtr = NULL;
313     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
314         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
315     }
316 }
317
318 /// Assignment operator
319 CProxy& CProxy::operator=(const CProxy &src) {
320         CkDelegateData *oldPtr=delegatedPtr;
321         ckUndelegate();
322         delegatedMgr=src.delegatedMgr;
323         delegatedGroupId = src.delegatedGroupId; 
324         isNodeGroup = src.isNodeGroup;
325
326         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
327             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
328         else
329             delegatedPtr = NULL;
330
331         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
332         if (oldPtr) oldPtr->unref();
333         return *this;
334 }
335
336 void CProxy::pup(PUP::er &p) {
337   if (!p.isUnpacking()) {
338     if (ckDelegatedTo() != NULL) {
339       delegatedGroupId = delegatedMgr->CkGetGroupID();
340       isNodeGroup = delegatedMgr->isNodeGroup();
341     }
342   }
343   p|delegatedGroupId;
344   if (!delegatedGroupId.isZero()) {
345     p|isNodeGroup;
346     if (p.isUnpacking()) {
347       delegatedMgr = ckDelegatedTo(); 
348     }
349
350     int migCtor = 0, cIdx; 
351     if (!p.isUnpacking()) {
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
355         migCtor = _chareTable[cIdx]->migCtor; 
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
361         migCtor = _chareTable[cIdx]->migCtor; 
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       }         
364     }
365
366     p|migCtor;
367
368     // if delegated manager has not been created, construct a dummy
369     // object on which to call DelegatePointerPup
370     if (delegatedMgr == NULL) {
371
372       // create a dummy object for calling DelegatePointerPup
373       int objId = _entryTable[migCtor]->chareIdx; 
374       int objSize = _chareTable[objId]->size; 
375       void *obj = malloc(objSize); 
376       _entryTable[migCtor]->call(NULL, obj); 
377       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
378         ->DelegatePointerPup(p, delegatedPtr);           
379       free(obj);
380
381     }
382     else {
383
384       // delegated manager has been created, so we can use it
385       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
386
387     }
388
389     if (p.isUnpacking() && delegatedPtr) {
390       delegatedPtr->ref();
391     }
392   }
393 }
394
395 /**** Array sections */
396 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
397 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
398   _cookie.get_aid() = aid;      \
399   _cookie.get_pe() = CkMyPe();  \
400   _elems = new CkArrayIndex[nElems];    \
401   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
402   pelist = NULL;        \
403   npes  = 0;    \
404 }
405
406 CKSECTIONID_CONSTRUCTOR_DEF(1D)
407 CKSECTIONID_CONSTRUCTOR_DEF(2D)
408 CKSECTIONID_CONSTRUCTOR_DEF(3D)
409 CKSECTIONID_CONSTRUCTOR_DEF(4D)
410 CKSECTIONID_CONSTRUCTOR_DEF(5D)
411 CKSECTIONID_CONSTRUCTOR_DEF(6D)
412 CKSECTIONID_CONSTRUCTOR_DEF(Max)
413
414 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
415   pelist = new int[npes];
416   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
417   _cookie.get_aid() = gid;
418 }
419
420 CkSectionID::CkSectionID(const CkSectionID &sid) {
421   int i;
422   _cookie = sid._cookie;
423   _nElems = sid._nElems;
424   if (_nElems > 0) {
425     _elems = new CkArrayIndex[_nElems];
426     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
427   } else _elems = NULL;
428   npes = sid.npes;
429   if (npes > 0) {
430     pelist = new int[npes];
431     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
432   } else pelist = NULL;
433 }
434
435 void CkSectionID::operator=(const CkSectionID &sid) {
436   int i;
437   _cookie = sid._cookie;
438   _nElems = sid._nElems;
439   if (_nElems > 0) {
440     _elems = new CkArrayIndex[_nElems];
441     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
442   } else _elems = NULL;
443   npes = sid.npes;
444   if (npes > 0) {
445     pelist = new int[npes];
446     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
447   } else pelist = NULL;
448 }
449
450 void CkSectionID::pup(PUP::er &p) {
451     p | _cookie;
452     p(_nElems);
453     if (_nElems > 0) {
454       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
455       for (int i=0; i< _nElems; i++) p | _elems[i];
456       npes = 0;
457       pelist = NULL;
458     } else {
459       // If _nElems is zero, than this section describes processors instead of array elements
460       _elems = NULL;
461       p(npes);
462       if (p.isUnpacking()) pelist = new int[npes];
463       p(pelist, npes);
464     }
465 }
466
467 /**** Tiny random API routines */
468
469 #ifdef CMK_CUDA
470 void CUDACallbackManager(void *fn) {
471   if (fn != NULL) {
472     CkCallback *cb = (CkCallback*) fn;
473     cb->send();
474   }
475 }
476
477 #endif
478
479 void QdCreate(int n) {
480   CpvAccess(_qd)->create(n);
481 }
482
483 void QdProcess(int n) {
484   CpvAccess(_qd)->process(n);
485 }
486
487 extern "C"
488 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
489 {
490   UsrToEnv(msg)->setRef(ref);
491 }
492
493 extern "C"
494 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
495 {
496   return UsrToEnv(msg)->getRef();
497 }
498
499 extern "C"
500 int CkGetSrcPe(void *msg)
501 {
502   return UsrToEnv(msg)->getSrcPe();
503 }
504
505 extern "C"
506 int CkGetSrcNode(void *msg)
507 {
508   return CmiNodeOf(CkGetSrcPe(msg));
509 }
510
511 extern "C"
512 void *CkLocalBranch(CkGroupID gID) {
513   return _localBranch(gID);
514 }
515
516 static
517 void *_ckLocalNodeBranch(CkGroupID groupID) {
518   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
519   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
520   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
521   return retval;
522 }
523
524 extern "C"
525 void *CkLocalNodeBranch(CkGroupID groupID)
526 {
527   void *retval;
528   // we are called in a constructor
529   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
530     return CkpvAccess(_currentNodeGroupObj);
531   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
532   { // Nodegroup hasn't finished being created yet-- schedule...
533     CsdScheduler(0);
534   }
535   return retval;
536 }
537
538 extern "C"
539 void *CkLocalChare(const CkChareID *pCid)
540 {
541         int pe=pCid->onPE;
542         if (pe<0) { //A virtual chare ID
543                 if (pe!=(-(CkMyPe()+1)))
544                         return NULL;//VID block not on this PE
545 #ifdef CMK_CHARE_USE_PTR
546                 VidBlock *v=(VidBlock *)pCid->objPtr;
547 #else
548                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
549 #endif
550                 return v->getLocalChareObj();
551         }
552         else
553         { //An ordinary chare ID
554                 if (pe!=CkMyPe())
555                         return NULL;//Chare not on this PE
556 #ifdef CMK_CHARE_USE_PTR
557                 return pCid->objPtr;
558 #else
559                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
560 #endif
561         }
562 }
563
564 CkpvDeclare(char**,Ck_argv);
565
566 extern "C" char **CkGetArgv(void) {
567         return CkpvAccess(Ck_argv);
568 }
569 extern "C" int CkGetArgc(void) {
570         return CmiGetArgc(CkpvAccess(Ck_argv));
571 }
572
573 /******************** Basic support *****************/
574 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
575 {
576 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
577         CpvAccess(_currentObj) = (Chare *)obj;
578 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
579 #endif
580   //BIGSIM_OOC DEBUGGING
581   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
582   //fflush(stdout);
583 #if CMK_CHARMDEBUG
584   CpdBeforeEp(epIdx, obj, msg);
585 #endif    
586   _entryTable[epIdx]->call(msg, obj);
587 #if CMK_CHARMDEBUG
588   CpdAfterEp(epIdx);
589 #endif
590   if (_entryTable[epIdx]->noKeep)
591   { /* Method doesn't keep/delete the message, so we have to: */
592     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
593   }
594 }
595 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
596 {
597   //BIGSIM_OOC DEBUGGING
598   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
599   //fflush(stdout);
600
601   void *deliverMsg;
602 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
603         CpvAccess(_currentObj) = (Chare *)obj;
604 #endif
605   if (_entryTable[epIdx]->noKeep)
606   { /* Deliver a read-only copy of the message */
607     deliverMsg=(void *)msg;
608   } else
609   { /* Method needs a copy of the message to keep/delete */
610     void *oldMsg=(void *)msg;
611     deliverMsg=CkCopyMsg(&oldMsg);
612 #if CMK_ERROR_CHECKING
613     if (oldMsg!=msg)
614       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
615 #endif
616   }
617 #if CMK_CHARMDEBUG
618   CpdBeforeEp(epIdx, obj, (void*)msg);
619 #endif
620   _entryTable[epIdx]->call(deliverMsg, obj);
621 #if CMK_CHARMDEBUG
622   CpdAfterEp(epIdx);
623 #endif
624 }
625
626 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
627 {
628   register void *msg = EnvToUsr(env);
629   _SET_USED(env, 0);
630   CkDeliverMessageFree(epIdx,msg,obj);
631 }
632
633 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
634 {
635
636 #if CMK_TRACE_ENABLED 
637   if (_entryTable[epIdx]->traceEnabled) {
638     _TRACE_BEGIN_EXECUTE(env);
639     _invokeEntryNoTrace(epIdx,env,obj);
640     _TRACE_END_EXECUTE();
641   }
642   else
643 #endif
644     _invokeEntryNoTrace(epIdx,env,obj);
645
646 }
647
648 /********************* Creation ********************/
649
650 extern "C"
651 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
652 {
653   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
654   envelope *env = UsrToEnv(msg);
655   _CHECK_USED(env);
656   if(pCid == 0) {
657     env->setMsgtype(NewChareMsg);
658   } else {
659     pCid->onPE = (-(CkMyPe()+1));
660     //  pCid->magic = _GETIDX(cIdx);
661     pCid->objPtr = (void *) new VidBlock();
662     _MEMCHECK(pCid->objPtr);
663     env->setMsgtype(NewVChareMsg);
664     env->setVidPtr(pCid->objPtr);
665 #ifndef CMK_CHARE_USE_PTR
666     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
667     int idx = CkpvAccess(vidblocks).size()-1;
668     pCid->objPtr = (void *)(CmiIntPtr)idx;
669     env->setVidPtr((void *)(CmiIntPtr)idx);
670 #endif
671   }
672   env->setEpIdx(eIdx);
673   env->setByPe(CkMyPe());
674   env->setSrcPe(CkMyPe());
675   CmiSetHandler(env, _charmHandlerIdx);
676   _TRACE_CREATION_1(env);
677   CpvAccess(_qd)->create();
678   _STATS_RECORD_CREATE_CHARE_1();
679   _SET_USED(env, 1);
680   if(destPE == CK_PE_ANY)
681     env->setForAnyPE(1);
682   else
683     env->setForAnyPE(0);
684   _CldEnqueue(destPE, env, _infoIdx);
685   _TRACE_CREATION_DONE(1);
686 }
687
688 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
689 {
690   register int gIdx = _entryTable[epIdx]->chareIdx;
691   register void *obj = malloc(_chareTable[gIdx]->size);
692   _MEMCHECK(obj);
693   setMemoryTypeChare(obj);
694   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
695   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
696   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
697   CkpvAccess(_groupIDTable)->push_back(groupID);
698   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
699   if(ptrq) {
700     void *pending;
701     while((pending=ptrq->deq())!=0)
702       _CldEnqueue(CkMyPe(), pending, _infoIdx);
703 //    delete ptrq;
704       CkpvAccess(_groupTable)->find(groupID).clearPending();
705   }
706   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
707
708   CkpvAccess(_currentGroup) = groupID;
709   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
710 #ifndef CMK_CHARE_USE_PTR
711   //((Chare *)obj)->chareIdx = -1;
712   CkpvAccess(currentChareIdx) = -1;
713 #endif
714   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
715   _STATS_RECORD_PROCESS_GROUP_1();
716 }
717
718 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
719 {
720   register int gIdx = _entryTable[epIdx]->chareIdx;
721   int objSize=_chareTable[gIdx]->size;
722   register void *obj = malloc(objSize);
723   _MEMCHECK(obj);
724   setMemoryTypeChare(obj);
725   CkpvAccess(_currentGroup) = groupID;
726
727 // Now that the NodeGroup is created, add it to the table.
728 //  NodeGroups can be accessed by multiple processors, so
729 //  this is in the opposite order from groups - invoking the constructor
730 //  before registering it.
731 // User may call CkLocalNodeBranch() inside the nodegroup constructor
732 //  store nodegroup into _currentNodeGroupObj
733   CkpvAccess(_currentNodeGroupObj) = obj;
734 #ifndef CMK_CHARE_USE_PTR
735   //((Chare *)obj)->chareIdx = -1;
736   CkpvAccess(currentChareIdx) = -1;
737 #endif
738   _invokeEntryNoTrace(epIdx,env,obj);
739   CkpvAccess(_currentNodeGroupObj) = NULL;
740   _STATS_RECORD_PROCESS_NODE_GROUP_1();
741
742   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
743   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
744   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
745   CksvAccess(_nodeGroupIDTable).push_back(groupID);
746
747   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
748   if(ptrq) {
749     void *pending;
750     while((pending=ptrq->deq())!=0)
751       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
752 //    delete ptrq;
753       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
754   }
755   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
756 }
757
758 void _createGroup(CkGroupID groupID, envelope *env)
759 {
760   _CHECK_USED(env);
761   _SET_USED(env, 1);
762   register int epIdx = env->getEpIdx();
763   int gIdx = _entryTable[epIdx]->chareIdx;
764   CkNodeGroupID rednMgr;
765   if(_chareTable[gIdx]->isIrr == 0){
766                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
767                 rednMgr = rednMgrProxy;
768 //              rednMgrProxy.setAttachedGroup(groupID);
769   }else{
770         rednMgr.setZero();
771   }
772   env->setInitGroupNum(groupID);
773   env->setSrcPe(CkMyPe());
774   env->setRednMgr(rednMgr);
775   env->setGroupEpoch(CkpvAccess(_charmEpoch));
776
777   if(CkNumPes()>1) {
778     CkPackMessage(&env);
779     CmiSetHandler(env, _bocHandlerIdx);
780     _numInitMsgs++;
781     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
782     CpvAccess(_qd)->create(CkNumPes()-1);
783     CkUnpackMessage(&env);
784   }
785   _STATS_RECORD_CREATE_GROUP_1();
786   CkCreateLocalGroup(groupID, epIdx, env);
787 }
788
789 void _createNodeGroup(CkGroupID groupID, envelope *env)
790 {
791   _CHECK_USED(env);
792   _SET_USED(env, 1);
793   register int epIdx = env->getEpIdx();
794   env->setInitGroupNum(groupID);
795   env->setSrcPe(CkMyPe());
796   env->setGroupEpoch(CkpvAccess(_charmEpoch));
797   if(CkNumNodes()>1) {
798     CkPackMessage(&env);
799     CmiSetHandler(env, _bocHandlerIdx);
800     _numInitMsgs++;
801     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
802     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
803     CpvAccess(_qd)->create(CkNumNodes()-1);
804     CkUnpackMessage(&env);
805   }
806   _STATS_RECORD_CREATE_NODE_GROUP_1();
807   CkCreateLocalNodeGroup(groupID, epIdx, env);
808 }
809
810 // new _groupCreate
811
812 static CkGroupID _groupCreate(envelope *env)
813 {
814   register CkGroupID groupNum;
815
816   // check CkMyPe(). if it is 0 then idx is _numGroups++
817   // if not, then something else...
818   if(CkMyPe() == 0)
819      groupNum.idx = CkpvAccess(_numGroups)++;
820   else
821      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
822   _createGroup(groupNum, env);
823   return groupNum;
824 }
825
826 // new _nodeGroupCreate
827 static CkGroupID _nodeGroupCreate(envelope *env)
828 {
829   register CkGroupID groupNum;
830   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
831   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
832           groupNum.idx = CksvAccess(_numNodeGroups)++;
833    else
834           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
835   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
836   _createNodeGroup(groupNum, env);
837   return groupNum;
838 }
839
840 /**** generate the group idx when group is creator pe is not pe0
841  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
842  **** remaining bits contain the group creator processor number and
843  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
844
845 int _getGroupIdx(int numNodes,int myNode,int numGroups)
846 {
847         int idx;
848         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
849         int n = 32 - (x+1);                                     // number of bits remaining for the index
850         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
851                                                                 // then add the next available index
852         // of course this won't work when int is 8 bytes long on T3E
853         //idx |= 0x80000000;                                      // set the most significant bit to 1
854         idx = - idx;
855                                                                 // if int is not 32 bits, wouldn't this be wrong?
856         return idx;
857 }
858
859 extern "C"
860 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
861 {
862   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
863   register envelope *env = UsrToEnv(msg);
864   env->setMsgtype(BocInitMsg);
865   env->setEpIdx(eIdx);
866   env->setSrcPe(CkMyPe());
867   _TRACE_CREATION_N(env, CkNumPes());
868   CkGroupID gid = _groupCreate(env);
869   _TRACE_CREATION_DONE(1);
870   return gid;
871 }
872
873 extern "C"
874 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
875 {
876   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
877   register envelope *env = UsrToEnv(msg);
878   env->setMsgtype(NodeBocInitMsg);
879   env->setEpIdx(eIdx);
880   env->setSrcPe(CkMyPe());
881   _TRACE_CREATION_N(env, CkNumNodes());
882   CkGroupID gid = _nodeGroupCreate(env);
883   _TRACE_CREATION_DONE(1);
884   return gid;
885 }
886
887 static inline void *_allocNewChare(envelope *env, int &idx)
888 {
889   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
890   void *tmp=malloc(_chareTable[chareIdx]->size);
891   _MEMCHECK(tmp);
892 #ifndef CMK_CHARE_USE_PTR
893   CkpvAccess(chare_objs).push_back(tmp);
894   CkpvAccess(chare_types).push_back(chareIdx);
895   idx = CkpvAccess(chare_objs).size()-1;
896 #endif
897   setMemoryTypeChare(tmp);
898   return tmp;
899 }
900
901 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
902 {
903   int idx;
904   register void *obj = _allocNewChare(env, idx);
905 #ifndef CMK_CHARE_USE_PTR
906   //((Chare *)obj)->chareIdx = idx;
907   CkpvAccess(currentChareIdx) = idx;
908 #endif
909   _invokeEntry(env->getEpIdx(),env,obj);
910 }
911
912 void CkCreateLocalChare(int epIdx, envelope *env)
913 {
914   env->setEpIdx(epIdx);
915   _processNewChareMsg(NULL, env);
916 }
917
918 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
919 {
920   int idx;
921   register void *obj = _allocNewChare(env, idx);
922   register CkChareID *pCid = (CkChareID *)
923       _allocMsg(FillVidMsg, sizeof(CkChareID));
924   pCid->onPE = CkMyPe();
925 #ifndef CMK_CHARE_USE_PTR
926   pCid->objPtr = (void*)(CmiIntPtr)idx;
927 #else
928   pCid->objPtr = obj;
929 #endif
930   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
931   register envelope *ret = UsrToEnv(pCid);
932   ret->setVidPtr(env->getVidPtr());
933   register int srcPe = env->getByPe();
934   ret->setSrcPe(CkMyPe());
935   CmiSetHandler(ret, _charmHandlerIdx);
936   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
937 #ifndef CMK_CHARE_USE_PTR
938   // register the remote vidblock for deletion when chare is deleted
939   CkChareID vid;
940   vid.onPE = srcPe;
941   vid.objPtr = env->getVidPtr();
942   CkpvAccess(vmap)[idx] = vid;    
943 #endif
944   CpvAccess(_qd)->create();
945 #ifndef CMK_CHARE_USE_PTR
946   //((Chare *)obj)->chareIdx = idx;
947   CkpvAccess(currentChareIdx) = idx;
948 #endif
949   _invokeEntry(env->getEpIdx(),env,obj);
950 }
951
952 /************** Receive: Chares *************/
953
954 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
955 {
956   register int epIdx = env->getEpIdx();
957   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
958   register void *obj;
959   if (mainIdx != -1)  {           // mainchare
960     CmiAssert(CkMyPe()==0);
961     obj = _mainTable[mainIdx]->getObj();
962   }
963   else {
964 #ifndef CMK_CHARE_USE_PTR
965     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
966       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
967     else
968       obj = env->getObjPtr();
969 #else
970     obj = env->getObjPtr();
971 #endif
972   }
973   _invokeEntry(epIdx,env,obj);
974 }
975
976 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
977 {
978   register int epIdx = env->getEpIdx();
979   register void *obj = env->getObjPtr();
980   _invokeEntry(epIdx,env,obj);
981 }
982
983 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
984 {
985 #ifndef CMK_CHARE_USE_PTR
986   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
987 #else
988   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
989   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
990 #endif
991   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
992   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
993   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
994   CmiFree(env);
995 }
996
997 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
998 {
999 #ifndef CMK_CHARE_USE_PTR
1000   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1001 #else
1002   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1003   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1004 #endif
1005   _SET_USED(env, 1);
1006   vptr->send(env);
1007 }
1008
1009 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1010 {
1011 #ifndef CMK_CHARE_USE_PTR
1012   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1013   delete vptr;
1014   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1015 #endif
1016   CmiFree(env);
1017 }
1018
1019 /************** Receive: Groups ****************/
1020
1021 /**
1022  Return a pointer to the local BOC of "groupID".
1023  The message "env" passed in has some known dependency on this groupID
1024  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1025  Therefore, if the return value is NULL, this function buffers the massage so that
1026  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1027  The message passed in must have its handlers correctly set so that it can be
1028  scheduled again.
1029 */
1030 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1031 {
1032
1033         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1034         IrrGroup *obj = ck->localBranch(groupID);
1035         if (obj==NULL) { /* groupmember not yet created: stash message */
1036                 ck->getGroupTable()->find(groupID).enqMsg(env);
1037         }
1038         else { /* will be able to process message */
1039                 ck->process();
1040         }
1041         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1042         return obj;
1043 }
1044
1045 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1046 {
1047   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1048 }
1049
1050 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1051 {
1052 #if CMK_LBDB_ON
1053   // if there is a running obj being measured, stop it temporarily
1054   LDObjHandle objHandle;
1055   int objstopped = 0;
1056   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1057   if (the_lbdb->RunningObject(&objHandle)) {
1058     objstopped = 1;
1059     the_lbdb->ObjectStop(objHandle);
1060   }
1061 #endif
1062   _invokeEntry(epIdx,env,obj);
1063 #if CMK_LBDB_ON
1064   if (objstopped) the_lbdb->ObjectStart(objHandle);
1065 #endif
1066   _STATS_RECORD_PROCESS_BRANCH_1();
1067 }
1068
1069 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1070 {
1071   register CkGroupID groupID =  env->getGroupNum();
1072   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1073   if(obj) {
1074     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1075   }
1076 }
1077
1078 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1079 {
1080   env->setMsgtype(ForChareMsg);
1081   env->setObjPtr(obj);
1082   _processForChareMsg(ck,env);
1083   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1084 }
1085
1086 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1087 {
1088   env->setEpIdx(epIdx);
1089   _deliverForNodeBocMsg(ck,env, obj);
1090 }
1091
1092 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1093 {
1094   register CkGroupID groupID = env->getGroupNum();
1095   register void *obj;
1096
1097   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1098   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1099   if(!obj) { // groupmember not yet created
1100 #if CMK_IMMEDIATE_MSG
1101     if (CmiIsImmediate(env)) {
1102       //CmiDelayImmediate();        /* buffer immediate message */
1103       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1104     }
1105 #endif
1106     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1107     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1108     return;
1109   }
1110   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1111   ck->process();
1112   env->setMsgtype(ForChareMsg);
1113   env->setObjPtr(obj);
1114   _processForChareMsg(ck,env);
1115   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1116 }
1117
1118 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1119 {
1120   register CkGroupID groupID = env->getInitGroupNum();
1121   register int epIdx = env->getEpIdx();
1122   if (!env->getGroupDep().isZero()) {      // dependence
1123     CkGroupID dep = env->getGroupDep();
1124     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1125     if (obj == NULL) return;
1126   }
1127   else
1128     ck->process();
1129   CkCreateLocalGroup(groupID, epIdx, env);
1130 }
1131
1132 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1133 {
1134   register CkGroupID groupID = env->getInitGroupNum();
1135   register int epIdx = env->getEpIdx();
1136   CkCreateLocalNodeGroup(groupID, epIdx, env);
1137 }
1138
1139 /************** Receive: Arrays *************/
1140
1141 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1142   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1143   if (mgr) {
1144     _SET_USED(env, 0);
1145     mgr->insertElement((CkMessage *)EnvToUsr(env));
1146   }
1147 }
1148 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1149   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1150   if (mgr) {
1151     _SET_USED(env, 0);
1152     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1153   }
1154 }
1155
1156 //BIGSIM_OOC DEBUGGING
1157 #define TELLMSGTYPE(x) //x
1158
1159 /**
1160  * This is the main converse-level handler used by all of Charm++.
1161  *
1162  * \addtogroup CriticalPathFramework
1163  */
1164 void _processHandler(void *converseMsg,CkCoreState *ck)
1165 {
1166   register envelope *env = (envelope *) converseMsg;
1167
1168   MESSAGE_PHASE_CHECK(env);
1169
1170 //#if CMK_RECORD_REPLAY
1171   if (ck->watcher!=NULL) {
1172     if (!ck->watcher->processMessage(&env,ck)) return;
1173   }
1174 //#endif
1175 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1176         Chare *obj=NULL;
1177         CkObjID sender;
1178         MCount SN;
1179         MlogEntry *entry=NULL;
1180         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1181            || env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForIDedObjMsg
1182            || env->getMsgtype() == ForChareMsg) {
1183                 sender = env->sender;
1184                 SN = env->SN;
1185                 int result = preProcessReceivedMessage(env,&obj,&entry);
1186                 if(result == 0){
1187                         return;
1188                 }
1189         }
1190 #endif
1191
1192 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1193   //  CkPrintf("START\n");
1194   criticalPath_start(env);
1195 #endif
1196
1197
1198   switch(env->getMsgtype()) {
1199 // Group support
1200     case BocInitMsg :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1202       // QD processing moved inside _processBocInitMsg because it is conditional
1203       //ck->process(); 
1204       if(env->isPacked()) CkUnpackMessage(&env);
1205       _processBocInitMsg(ck,env);
1206       break;
1207     case NodeBocInitMsg :
1208       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1209       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1210       _processNodeBocInitMsg(ck,env);
1211       break;
1212     case ForBocMsg :
1213       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1214       // QD processing moved inside _processForBocMsg because it is conditional
1215       if(env->isPacked()) CkUnpackMessage(&env);
1216       _processForBocMsg(ck,env);
1217       // stats record moved inside _processForBocMsg because it is conditional
1218       break;
1219     case ForNodeBocMsg :
1220       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1221       // QD processing moved to _processForNodeBocMsg because it is conditional
1222       if(env->isPacked()) CkUnpackMessage(&env);
1223       _processForNodeBocMsg(ck,env);
1224       // stats record moved to _processForNodeBocMsg because it is conditional
1225       break;
1226
1227 // Array support
1228     case ArrayEltInitMsg:
1229       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1230       if(env->isPacked()) CkUnpackMessage(&env);
1231       _processArrayEltInitMsg(ck,env);
1232       break;
1233     case ForIDedObjMsg:
1234     case ForArrayEltMsg:
1235       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1236       if(env->isPacked()) CkUnpackMessage(&env);
1237       _processArrayEltMsg(ck,env);
1238       break;
1239
1240 // Chare support
1241     case NewChareMsg :
1242       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1243       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1244       _processNewChareMsg(ck,env);
1245       _STATS_RECORD_PROCESS_CHARE_1();
1246       break;
1247     case NewVChareMsg :
1248       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1249       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1250       _processNewVChareMsg(ck,env);
1251       _STATS_RECORD_PROCESS_CHARE_1();
1252       break;
1253     case ForChareMsg :
1254       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1255       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1256       _processForPlainChareMsg(ck,env);
1257       _STATS_RECORD_PROCESS_MSG_1();
1258       break;
1259     case ForVidMsg   :
1260       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1261       ck->process();
1262       _processForVidMsg(ck,env);
1263       break;
1264     case FillVidMsg  :
1265       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1266       ck->process();
1267       _processFillVidMsg(ck,env);
1268       break;
1269     case DeleteVidMsg  :
1270       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1271       ck->process();
1272       _processDeleteVidMsg(ck,env);
1273       break;
1274
1275     default:
1276       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1277   }
1278 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1279         if(obj != NULL){
1280                 postProcessReceivedMessage(obj,sender,SN,entry);
1281         }
1282 #endif
1283
1284
1285 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1286   criticalPath_end();
1287   //  CkPrintf("STOP\n");
1288 #endif
1289
1290
1291 }
1292
1293
1294 /******************** Message Send **********************/
1295
1296 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1297              int *queueing, int *priobits, unsigned int **prioptr)
1298 {
1299   register envelope *env = (envelope *)converseMsg;
1300   *pfn = (CldPackFn)CkPackMessage;
1301   *len = env->getTotalsize();
1302   *queueing = env->getQueueing();
1303   *priobits = env->getPriobits();
1304   *prioptr = (unsigned int *) env->getPrioPtr();
1305 }
1306
1307 void CkPackMessage(envelope **pEnv)
1308 {
1309   register envelope *env = *pEnv;
1310   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1311     register void *msg = EnvToUsr(env);
1312     _TRACE_BEGIN_PACK();
1313     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1314     _TRACE_END_PACK();
1315     env=UsrToEnv(msg);
1316     env->setPacked(1);
1317     *pEnv = env;
1318   }
1319 }
1320
1321 void CkUnpackMessage(envelope **pEnv)
1322 {
1323   register envelope *env = *pEnv;
1324   register int msgIdx = env->getMsgIdx();
1325   if(env->isPacked()) {
1326     register void *msg = EnvToUsr(env);
1327     _TRACE_BEGIN_UNPACK();
1328     msg = _msgTable[msgIdx]->unpack(msg);
1329     _TRACE_END_UNPACK();
1330     env=UsrToEnv(msg);
1331     env->setPacked(0);
1332     *pEnv = env;
1333   }
1334 }
1335
1336 //There's no reason for most messages to go through the Cld--
1337 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1338 // Thus these accellerated versions of the Cld calls.
1339 #if CMK_OBJECT_QUEUE_AVAILABLE
1340 static int index_objectQHandler;
1341 #endif
1342 int index_tokenHandler;
1343 int index_skipCldHandler;
1344
1345 void _skipCldHandler(void *converseMsg)
1346 {
1347   register envelope *env = (envelope *)(converseMsg);
1348   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1349 #if CMK_GRID_QUEUE_AVAILABLE
1350   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1351     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1352                        env, env->getQueueing (), env->getPriobits (),
1353                        (unsigned int *) env->getPrioPtr ());
1354   } else {
1355     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1356                        env, env->getQueueing (), env->getPriobits (),
1357                        (unsigned int *) env->getPrioPtr ());
1358   }
1359 #else
1360   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1361         env, env->getQueueing(),env->getPriobits(),
1362         (unsigned int *)env->getPrioPtr());
1363 #endif
1364 }
1365
1366
1367 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1368 // Made non-static to be used by ckmessagelogging
1369 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1370 {
1371 #if CMK_CHARMDEBUG
1372   if (!ConverseDeliver(pe)) {
1373     CmiFree(env);
1374     return;
1375   }
1376 #endif
1377   if(pe == CkMyPe() ){
1378     if(!CmiNodeAlive(CkMyPe())){
1379         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1380 //      return;
1381     }
1382   }
1383   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1384 #if CMK_OBJECT_QUEUE_AVAILABLE
1385     Chare *obj = CkFindObjectPtr(env);
1386     if (obj && obj->CkGetObjQueue().queue()) {
1387       _enqObjQueue(obj, env);
1388     }
1389     else
1390 #endif
1391     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1392         env, env->getQueueing(),env->getPriobits(),
1393         (unsigned int *)env->getPrioPtr());
1394 #if CMK_PERSISTENT_COMM
1395     CmiPersistentOneSend();
1396 #endif
1397   } else {
1398     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1399       CkPackMessage(&env);
1400     int len=env->getTotalsize();
1401     CmiSetXHandler(env,CmiGetHandler(env));
1402 #if CMK_OBJECT_QUEUE_AVAILABLE
1403     CmiSetHandler(env,index_objectQHandler);
1404 #else
1405     CmiSetHandler(env,index_skipCldHandler);
1406 #endif
1407     CmiSetInfo(env,infoFn);
1408     if (pe==CLD_BROADCAST) {
1409 #if CMK_MESSAGE_LOGGING
1410         if(env->flags & CK_FREE_MSG_MLOG)
1411                 CmiSyncBroadcastAndFree(len, (char *)env); 
1412         else
1413                 CmiSyncBroadcast(len, (char *)env);
1414 #else
1415                         CmiSyncBroadcastAndFree(len, (char *)env); 
1416 #endif
1417
1418 }
1419     else if (pe==CLD_BROADCAST_ALL) { 
1420 #if CMK_MESSAGE_LOGGING
1421         if(env->flags & CK_FREE_MSG_MLOG)
1422                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1423         else
1424                 CmiSyncBroadcastAll(len, (char *)env);
1425 #else
1426                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1427 #endif
1428
1429 }
1430     else{
1431 #if CMK_MESSAGE_LOGGING
1432         if(env->flags & CK_FREE_MSG_MLOG)
1433                 CmiSyncSendAndFree(pe, len, (char *)env);
1434         else
1435                 CmiSyncSend(pe, len, (char *)env);
1436 #else
1437                         CmiSyncSendAndFree(pe, len, (char *)env);
1438 #endif
1439
1440                 }
1441   }
1442 }
1443
1444 #if CMK_BIGSIM_CHARM
1445 #   define  _skipCldEnqueue   _CldEnqueue
1446 #endif
1447
1448 // by pass Charm++ priority queue, send as Converse message
1449 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1450 {
1451 #if CMK_CHARMDEBUG
1452   if (!ConverseDeliver(-1)) {
1453     CmiFree(env);
1454     return;
1455   }
1456 #endif
1457   CkPackMessage(&env);
1458   int len=env->getTotalsize();
1459   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1460 }
1461
1462 static void _noCldEnqueue(int pe, envelope *env)
1463 {
1464 /*
1465   if (pe == CkMyPe()) {
1466     CmiHandleMessage(env);
1467   } else
1468 */
1469 #if CMK_CHARMDEBUG
1470   if (!ConverseDeliver(pe)) {
1471     CmiFree(env);
1472     return;
1473   }
1474 #endif
1475   CkPackMessage(&env);
1476   int len=env->getTotalsize();
1477   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1478   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1479   else CmiSyncSendAndFree(pe, len, (char *)env);
1480 }
1481
1482 //static void _noCldNodeEnqueue(int node, envelope *env)
1483 //Made non-static to be used by ckmessagelogging
1484 void _noCldNodeEnqueue(int node, envelope *env)
1485 {
1486 /*
1487   if (node == CkMyNode()) {
1488     CmiHandleMessage(env);
1489   } else {
1490 */
1491 #if CMK_CHARMDEBUG
1492   if (!ConverseDeliver(node)) {
1493     CmiFree(env);
1494     return;
1495   }
1496 #endif
1497   CkPackMessage(&env);
1498   int len=env->getTotalsize();
1499   if (node==CLD_BROADCAST) { 
1500 #if CMK_MESSAGE_LOGGING
1501         if(env->flags & CK_FREE_MSG_MLOG)
1502                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1503         else
1504                 CmiSyncNodeBroadcast(len, (char *)env);
1505 #else
1506         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1507 #endif
1508 }
1509   else if (node==CLD_BROADCAST_ALL) { 
1510 #if CMK_MESSAGE_LOGGING
1511         if(env->flags & CK_FREE_MSG_MLOG)
1512                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1513         else
1514                 CmiSyncNodeBroadcastAll(len, (char *)env);
1515 #else
1516                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1517 #endif
1518
1519 }
1520   else {
1521 #if CMK_MESSAGE_LOGGING
1522         if(env->flags & CK_FREE_MSG_MLOG)
1523                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1524         else
1525                 CmiSyncNodeSend(node, len, (char *)env);
1526 #else
1527         CmiSyncNodeSendAndFree(node, len, (char *)env);
1528 #endif
1529   }
1530 }
1531
1532 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1533 {
1534   register envelope *env = UsrToEnv(msg);
1535   _CHECK_USED(env);
1536   _SET_USED(env, 1);
1537 #if CMK_REPLAYSYSTEM
1538   setEventID(env);
1539 #endif
1540   env->setMsgtype(ForChareMsg);
1541   env->setEpIdx(eIdx);
1542   env->setSrcPe(CkMyPe());
1543 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1544   criticalPath_send(env);
1545   automaticallySetMessagePriority(env);
1546 #endif
1547 #if CMK_CHARMDEBUG
1548   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1549 #endif
1550 #if CMK_OBJECT_QUEUE_AVAILABLE
1551   CmiSetHandler(env, index_objectQHandler);
1552 #else
1553   CmiSetHandler(env, _charmHandlerIdx);
1554 #endif
1555   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1556     register int pe = -(pCid->onPE+1);
1557     if(pe==CkMyPe()) {
1558 #ifndef CMK_CHARE_USE_PTR
1559       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1560 #else
1561       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1562 #endif
1563       void *objPtr;
1564       if (NULL!=(objPtr=vblk->getLocalChare()))
1565       { //A ready local chare
1566         env->setObjPtr(objPtr);
1567         return pe;
1568       }
1569       else { //The vidblock is not ready-- forget it
1570         vblk->send(env);
1571         return -1;
1572       }
1573     } else { //Valid vidblock for another PE:
1574       env->setMsgtype(ForVidMsg);
1575       env->setVidPtr(pCid->objPtr);
1576       return pe;
1577     }
1578   }
1579   else {
1580     env->setObjPtr(pCid->objPtr);
1581     return pCid->onPE;
1582   }
1583 }
1584
1585 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1586 {
1587   int destPE = _prepareMsg(eIdx, msg, pCid);
1588   if (destPE != -1) {
1589     register envelope *env = UsrToEnv(msg);
1590 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1591     criticalPath_send(env);
1592     automaticallySetMessagePriority(env);
1593 #endif
1594     CmiBecomeImmediate(env);
1595   }
1596   return destPE;
1597 }
1598
1599 extern "C"
1600 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1601 {
1602   if (opts & CK_MSG_INLINE) {
1603     CkSendMsgInline(entryIdx, msg, pCid, opts);
1604     return;
1605   }
1606 #if CMK_ERROR_CHECKING
1607   if (opts & CK_MSG_IMMEDIATE) {
1608     CmiAbort("Immediate message is not allowed in Chare!");
1609   }
1610 #endif
1611   register envelope *env = UsrToEnv(msg);
1612   int destPE=_prepareMsg(entryIdx,msg,pCid);
1613   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1614   // VidBlock was not yet filled). The problem is that the creation was never
1615   // traced later when the VidBlock was filled. One solution is to trace the
1616   // creation here, the other to trace it in VidBlock->msgDeliver().
1617 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1618   if (destPE!=-1) {
1619     CpvAccess(_qd)->create();
1620   }
1621         sendChareMsg(env,destPE,_infoIdx,pCid);
1622 #else
1623   _TRACE_CREATION_1(env);
1624   if (destPE!=-1) {
1625     CpvAccess(_qd)->create();
1626     if (opts & CK_MSG_SKIP_OR_IMM)
1627       _noCldEnqueue(destPE, env);
1628     else
1629       _CldEnqueue(destPE, env, _infoIdx);
1630   }
1631   _TRACE_CREATION_DONE(1);
1632 #endif
1633 }
1634
1635 extern "C"
1636 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1637 {
1638   if (pCid->onPE==CkMyPe())
1639   {
1640     if(!CmiNodeAlive(CkMyPe())){
1641         return;
1642     }
1643 #if CMK_CHARMDEBUG
1644     //Just in case we need to breakpoint or use the envelope in some way
1645     _prepareMsg(entryIndex,msg,pCid);
1646 #endif
1647                 //Just directly call the chare (skip QD handling & scheduler)
1648     register envelope *env = UsrToEnv(msg);
1649     if (env->isPacked()) CkUnpackMessage(&env);
1650     _STATS_RECORD_PROCESS_MSG_1();
1651     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1652   }
1653   else {
1654     //No way to inline a cross-processor message:
1655     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1656   }
1657 }
1658
1659 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1660 {
1661   register envelope *env = UsrToEnv(msg);
1662 /*
1663 #if CMK_ERROR_CHECKING
1664   CkNodeGroupID nodeRedMgr;
1665 #endif
1666 */
1667   _CHECK_USED(env);
1668   _SET_USED(env, 1);
1669 #if CMK_REPLAYSYSTEM
1670   setEventID(env);
1671 #endif
1672   env->setMsgtype(type);
1673   env->setEpIdx(eIdx);
1674   env->setGroupNum(gID);
1675   env->setSrcPe(CkMyPe());
1676 /*
1677 #if CMK_ERROR_CHECKING
1678   nodeRedMgr.setZero();
1679   env->setRednMgr(nodeRedMgr);
1680 #endif
1681 */
1682 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1683   criticalPath_send(env);
1684   automaticallySetMessagePriority(env);
1685 #endif
1686 #if CMK_CHARMDEBUG
1687   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1688 #endif
1689   CmiSetHandler(env, _charmHandlerIdx);
1690   return env;
1691 }
1692
1693 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1694 {
1695   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1696 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1697   criticalPath_send(env);
1698   automaticallySetMessagePriority(env);
1699 #endif
1700   CmiBecomeImmediate(env);
1701   return env;
1702 }
1703
1704 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1705                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1706 {
1707   int numPes;
1708   register envelope *env;
1709     if (opts & CK_MSG_IMMEDIATE) {
1710         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1711     }else
1712     {
1713         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1714     }
1715
1716 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1717         sendGroupMsg(env,pe,_infoIdx);
1718 #else
1719   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1720   _TRACE_CREATION_N(env, numPes);
1721   if (opts & CK_MSG_SKIP_OR_IMM)
1722     _noCldEnqueue(pe, env);
1723   else
1724     _skipCldEnqueue(pe, env, _infoIdx);
1725   _TRACE_CREATION_DONE(1);
1726 #endif
1727 }
1728
1729 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1730                            int npes, int *pes)
1731 {
1732   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1733   _TRACE_CREATION_MULTICAST(env, npes, pes);
1734   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1735   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1736 }
1737
1738 extern "C"
1739 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1740 {
1741 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1742   if (destPE==CkMyPe())
1743   {
1744     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1745     return;
1746   }
1747   //Can't inline-- send the usual way
1748   register envelope *env = UsrToEnv(msg);
1749   int numPes;
1750   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1751   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1752   _TRACE_CREATION_N(env, numPes);
1753   _noCldEnqueue(destPE, env);
1754   _STATS_RECORD_SEND_BRANCH_1();
1755   CkpvAccess(_coreState)->create();
1756   _TRACE_CREATION_DONE(1);
1757 #else
1758   // no support for immediate message, send inline
1759   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1760 #endif
1761 }
1762
1763 extern "C"
1764 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1765 {
1766   if (destPE==CkMyPe())
1767   {
1768     if(!CmiNodeAlive(CkMyPe())){
1769         return;
1770     }
1771     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1772     if (obj!=NULL)
1773     { //Just directly call the group:
1774 #if CMK_ERROR_CHECKING
1775       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1776 #else
1777       envelope *env=UsrToEnv(msg);
1778 #endif
1779       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1780       return;
1781     }
1782   }
1783   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1784   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1785 }
1786
1787 extern "C"
1788 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1789 {
1790   if (opts & CK_MSG_INLINE) {
1791     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1792     return;
1793   }
1794   if (opts & CK_MSG_IMMEDIATE) {
1795     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1796     return;
1797   }
1798   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1799   _STATS_RECORD_SEND_BRANCH_1();
1800   CkpvAccess(_coreState)->create();
1801 }
1802
1803 extern "C"
1804 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1805 {
1806 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1807   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1808   _TRACE_CREATION_MULTICAST(env, npes, pes);
1809   _noCldEnqueueMulti(npes, pes, env);
1810   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1811 #else
1812   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1813   CpvAccess(_qd)->create(-npes);
1814 #endif
1815   _STATS_RECORD_SEND_BRANCH_N(npes);
1816   CpvAccess(_qd)->create(npes);
1817 }
1818
1819 extern "C"
1820 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1821 {
1822   if (opts & CK_MSG_IMMEDIATE) {
1823     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1824     return;
1825   }
1826     // normal mesg
1827   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1828   _STATS_RECORD_SEND_BRANCH_N(npes);
1829   CpvAccess(_qd)->create(npes);
1830 }
1831
1832 extern "C"
1833 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1834 {
1835   int npes;
1836   int *pes;
1837   if (opts & CK_MSG_IMMEDIATE) {
1838     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1839     return;
1840   }
1841     // normal mesg
1842   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1843   CmiLookupGroup(grp, &npes, &pes);
1844   _TRACE_CREATION_MULTICAST(env, npes, pes);
1845   _CldEnqueueGroup(grp, env, _infoIdx);
1846   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1847   _STATS_RECORD_SEND_BRANCH_N(npes);
1848   CpvAccess(_qd)->create(npes);
1849 }
1850
1851 extern "C"
1852 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1853 {
1854   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1855   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1856   CpvAccess(_qd)->create(CkNumPes());
1857 }
1858
1859 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1860                 int node=CLD_BROADCAST_ALL, int opts=0)
1861 {
1862     int numPes;
1863     register envelope *env;
1864     if (opts & CK_MSG_IMMEDIATE) {
1865         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1866     }else
1867     {
1868         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1869     }
1870 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1871     sendNodeGroupMsg(env,node,_infoIdx);
1872 #else
1873   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1874   _TRACE_CREATION_N(env, numPes);
1875   if (opts & CK_MSG_SKIP_OR_IMM) {
1876     _noCldNodeEnqueue(node, env);
1877   }
1878   else
1879     _CldNodeEnqueue(node, env, _infoIdx);
1880   _TRACE_CREATION_DONE(1);
1881 #endif
1882 }
1883
1884 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1885                            int npes, int *nodes)
1886 {
1887   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1888   _TRACE_CREATION_N(env, npes);
1889   for (int i=0; i<npes; i++) {
1890     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1891   }
1892   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1893 }
1894
1895 extern "C"
1896 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1897 {
1898 #if CMK_IMMEDIATE_MSG
1899   if (node==CkMyNode())
1900   {
1901     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1902     return;
1903   }
1904   //Can't inline-- send the usual way
1905   register envelope *env = UsrToEnv(msg);
1906   int numPes;
1907   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1908   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1909   _TRACE_CREATION_N(env, numPes);
1910   _noCldNodeEnqueue(node, env);
1911   _STATS_RECORD_SEND_BRANCH_1();
1912   CkpvAccess(_coreState)->create();
1913   _TRACE_CREATION_DONE(1);
1914 #else
1915   // no support for immediate message, send inline
1916   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1917 #endif
1918 }
1919
1920 extern "C"
1921 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1922 {
1923   if (node==CkMyNode())
1924   {
1925     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1926     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1927     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1928     if (obj!=NULL)
1929     { //Just directly call the group:
1930 #if CMK_ERROR_CHECKING
1931       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1932 #else
1933       envelope *env=UsrToEnv(msg);
1934 #endif
1935       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1936       return;
1937     }
1938   }
1939   //Can't inline-- send the usual way
1940   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1941 }
1942
1943 extern "C"
1944 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1945 {
1946   if (opts & CK_MSG_INLINE) {
1947     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1948     return;
1949   }
1950   if (opts & CK_MSG_IMMEDIATE) {
1951     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1952     return;
1953   }
1954   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1955   _STATS_RECORD_SEND_NODE_BRANCH_1();
1956   CkpvAccess(_coreState)->create();
1957 }
1958
1959 extern "C"
1960 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1961 {
1962 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1963   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1964   _noCldEnqueueMulti(npes, nodes, env);
1965 #else
1966   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1967   CpvAccess(_qd)->create(-npes);
1968 #endif
1969   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1970   CpvAccess(_qd)->create(npes);
1971 }
1972
1973 extern "C"
1974 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1975 {
1976   if (opts & CK_MSG_IMMEDIATE) {
1977     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1978     return;
1979   }
1980     // normal mesg
1981   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1982   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1983   CpvAccess(_qd)->create(npes);
1984 }
1985
1986 extern "C"
1987 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1988 {
1989   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1990   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1991   CpvAccess(_qd)->create(CkNumNodes());
1992 }
1993
1994 //Needed by delegation manager:
1995 extern "C"
1996 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1997 { return _prepareMsg(eIdx,msg,pCid); }
1998 extern "C"
1999 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2000 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2001 extern "C"
2002 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2003 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2004
2005 void _ckModuleInit(void) {
2006         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
2007 #if CMK_OBJECT_QUEUE_AVAILABLE
2008         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
2009 #endif
2010         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
2011         CkpvInitialize(TokenPool*, _tokenPool);
2012         CkpvAccess(_tokenPool) = new TokenPool;
2013 }
2014
2015
2016 /************** Send: Arrays *************/
2017
2018 extern void CkArrayManagerInsert(int onPe,void *msg);
2019 //extern void CkArrayManagerDeliver(int onPe,void *msg);
2020
2021 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2022 {
2023   _CHECK_USED(env);
2024   _SET_USED(env, 1);
2025   env->setMsgtype(type);
2026 #if CMK_CHARMDEBUG
2027   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2028 #endif
2029   CmiSetHandler(env, _charmHandlerIdx);
2030   CpvAccess(_qd)->create();
2031 }
2032
2033 extern "C"
2034 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
2035   register envelope *env = UsrToEnv(msg);
2036   env->setArrayMgr(aID);
2037   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
2038   _CldEnqueue(pe, env, _infoIdx);
2039 }
2040
2041 extern "C"
2042 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2043   register envelope *env = UsrToEnv(msg);
2044   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2045 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2046         sendArrayMsg(env,pe,_infoIdx);
2047 #else
2048   if (opts & CK_MSG_IMMEDIATE)
2049     CmiBecomeImmediate(env);
2050   if (opts & CK_MSG_SKIP_OR_IMM)
2051     _noCldEnqueue(pe, env);
2052   else
2053     _skipCldEnqueue(pe, env, _infoIdx);
2054 #endif
2055 }
2056
2057 class ElementDestroyer : public CkLocIterator {
2058 private:
2059         CkLocMgr *locMgr;
2060 public:
2061         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2062         void addLocation(CkLocation &loc) {
2063           loc.destroyAll();
2064         }
2065 };
2066
2067 void CkDeleteChares() {
2068   int i;
2069   int numGroups = CkpvAccess(_groupIDTable)->size();
2070
2071   // delete all plain chares
2072 #ifndef CMK_CHARE_USE_PTR
2073   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2074         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2075         delete obj;
2076         CkpvAccess(chare_objs)[i] = NULL;
2077   }
2078   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2079         VidBlock *obj = CkpvAccess(vidblocks)[i];
2080         delete obj;
2081         CkpvAccess(vidblocks)[i] = NULL;
2082   }
2083 #endif
2084
2085   // delete all array elements
2086   for(i=0;i<numGroups;i++) {
2087     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2088     if(obj && obj->isLocMgr())  {
2089       CkLocMgr *mgr = (CkLocMgr*)obj;
2090       ElementDestroyer destroyer(mgr);
2091       mgr->iterate(destroyer);
2092     }
2093   }
2094
2095   // delete all groups
2096   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2097   for(i=0;i<numGroups;i++) {
2098     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2099     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2100     if (obj) delete obj;
2101   }
2102   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2103
2104   // delete all node groups
2105   if (CkMyRank() == 0) {
2106     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2107     for(i=0;i<numNodeGroups;i++) {
2108       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2109       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2110       if (obj) delete obj;
2111     }
2112   }
2113 }
2114
2115 //------------------- Message Watcher (record/replay) ----------------
2116
2117 #include "crc32.h"
2118
2119 CkpvDeclare(int, envelopeEventID);
2120 int _recplay_crc = 0;
2121 int _recplay_checksum = 0;
2122 int _recplay_logsize = 1024*1024;
2123
2124 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2125 #define REPLAYDEBUG(args) /* empty */
2126
2127 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2128
2129 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2130
2131 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2132
2133     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2134     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2135     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2136     FILE *f=fopen(fName,permissions);
2137     REPLAYDEBUG("openReplayfile "<<fName);
2138     if (f==NULL) {
2139         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2140             CkMyPe(),fName,permissions);
2141         CkAbort("openReplayFile> Could not open replay file");
2142     }
2143     return f;
2144 }
2145
2146 #include "BaseLB.h" /* For LBMigrateMsg message */
2147
2148 class CkMessageRecorder : public CkMessageWatcher {
2149   char *buffer;
2150   unsigned int curpos;
2151   bool firstOpen;
2152 public:
2153   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2154   ~CkMessageRecorder() {
2155     flushLog(0);
2156     fprintf(f,"-1 -1 -1 ");
2157     fclose(f);
2158     delete[] buffer;
2159 #if 0
2160     FILE *stsfp = fopen("sts", "w");
2161     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2162     traceWriteSTS(stsfp, 0);
2163     fclose(stsfp);
2164 #endif
2165     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2166   }
2167
2168 private:
2169   void flushLog(int verbose=1) {
2170     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2171     fprintf(f, "%s", buffer);
2172     curpos=0;
2173   }
2174   virtual bool process(envelope **envptr,CkCoreState *ck) {
2175     if ((*envptr)->getEvent()) {
2176       bool wasPacked = (*envptr)->isPacked();
2177       if (!wasPacked) CkPackMessage(envptr);
2178       envelope *env = *envptr;
2179       unsigned int crc1=0, crc2=0;
2180       if (_recplay_crc) {
2181         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2182         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2183         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2184       } else if (_recplay_checksum) {
2185         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2186         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2187       }
2188       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2189       if (curpos > _recplay_logsize-128) flushLog();
2190       if (!wasPacked) CkUnpackMessage(envptr);
2191     }
2192     return true;
2193   }
2194   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2195     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2196     if (curpos > _recplay_logsize-128) flushLog();
2197     return true;
2198   }
2199   
2200   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2201     FILE *f;
2202     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2203     else f = openReplayFile("ckreplay_",".lb","a");
2204     firstOpen = false;
2205     if (f != NULL) {
2206       PUP::toDisk p(f);
2207       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2208       (*msg)->pup(p);
2209       fclose(f);
2210     }
2211     return true;
2212   }
2213 };
2214
2215 class CkMessageDetailRecorder : public CkMessageWatcher {
2216 public:
2217   CkMessageDetailRecorder(FILE *f_) {
2218     f=f_;
2219     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2220      * The value of 'x' is the pointer size.
2221      */
2222     CmiUInt2 little = sizeof(void*);
2223     fwrite(&little, 2, 1, f);
2224   }
2225   ~CkMessageDetailRecorder() {fclose(f);}
2226 private:
2227   virtual bool process(envelope **envptr, CkCoreState *ck) {
2228     bool wasPacked = (*envptr)->isPacked();
2229     if (!wasPacked) CkPackMessage(envptr);
2230     envelope *env = *envptr;
2231     CmiUInt4 size = env->getTotalsize();
2232     fwrite(&size, 4, 1, f);
2233     fwrite(env, env->getTotalsize(), 1, f);
2234     if (!wasPacked) CkUnpackMessage(envptr);
2235     return true;
2236   }
2237 };
2238
2239 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2240 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2241
2242 #if CMK_BIGSIM_CHARM
2243 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2244                                    int pb,unsigned int *prio);
2245 #endif
2246
2247 class CkMessageReplay : public CkMessageWatcher {
2248   int counter;
2249         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2250         int nextEP;
2251         unsigned int crc1, crc2;
2252         FILE *lbFile;
2253         /// Read the next message we need from the file:
2254         void getNext(void) {
2255           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2256           if (nextSize > 0) {
2257             // We are reading a regular message
2258             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2259               CkAbort("CkMessageReplay> Syntax error reading replay file");
2260             }
2261             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2262           } else if (nextSize == -2) {
2263             // We are reading a special message (right now only thread awaken)
2264             // Nothing to do since we have already read all info
2265             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2266           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2267             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2268             CkAbort("CkMessageReplay> Unrecognized input");
2269           }
2270             /*
2271                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2272                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2273                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2274                 }
2275                 */
2276                 counter++;
2277         }
2278         /// If this is the next message we need, advance and return true.
2279         bool isNext(envelope *env) {
2280                 if (nextPE!=env->getSrcPe()) return false;
2281                 if (nextEvent!=env->getEvent()) return false;
2282                 if (nextSize<0) return false; // not waiting for a regular message
2283 #if 1
2284                 if (nextEP != env->getEpIdx()) {
2285                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2286                         return false;
2287                 }
2288 #endif
2289 #if ! CMK_BIGSIM_CHARM
2290                 if (nextSize!=env->getTotalsize())
2291                 {
2292                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2293                         return false;
2294                 }
2295                 if (_recplay_crc || _recplay_checksum) {
2296                   bool wasPacked = env->isPacked();
2297                   if (!wasPacked) CkPackMessage(&env);
2298                   if (_recplay_crc) {
2299                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2300                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2301                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2302                     if (crcnew1 != crc1) {
2303                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2304                     }
2305                     if (crcnew2 != crc2) {
2306                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2307                     }
2308                   } else if (_recplay_checksum) {
2309             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2310             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2311             if (crcnew1 != crc1) {
2312               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2313             }
2314             if (crcnew2 != crc2) {
2315               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2316             }               
2317                   }
2318                   if (!wasPacked) CkUnpackMessage(&env);
2319                 }
2320 #endif
2321                 return true;
2322         }
2323         bool isNext(CthThreadToken *token) {
2324           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2325           return false;
2326         }
2327
2328         /// This is a (short) list of messages we aren't yet ready for:
2329         CkQ<envelope *> delayedMessages;
2330         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2331         CkQ<CthThreadToken *> delayedTokens;
2332
2333         /// Try to flush out any delayed messages
2334         void flush(void) {
2335           if (nextSize>0) {
2336                 int len=delayedMessages.length();
2337                 for (int i=0;i<len;i++) {
2338                         envelope *env=delayedMessages.deq();
2339                         if (isNext(env)) { /* this is the next message: process it */
2340                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2341                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2342                                 return;
2343                         }
2344                         else /* Not ready yet-- put it back in the
2345                                 queue */
2346                           {
2347                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2348                                 delayedMessages.enq(env);
2349                           }
2350                 }
2351           } else if (nextSize==-2) {
2352             int len=delayedTokens.length();
2353             for (int i=0;i<len;++i) {
2354               CthThreadToken *token=delayedTokens.deq();
2355               if (isNext(token)) {
2356             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2357 #if ! CMK_BIGSIM_CHARM
2358                 CsdEnqueueLifo((void*)token);
2359 #else
2360                 CthEnqueueBigSimThread(token,0,0,NULL);
2361 #endif
2362                 return;
2363               } else {
2364             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2365                 delayedTokens.enq(token);
2366               }
2367             }
2368           }
2369         }
2370
2371 public:
2372         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2373           counter=0;
2374           f=f_;
2375           getNext();
2376           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2377           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2378         }
2379         ~CkMessageReplay() {fclose(f);}
2380
2381 private:
2382         virtual bool process(envelope **envptr,CkCoreState *ck) {
2383           bool wasPacked = (*envptr)->isPacked();
2384           if (!wasPacked) CkPackMessage(envptr);
2385           envelope *env = *envptr;
2386           //CkAssert(*(int*)env == 0x34567890);
2387           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2388                 if (env->getEvent() == 0) return true;
2389                 if (isNext(env)) { /* This is the message we were expecting */
2390                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2391                         getNext(); /* Advance over this message */
2392                         flush(); /* try to process queued-up stuff */
2393                         if (!wasPacked) CkUnpackMessage(envptr);
2394                         return true;
2395                 }
2396 #if CMK_SMP
2397                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2398                          // try next rank, we can't just buffer the msg and left
2399                          // we need to keep unprocessed msg on the fly
2400                         int nextpe = CkMyPe()+1;
2401                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2402                         nextpe = CkNodeFirst(CkMyNode());
2403                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2404                         return false;
2405                 }
2406 #endif
2407                 else /*!isNext(env) */ {
2408                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2409                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2410                         delayedMessages.enq(env);
2411                         flush();
2412                         return false;
2413                 }
2414         }
2415         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2416       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2417           if (isNext(token)) {
2418         REPLAYDEBUG("Executing token: "<<token->serialNo)
2419             getNext();
2420             flush();
2421             return true;
2422           } else {
2423         REPLAYDEBUG("Queueing token: "<<token->serialNo
2424             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2425             delayedTokens.enq(token);
2426             return false;
2427           }
2428         }
2429
2430         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2431           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2432           if (lbFile != NULL) {
2433             int num_moves = 0;
2434         PUP::fromDisk p(lbFile);
2435             p | num_moves;
2436             if (num_moves != (*msg)->n_moves) {
2437               delete *msg;
2438               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2439             }
2440             (*msg)->pup(p);
2441           }
2442           return true;
2443         }
2444 };
2445
2446 class CkMessageDetailReplay : public CkMessageWatcher {
2447   void *getNext() {
2448     CmiUInt4 size; size_t nread;
2449     if ((nread=fread(&size, 4, 1, f)) < 1) {
2450       if (feof(f)) return NULL;
2451       CkPrintf("Broken record file (metadata) got %d\n",nread);
2452       CkAbort("");
2453     }
2454     void *env = CmiAlloc(size);
2455     long tell = ftell(f);
2456     if ((nread=fread(env, size, 1, f)) < 1) {
2457       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2458       CkAbort("");
2459     }
2460     //*(int*)env = 0x34567890; // set first integer as magic
2461     return env;
2462   }
2463 public:
2464   double starttime;
2465   CkMessageDetailReplay(FILE *f_) {
2466     f=f_;
2467     starttime=CkWallTimer();
2468     /* This must match what CkMessageDetailRecorder did */
2469     CmiUInt2 little;
2470     fread(&little, 2, 1, f);
2471     if (little != sizeof(void*)) {
2472       CkAbort("Replaying on a different architecture from which recording was done!");
2473     }
2474
2475     CsdEnqueue(getNext());
2476
2477     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2478   }
2479   virtual bool process(envelope **env,CkCoreState *ck) {
2480     void *msg = getNext();
2481     if (msg != NULL) CsdEnqueue(msg);
2482     return true;
2483   }
2484 };
2485
2486 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2487 #if ! CMK_BIGSIM_CHARM
2488   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2489 #endif
2490   CkMessageReplay *replay = (CkMessageReplay*)rep;
2491   //CmiStartQD(CkMessageReplayQuiescence, replay);
2492 }
2493
2494 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2495   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2496   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2497   ConverseExit();
2498 }
2499
2500 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2501   CkCoreState *ck = CkpvAccess(_coreState);
2502   if (ck->watcher!=NULL) {
2503     return ck->watcher->processThread(token,ck);
2504   }
2505   return true;
2506 }
2507
2508 CpvCExtern(int, CthResumeNormalThreadIdx);
2509 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2510 {
2511   CthThread t = token->thread;
2512
2513   if(t == NULL){
2514     free(token);
2515     return;
2516   }
2517 #if CMK_TRACE_ENABLED
2518 #if ! CMK_TRACE_IN_CHARM
2519   if(CpvAccess(traceOn))
2520     CthTraceResume(t);
2521 /*    if(CpvAccess(_traceCoreOn)) 
2522             resumeTraceCore();*/
2523 #endif
2524 #endif
2525   
2526   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2527   if (CpdExecuteThreadResume(token)) {
2528     CthResume(t);
2529   }
2530 }
2531
2532 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2533   CkCoreState *ck = CkpvAccess(_coreState);
2534   if (ck->watcher!=NULL) {
2535     ck->watcher->processLBMessage(msg, ck);
2536   }
2537 }
2538
2539 #if CMK_BIGSIM_CHARM
2540 CpvExtern(int      , CthResumeBigSimThreadIdx);
2541 #endif
2542
2543 #include "ckliststring.h"
2544 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2545     CmiArgGroup("Charm++","Record/Replay");
2546     bool forceReplay = false;
2547     char *procs = NULL;
2548     _replaySystem = 0;
2549     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2550       _recplay_crc = 1;
2551     }
2552     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2553       _recplay_checksum = 1;
2554     }
2555     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2556     REPLAYDEBUG("CkMessageWatcherInit ");
2557     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2558         CkListString list(procs);
2559         if (list.includes(CkMyPe())) {
2560           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2561           CpdSetInitializeMemory(1);
2562           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2563         }
2564     }
2565     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2566       if (CkMyPe() == 0) {
2567         CmiPrintf("Charm++> record mode.\n");
2568         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2569           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2570           _recplay_crc = _recplay_checksum = 0;
2571         }
2572       }
2573       CpdSetInitializeMemory(1);
2574       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2575       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2576     }
2577         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2578             forceReplay = true;
2579             CpdSetInitializeMemory(1);
2580             // Set the parameters of the processor
2581 #if !CMK_SMP
2582             _Cmi_mype = atoi(procs);
2583             while (procs[0]!='/') procs++;
2584             procs++;
2585             _Cmi_numpes = atoi(procs);
2586 #else
2587             CkAbort("+replay-detail available only for non-SMP build");
2588 #endif
2589             _replaySystem = 1;
2590             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2591         }
2592         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2593           if (CkMyPe() == 0)  {
2594             CmiPrintf("Charm++> replay mode.\n");
2595             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2596               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2597               _recplay_crc = _recplay_checksum = 0;
2598             }
2599           }
2600           CpdSetInitializeMemory(1);
2601 #if ! CMK_BIGSIM_CHARM
2602           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2603 #else
2604           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2605 #endif
2606           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2607         }
2608         if (_recplay_crc && _recplay_checksum) {
2609           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2610         }
2611 }
2612
2613 extern "C"
2614 int CkMessageToEpIdx(void *msg) {
2615         envelope *env=UsrToEnv(msg);
2616         int ep=env->getEpIdx();
2617         if (ep==CkIndex_CkArray::recvBroadcast(0))
2618                 return env->getsetArrayBcastEp();
2619         else
2620                 return ep;
2621 }
2622
2623 extern "C"
2624 int getCharmEnvelopeSize() {
2625   return sizeof(envelope);
2626 }
2627
2628 /// Best-effort guess at whether @arg msg points at a charm envelope
2629 extern "C"
2630 int isCharmEnvelope(void *msg) {
2631     envelope *e = (envelope *)msg;
2632     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2633     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2634     if (e->getTotalsize() < sizeof(envelope)) return 0;
2635     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2636 #if CMK_SMP
2637     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2638 #else
2639     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2640 #endif
2641     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2642     return 1;
2643 }
2644
2645
2646 #include "CkMarshall.def.h"
2647