Fix to the readonly delegated proxy workaround.
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 #include <map>
30 CkpvDeclare(CkVec<void *>, chare_objs);
31 CkpvDeclare(CkVec<int>, chare_types);
32 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
33
34 typedef std::map<int, CkChareID>  Vidblockmap;
35 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
36 CkpvDeclare(int, currentChareIdx);
37 #endif
38
39
40 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
41
42 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
43
44 int CMessage_CkMessage::__idx=-1;
45 int CMessage_CkArgMsg::__idx=0;
46 int CkIndex_Chare::__idx;
47 int CkIndex_Group::__idx;
48 int CkIndex_ArrayBase::__idx=-1;
49
50 extern int _defaultObjectQ;
51
52 void _initChareTables()
53 {
54 #ifndef CMK_CHARE_USE_PTR
55           /* chare and vidblock table */
56   CkpvInitialize(CkVec<void *>, chare_objs);
57   CkpvInitialize(CkVec<int>, chare_types);
58   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
59   CkpvInitialize(Vidblockmap, vmap);
60   CkpvInitialize(int, currentChareIdx);
61   CkpvAccess(currentChareIdx) = -1;
62 #endif
63 }
64
65 //Charm++ virtual functions: declaring these here results in a smaller executable
66 Chare::Chare(void) {
67   thishandle.onPE=CkMyPe();
68   thishandle.objPtr=this;
69 #ifndef CMK_CHARE_USE_PTR
70      // for plain chare, objPtr is actually the index to chare obj table
71   if (CkpvAccess(currentChareIdx) >= 0) {
72     thishandle.objPtr=(void*)CkpvAccess(currentChareIdx);
73   }
74   chareIdx = CkpvAccess(currentChareIdx);
75 #endif
76 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
77   mlogData = new ChareMlogData();
78   mlogData->objID.type = TypeChare;
79   mlogData->objID.data.chare.id = thishandle;
80 #endif
81 #if CMK_OBJECT_QUEUE_AVAILABLE
82   if (_defaultObjectQ)  CkEnableObjQ();
83 #endif
84 }
85
86 Chare::Chare(CkMigrateMessage* m) {
87   thishandle.onPE=CkMyPe();
88   thishandle.objPtr=this;
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 }
149
150 int Chare::ckGetChareType() const {
151   return -3;
152 }
153 char *Chare::ckDebugChareName(void) {
154   char buf[100];
155   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
156   return strdup(buf);
157 }
158 int Chare::ckDebugChareID(char *str, int limit) {
159   // pure chares for now do not have a valid ID
160   str[0] = 0;
161   return 1;
162 }
163 void Chare::ckDebugPup(PUP::er &p) {
164   pup(p);
165 }
166
167 /// This method is called before starting a [threaded] entry method.
168 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
169   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
170   traceAddThreadListeners(th, UsrToEnv(msg));
171 }
172
173 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
174   p.comment("Bytes");
175   int ts=UsrToEnv(msg)->getTotalsize();
176   int msgLen=ts-sizeof(envelope);
177   if (msgLen>0)
178     p((char*)msg,msgLen);
179 }
180
181 IrrGroup::IrrGroup(void) {
182   thisgroup = CkpvAccess(_currentGroup);
183 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
184         mlogData->objID.type = TypeGroup;
185         mlogData->objID.data.group.id = thisgroup;
186         mlogData->objID.data.group.onPE = CkMyPe();
187 #endif
188 }
189
190 IrrGroup::~IrrGroup() {
191   // remove the object pointer
192   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
193   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
194   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
195 }
196
197 void IrrGroup::pup(PUP::er &p)
198 {
199   Chare::pup(p);
200   p|thisgroup;
201 }
202
203 int IrrGroup::ckGetChareType() const {
204   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
205 }
206
207 int IrrGroup::ckDebugChareID(char *str, int limit) {
208   if (limit<5) return -1;
209   str[0] = 1;
210   *((int*)&str[1]) = thisgroup.idx;
211   return 5;
212 }
213
214 char *IrrGroup::ckDebugChareName() {
215   return strdup(_chareTable[ckGetChareType()]->name);
216 }
217
218 void IrrGroup::ckJustMigrated(void)
219 {
220 }
221
222 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
223   /* FIXME: **CW** not entirely sure what we should do here yet */
224 }
225
226 void Group::CkAddThreadListeners(CthThread th, void *msg) {
227   Chare::CkAddThreadListeners(th, msg);
228   CthSetThreadID(th, thisgroup.idx, 0, 0);
229 }
230
231 void Group::pup(PUP::er &p)
232 {
233   CkReductionMgr::pup(p);
234   reductionInfo.pup(p);
235 }
236
237 /**** Delegation Manager Group */
238 CkDelegateMgr::~CkDelegateMgr() { }
239
240 //Default delegator implementation: do not delegate-- send directly
241 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
242   { CkSendMsg(ep,m,c); }
243 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
244   { CkSendMsgBranch(ep,m,onPE,g); }
245 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
246   { CkBroadcastMsgBranch(ep,m,g); }
247 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
248   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
249 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
250   { CkSendMsgNodeBranch(ep,m,onNode,g); }
251 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
252   { CkBroadcastMsgNodeBranch(ep,m,g); }
253 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
254   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
255 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
256 {
257         CProxyElement_ArrayBase ap(a,idx);
258         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
259 }
260 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
261 {
262         CProxyElement_ArrayBase ap(a,idx);
263         ap.ckSend((CkArrayMessage *)m,ep);
264 }
265 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
266 {
267         CProxy_ArrayBase ap(a);
268         ap.ckBroadcast((CkArrayMessage *)m,ep);
269 }
270
271 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
272 {
273         CmiAbort("ArraySectionSend is not implemented!\n");
274 /*
275         CProxyElement_ArrayBase ap(a,idx);
276         ap.ckSend((CkArrayMessage *)m,ep);
277 */
278 }
279
280 /*** Proxy <-> delegator communication */
281 CkDelegateData::~CkDelegateData() {}
282
283 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
284   return pd; // default implementation ignores pup call
285 }
286
287 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
288    this tricky manual reference counting business... */
289
290 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
291         if (dPtr) dPtr->ref();
292         ckUndelegate();
293         delegatedMgr = dTo;
294         delegatedPtr = dPtr;
295         delegatedGroupId = delegatedMgr->CkGetGroupID();
296         isNodeGroup = delegatedMgr->isNodeGroup();
297 }
298 void CProxy::ckUndelegate(void) {
299         delegatedMgr=NULL;
300         delegatedGroupId.setZero();
301         if (delegatedPtr) delegatedPtr->unref();
302         delegatedPtr=NULL;
303 }
304
305 /// Copy constructor
306 CProxy::CProxy(const CProxy &src)
307   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
308    isNodeGroup(src.isNodeGroup) {
309     delegatedPtr = NULL;
310     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
311         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
312     }
313 }
314
315 /// Assignment operator
316 CProxy& CProxy::operator=(const CProxy &src) {
317         CkDelegateData *oldPtr=delegatedPtr;
318         ckUndelegate();
319         delegatedMgr=src.delegatedMgr;
320         delegatedGroupId = src.delegatedGroupId; 
321         isNodeGroup = src.isNodeGroup;
322
323         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
324             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
325         else
326             delegatedPtr = NULL;
327
328         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
329         if (oldPtr) oldPtr->unref();
330         return *this;
331 }
332
333 void CProxy::pup(PUP::er &p) {
334   if (!p.isUnpacking()) {
335     if (ckDelegatedTo() != NULL) {
336       delegatedGroupId = delegatedMgr->CkGetGroupID();
337       isNodeGroup = delegatedMgr->isNodeGroup();
338     }
339   }
340   p|delegatedGroupId;
341   if (!delegatedGroupId.isZero()) {
342     p|isNodeGroup;
343     if (p.isUnpacking()) {
344       delegatedMgr = ckDelegatedTo(); 
345     }
346
347     int migCtor, cIdx; 
348     if (!p.isUnpacking()) {
349       if (isNodeGroup) {
350         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
351         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
352         migCtor = _chareTable[cIdx]->migCtor; 
353         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
354       }
355       else  {
356         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
357         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
358         migCtor = _chareTable[cIdx]->migCtor; 
359         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
360       }         
361     }
362
363     p|migCtor;
364
365     // if delegated manager has not been created, construct a dummy
366     // object on which to call DelegatePointerPup
367     if (delegatedMgr == NULL) {
368
369       // create a dummy object for calling DelegatePointerPup
370       int objId = _entryTable[migCtor]->chareIdx; 
371       int objSize = _chareTable[objId]->size; 
372       void *obj = malloc(objSize); 
373       _entryTable[migCtor]->call(NULL, obj); 
374       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
375         ->DelegatePointerPup(p, delegatedPtr);           
376       free(obj);
377
378     }
379     else {
380
381       // delegated manager has been created, so we can use it
382       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
383
384     }
385
386     if (p.isUnpacking() && delegatedPtr) {
387       delegatedPtr->ref();
388     }
389   }
390 }
391
392 /**** Array sections */
393 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
394 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
395   _cookie.get_aid() = aid;      \
396   _cookie.get_pe() = CkMyPe();  \
397   _elems = new CkArrayIndex[nElems];    \
398   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
399   pelist = NULL;        \
400   npes  = 0;    \
401 }
402
403 CKSECTIONID_CONSTRUCTOR_DEF(1D)
404 CKSECTIONID_CONSTRUCTOR_DEF(2D)
405 CKSECTIONID_CONSTRUCTOR_DEF(3D)
406 CKSECTIONID_CONSTRUCTOR_DEF(4D)
407 CKSECTIONID_CONSTRUCTOR_DEF(5D)
408 CKSECTIONID_CONSTRUCTOR_DEF(6D)
409 CKSECTIONID_CONSTRUCTOR_DEF(Max)
410
411 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
412   pelist = new int[npes];
413   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
414   _cookie.get_aid() = gid;
415 }
416
417 CkSectionID::CkSectionID(const CkSectionID &sid) {
418   int i;
419   _cookie = sid._cookie;
420   _nElems = sid._nElems;
421   if (_nElems > 0) {
422     _elems = new CkArrayIndex[_nElems];
423     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
424   } else _elems = NULL;
425   npes = sid.npes;
426   if (npes > 0) {
427     pelist = new int[npes];
428     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
429   } else pelist = NULL;
430 }
431
432 void CkSectionID::operator=(const CkSectionID &sid) {
433   int i;
434   _cookie = sid._cookie;
435   _nElems = sid._nElems;
436   if (_nElems > 0) {
437     _elems = new CkArrayIndex[_nElems];
438     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
439   } else _elems = NULL;
440   npes = sid.npes;
441   if (npes > 0) {
442     pelist = new int[npes];
443     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
444   } else pelist = NULL;
445 }
446
447 void CkSectionID::pup(PUP::er &p) {
448     p | _cookie;
449     p(_nElems);
450     if (_nElems > 0) {
451       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
452       for (int i=0; i< _nElems; i++) p | _elems[i];
453       npes = 0;
454       pelist = NULL;
455     } else {
456       // If _nElems is zero, than this section describes processors instead of array elements
457       _elems = NULL;
458       p(npes);
459       if (p.isUnpacking()) pelist = new int[npes];
460       p(pelist, npes);
461     }
462 }
463
464 /**** Tiny random API routines */
465
466 #ifdef CMK_CUDA
467 void CUDACallbackManager(void *fn) {
468   if (fn != NULL) {
469     CkCallback *cb = (CkCallback*) fn;
470     cb->send();
471   }
472 }
473
474 #endif
475
476 extern "C"
477 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
478 {
479   UsrToEnv(msg)->setRef(ref);
480 }
481
482 extern "C"
483 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
484 {
485   return UsrToEnv(msg)->getRef();
486 }
487
488 extern "C"
489 int CkGetSrcPe(void *msg)
490 {
491   return UsrToEnv(msg)->getSrcPe();
492 }
493
494 extern "C"
495 int CkGetSrcNode(void *msg)
496 {
497   return CmiNodeOf(CkGetSrcPe(msg));
498 }
499
500 extern "C"
501 void *CkLocalBranch(CkGroupID gID) {
502   return _localBranch(gID);
503 }
504
505 static
506 void *_ckLocalNodeBranch(CkGroupID groupID) {
507   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
508   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
509   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
510   return retval;
511 }
512
513 extern "C"
514 void *CkLocalNodeBranch(CkGroupID groupID)
515 {
516   void *retval;
517   // we are called in a constructor
518   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
519     return CkpvAccess(_currentNodeGroupObj);
520   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
521   { // Nodegroup hasn't finished being created yet-- schedule...
522     CsdScheduler(0);
523   }
524   return retval;
525 }
526
527 extern "C"
528 void *CkLocalChare(const CkChareID *pCid)
529 {
530         int pe=pCid->onPE;
531         if (pe<0) { //A virtual chare ID
532                 if (pe!=(-(CkMyPe()+1)))
533                         return NULL;//VID block not on this PE
534 #ifdef CMK_CHARE_USE_PTR
535                 VidBlock *v=(VidBlock *)pCid->objPtr;
536 #else
537                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
538 #endif
539                 return v->getLocalChareObj();
540         }
541         else
542         { //An ordinary chare ID
543                 if (pe!=CkMyPe())
544                         return NULL;//Chare not on this PE
545 #ifdef CMK_CHARE_USE_PTR
546                 return pCid->objPtr;
547 #else
548                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
549 #endif
550         }
551 };
552
553 CkpvDeclare(char**,Ck_argv);
554
555 extern "C" char **CkGetArgv(void) {
556         return CkpvAccess(Ck_argv);
557 }
558 extern "C" int CkGetArgc(void) {
559         return CmiGetArgc(CkpvAccess(Ck_argv));
560 }
561
562 /******************** Basic support *****************/
563 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
564 {
565 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
566         CpvAccess(_currentObj) = (Chare *)obj;
567 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
568 #endif
569   //BIGSIM_OOC DEBUGGING
570   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
571   //fflush(stdout);
572 #if CMK_CHARMDEBUG
573   CpdBeforeEp(epIdx, obj, msg);
574 #endif    
575   _entryTable[epIdx]->call(msg, obj);
576 #if CMK_CHARMDEBUG
577   CpdAfterEp(epIdx);
578 #endif
579   if (_entryTable[epIdx]->noKeep)
580   { /* Method doesn't keep/delete the message, so we have to: */
581     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
582   }
583 }
584 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
585 {
586   //BIGSIM_OOC DEBUGGING
587   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
588   //fflush(stdout);
589
590   void *deliverMsg;
591 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
592         CpvAccess(_currentObj) = (Chare *)obj;
593 #endif
594   if (_entryTable[epIdx]->noKeep)
595   { /* Deliver a read-only copy of the message */
596     deliverMsg=(void *)msg;
597   } else
598   { /* Method needs a copy of the message to keep/delete */
599     void *oldMsg=(void *)msg;
600     deliverMsg=CkCopyMsg(&oldMsg);
601 #if CMK_ERROR_CHECKING
602     if (oldMsg!=msg)
603       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
604 #endif
605   }
606 #if CMK_CHARMDEBUG
607   CpdBeforeEp(epIdx, obj, (void*)msg);
608 #endif
609   _entryTable[epIdx]->call(deliverMsg, obj);
610 #if CMK_CHARMDEBUG
611   CpdAfterEp(epIdx);
612 #endif
613 }
614
615 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
616 {
617   register void *msg = EnvToUsr(env);
618   _SET_USED(env, 0);
619   CkDeliverMessageFree(epIdx,msg,obj);
620 }
621
622 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
623 {
624
625 #if CMK_TRACE_ENABLED 
626   if (_entryTable[epIdx]->traceEnabled) {
627     _TRACE_BEGIN_EXECUTE(env);
628     _invokeEntryNoTrace(epIdx,env,obj);
629     _TRACE_END_EXECUTE();
630   }
631   else
632 #endif
633     _invokeEntryNoTrace(epIdx,env,obj);
634
635 }
636
637 /********************* Creation ********************/
638
639 extern "C"
640 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
641 {
642   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
643   envelope *env = UsrToEnv(msg);
644   _CHECK_USED(env);
645   if(pCid == 0) {
646     env->setMsgtype(NewChareMsg);
647   } else {
648     pCid->onPE = (-(CkMyPe()+1));
649     //  pCid->magic = _GETIDX(cIdx);
650     pCid->objPtr = (void *) new VidBlock();
651     _MEMCHECK(pCid->objPtr);
652     env->setMsgtype(NewVChareMsg);
653     env->setVidPtr(pCid->objPtr);
654 #ifndef CMK_CHARE_USE_PTR
655     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
656     int idx = CkpvAccess(vidblocks).size()-1;
657     pCid->objPtr = (void *)idx;
658     env->setVidPtr((void *)idx);
659 #endif
660   }
661   env->setEpIdx(eIdx);
662   env->setSrcPe(CkMyPe());
663   CmiSetHandler(env, _charmHandlerIdx);
664   _TRACE_CREATION_1(env);
665   CpvAccess(_qd)->create();
666   _STATS_RECORD_CREATE_CHARE_1();
667   _SET_USED(env, 1);
668   if(destPE == CK_PE_ANY)
669     env->setForAnyPE(1);
670   else
671     env->setForAnyPE(0);
672   _CldEnqueue(destPE, env, _infoIdx);
673   _TRACE_CREATION_DONE(1);
674 }
675
676 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
677 {
678   register int gIdx = _entryTable[epIdx]->chareIdx;
679   register void *obj = malloc(_chareTable[gIdx]->size);
680   _MEMCHECK(obj);
681   setMemoryTypeChare(obj);
682   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
683   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
684   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
685   CkpvAccess(_groupIDTable)->push_back(groupID);
686   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
687   if(ptrq) {
688     void *pending;
689     while((pending=ptrq->deq())!=0)
690       _CldEnqueue(CkMyPe(), pending, _infoIdx);
691 //    delete ptrq;
692       CkpvAccess(_groupTable)->find(groupID).clearPending();
693   }
694   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
695
696   CkpvAccess(_currentGroup) = groupID;
697   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
698 #ifndef CMK_CHARE_USE_PTR
699   //((Chare *)obj)->chareIdx = -1;
700   CkpvAccess(currentChareIdx) = -1;
701 #endif
702   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
703   _STATS_RECORD_PROCESS_GROUP_1();
704 }
705
706 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
707 {
708   register int gIdx = _entryTable[epIdx]->chareIdx;
709   int objSize=_chareTable[gIdx]->size;
710   register void *obj = malloc(objSize);
711   _MEMCHECK(obj);
712   setMemoryTypeChare(obj);
713   CkpvAccess(_currentGroup) = groupID;
714
715 // Now that the NodeGroup is created, add it to the table.
716 //  NodeGroups can be accessed by multiple processors, so
717 //  this is in the opposite order from groups - invoking the constructor
718 //  before registering it.
719 // User may call CkLocalNodeBranch() inside the nodegroup constructor
720 //  store nodegroup into _currentNodeGroupObj
721   CkpvAccess(_currentNodeGroupObj) = obj;
722 #ifndef CMK_CHARE_USE_PTR
723   //((Chare *)obj)->chareIdx = -1;
724   CkpvAccess(currentChareIdx) = -1;
725 #endif
726   _invokeEntryNoTrace(epIdx,env,obj);
727   CkpvAccess(_currentNodeGroupObj) = NULL;
728   _STATS_RECORD_PROCESS_NODE_GROUP_1();
729
730   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
731   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
732   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
733   CksvAccess(_nodeGroupIDTable).push_back(groupID);
734
735   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
736   if(ptrq) {
737     void *pending;
738     while((pending=ptrq->deq())!=0)
739       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
740 //    delete ptrq;
741       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
742   }
743   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
744 }
745
746 void _createGroup(CkGroupID groupID, envelope *env)
747 {
748   _CHECK_USED(env);
749   _SET_USED(env, 1);
750   register int epIdx = env->getEpIdx();
751   int gIdx = _entryTable[epIdx]->chareIdx;
752   CkNodeGroupID rednMgr;
753   if(_chareTable[gIdx]->isIrr == 0){
754                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
755                 rednMgr = rednMgrProxy;
756 //              rednMgrProxy.setAttachedGroup(groupID);
757   }else{
758         rednMgr.setZero();
759   }
760   env->setGroupNum(groupID);
761   env->setSrcPe(CkMyPe());
762   env->setRednMgr(rednMgr);
763   env->setGroupEpoch(CkpvAccess(_charmEpoch));
764
765   if(CkNumPes()>1) {
766     CkPackMessage(&env);
767     CmiSetHandler(env, _bocHandlerIdx);
768     _numInitMsgs++;
769     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
770     CpvAccess(_qd)->create(CkNumPes()-1);
771     CkUnpackMessage(&env);
772   }
773   _STATS_RECORD_CREATE_GROUP_1();
774   CkCreateLocalGroup(groupID, epIdx, env);
775 }
776
777 void _createNodeGroup(CkGroupID groupID, envelope *env)
778 {
779   _CHECK_USED(env);
780   _SET_USED(env, 1);
781   register int epIdx = env->getEpIdx();
782   env->setGroupNum(groupID);
783   env->setSrcPe(CkMyPe());
784   env->setGroupEpoch(CkpvAccess(_charmEpoch));
785   if(CkNumNodes()>1) {
786     CkPackMessage(&env);
787     CmiSetHandler(env, _bocHandlerIdx);
788     _numInitMsgs++;
789     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
790     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
791     CpvAccess(_qd)->create(CkNumNodes()-1);
792     CkUnpackMessage(&env);
793   }
794   _STATS_RECORD_CREATE_NODE_GROUP_1();
795   CkCreateLocalNodeGroup(groupID, epIdx, env);
796 }
797
798 // new _groupCreate
799
800 static CkGroupID _groupCreate(envelope *env)
801 {
802   register CkGroupID groupNum;
803
804   // check CkMyPe(). if it is 0 then idx is _numGroups++
805   // if not, then something else...
806   if(CkMyPe() == 0)
807      groupNum.idx = CkpvAccess(_numGroups)++;
808   else
809      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
810   _createGroup(groupNum, env);
811   return groupNum;
812 }
813
814 // new _nodeGroupCreate
815 static CkGroupID _nodeGroupCreate(envelope *env)
816 {
817   register CkGroupID groupNum;
818   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
819   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
820           groupNum.idx = CksvAccess(_numNodeGroups)++;
821    else
822           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
823   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
824   _createNodeGroup(groupNum, env);
825   return groupNum;
826 }
827
828 /**** generate the group idx when group is creator pe is not pe0
829  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
830  **** remaining bits contain the group creator processor number and
831  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
832
833 int _getGroupIdx(int numNodes,int myNode,int numGroups)
834 {
835         int idx;
836         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
837         int n = 32 - (x+1);                                     // number of bits remaining for the index
838         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
839                                                                 // then add the next available index
840         // of course this won't work when int is 8 bytes long on T3E
841         //idx |= 0x80000000;                                      // set the most significant bit to 1
842         idx = - idx;
843                                                                 // if int is not 32 bits, wouldn't this be wrong?
844         return idx;
845 }
846
847 extern "C"
848 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
849 {
850   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
851   register envelope *env = UsrToEnv(msg);
852   env->setMsgtype(BocInitMsg);
853   env->setEpIdx(eIdx);
854   env->setSrcPe(CkMyPe());
855   _TRACE_CREATION_N(env, CkNumPes());
856   CkGroupID gid = _groupCreate(env);
857   _TRACE_CREATION_DONE(1);
858   return gid;
859 }
860
861 extern "C"
862 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
863 {
864   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
865   register envelope *env = UsrToEnv(msg);
866   env->setMsgtype(NodeBocInitMsg);
867   env->setEpIdx(eIdx);
868   env->setSrcPe(CkMyPe());
869   _TRACE_CREATION_N(env, CkNumNodes());
870   CkGroupID gid = _nodeGroupCreate(env);
871   _TRACE_CREATION_DONE(1);
872   return gid;
873 }
874
875 static inline void *_allocNewChare(envelope *env, int &idx)
876 {
877   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
878   void *tmp=malloc(_chareTable[chareIdx]->size);
879   _MEMCHECK(tmp);
880 #ifndef CMK_CHARE_USE_PTR
881   CkpvAccess(chare_objs).push_back(tmp);
882   CkpvAccess(chare_types).push_back(chareIdx);
883   idx = CkpvAccess(chare_objs).size()-1;
884 #endif
885   setMemoryTypeChare(tmp);
886   return tmp;
887 }
888
889 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
890 {
891   int idx;
892   register void *obj = _allocNewChare(env, idx);
893 #ifndef CMK_CHARE_USE_PTR
894   //((Chare *)obj)->chareIdx = idx;
895   CkpvAccess(currentChareIdx) = idx;
896 #endif
897   _invokeEntry(env->getEpIdx(),env,obj);
898 }
899
900 void CkCreateLocalChare(int epIdx, envelope *env)
901 {
902   env->setEpIdx(epIdx);
903   _processNewChareMsg(NULL, env);
904 }
905
906 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
907 {
908   int idx;
909   register void *obj = _allocNewChare(env, idx);
910   register CkChareID *pCid = (CkChareID *)
911       _allocMsg(FillVidMsg, sizeof(CkChareID));
912   pCid->onPE = CkMyPe();
913 #ifndef CMK_CHARE_USE_PTR
914   pCid->objPtr = (void*)idx;
915 #else
916   pCid->objPtr = obj;
917 #endif
918   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
919   register envelope *ret = UsrToEnv(pCid);
920   ret->setVidPtr(env->getVidPtr());
921   register int srcPe = env->getSrcPe();
922   ret->setSrcPe(CkMyPe());
923   CmiSetHandler(ret, _charmHandlerIdx);
924   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
925 #ifndef CMK_CHARE_USE_PTR
926   // register the remote vidblock for deletion when chare is deleted
927   CkChareID vid;
928   vid.onPE = srcPe;
929   vid.objPtr = env->getVidPtr();
930   CkpvAccess(vmap)[idx] = vid;    
931 #endif
932   CpvAccess(_qd)->create();
933 #ifndef CMK_CHARE_USE_PTR
934   //((Chare *)obj)->chareIdx = idx;
935   CkpvAccess(currentChareIdx) = idx;
936 #endif
937   _invokeEntry(env->getEpIdx(),env,obj);
938 }
939
940 /************** Receive: Chares *************/
941
942 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
943 {
944   register int epIdx = env->getEpIdx();
945   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
946   register void *obj;
947   if (mainIdx != -1)  {           // mainchare
948     CmiAssert(CkMyPe()==0);
949     obj = _mainTable[mainIdx]->getObj();
950   }
951   else {
952 #ifndef CMK_CHARE_USE_PTR
953     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
954       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
955     else
956       obj = env->getObjPtr();
957 #else
958     obj = env->getObjPtr();
959 #endif
960   }
961   _invokeEntry(epIdx,env,obj);
962 }
963
964 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
965 {
966   register int epIdx = env->getEpIdx();
967   register void *obj = env->getObjPtr();
968   _invokeEntry(epIdx,env,obj);
969 }
970
971 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
972 {
973 #ifndef CMK_CHARE_USE_PTR
974   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
975 #else
976   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
977   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
978 #endif
979   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
980   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
981   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
982   CmiFree(env);
983 }
984
985 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
986 {
987 #ifndef CMK_CHARE_USE_PTR
988   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
989 #else
990   VidBlock *vptr = (VidBlock *) env->getVidPtr();
991   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
992 #endif
993   _SET_USED(env, 1);
994   vptr->send(env);
995 }
996
997 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
998 {
999 #ifndef CMK_CHARE_USE_PTR
1000   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1001   delete vptr;
1002   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1003 #endif
1004   CmiFree(env);
1005 }
1006
1007 /************** Receive: Groups ****************/
1008
1009 /**
1010  Return a pointer to the local BOC of "groupID".
1011  The message "env" passed in has some known dependency on this groupID
1012  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1013  Therefore, if the return value is NULL, this function buffers the massage so that
1014  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1015  The message passed in must have its handlers correctly set so that it can be
1016  scheduled again.
1017 */
1018 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1019 {
1020
1021         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1022         IrrGroup *obj = ck->localBranch(groupID);
1023         if (obj==NULL) { /* groupmember not yet created: stash message */
1024                 ck->getGroupTable()->find(groupID).enqMsg(env);
1025         }
1026         else { /* will be able to process message */
1027                 ck->process();
1028         }
1029         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1030         return obj;
1031 }
1032
1033 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1034 {
1035 #if CMK_LBDB_ON
1036   // if there is a running obj being measured, stop it temporarily
1037   LDObjHandle objHandle;
1038   int objstopped = 0;
1039   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1040   if (the_lbdb->RunningObject(&objHandle)) {
1041     objstopped = 1;
1042     the_lbdb->ObjectStop(objHandle);
1043   }
1044 #endif
1045   _invokeEntry(epIdx,env,obj);
1046 #if CMK_LBDB_ON
1047   if (objstopped) the_lbdb->ObjectStart(objHandle);
1048 #endif
1049   _STATS_RECORD_PROCESS_BRANCH_1();
1050 }
1051
1052 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1053 {
1054   register CkGroupID groupID =  env->getGroupNum();
1055   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1056   if(obj) {
1057     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1058   }
1059 }
1060
1061 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1062 {
1063   env->setMsgtype(ForChareMsg);
1064   env->setObjPtr(obj);
1065   _processForChareMsg(ck,env);
1066   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1067 }
1068
1069 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1070 {
1071   env->setEpIdx(epIdx);
1072   _deliverForNodeBocMsg(ck,env, obj);
1073 }
1074
1075 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1076 {
1077   register CkGroupID groupID = env->getGroupNum();
1078   register void *obj;
1079
1080   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1081   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1082   if(!obj) { // groupmember not yet created
1083 #if CMK_IMMEDIATE_MSG
1084     if (CmiIsImmediate(env))     // buffer immediate message
1085       CmiDelayImmediate();
1086     else
1087 #endif
1088     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1089     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1090     return;
1091   }
1092   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1093 #if CMK_IMMEDIATE_MSG
1094   if (!CmiIsImmediate(env))
1095 #endif
1096   ck->process();
1097   env->setMsgtype(ForChareMsg);
1098   env->setObjPtr(obj);
1099   _processForChareMsg(ck,env);
1100   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1101 }
1102
1103 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1104 {
1105   register CkGroupID groupID = env->getGroupNum();
1106   register int epIdx = env->getEpIdx();
1107   if (!env->getGroupDep().isZero()) {      // dependence
1108     CkGroupID dep = env->getGroupDep();
1109     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1110     if (obj == NULL) return;
1111   }
1112   else
1113     ck->process();
1114   CkCreateLocalGroup(groupID, epIdx, env);
1115 }
1116
1117 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1118 {
1119   register CkGroupID groupID = env->getGroupNum();
1120   register int epIdx = env->getEpIdx();
1121   CkCreateLocalNodeGroup(groupID, epIdx, env);
1122 }
1123
1124 /************** Receive: Arrays *************/
1125
1126 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1127   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1128   if (mgr) {
1129     _SET_USED(env, 0);
1130     mgr->insertElement((CkMessage *)EnvToUsr(env));
1131   }
1132 }
1133 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1134   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1135   if (mgr) {
1136     _SET_USED(env, 0);
1137     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1138   }
1139 }
1140
1141 //BIGSIM_OOC DEBUGGING
1142 #define TELLMSGTYPE(x) //x
1143
1144 /**
1145  * This is the main converse-level handler used by all of Charm++.
1146  *
1147  * \addtogroup CriticalPathFramework
1148  */
1149 void _processHandler(void *converseMsg,CkCoreState *ck)
1150 {
1151   register envelope *env = (envelope *) converseMsg;
1152
1153   MESSAGE_PHASE_CHECK(env);
1154
1155 //#if CMK_RECORD_REPLAY
1156   if (ck->watcher!=NULL) {
1157     if (!ck->watcher->processMessage(&env,ck)) return;
1158   }
1159 //#endif
1160 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1161         Chare *obj=NULL;
1162         CkObjID sender;
1163         MCount SN;
1164         MlogEntry *entry=NULL;
1165         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1166         env->getMsgtype() == ForArrayEltMsg){
1167                 sender = env->sender;
1168                 SN = env->SN;
1169                 int result = preProcessReceivedMessage(env,&obj,&entry);
1170                 if(result == 0){
1171                         return;
1172                 }
1173         }
1174 #endif
1175
1176 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1177   //  CkPrintf("START\n");
1178   criticalPath_start(env);
1179 #endif
1180
1181
1182   switch(env->getMsgtype()) {
1183 // Group support
1184     case BocInitMsg :
1185       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1186       // QD processing moved inside _processBocInitMsg because it is conditional
1187       //ck->process(); 
1188       if(env->isPacked()) CkUnpackMessage(&env);
1189       _processBocInitMsg(ck,env);
1190       break;
1191     case NodeBocInitMsg :
1192       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1193       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1194       _processNodeBocInitMsg(ck,env);
1195       break;
1196     case ForBocMsg :
1197       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1198       // QD processing moved inside _processForBocMsg because it is conditional
1199       if(env->isPacked()) CkUnpackMessage(&env);
1200       _processForBocMsg(ck,env);
1201       // stats record moved inside _processForBocMsg because it is conditional
1202       break;
1203     case ForNodeBocMsg :
1204       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1205       // QD processing moved to _processForNodeBocMsg because it is conditional
1206       if(env->isPacked()) CkUnpackMessage(&env);
1207       _processForNodeBocMsg(ck,env);
1208       // stats record moved to _processForNodeBocMsg because it is conditional
1209       break;
1210
1211 // Array support
1212     case ArrayEltInitMsg:
1213       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1214       if(env->isPacked()) CkUnpackMessage(&env);
1215       _processArrayEltInitMsg(ck,env);
1216       break;
1217     case ForArrayEltMsg:
1218       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1219       if(env->isPacked()) CkUnpackMessage(&env);
1220       _processArrayEltMsg(ck,env);
1221       break;
1222
1223 // Chare support
1224     case NewChareMsg :
1225       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1226       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1227       _processNewChareMsg(ck,env);
1228       _STATS_RECORD_PROCESS_CHARE_1();
1229       break;
1230     case NewVChareMsg :
1231       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1232       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1233       _processNewVChareMsg(ck,env);
1234       _STATS_RECORD_PROCESS_CHARE_1();
1235       break;
1236     case ForChareMsg :
1237       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1238       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1239       _processForPlainChareMsg(ck,env);
1240       _STATS_RECORD_PROCESS_MSG_1();
1241       break;
1242     case ForVidMsg   :
1243       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1244       ck->process();
1245       _processForVidMsg(ck,env);
1246       break;
1247     case FillVidMsg  :
1248       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1249       ck->process();
1250       _processFillVidMsg(ck,env);
1251       break;
1252     case DeleteVidMsg  :
1253       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1254       ck->process();
1255       _processDeleteVidMsg(ck,env);
1256       break;
1257
1258     default:
1259       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1260   }
1261 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1262         if(obj != NULL){
1263                 postProcessReceivedMessage(obj,sender,SN,entry);
1264         }
1265 #endif
1266
1267
1268 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1269   criticalPath_end();
1270   //  CkPrintf("STOP\n");
1271 #endif
1272
1273
1274 }
1275
1276
1277 /******************** Message Send **********************/
1278
1279 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1280              int *queueing, int *priobits, unsigned int **prioptr)
1281 {
1282   register envelope *env = (envelope *)converseMsg;
1283   *pfn = (CldPackFn)CkPackMessage;
1284   *len = env->getTotalsize();
1285   *queueing = env->getQueueing();
1286   *priobits = env->getPriobits();
1287   *prioptr = (unsigned int *) env->getPrioPtr();
1288 }
1289
1290 void CkPackMessage(envelope **pEnv)
1291 {
1292   register envelope *env = *pEnv;
1293   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1294     register void *msg = EnvToUsr(env);
1295     _TRACE_BEGIN_PACK();
1296     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1297     _TRACE_END_PACK();
1298     env=UsrToEnv(msg);
1299     env->setPacked(1);
1300     *pEnv = env;
1301   }
1302 }
1303
1304 void CkUnpackMessage(envelope **pEnv)
1305 {
1306   register envelope *env = *pEnv;
1307   register int msgIdx = env->getMsgIdx();
1308   if(env->isPacked()) {
1309     register void *msg = EnvToUsr(env);
1310     _TRACE_BEGIN_UNPACK();
1311     msg = _msgTable[msgIdx]->unpack(msg);
1312     _TRACE_END_UNPACK();
1313     env=UsrToEnv(msg);
1314     env->setPacked(0);
1315     *pEnv = env;
1316   }
1317 }
1318
1319 //There's no reason for most messages to go through the Cld--
1320 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1321 // Thus these accellerated versions of the Cld calls.
1322 #if CMK_OBJECT_QUEUE_AVAILABLE
1323 static int index_objectQHandler;
1324 #endif
1325 int index_tokenHandler;
1326 int index_skipCldHandler;
1327
1328 void _skipCldHandler(void *converseMsg)
1329 {
1330   register envelope *env = (envelope *)(converseMsg);
1331   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1332 #if CMK_GRID_QUEUE_AVAILABLE
1333   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1334     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1335                        env, env->getQueueing (), env->getPriobits (),
1336                        (unsigned int *) env->getPrioPtr ());
1337   } else {
1338     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1339                        env, env->getQueueing (), env->getPriobits (),
1340                        (unsigned int *) env->getPrioPtr ());
1341   }
1342 #else
1343   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1344         env, env->getQueueing(),env->getPriobits(),
1345         (unsigned int *)env->getPrioPtr());
1346 #endif
1347 }
1348
1349
1350 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1351 // Made non-static to be used by ckmessagelogging
1352 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1353 {
1354 #if CMK_CHARMDEBUG
1355   if (!ConverseDeliver(pe)) {
1356     CmiFree(env);
1357     return;
1358   }
1359 #endif
1360   if(pe == CkMyPe() ){
1361     if(!CmiNodeAlive(CkMyPe())){
1362         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1363 //      return;
1364     }
1365   }
1366   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1367 #if CMK_OBJECT_QUEUE_AVAILABLE
1368     Chare *obj = CkFindObjectPtr(env);
1369     if (obj && obj->CkGetObjQueue().queue()) {
1370       _enqObjQueue(obj, env);
1371     }
1372     else
1373 #endif
1374     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1375         env, env->getQueueing(),env->getPriobits(),
1376         (unsigned int *)env->getPrioPtr());
1377   } else {
1378     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1379       CkPackMessage(&env);
1380     int len=env->getTotalsize();
1381     CmiSetXHandler(env,CmiGetHandler(env));
1382 #if CMK_OBJECT_QUEUE_AVAILABLE
1383     CmiSetHandler(env,index_objectQHandler);
1384 #else
1385     CmiSetHandler(env,index_skipCldHandler);
1386 #endif
1387     CmiSetInfo(env,infoFn);
1388     if (pe==CLD_BROADCAST) {
1389 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1390                         CmiSyncBroadcast(len, (char *)env);
1391 #else
1392                         CmiSyncBroadcastAndFree(len, (char *)env); 
1393 #endif
1394
1395 }
1396     else if (pe==CLD_BROADCAST_ALL) { 
1397 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1398                         CmiSyncBroadcastAll(len, (char *)env);
1399 #else
1400                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1401 #endif
1402
1403 }
1404     else{
1405 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1406                         CmiSyncSend(pe, len, (char *)env);
1407 #else
1408                         CmiSyncSendAndFree(pe, len, (char *)env);
1409 #endif
1410
1411                 }
1412   }
1413 }
1414
1415 #if CMK_BLUEGENE_CHARM
1416 #   define  _skipCldEnqueue   _CldEnqueue
1417 #endif
1418
1419 // by pass Charm++ priority queue, send as Converse message
1420 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1421 {
1422 #if CMK_CHARMDEBUG
1423   if (!ConverseDeliver(-1)) {
1424     CmiFree(env);
1425     return;
1426   }
1427 #endif
1428   CkPackMessage(&env);
1429   int len=env->getTotalsize();
1430   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1431 }
1432
1433 static void _noCldEnqueue(int pe, envelope *env)
1434 {
1435 /*
1436   if (pe == CkMyPe()) {
1437     CmiHandleMessage(env);
1438   } else
1439 */
1440 #if CMK_CHARMDEBUG
1441   if (!ConverseDeliver(pe)) {
1442     CmiFree(env);
1443     return;
1444   }
1445 #endif
1446   CkPackMessage(&env);
1447   int len=env->getTotalsize();
1448   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1449   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1450   else CmiSyncSendAndFree(pe, len, (char *)env);
1451 }
1452
1453 //static void _noCldNodeEnqueue(int node, envelope *env)
1454 //Made non-static to be used by ckmessagelogging
1455 void _noCldNodeEnqueue(int node, envelope *env)
1456 {
1457 /*
1458   if (node == CkMyNode()) {
1459     CmiHandleMessage(env);
1460   } else {
1461 */
1462 #if CMK_CHARMDEBUG
1463   if (!ConverseDeliver(node)) {
1464     CmiFree(env);
1465     return;
1466   }
1467 #endif
1468   CkPackMessage(&env);
1469   int len=env->getTotalsize();
1470   if (node==CLD_BROADCAST) { 
1471 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1472         CmiSyncNodeBroadcast(len, (char *)env);
1473 #else
1474         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1475 #endif
1476 }
1477   else if (node==CLD_BROADCAST_ALL) { 
1478 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1479                 CmiSyncNodeBroadcastAll(len, (char *)env);
1480 #else
1481                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1482 #endif
1483
1484 }
1485   else {
1486 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1487         CmiSyncNodeSend(node, len, (char *)env);
1488 #else
1489         CmiSyncNodeSendAndFree(node, len, (char *)env);
1490 #endif
1491   }
1492 }
1493
1494 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1495 {
1496   register envelope *env = UsrToEnv(msg);
1497   _CHECK_USED(env);
1498   _SET_USED(env, 1);
1499 #if CMK_REPLAYSYSTEM
1500   setEventID(env);
1501 #endif
1502   env->setMsgtype(ForChareMsg);
1503   env->setEpIdx(eIdx);
1504   env->setSrcPe(CkMyPe());
1505 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1506   criticalPath_send(env);
1507   automaticallySetMessagePriority(env);
1508 #endif
1509 #if CMK_CHARMDEBUG
1510   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1511 #endif
1512 #if CMK_OBJECT_QUEUE_AVAILABLE
1513   CmiSetHandler(env, index_objectQHandler);
1514 #else
1515   CmiSetHandler(env, _charmHandlerIdx);
1516 #endif
1517   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1518     register int pe = -(pCid->onPE+1);
1519     if(pe==CkMyPe()) {
1520 #ifndef CMK_CHARE_USE_PTR
1521       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1522 #else
1523       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1524 #endif
1525       void *objPtr;
1526       if (NULL!=(objPtr=vblk->getLocalChare()))
1527       { //A ready local chare
1528         env->setObjPtr(objPtr);
1529         return pe;
1530       }
1531       else { //The vidblock is not ready-- forget it
1532         vblk->send(env);
1533         return -1;
1534       }
1535     } else { //Valid vidblock for another PE:
1536       env->setMsgtype(ForVidMsg);
1537       env->setVidPtr(pCid->objPtr);
1538       return pe;
1539     }
1540   }
1541   else {
1542     env->setObjPtr(pCid->objPtr);
1543     return pCid->onPE;
1544   }
1545 }
1546
1547 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1548 {
1549   int destPE = _prepareMsg(eIdx, msg, pCid);
1550   if (destPE != -1) {
1551     register envelope *env = UsrToEnv(msg);
1552 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1553     criticalPath_send(env);
1554     automaticallySetMessagePriority(env);
1555 #endif
1556     CmiBecomeImmediate(env);
1557   }
1558   return destPE;
1559 }
1560
1561 extern "C"
1562 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1563 {
1564   if (opts & CK_MSG_INLINE) {
1565     CkSendMsgInline(entryIdx, msg, pCid, opts);
1566     return;
1567   }
1568 #if CMK_ERROR_CHECKING
1569   if (opts & CK_MSG_IMMEDIATE) {
1570     CmiAbort("Immediate message is not allowed in Chare!");
1571   }
1572 #endif
1573   register envelope *env = UsrToEnv(msg);
1574   int destPE=_prepareMsg(entryIdx,msg,pCid);
1575   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1576   // VidBlock was not yet filled). The problem is that the creation was never
1577   // traced later when the VidBlock was filled. One solution is to trace the
1578   // creation here, the other to trace it in VidBlock->msgDeliver().
1579   _TRACE_CREATION_1(env);
1580   if (destPE!=-1) {
1581     CpvAccess(_qd)->create();
1582     if (opts & CK_MSG_SKIP_OR_IMM)
1583       _noCldEnqueue(destPE, env);
1584     else
1585       _CldEnqueue(destPE, env, _infoIdx);
1586   }
1587   _TRACE_CREATION_DONE(1);
1588 }
1589
1590 extern "C"
1591 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1592 {
1593   if (pCid->onPE==CkMyPe())
1594   {
1595     if(!CmiNodeAlive(CkMyPe())){
1596         return;
1597     }
1598 #if CMK_CHARMDEBUG
1599     //Just in case we need to breakpoint or use the envelope in some way
1600     _prepareMsg(entryIndex,msg,pCid);
1601 #endif
1602                 //Just directly call the chare (skip QD handling & scheduler)
1603     register envelope *env = UsrToEnv(msg);
1604     if (env->isPacked()) CkUnpackMessage(&env);
1605     _STATS_RECORD_PROCESS_MSG_1();
1606     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1607   }
1608   else {
1609     //No way to inline a cross-processor message:
1610     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1611   }
1612 }
1613
1614 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1615 {
1616   register envelope *env = UsrToEnv(msg);
1617 #if CMK_ERROR_CHECKING
1618   CkNodeGroupID nodeRedMgr;
1619 #endif
1620   _CHECK_USED(env);
1621   _SET_USED(env, 1);
1622 #if CMK_REPLAYSYSTEM
1623   setEventID(env);
1624 #endif
1625   env->setMsgtype(type);
1626   env->setEpIdx(eIdx);
1627   env->setGroupNum(gID);
1628   env->setSrcPe(CkMyPe());
1629 #if CMK_ERROR_CHECKING
1630   nodeRedMgr.setZero();
1631   env->setRednMgr(nodeRedMgr);
1632 #endif
1633 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1634   criticalPath_send(env);
1635   automaticallySetMessagePriority(env);
1636 #endif
1637 #if CMK_CHARMDEBUG
1638   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1639 #endif
1640   CmiSetHandler(env, _charmHandlerIdx);
1641   return env;
1642 }
1643
1644 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1645 {
1646   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1647 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1648   criticalPath_send(env);
1649   automaticallySetMessagePriority(env);
1650 #endif
1651   CmiBecomeImmediate(env);
1652   return env;
1653 }
1654
1655 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1656                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1657 {
1658   int numPes;
1659   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1660 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1661   sendTicketGroupRequest(env,pe,_infoIdx);
1662 #else
1663   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1664   _TRACE_CREATION_N(env, numPes);
1665   if (opts & CK_MSG_SKIP_OR_IMM)
1666     _noCldEnqueue(pe, env);
1667   else
1668     _skipCldEnqueue(pe, env, _infoIdx);
1669   _TRACE_CREATION_DONE(1);
1670 #endif
1671 }
1672
1673 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1674                            int npes, int *pes)
1675 {
1676   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1677   _TRACE_CREATION_MULTICAST(env, npes, pes);
1678   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1679   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1680 }
1681
1682 extern "C"
1683 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1684 {
1685 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1686   if (destPE==CkMyPe())
1687   {
1688     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1689     return;
1690   }
1691   //Can't inline-- send the usual way
1692   register envelope *env = UsrToEnv(msg);
1693   int numPes;
1694   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1695   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1696   _TRACE_CREATION_N(env, numPes);
1697   _noCldEnqueue(destPE, env);
1698   _STATS_RECORD_SEND_BRANCH_1();
1699   CkpvAccess(_coreState)->create();
1700   _TRACE_CREATION_DONE(1);
1701 #else
1702   // no support for immediate message, send inline
1703   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1704 #endif
1705 }
1706
1707 extern "C"
1708 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1709 {
1710   if (destPE==CkMyPe())
1711   {
1712     if(!CmiNodeAlive(CkMyPe())){
1713         return;
1714     }
1715     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1716     if (obj!=NULL)
1717     { //Just directly call the group:
1718 #if CMK_ERROR_CHECKING
1719       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1720 #else
1721       envelope *env=UsrToEnv(msg);
1722 #endif
1723       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1724       return;
1725     }
1726   }
1727   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1728   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1729 }
1730
1731 extern "C"
1732 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1733 {
1734   if (opts & CK_MSG_INLINE) {
1735     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1736     return;
1737   }
1738   if (opts & CK_MSG_IMMEDIATE) {
1739     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1740     return;
1741   }
1742   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1743   _STATS_RECORD_SEND_BRANCH_1();
1744   CkpvAccess(_coreState)->create();
1745 }
1746
1747 extern "C"
1748 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1749 {
1750 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1751   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1752   _TRACE_CREATION_MULTICAST(env, npes, pes);
1753   _noCldEnqueueMulti(npes, pes, env);
1754   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1755 #else
1756   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1757   CpvAccess(_qd)->create(-npes);
1758 #endif
1759   _STATS_RECORD_SEND_BRANCH_N(npes);
1760   CpvAccess(_qd)->create(npes);
1761 }
1762
1763 extern "C"
1764 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1765 {
1766   if (opts & CK_MSG_IMMEDIATE) {
1767     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1768     return;
1769   }
1770     // normal mesg
1771   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1772   _STATS_RECORD_SEND_BRANCH_N(npes);
1773   CpvAccess(_qd)->create(npes);
1774 }
1775
1776 extern "C"
1777 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1778 {
1779   int npes;
1780   int *pes;
1781   if (opts & CK_MSG_IMMEDIATE) {
1782     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1783     return;
1784   }
1785     // normal mesg
1786   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1787   CmiLookupGroup(grp, &npes, &pes);
1788   _TRACE_CREATION_MULTICAST(env, npes, pes);
1789   _CldEnqueueGroup(grp, env, _infoIdx);
1790   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1791   _STATS_RECORD_SEND_BRANCH_N(npes);
1792   CpvAccess(_qd)->create(npes);
1793 }
1794
1795 extern "C"
1796 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1797 {
1798   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1799   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1800   CpvAccess(_qd)->create(CkNumPes());
1801 }
1802
1803 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1804                 int node=CLD_BROADCAST_ALL, int opts=0)
1805 {
1806   int numPes;
1807   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1808 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1809         sendTicketNodeGroupRequest(env,node,_infoIdx);
1810 #else
1811   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1812   _TRACE_CREATION_N(env, numPes);
1813   if (opts & CK_MSG_SKIP_OR_IMM) {
1814     _noCldNodeEnqueue(node, env);
1815     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1816       CkpvAccess(_coreState)->create(-numPes);
1817     }
1818   }
1819   else
1820     _CldNodeEnqueue(node, env, _infoIdx);
1821   _TRACE_CREATION_DONE(1);
1822 #endif
1823 }
1824
1825 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1826                            int npes, int *nodes)
1827 {
1828   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1829   _TRACE_CREATION_N(env, npes);
1830   for (int i=0; i<npes; i++) {
1831     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1832   }
1833   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1834 }
1835
1836 extern "C"
1837 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1838 {
1839 #if CMK_IMMEDIATE_MSG
1840   if (node==CkMyNode())
1841   {
1842     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1843     return;
1844   }
1845   //Can't inline-- send the usual way
1846   register envelope *env = UsrToEnv(msg);
1847   int numPes;
1848   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1849   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1850   _TRACE_CREATION_N(env, numPes);
1851   _noCldNodeEnqueue(node, env);
1852   _STATS_RECORD_SEND_BRANCH_1();
1853   /* immeidate message is invisible to QD */
1854 //  CkpvAccess(_coreState)->create();
1855   _TRACE_CREATION_DONE(1);
1856 #else
1857   // no support for immediate message, send inline
1858   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1859 #endif
1860 }
1861
1862 extern "C"
1863 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1864 {
1865   if (node==CkMyNode())
1866   {
1867     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1868     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1869     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1870     if (obj!=NULL)
1871     { //Just directly call the group:
1872 #if CMK_ERROR_CHECKING
1873       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1874 #else
1875       envelope *env=UsrToEnv(msg);
1876 #endif
1877       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1878       return;
1879     }
1880   }
1881   //Can't inline-- send the usual way
1882   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1883 }
1884
1885 extern "C"
1886 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1887 {
1888   if (opts & CK_MSG_INLINE) {
1889     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1890     return;
1891   }
1892   if (opts & CK_MSG_IMMEDIATE) {
1893     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1894     return;
1895   }
1896   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1897   _STATS_RECORD_SEND_NODE_BRANCH_1();
1898   CkpvAccess(_coreState)->create();
1899 }
1900
1901 extern "C"
1902 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1903 {
1904 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1905   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1906   _noCldEnqueueMulti(npes, nodes, env);
1907 #else
1908   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1909   CpvAccess(_qd)->create(-npes);
1910 #endif
1911   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1912   CpvAccess(_qd)->create(npes);
1913 }
1914
1915 extern "C"
1916 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1917 {
1918   if (opts & CK_MSG_IMMEDIATE) {
1919     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1920     return;
1921   }
1922     // normal mesg
1923   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1924   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1925   CpvAccess(_qd)->create(npes);
1926 }
1927
1928 extern "C"
1929 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1930 {
1931   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1932   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1933   CpvAccess(_qd)->create(CkNumNodes());
1934 }
1935
1936 //Needed by delegation manager:
1937 extern "C"
1938 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1939 { return _prepareMsg(eIdx,msg,pCid); }
1940 extern "C"
1941 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1942 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1943 extern "C"
1944 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1945 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1946
1947 void _ckModuleInit(void) {
1948         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1949 #if CMK_OBJECT_QUEUE_AVAILABLE
1950         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1951 #endif
1952         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1953         CkpvInitialize(TokenPool*, _tokenPool);
1954         CkpvAccess(_tokenPool) = new TokenPool;
1955 }
1956
1957
1958 /************** Send: Arrays *************/
1959
1960 extern void CkArrayManagerInsert(int onPe,void *msg);
1961 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1962
1963 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1964 {
1965   _CHECK_USED(env);
1966   _SET_USED(env, 1);
1967   env->setMsgtype(type);
1968 #if CMK_CHARMDEBUG
1969   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1970 #endif
1971   CmiSetHandler(env, _charmHandlerIdx);
1972   CpvAccess(_qd)->create();
1973 }
1974
1975 extern "C"
1976 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1977   register envelope *env = UsrToEnv(msg);
1978   env->getsetArrayMgr()=aID;
1979   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1980   _CldEnqueue(pe, env, _infoIdx);
1981 }
1982
1983 extern "C"
1984 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1985   register envelope *env = UsrToEnv(msg);
1986   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1987 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1988    sendTicketArrayRequest(env,pe,_infoIdx);
1989 #else
1990   if (opts & CK_MSG_IMMEDIATE)
1991     CmiBecomeImmediate(env);
1992   if (opts & CK_MSG_SKIP_OR_IMM)
1993     _noCldEnqueue(pe, env);
1994   else
1995     _skipCldEnqueue(pe, env, _infoIdx);
1996 #endif
1997 }
1998
1999 class ElementDestroyer : public CkLocIterator {
2000 private:
2001         CkLocMgr *locMgr;
2002 public:
2003         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2004         void addLocation(CkLocation &loc) {
2005           loc.destroyAll();
2006         }
2007 };
2008
2009 void CkDeleteChares() {
2010   int i;
2011   int numGroups = CkpvAccess(_groupIDTable)->size();
2012
2013   // delete all plain chares
2014 #ifndef CMK_CHARE_USE_PTR
2015   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2016         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2017         delete obj;
2018         CkpvAccess(chare_objs)[i] = NULL;
2019   }
2020   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2021         VidBlock *obj = CkpvAccess(vidblocks)[i];
2022         delete obj;
2023         CkpvAccess(vidblocks)[i] = NULL;
2024   }
2025 #endif
2026
2027   // delete all array elements
2028   for(i=0;i<numGroups;i++) {
2029     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2030     if(obj && obj->isLocMgr())  {
2031       CkLocMgr *mgr = (CkLocMgr*)obj;
2032       ElementDestroyer destroyer(mgr);
2033       mgr->iterate(destroyer);
2034     }
2035   }
2036
2037   // delete all groups
2038   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2039   for(i=0;i<numGroups;i++) {
2040     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2041     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2042     if (obj) delete obj;
2043   }
2044   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2045
2046   // delete all node groups
2047   if (CkMyRank() == 0) {
2048     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2049     for(i=0;i<numNodeGroups;i++) {
2050       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2051       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2052       if (obj) delete obj;
2053     }
2054   }
2055 }
2056
2057 //------------------- Message Watcher (record/replay) ----------------
2058
2059 #include "crc32.h"
2060
2061 CkpvDeclare(int, envelopeEventID);
2062 int _recplay_crc = 0;
2063 int _recplay_checksum = 0;
2064 int _recplay_logsize = 1024*1024;
2065
2066 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2067 #define REPLAYDEBUG(args) /* empty */
2068
2069 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2070
2071 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2072
2073 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2074
2075     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2076     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2077     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2078     FILE *f=fopen(fName,permissions);
2079     REPLAYDEBUG("openReplayfile "<<fName);
2080     if (f==NULL) {
2081         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2082             CkMyPe(),fName,permissions);
2083         CkAbort("openReplayFile> Could not open replay file");
2084     }
2085     return f;
2086 }
2087
2088 #include "BaseLB.h" /* For LBMigrateMsg message */
2089
2090 class CkMessageRecorder : public CkMessageWatcher {
2091   char *buffer;
2092   unsigned int curpos;
2093   bool firstOpen;
2094 public:
2095   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2096   ~CkMessageRecorder() {
2097     flushLog(0);
2098     fprintf(f,"-1 -1 -1 ");
2099     fclose(f);
2100     delete[] buffer;
2101 #if 0
2102     FILE *stsfp = fopen("sts", "w");
2103     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2104     traceWriteSTS(stsfp, 0);
2105     fclose(stsfp);
2106 #endif
2107     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2108   }
2109
2110 private:
2111   void flushLog(int verbose=1) {
2112     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2113     fprintf(f, "%s", buffer);
2114     curpos=0;
2115   }
2116   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2117     if ((*envptr)->getEvent()) {
2118       bool wasPacked = (*envptr)->isPacked();
2119       if (!wasPacked) CkPackMessage(envptr);
2120       envelope *env = *envptr;
2121       unsigned int crc1=0, crc2=0;
2122       if (_recplay_crc) {
2123         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2124         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2125         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2126       } else if (_recplay_checksum) {
2127         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2128         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2129       }
2130       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2131       if (curpos > _recplay_logsize-128) flushLog();
2132       if (!wasPacked) CkUnpackMessage(envptr);
2133     }
2134     return CmiTrue;
2135   }
2136   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2137     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2138     if (curpos > _recplay_logsize-128) flushLog();
2139     return CmiTrue;
2140   }
2141   
2142   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2143     FILE *f;
2144     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2145     else f = openReplayFile("ckreplay_",".lb","a");
2146     firstOpen = false;
2147     if (f != NULL) {
2148       PUP::toDisk p(f);
2149       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2150       (*msg)->pup(p);
2151       fclose(f);
2152     }
2153     return CmiTrue;
2154   }
2155 };
2156
2157 class CkMessageDetailRecorder : public CkMessageWatcher {
2158 public:
2159   CkMessageDetailRecorder(FILE *f_) {
2160     f=f_;
2161     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2162      * The value of 'x' is the pointer size.
2163      */
2164     CmiUInt2 little = sizeof(void*);
2165     fwrite(&little, 2, 1, f);
2166   }
2167   ~CkMessageDetailRecorder() {fclose(f);}
2168 private:
2169   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2170     bool wasPacked = (*envptr)->isPacked();
2171     if (!wasPacked) CkPackMessage(envptr);
2172     envelope *env = *envptr;
2173     CmiUInt4 size = env->getTotalsize();
2174     fwrite(&size, 4, 1, f);
2175     fwrite(env, env->getTotalsize(), 1, f);
2176     if (!wasPacked) CkUnpackMessage(envptr);
2177     return CmiTrue;
2178   }
2179 };
2180
2181 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2182 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2183
2184 #if CMK_BLUEGENE_CHARM
2185 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2186                                    int pb,unsigned int *prio);
2187 #endif
2188
2189 class CkMessageReplay : public CkMessageWatcher {
2190   int counter;
2191         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2192         int nextEP;
2193         unsigned int crc1, crc2;
2194         FILE *lbFile;
2195         /// Read the next message we need from the file:
2196         void getNext(void) {
2197           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2198           if (nextSize > 0) {
2199             // We are reading a regular message
2200             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2201               CkAbort("CkMessageReplay> Syntax error reading replay file");
2202             }
2203             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2204           } else if (nextSize == -2) {
2205             // We are reading a special message (right now only thread awaken)
2206             // Nothing to do since we have already read all info
2207             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2208           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2209             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2210             CkAbort("CkMessageReplay> Unrecognized input");
2211           }
2212             /*
2213                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2214                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2215                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2216                 }
2217                 */
2218                 counter++;
2219         }
2220         /// If this is the next message we need, advance and return CmiTrue.
2221         CmiBool isNext(envelope *env) {
2222                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2223                 if (nextEvent!=env->getEvent()) return CmiFalse;
2224                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2225 #if 1
2226                 if (nextEP != env->getEpIdx()) {
2227                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2228                         return CmiFalse;
2229                 }
2230 #endif
2231 #if ! CMK_BLUEGENE_CHARM
2232                 if (nextSize!=env->getTotalsize())
2233                 {
2234                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2235                         return CmiFalse;
2236                 }
2237                 if (_recplay_crc || _recplay_checksum) {
2238                   bool wasPacked = env->isPacked();
2239                   if (!wasPacked) CkPackMessage(&env);
2240                   if (_recplay_crc) {
2241                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2242                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2243                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2244                     if (crcnew1 != crc1) {
2245                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2246                     }
2247                     if (crcnew2 != crc2) {
2248                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2249                     }
2250                   } else if (_recplay_checksum) {
2251             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2252             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2253             if (crcnew1 != crc1) {
2254               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2255             }
2256             if (crcnew2 != crc2) {
2257               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2258             }               
2259                   }
2260                   if (!wasPacked) CkUnpackMessage(&env);
2261                 }
2262 #endif
2263                 return CmiTrue;
2264         }
2265         CmiBool isNext(CthThreadToken *token) {
2266           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2267           return CmiFalse;
2268         }
2269
2270         /// This is a (short) list of messages we aren't yet ready for:
2271         CkQ<envelope *> delayedMessages;
2272         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2273         CkQ<CthThreadToken *> delayedTokens;
2274
2275         /// Try to flush out any delayed messages
2276         void flush(void) {
2277           if (nextSize>0) {
2278                 int len=delayedMessages.length();
2279                 for (int i=0;i<len;i++) {
2280                         envelope *env=delayedMessages.deq();
2281                         if (isNext(env)) { /* this is the next message: process it */
2282                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2283                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2284                                 return;
2285                         }
2286                         else /* Not ready yet-- put it back in the
2287                                 queue */
2288                           {
2289                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2290                                 delayedMessages.enq(env);
2291                           }
2292                 }
2293           } else if (nextSize==-2) {
2294             int len=delayedTokens.length();
2295             for (int i=0;i<len;++i) {
2296               CthThreadToken *token=delayedTokens.deq();
2297               if (isNext(token)) {
2298             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2299 #if ! CMK_BLUEGENE_CHARM
2300                 CsdEnqueueLifo((void*)token);
2301 #else
2302                 CthEnqueueBigSimThread(token,0,0,NULL);
2303 #endif
2304                 return;
2305               } else {
2306             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2307                 delayedTokens.enq(token);
2308               }
2309             }
2310           }
2311         }
2312
2313 public:
2314         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2315           counter=0;
2316           f=f_;
2317           getNext();
2318           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2319           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2320         }
2321         ~CkMessageReplay() {fclose(f);}
2322
2323 private:
2324         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2325           bool wasPacked = (*envptr)->isPacked();
2326           if (!wasPacked) CkPackMessage(envptr);
2327           envelope *env = *envptr;
2328           //CkAssert(*(int*)env == 0x34567890);
2329           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2330                 if (env->getEvent() == 0) return CmiTrue;
2331                 if (isNext(env)) { /* This is the message we were expecting */
2332                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2333                         getNext(); /* Advance over this message */
2334                         flush(); /* try to process queued-up stuff */
2335                         if (!wasPacked) CkUnpackMessage(envptr);
2336                         return CmiTrue;
2337                 }
2338 #if CMK_SMP
2339                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2340                          // try next rank, we can't just buffer the msg and left
2341                          // we need to keep unprocessed msg on the fly
2342                         int nextpe = CkMyPe()+1;
2343                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2344                         nextpe = CkNodeFirst(CkMyNode());
2345                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2346                         return CmiFalse;
2347                 }
2348 #endif
2349                 else /*!isNext(env) */ {
2350                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2351                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2352                         delayedMessages.enq(env);
2353                         flush();
2354                         return CmiFalse;
2355                 }
2356         }
2357         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2358       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2359           if (isNext(token)) {
2360         REPLAYDEBUG("Executing token: "<<token->serialNo)
2361             getNext();
2362             flush();
2363             return CmiTrue;
2364           } else {
2365         REPLAYDEBUG("Queueing token: "<<token->serialNo
2366             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2367             delayedTokens.enq(token);
2368             return CmiFalse;
2369           }
2370         }
2371
2372         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2373           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2374           if (lbFile != NULL) {
2375             int num_moves;
2376         PUP::fromDisk p(lbFile);
2377             p | num_moves;
2378             if (num_moves != (*msg)->n_moves) {
2379               delete *msg;
2380               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2381             }
2382             (*msg)->pup(p);
2383           }
2384           return CmiTrue;
2385         }
2386 };
2387
2388 class CkMessageDetailReplay : public CkMessageWatcher {
2389   void *getNext() {
2390     CmiUInt4 size; size_t nread;
2391     if ((nread=fread(&size, 4, 1, f)) < 1) {
2392       if (feof(f)) return NULL;
2393       CkPrintf("Broken record file (metadata) got %d\n",nread);
2394       CkAbort("");
2395     }
2396     void *env = CmiAlloc(size);
2397     long tell = ftell(f);
2398     if ((nread=fread(env, size, 1, f)) < 1) {
2399       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2400       CkAbort("");
2401     }
2402     //*(int*)env = 0x34567890; // set first integer as magic
2403     return env;
2404   }
2405 public:
2406   double starttime;
2407   CkMessageDetailReplay(FILE *f_) {
2408     f=f_;
2409     starttime=CkWallTimer();
2410     /* This must match what CkMessageDetailRecorder did */
2411     CmiUInt2 little;
2412     fread(&little, 2, 1, f);
2413     if (little != sizeof(void*)) {
2414       CkAbort("Replaying on a different architecture from which recording was done!");
2415     }
2416
2417     CsdEnqueue(getNext());
2418
2419     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2420   }
2421   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2422     void *msg = getNext();
2423     if (msg != NULL) CsdEnqueue(msg);
2424     return CmiTrue;
2425   }
2426 };
2427
2428 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2429 #if ! CMK_BLUEGENE_CHARM
2430   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2431 #endif
2432   CkMessageReplay *replay = (CkMessageReplay*)rep;
2433   //CmiStartQD(CkMessageReplayQuiescence, replay);
2434 }
2435
2436 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2437   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2438   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2439   ConverseExit();
2440 }
2441
2442 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2443   CkCoreState *ck = CkpvAccess(_coreState);
2444   if (ck->watcher!=NULL) {
2445     return ck->watcher->processThread(token,ck);
2446   }
2447   return CmiTrue;
2448 }
2449
2450 CpvCExtern(int, CthResumeNormalThreadIdx);
2451 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2452 {
2453   CthThread t = token->thread;
2454
2455   if(t == NULL){
2456     free(token);
2457     return;
2458   }
2459 #if CMK_TRACE_ENABLED
2460 #if ! CMK_TRACE_IN_CHARM
2461   if(CpvAccess(traceOn))
2462     CthTraceResume(t);
2463 /*    if(CpvAccess(_traceCoreOn)) 
2464             resumeTraceCore();*/
2465 #endif
2466 #endif
2467   
2468   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2469   if (CpdExecuteThreadResume(token)) {
2470     CthResume(t);
2471   }
2472 }
2473
2474 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2475   CkCoreState *ck = CkpvAccess(_coreState);
2476   if (ck->watcher!=NULL) {
2477     ck->watcher->processLBMessage(msg, ck);
2478   }
2479 }
2480
2481 #if CMK_BLUEGENE_CHARM
2482 CpvExtern(int      , CthResumeBigSimThreadIdx);
2483 #endif
2484
2485 #include "ckliststring.h"
2486 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2487     CmiArgGroup("Charm++","Record/Replay");
2488     CmiBool forceReplay = CmiFalse;
2489     char *procs = NULL;
2490     _replaySystem = 0;
2491     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2492       _recplay_crc = 1;
2493     }
2494     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2495       _recplay_checksum = 1;
2496     }
2497     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2498     REPLAYDEBUG("CkMessageWatcherInit ");
2499     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2500         CkListString list(procs);
2501         if (list.includes(CkMyPe())) {
2502           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2503           CpdSetInitializeMemory(1);
2504           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2505         }
2506     }
2507     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2508       if (CkMyPe() == 0) {
2509         CmiPrintf("Charm++> record mode.\n");
2510         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2511           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2512           _recplay_crc = _recplay_checksum = 0;
2513         }
2514       }
2515       CpdSetInitializeMemory(1);
2516       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2517       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2518     }
2519         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2520             forceReplay = CmiTrue;
2521             CpdSetInitializeMemory(1);
2522             // Set the parameters of the processor
2523 #if CMK_SHARED_VARS_UNAVAILABLE
2524             _Cmi_mype = atoi(procs);
2525             while (procs[0]!='/') procs++;
2526             procs++;
2527             _Cmi_numpes = atoi(procs);
2528 #else
2529             CkAbort("+replay-detail available only for non-SMP build");
2530 #endif
2531             _replaySystem = 1;
2532             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2533         }
2534         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2535           if (CkMyPe() == 0)  {
2536             CmiPrintf("Charm++> replay mode.\n");
2537             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2538               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2539               _recplay_crc = _recplay_checksum = 0;
2540             }
2541           }
2542           CpdSetInitializeMemory(1);
2543 #if ! CMK_BLUEGENE_CHARM
2544           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2545 #else
2546           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2547 #endif
2548           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2549         }
2550         if (_recplay_crc && _recplay_checksum) {
2551           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2552         }
2553 }
2554
2555 extern "C"
2556 int CkMessageToEpIdx(void *msg) {
2557         envelope *env=UsrToEnv(msg);
2558         int ep=env->getEpIdx();
2559         if (ep==CkIndex_CkArray::recvBroadcast(0))
2560                 return env->getsetArrayBcastEp();
2561         else
2562                 return ep;
2563 }
2564
2565 extern "C"
2566 int getCharmEnvelopeSize() {
2567   return sizeof(envelope);
2568 }
2569
2570
2571 #include "CkMarshall.def.h"
2572