16f451144772353a3d771aa8599342ad1d690894
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 #include <map>
30 CkpvDeclare(CkVec<void *>, chare_objs);
31 CkpvDeclare(CkVec<int>, chare_types);
32 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
33
34 typedef std::map<int, CkChareID>  Vidblockmap;
35 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
36 CkpvDeclare(int, currentChareIdx);
37 #endif
38
39
40 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
41
42 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
43
44 int CMessage_CkMessage::__idx=-1;
45 int CMessage_CkArgMsg::__idx=0;
46 int CkIndex_Chare::__idx;
47 int CkIndex_Group::__idx;
48 int CkIndex_ArrayBase::__idx=-1;
49
50 extern int _defaultObjectQ;
51
52 void _initChareTables()
53 {
54 #ifndef CMK_CHARE_USE_PTR
55           /* chare and vidblock table */
56   CkpvInitialize(CkVec<void *>, chare_objs);
57   CkpvInitialize(CkVec<int>, chare_types);
58   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
59   CkpvInitialize(Vidblockmap, vmap);
60   CkpvInitialize(int, currentChareIdx);
61   CkpvAccess(currentChareIdx) = -1;
62 #endif
63 }
64
65 //Charm++ virtual functions: declaring these here results in a smaller executable
66 Chare::Chare(void) {
67   thishandle.onPE=CkMyPe();
68   thishandle.objPtr=this;
69 #ifndef CMK_CHARE_USE_PTR
70      // for plain chare, objPtr is actually the index to chare obj table
71   if (CkpvAccess(currentChareIdx) >= 0) {
72     thishandle.objPtr=(void*)CkpvAccess(currentChareIdx);
73   }
74   chareIdx = CkpvAccess(currentChareIdx);
75 #endif
76 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
77   mlogData = new ChareMlogData();
78   mlogData->objID.type = TypeChare;
79   mlogData->objID.data.chare.id = thishandle;
80 #endif
81 #if CMK_OBJECT_QUEUE_AVAILABLE
82   if (_defaultObjectQ)  CkEnableObjQ();
83 #endif
84 }
85
86 Chare::Chare(CkMigrateMessage* m) {
87   thishandle.onPE=CkMyPe();
88   thishandle.objPtr=this;
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 }
149
150 int Chare::ckGetChareType() const {
151   return -3;
152 }
153 char *Chare::ckDebugChareName(void) {
154   char buf[100];
155   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
156   return strdup(buf);
157 }
158 int Chare::ckDebugChareID(char *str, int limit) {
159   // pure chares for now do not have a valid ID
160   str[0] = 0;
161   return 1;
162 }
163 void Chare::ckDebugPup(PUP::er &p) {
164   pup(p);
165 }
166
167 /// This method is called before starting a [threaded] entry method.
168 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
169   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
170   traceAddThreadListeners(th, UsrToEnv(msg));
171 }
172
173 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
174   p.comment("Bytes");
175   int ts=UsrToEnv(msg)->getTotalsize();
176   int msgLen=ts-sizeof(envelope);
177   if (msgLen>0)
178     p((char*)msg,msgLen);
179 }
180
181 IrrGroup::IrrGroup(void) {
182   thisgroup = CkpvAccess(_currentGroup);
183 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
184         mlogData->objID.type = TypeGroup;
185         mlogData->objID.data.group.id = thisgroup;
186         mlogData->objID.data.group.onPE = CkMyPe();
187 #endif
188 }
189
190 IrrGroup::~IrrGroup() {
191   // remove the object pointer
192   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
193   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
194   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
195 }
196
197 void IrrGroup::pup(PUP::er &p)
198 {
199   Chare::pup(p);
200   p|thisgroup;
201 }
202
203 int IrrGroup::ckGetChareType() const {
204   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
205 }
206
207 int IrrGroup::ckDebugChareID(char *str, int limit) {
208   if (limit<5) return -1;
209   str[0] = 1;
210   *((int*)&str[1]) = thisgroup.idx;
211   return 5;
212 }
213
214 char *IrrGroup::ckDebugChareName() {
215   return strdup(_chareTable[ckGetChareType()]->name);
216 }
217
218 void IrrGroup::ckJustMigrated(void)
219 {
220 }
221
222 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
223   /* FIXME: **CW** not entirely sure what we should do here yet */
224 }
225
226 void Group::CkAddThreadListeners(CthThread th, void *msg) {
227   Chare::CkAddThreadListeners(th, msg);
228   CthSetThreadID(th, thisgroup.idx, 0, 0);
229 }
230
231 void Group::pup(PUP::er &p)
232 {
233   CkReductionMgr::pup(p);
234   reductionInfo.pup(p);
235 }
236
237 /**** Delegation Manager Group */
238 CkDelegateMgr::~CkDelegateMgr() { }
239
240 //Default delegator implementation: do not delegate-- send directly
241 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
242   { CkSendMsg(ep,m,c); }
243 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
244   { CkSendMsgBranch(ep,m,onPE,g); }
245 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
246   { CkBroadcastMsgBranch(ep,m,g); }
247 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
248   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
249 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
250   { CkSendMsgNodeBranch(ep,m,onNode,g); }
251 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
252   { CkBroadcastMsgNodeBranch(ep,m,g); }
253 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
254   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
255 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
256 {
257         CProxyElement_ArrayBase ap(a,idx);
258         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
259 }
260 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
261 {
262         CProxyElement_ArrayBase ap(a,idx);
263         ap.ckSend((CkArrayMessage *)m,ep);
264 }
265 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
266 {
267         CProxy_ArrayBase ap(a);
268         ap.ckBroadcast((CkArrayMessage *)m,ep);
269 }
270
271 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
272 {
273         CmiAbort("ArraySectionSend is not implemented!\n");
274 /*
275         CProxyElement_ArrayBase ap(a,idx);
276         ap.ckSend((CkArrayMessage *)m,ep);
277 */
278 }
279
280 /*** Proxy <-> delegator communication */
281 CkDelegateData::~CkDelegateData() {}
282
283 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
284   return pd; // default implementation ignores pup call
285 }
286
287 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
288    this tricky manual reference counting business... */
289
290 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
291         if (dPtr) dPtr->ref();
292         ckUndelegate();
293         delegatedMgr = dTo;
294         delegatedPtr = dPtr;
295         delegatedGroupId = delegatedMgr->CkGetGroupID();
296         isNodeGroup = delegatedMgr->isNodeGroup();
297 }
298 void CProxy::ckUndelegate(void) {
299         delegatedMgr=NULL;
300         delegatedGroupId.setZero();
301         if (delegatedPtr) delegatedPtr->unref();
302         delegatedPtr=NULL;
303 }
304
305 /// Copy constructor
306 CProxy::CProxy(const CProxy &src)
307   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
308    isNodeGroup(src.isNodeGroup) {
309     delegatedPtr = NULL;
310     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
311         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
312     }
313 }
314
315 /// Assignment operator
316 CProxy& CProxy::operator=(const CProxy &src) {
317         CkDelegateData *oldPtr=delegatedPtr;
318         ckUndelegate();
319         delegatedMgr=src.delegatedMgr;
320         delegatedGroupId = src.delegatedGroupId; 
321         isNodeGroup = src.isNodeGroup;
322
323         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
324             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
325         else
326             delegatedPtr = NULL;
327
328         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
329         if (oldPtr) oldPtr->unref();
330         return *this;
331 }
332
333 void CProxy::pup(PUP::er &p) {
334   if (!p.isUnpacking()) {
335     if (ckDelegatedTo() != NULL) {
336       delegatedGroupId = delegatedMgr->CkGetGroupID();
337       isNodeGroup = delegatedMgr->isNodeGroup();
338     }
339   }
340   p|delegatedGroupId;
341   if (!delegatedGroupId.isZero()) {
342     p|isNodeGroup;
343     if (p.isUnpacking()) {
344       delegatedMgr = ckDelegatedTo(); 
345     }
346
347     // if delegated manager has not been created, construct a dummy
348     // object on which to call DelegatePointerPup
349     if (delegatedMgr == NULL) {
350
351       int migCtor; 
352       if (isNodeGroup) {
353         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354         migCtor = 
355           CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getmigCtor();
356         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
357       }
358       else  {
359         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
360         migCtor = 
361           CkpvAccess(_groupTable)->find(delegatedGroupId).getmigCtor();
362         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
363       } 
364
365       // create a dummy object for calling DelegatePointerPup
366       int objId = _entryTable[migCtor]->chareIdx; 
367       int objSize = _chareTable[objId]->size; 
368       void *obj = malloc(objSize); 
369       _entryTable[migCtor]->call(NULL, obj); 
370       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
371         ->DelegatePointerPup(p, delegatedPtr);           
372       free(obj);
373
374     }
375     else {
376
377       // delegated manager has been created, so we can use it
378       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
379
380     }
381
382     if (p.isUnpacking() && delegatedPtr) {
383       delegatedPtr->ref();
384     }
385   }
386 }
387
388 /**** Array sections */
389 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
390 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
391   _cookie.get_aid() = aid;      \
392   _cookie.get_pe() = CkMyPe();  \
393   _elems = new CkArrayIndex[nElems];    \
394   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
395   pelist = NULL;        \
396   npes  = 0;    \
397 }
398
399 CKSECTIONID_CONSTRUCTOR_DEF(1D)
400 CKSECTIONID_CONSTRUCTOR_DEF(2D)
401 CKSECTIONID_CONSTRUCTOR_DEF(3D)
402 CKSECTIONID_CONSTRUCTOR_DEF(4D)
403 CKSECTIONID_CONSTRUCTOR_DEF(5D)
404 CKSECTIONID_CONSTRUCTOR_DEF(6D)
405 CKSECTIONID_CONSTRUCTOR_DEF(Max)
406
407 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
408   pelist = new int[npes];
409   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
410   _cookie.get_aid() = gid;
411 }
412
413 CkSectionID::CkSectionID(const CkSectionID &sid) {
414   int i;
415   _cookie = sid._cookie;
416   _nElems = sid._nElems;
417   if (_nElems > 0) {
418     _elems = new CkArrayIndex[_nElems];
419     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
420   } else _elems = NULL;
421   npes = sid.npes;
422   if (npes > 0) {
423     pelist = new int[npes];
424     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
425   } else pelist = NULL;
426 }
427
428 void CkSectionID::operator=(const CkSectionID &sid) {
429   int i;
430   _cookie = sid._cookie;
431   _nElems = sid._nElems;
432   if (_nElems > 0) {
433     _elems = new CkArrayIndex[_nElems];
434     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
435   } else _elems = NULL;
436   npes = sid.npes;
437   if (npes > 0) {
438     pelist = new int[npes];
439     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
440   } else pelist = NULL;
441 }
442
443 void CkSectionID::pup(PUP::er &p) {
444     p | _cookie;
445     p(_nElems);
446     if (_nElems > 0) {
447       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
448       for (int i=0; i< _nElems; i++) p | _elems[i];
449       npes = 0;
450       pelist = NULL;
451     } else {
452       // If _nElems is zero, than this section describes processors instead of array elements
453       _elems = NULL;
454       p(npes);
455       if (p.isUnpacking()) pelist = new int[npes];
456       p(pelist, npes);
457     }
458 }
459
460 /**** Tiny random API routines */
461
462 #ifdef CMK_CUDA
463 void CUDACallbackManager(void *fn) {
464   if (fn != NULL) {
465     CkCallback *cb = (CkCallback*) fn;
466     cb->send();
467   }
468 }
469
470 #endif
471
472 extern "C"
473 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
474 {
475   UsrToEnv(msg)->setRef(ref);
476 }
477
478 extern "C"
479 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
480 {
481   return UsrToEnv(msg)->getRef();
482 }
483
484 extern "C"
485 int CkGetSrcPe(void *msg)
486 {
487   return UsrToEnv(msg)->getSrcPe();
488 }
489
490 extern "C"
491 int CkGetSrcNode(void *msg)
492 {
493   return CmiNodeOf(CkGetSrcPe(msg));
494 }
495
496 extern "C"
497 void *CkLocalBranch(CkGroupID gID) {
498   return _localBranch(gID);
499 }
500
501 static
502 void *_ckLocalNodeBranch(CkGroupID groupID) {
503   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
504   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
505   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
506   return retval;
507 }
508
509 extern "C"
510 void *CkLocalNodeBranch(CkGroupID groupID)
511 {
512   void *retval;
513   // we are called in a constructor
514   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
515     return CkpvAccess(_currentNodeGroupObj);
516   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
517   { // Nodegroup hasn't finished being created yet-- schedule...
518     CsdScheduler(0);
519   }
520   return retval;
521 }
522
523 extern "C"
524 void *CkLocalChare(const CkChareID *pCid)
525 {
526         int pe=pCid->onPE;
527         if (pe<0) { //A virtual chare ID
528                 if (pe!=(-(CkMyPe()+1)))
529                         return NULL;//VID block not on this PE
530 #ifdef CMK_CHARE_USE_PTR
531                 VidBlock *v=(VidBlock *)pCid->objPtr;
532 #else
533                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
534 #endif
535                 return v->getLocalChareObj();
536         }
537         else
538         { //An ordinary chare ID
539                 if (pe!=CkMyPe())
540                         return NULL;//Chare not on this PE
541 #ifdef CMK_CHARE_USE_PTR
542                 return pCid->objPtr;
543 #else
544                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
545 #endif
546         }
547 };
548
549 CkpvDeclare(char**,Ck_argv);
550
551 extern "C" char **CkGetArgv(void) {
552         return CkpvAccess(Ck_argv);
553 }
554 extern "C" int CkGetArgc(void) {
555         return CmiGetArgc(CkpvAccess(Ck_argv));
556 }
557
558 /******************** Basic support *****************/
559 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
560 {
561 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
562         CpvAccess(_currentObj) = (Chare *)obj;
563 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
564 #endif
565   //BIGSIM_OOC DEBUGGING
566   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
567   //fflush(stdout);
568 #if CMK_CHARMDEBUG
569   CpdBeforeEp(epIdx, obj, msg);
570 #endif    
571   _entryTable[epIdx]->call(msg, obj);
572 #if CMK_CHARMDEBUG
573   CpdAfterEp(epIdx);
574 #endif
575   if (_entryTable[epIdx]->noKeep)
576   { /* Method doesn't keep/delete the message, so we have to: */
577     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
578   }
579 }
580 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
581 {
582   //BIGSIM_OOC DEBUGGING
583   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
584   //fflush(stdout);
585
586   void *deliverMsg;
587 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
588         CpvAccess(_currentObj) = (Chare *)obj;
589 #endif
590   if (_entryTable[epIdx]->noKeep)
591   { /* Deliver a read-only copy of the message */
592     deliverMsg=(void *)msg;
593   } else
594   { /* Method needs a copy of the message to keep/delete */
595     void *oldMsg=(void *)msg;
596     deliverMsg=CkCopyMsg(&oldMsg);
597 #if CMK_ERROR_CHECKING
598     if (oldMsg!=msg)
599       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
600 #endif
601   }
602 #if CMK_CHARMDEBUG
603   CpdBeforeEp(epIdx, obj, (void*)msg);
604 #endif
605   _entryTable[epIdx]->call(deliverMsg, obj);
606 #if CMK_CHARMDEBUG
607   CpdAfterEp(epIdx);
608 #endif
609 }
610
611 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
612 {
613   register void *msg = EnvToUsr(env);
614   _SET_USED(env, 0);
615   CkDeliverMessageFree(epIdx,msg,obj);
616 }
617
618 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
619 {
620
621 #if CMK_TRACE_ENABLED 
622   if (_entryTable[epIdx]->traceEnabled) {
623     _TRACE_BEGIN_EXECUTE(env);
624     _invokeEntryNoTrace(epIdx,env,obj);
625     _TRACE_END_EXECUTE();
626   }
627   else
628 #endif
629     _invokeEntryNoTrace(epIdx,env,obj);
630
631 }
632
633 /********************* Creation ********************/
634
635 extern "C"
636 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
637 {
638   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
639   envelope *env = UsrToEnv(msg);
640   _CHECK_USED(env);
641   if(pCid == 0) {
642     env->setMsgtype(NewChareMsg);
643   } else {
644     pCid->onPE = (-(CkMyPe()+1));
645     //  pCid->magic = _GETIDX(cIdx);
646     pCid->objPtr = (void *) new VidBlock();
647     _MEMCHECK(pCid->objPtr);
648     env->setMsgtype(NewVChareMsg);
649     env->setVidPtr(pCid->objPtr);
650 #ifndef CMK_CHARE_USE_PTR
651     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
652     int idx = CkpvAccess(vidblocks).size()-1;
653     pCid->objPtr = (void *)idx;
654     env->setVidPtr((void *)idx);
655 #endif
656   }
657   env->setEpIdx(eIdx);
658   env->setSrcPe(CkMyPe());
659   CmiSetHandler(env, _charmHandlerIdx);
660   _TRACE_CREATION_1(env);
661   CpvAccess(_qd)->create();
662   _STATS_RECORD_CREATE_CHARE_1();
663   _SET_USED(env, 1);
664   if(destPE == CK_PE_ANY)
665     env->setForAnyPE(1);
666   else
667     env->setForAnyPE(0);
668   _CldEnqueue(destPE, env, _infoIdx);
669   _TRACE_CREATION_DONE(1);
670 }
671
672 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
673 {
674   register int gIdx = _entryTable[epIdx]->chareIdx;
675   register void *obj = malloc(_chareTable[gIdx]->size);
676   _MEMCHECK(obj);
677   setMemoryTypeChare(obj);
678   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
679   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
680   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
681   CkpvAccess(_groupIDTable)->push_back(groupID);
682   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
683   if(ptrq) {
684     void *pending;
685     while((pending=ptrq->deq())!=0)
686       _CldEnqueue(CkMyPe(), pending, _infoIdx);
687 //    delete ptrq;
688       CkpvAccess(_groupTable)->find(groupID).clearPending();
689   }
690   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
691
692   CkpvAccess(_currentGroup) = groupID;
693   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
694 #ifndef CMK_CHARE_USE_PTR
695   //((Chare *)obj)->chareIdx = -1;
696   CkpvAccess(currentChareIdx) = -1;
697 #endif
698   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
699   _STATS_RECORD_PROCESS_GROUP_1();
700 }
701
702 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
703 {
704   register int gIdx = _entryTable[epIdx]->chareIdx;
705   int objSize=_chareTable[gIdx]->size;
706   register void *obj = malloc(objSize);
707   _MEMCHECK(obj);
708   setMemoryTypeChare(obj);
709   CkpvAccess(_currentGroup) = groupID;
710
711 // Now that the NodeGroup is created, add it to the table.
712 //  NodeGroups can be accessed by multiple processors, so
713 //  this is in the opposite order from groups - invoking the constructor
714 //  before registering it.
715 // User may call CkLocalNodeBranch() inside the nodegroup constructor
716 //  store nodegroup into _currentNodeGroupObj
717   CkpvAccess(_currentNodeGroupObj) = obj;
718 #ifndef CMK_CHARE_USE_PTR
719   //((Chare *)obj)->chareIdx = -1;
720   CkpvAccess(currentChareIdx) = -1;
721 #endif
722   _invokeEntryNoTrace(epIdx,env,obj);
723   CkpvAccess(_currentNodeGroupObj) = NULL;
724   _STATS_RECORD_PROCESS_NODE_GROUP_1();
725
726   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
727   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
728   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
729   CksvAccess(_nodeGroupIDTable).push_back(groupID);
730
731   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
732   if(ptrq) {
733     void *pending;
734     while((pending=ptrq->deq())!=0)
735       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
736 //    delete ptrq;
737       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
738   }
739   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
740 }
741
742 void _createGroup(CkGroupID groupID, envelope *env)
743 {
744   _CHECK_USED(env);
745   _SET_USED(env, 1);
746   register int epIdx = env->getEpIdx();
747   int gIdx = _entryTable[epIdx]->chareIdx;
748   CkNodeGroupID rednMgr;
749   if(_chareTable[gIdx]->isIrr == 0){
750                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
751                 rednMgr = rednMgrProxy;
752 //              rednMgrProxy.setAttachedGroup(groupID);
753   }else{
754         rednMgr.setZero();
755   }
756   env->setGroupNum(groupID);
757   env->setSrcPe(CkMyPe());
758   env->setRednMgr(rednMgr);
759   env->setGroupEpoch(CkpvAccess(_charmEpoch));
760
761   if(CkNumPes()>1) {
762     CkPackMessage(&env);
763     CmiSetHandler(env, _bocHandlerIdx);
764     _numInitMsgs++;
765     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
766     CpvAccess(_qd)->create(CkNumPes()-1);
767     CkUnpackMessage(&env);
768   }
769   _STATS_RECORD_CREATE_GROUP_1();
770   CkCreateLocalGroup(groupID, epIdx, env);
771 }
772
773 void _createNodeGroup(CkGroupID groupID, envelope *env)
774 {
775   _CHECK_USED(env);
776   _SET_USED(env, 1);
777   register int epIdx = env->getEpIdx();
778   env->setGroupNum(groupID);
779   env->setSrcPe(CkMyPe());
780   env->setGroupEpoch(CkpvAccess(_charmEpoch));
781   if(CkNumNodes()>1) {
782     CkPackMessage(&env);
783     CmiSetHandler(env, _bocHandlerIdx);
784     _numInitMsgs++;
785     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
786     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
787     CpvAccess(_qd)->create(CkNumNodes()-1);
788     CkUnpackMessage(&env);
789   }
790   _STATS_RECORD_CREATE_NODE_GROUP_1();
791   CkCreateLocalNodeGroup(groupID, epIdx, env);
792 }
793
794 // new _groupCreate
795
796 static CkGroupID _groupCreate(envelope *env)
797 {
798   register CkGroupID groupNum;
799
800   // check CkMyPe(). if it is 0 then idx is _numGroups++
801   // if not, then something else...
802   if(CkMyPe() == 0)
803      groupNum.idx = CkpvAccess(_numGroups)++;
804   else
805      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
806   _createGroup(groupNum, env);
807   return groupNum;
808 }
809
810 // new _nodeGroupCreate
811 static CkGroupID _nodeGroupCreate(envelope *env)
812 {
813   register CkGroupID groupNum;
814   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
815   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
816           groupNum.idx = CksvAccess(_numNodeGroups)++;
817    else
818           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
819   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
820   _createNodeGroup(groupNum, env);
821   return groupNum;
822 }
823
824 /**** generate the group idx when group is creator pe is not pe0
825  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
826  **** remaining bits contain the group creator processor number and
827  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
828
829 int _getGroupIdx(int numNodes,int myNode,int numGroups)
830 {
831         int idx;
832         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
833         int n = 32 - (x+1);                                     // number of bits remaining for the index
834         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
835                                                                 // then add the next available index
836         // of course this won't work when int is 8 bytes long on T3E
837         //idx |= 0x80000000;                                      // set the most significant bit to 1
838         idx = - idx;
839                                                                 // if int is not 32 bits, wouldn't this be wrong?
840         return idx;
841 }
842
843 extern "C"
844 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
845 {
846   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
847   register envelope *env = UsrToEnv(msg);
848   env->setMsgtype(BocInitMsg);
849   env->setEpIdx(eIdx);
850   env->setSrcPe(CkMyPe());
851   _TRACE_CREATION_N(env, CkNumPes());
852   CkGroupID gid = _groupCreate(env);
853   _TRACE_CREATION_DONE(1);
854   return gid;
855 }
856
857 extern "C"
858 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
859 {
860   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
861   register envelope *env = UsrToEnv(msg);
862   env->setMsgtype(NodeBocInitMsg);
863   env->setEpIdx(eIdx);
864   env->setSrcPe(CkMyPe());
865   _TRACE_CREATION_N(env, CkNumNodes());
866   CkGroupID gid = _nodeGroupCreate(env);
867   _TRACE_CREATION_DONE(1);
868   return gid;
869 }
870
871 static inline void *_allocNewChare(envelope *env, int &idx)
872 {
873   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
874   void *tmp=malloc(_chareTable[chareIdx]->size);
875   _MEMCHECK(tmp);
876 #ifndef CMK_CHARE_USE_PTR
877   CkpvAccess(chare_objs).push_back(tmp);
878   CkpvAccess(chare_types).push_back(chareIdx);
879   idx = CkpvAccess(chare_objs).size()-1;
880 #endif
881   setMemoryTypeChare(tmp);
882   return tmp;
883 }
884
885 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
886 {
887   int idx;
888   register void *obj = _allocNewChare(env, idx);
889 #ifndef CMK_CHARE_USE_PTR
890   //((Chare *)obj)->chareIdx = idx;
891   CkpvAccess(currentChareIdx) = idx;
892 #endif
893   _invokeEntry(env->getEpIdx(),env,obj);
894 }
895
896 void CkCreateLocalChare(int epIdx, envelope *env)
897 {
898   env->setEpIdx(epIdx);
899   _processNewChareMsg(NULL, env);
900 }
901
902 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
903 {
904   int idx;
905   register void *obj = _allocNewChare(env, idx);
906   register CkChareID *pCid = (CkChareID *)
907       _allocMsg(FillVidMsg, sizeof(CkChareID));
908   pCid->onPE = CkMyPe();
909 #ifndef CMK_CHARE_USE_PTR
910   pCid->objPtr = (void*)idx;
911 #else
912   pCid->objPtr = obj;
913 #endif
914   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
915   register envelope *ret = UsrToEnv(pCid);
916   ret->setVidPtr(env->getVidPtr());
917   register int srcPe = env->getSrcPe();
918   ret->setSrcPe(CkMyPe());
919   CmiSetHandler(ret, _charmHandlerIdx);
920   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
921 #ifndef CMK_CHARE_USE_PTR
922   // register the remote vidblock for deletion when chare is deleted
923   CkChareID vid;
924   vid.onPE = srcPe;
925   vid.objPtr = env->getVidPtr();
926   CkpvAccess(vmap)[idx] = vid;    
927 #endif
928   CpvAccess(_qd)->create();
929 #ifndef CMK_CHARE_USE_PTR
930   //((Chare *)obj)->chareIdx = idx;
931   CkpvAccess(currentChareIdx) = idx;
932 #endif
933   _invokeEntry(env->getEpIdx(),env,obj);
934 }
935
936 /************** Receive: Chares *************/
937
938 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
939 {
940   register int epIdx = env->getEpIdx();
941   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
942   register void *obj;
943   if (mainIdx != -1)  {           // mainchare
944     CmiAssert(CkMyPe()==0);
945     obj = _mainTable[mainIdx]->getObj();
946   }
947   else {
948 #ifndef CMK_CHARE_USE_PTR
949     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
950       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
951     else
952       obj = env->getObjPtr();
953 #else
954     obj = env->getObjPtr();
955 #endif
956   }
957   _invokeEntry(epIdx,env,obj);
958 }
959
960 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
961 {
962   register int epIdx = env->getEpIdx();
963   register void *obj = env->getObjPtr();
964   _invokeEntry(epIdx,env,obj);
965 }
966
967 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
968 {
969 #ifndef CMK_CHARE_USE_PTR
970   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
971 #else
972   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
973   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
974 #endif
975   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
976   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
977   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
978   CmiFree(env);
979 }
980
981 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
982 {
983 #ifndef CMK_CHARE_USE_PTR
984   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
985 #else
986   VidBlock *vptr = (VidBlock *) env->getVidPtr();
987   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
988 #endif
989   _SET_USED(env, 1);
990   vptr->send(env);
991 }
992
993 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
994 {
995 #ifndef CMK_CHARE_USE_PTR
996   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
997   delete vptr;
998   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
999 #endif
1000   CmiFree(env);
1001 }
1002
1003 /************** Receive: Groups ****************/
1004
1005 /**
1006  Return a pointer to the local BOC of "groupID".
1007  The message "env" passed in has some known dependency on this groupID
1008  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1009  Therefore, if the return value is NULL, this function buffers the massage so that
1010  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1011  The message passed in must have its handlers correctly set so that it can be
1012  scheduled again.
1013 */
1014 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1015 {
1016
1017         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1018         IrrGroup *obj = ck->localBranch(groupID);
1019         if (obj==NULL) { /* groupmember not yet created: stash message */
1020                 ck->getGroupTable()->find(groupID).enqMsg(env);
1021         }
1022         else { /* will be able to process message */
1023                 ck->process();
1024         }
1025         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1026         return obj;
1027 }
1028
1029 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1030 {
1031 #if CMK_LBDB_ON
1032   // if there is a running obj being measured, stop it temporarily
1033   LDObjHandle objHandle;
1034   int objstopped = 0;
1035   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1036   if (the_lbdb->RunningObject(&objHandle)) {
1037     objstopped = 1;
1038     the_lbdb->ObjectStop(objHandle);
1039   }
1040 #endif
1041   _invokeEntry(epIdx,env,obj);
1042 #if CMK_LBDB_ON
1043   if (objstopped) the_lbdb->ObjectStart(objHandle);
1044 #endif
1045   _STATS_RECORD_PROCESS_BRANCH_1();
1046 }
1047
1048 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1049 {
1050   register CkGroupID groupID =  env->getGroupNum();
1051   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1052   if(obj) {
1053     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1054   }
1055 }
1056
1057 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1058 {
1059   env->setMsgtype(ForChareMsg);
1060   env->setObjPtr(obj);
1061   _processForChareMsg(ck,env);
1062   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1063 }
1064
1065 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1066 {
1067   env->setEpIdx(epIdx);
1068   _deliverForNodeBocMsg(ck,env, obj);
1069 }
1070
1071 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1072 {
1073   register CkGroupID groupID = env->getGroupNum();
1074   register void *obj;
1075
1076   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1077   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1078   if(!obj) { // groupmember not yet created
1079 #if CMK_IMMEDIATE_MSG
1080     if (CmiIsImmediate(env))     // buffer immediate message
1081       CmiDelayImmediate();
1082     else
1083 #endif
1084     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1085     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1086     return;
1087   }
1088   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1089 #if CMK_IMMEDIATE_MSG
1090   if (!CmiIsImmediate(env))
1091 #endif
1092   ck->process();
1093   env->setMsgtype(ForChareMsg);
1094   env->setObjPtr(obj);
1095   _processForChareMsg(ck,env);
1096   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1097 }
1098
1099 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1100 {
1101   register CkGroupID groupID = env->getGroupNum();
1102   register int epIdx = env->getEpIdx();
1103   if (!env->getGroupDep().isZero()) {      // dependence
1104     CkGroupID dep = env->getGroupDep();
1105     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1106     if (obj == NULL) return;
1107   }
1108   else
1109     ck->process();
1110   CkCreateLocalGroup(groupID, epIdx, env);
1111 }
1112
1113 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1114 {
1115   register CkGroupID groupID = env->getGroupNum();
1116   register int epIdx = env->getEpIdx();
1117   CkCreateLocalNodeGroup(groupID, epIdx, env);
1118 }
1119
1120 /************** Receive: Arrays *************/
1121
1122 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1123   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1124   if (mgr) {
1125     _SET_USED(env, 0);
1126     mgr->insertElement((CkMessage *)EnvToUsr(env));
1127   }
1128 }
1129 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1130   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1131   if (mgr) {
1132     _SET_USED(env, 0);
1133     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1134   }
1135 }
1136
1137 //BIGSIM_OOC DEBUGGING
1138 #define TELLMSGTYPE(x) //x
1139
1140 /**
1141  * This is the main converse-level handler used by all of Charm++.
1142  *
1143  * \addtogroup CriticalPathFramework
1144  */
1145 void _processHandler(void *converseMsg,CkCoreState *ck)
1146 {
1147   register envelope *env = (envelope *) converseMsg;
1148
1149   MESSAGE_PHASE_CHECK(env);
1150
1151 //#if CMK_RECORD_REPLAY
1152   if (ck->watcher!=NULL) {
1153     if (!ck->watcher->processMessage(&env,ck)) return;
1154   }
1155 //#endif
1156 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1157         Chare *obj=NULL;
1158         CkObjID sender;
1159         MCount SN;
1160         MlogEntry *entry=NULL;
1161         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1162         env->getMsgtype() == ForArrayEltMsg){
1163                 sender = env->sender;
1164                 SN = env->SN;
1165                 int result = preProcessReceivedMessage(env,&obj,&entry);
1166                 if(result == 0){
1167                         return;
1168                 }
1169         }
1170 #endif
1171
1172 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1173   //  CkPrintf("START\n");
1174   criticalPath_start(env);
1175 #endif
1176
1177
1178   switch(env->getMsgtype()) {
1179 // Group support
1180     case BocInitMsg :
1181       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1182       // QD processing moved inside _processBocInitMsg because it is conditional
1183       //ck->process(); 
1184       if(env->isPacked()) CkUnpackMessage(&env);
1185       _processBocInitMsg(ck,env);
1186       break;
1187     case NodeBocInitMsg :
1188       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1189       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1190       _processNodeBocInitMsg(ck,env);
1191       break;
1192     case ForBocMsg :
1193       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1194       // QD processing moved inside _processForBocMsg because it is conditional
1195       if(env->isPacked()) CkUnpackMessage(&env);
1196       _processForBocMsg(ck,env);
1197       // stats record moved inside _processForBocMsg because it is conditional
1198       break;
1199     case ForNodeBocMsg :
1200       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1201       // QD processing moved to _processForNodeBocMsg because it is conditional
1202       if(env->isPacked()) CkUnpackMessage(&env);
1203       _processForNodeBocMsg(ck,env);
1204       // stats record moved to _processForNodeBocMsg because it is conditional
1205       break;
1206
1207 // Array support
1208     case ArrayEltInitMsg:
1209       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1210       if(env->isPacked()) CkUnpackMessage(&env);
1211       _processArrayEltInitMsg(ck,env);
1212       break;
1213     case ForArrayEltMsg:
1214       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1215       if(env->isPacked()) CkUnpackMessage(&env);
1216       _processArrayEltMsg(ck,env);
1217       break;
1218
1219 // Chare support
1220     case NewChareMsg :
1221       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1222       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1223       _processNewChareMsg(ck,env);
1224       _STATS_RECORD_PROCESS_CHARE_1();
1225       break;
1226     case NewVChareMsg :
1227       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1228       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1229       _processNewVChareMsg(ck,env);
1230       _STATS_RECORD_PROCESS_CHARE_1();
1231       break;
1232     case ForChareMsg :
1233       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1234       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1235       _processForPlainChareMsg(ck,env);
1236       _STATS_RECORD_PROCESS_MSG_1();
1237       break;
1238     case ForVidMsg   :
1239       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1240       ck->process();
1241       _processForVidMsg(ck,env);
1242       break;
1243     case FillVidMsg  :
1244       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1245       ck->process();
1246       _processFillVidMsg(ck,env);
1247       break;
1248     case DeleteVidMsg  :
1249       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1250       ck->process();
1251       _processDeleteVidMsg(ck,env);
1252       break;
1253
1254     default:
1255       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1256   }
1257 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1258         if(obj != NULL){
1259                 postProcessReceivedMessage(obj,sender,SN,entry);
1260         }
1261 #endif
1262
1263
1264 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1265   criticalPath_end();
1266   //  CkPrintf("STOP\n");
1267 #endif
1268
1269
1270 }
1271
1272
1273 /******************** Message Send **********************/
1274
1275 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1276              int *queueing, int *priobits, unsigned int **prioptr)
1277 {
1278   register envelope *env = (envelope *)converseMsg;
1279   *pfn = (CldPackFn)CkPackMessage;
1280   *len = env->getTotalsize();
1281   *queueing = env->getQueueing();
1282   *priobits = env->getPriobits();
1283   *prioptr = (unsigned int *) env->getPrioPtr();
1284 }
1285
1286 void CkPackMessage(envelope **pEnv)
1287 {
1288   register envelope *env = *pEnv;
1289   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1290     register void *msg = EnvToUsr(env);
1291     _TRACE_BEGIN_PACK();
1292     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1293     _TRACE_END_PACK();
1294     env=UsrToEnv(msg);
1295     env->setPacked(1);
1296     *pEnv = env;
1297   }
1298 }
1299
1300 void CkUnpackMessage(envelope **pEnv)
1301 {
1302   register envelope *env = *pEnv;
1303   register int msgIdx = env->getMsgIdx();
1304   if(env->isPacked()) {
1305     register void *msg = EnvToUsr(env);
1306     _TRACE_BEGIN_UNPACK();
1307     msg = _msgTable[msgIdx]->unpack(msg);
1308     _TRACE_END_UNPACK();
1309     env=UsrToEnv(msg);
1310     env->setPacked(0);
1311     *pEnv = env;
1312   }
1313 }
1314
1315 //There's no reason for most messages to go through the Cld--
1316 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1317 // Thus these accellerated versions of the Cld calls.
1318 #if CMK_OBJECT_QUEUE_AVAILABLE
1319 static int index_objectQHandler;
1320 #endif
1321 int index_tokenHandler;
1322 int index_skipCldHandler;
1323
1324 void _skipCldHandler(void *converseMsg)
1325 {
1326   register envelope *env = (envelope *)(converseMsg);
1327   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1328 #if CMK_GRID_QUEUE_AVAILABLE
1329   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1330     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1331                        env, env->getQueueing (), env->getPriobits (),
1332                        (unsigned int *) env->getPrioPtr ());
1333   } else {
1334     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1335                        env, env->getQueueing (), env->getPriobits (),
1336                        (unsigned int *) env->getPrioPtr ());
1337   }
1338 #else
1339   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1340         env, env->getQueueing(),env->getPriobits(),
1341         (unsigned int *)env->getPrioPtr());
1342 #endif
1343 }
1344
1345
1346 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1347 // Made non-static to be used by ckmessagelogging
1348 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1349 {
1350 #if CMK_CHARMDEBUG
1351   if (!ConverseDeliver(pe)) {
1352     CmiFree(env);
1353     return;
1354   }
1355 #endif
1356   if(pe == CkMyPe() ){
1357     if(!CmiNodeAlive(CkMyPe())){
1358         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1359 //      return;
1360     }
1361   }
1362   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1363 #if CMK_OBJECT_QUEUE_AVAILABLE
1364     Chare *obj = CkFindObjectPtr(env);
1365     if (obj && obj->CkGetObjQueue().queue()) {
1366       _enqObjQueue(obj, env);
1367     }
1368     else
1369 #endif
1370     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1371         env, env->getQueueing(),env->getPriobits(),
1372         (unsigned int *)env->getPrioPtr());
1373   } else {
1374     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1375       CkPackMessage(&env);
1376     int len=env->getTotalsize();
1377     CmiSetXHandler(env,CmiGetHandler(env));
1378 #if CMK_OBJECT_QUEUE_AVAILABLE
1379     CmiSetHandler(env,index_objectQHandler);
1380 #else
1381     CmiSetHandler(env,index_skipCldHandler);
1382 #endif
1383     CmiSetInfo(env,infoFn);
1384     if (pe==CLD_BROADCAST) {
1385 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1386                         CmiSyncBroadcast(len, (char *)env);
1387 #else
1388                         CmiSyncBroadcastAndFree(len, (char *)env); 
1389 #endif
1390
1391 }
1392     else if (pe==CLD_BROADCAST_ALL) { 
1393 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1394                         CmiSyncBroadcastAll(len, (char *)env);
1395 #else
1396                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1397 #endif
1398
1399 }
1400     else{
1401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1402                         CmiSyncSend(pe, len, (char *)env);
1403 #else
1404                         CmiSyncSendAndFree(pe, len, (char *)env);
1405 #endif
1406
1407                 }
1408   }
1409 }
1410
1411 #if CMK_BLUEGENE_CHARM
1412 #   define  _skipCldEnqueue   _CldEnqueue
1413 #endif
1414
1415 // by pass Charm++ priority queue, send as Converse message
1416 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1417 {
1418 #if CMK_CHARMDEBUG
1419   if (!ConverseDeliver(-1)) {
1420     CmiFree(env);
1421     return;
1422   }
1423 #endif
1424   CkPackMessage(&env);
1425   int len=env->getTotalsize();
1426   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1427 }
1428
1429 static void _noCldEnqueue(int pe, envelope *env)
1430 {
1431 /*
1432   if (pe == CkMyPe()) {
1433     CmiHandleMessage(env);
1434   } else
1435 */
1436 #if CMK_CHARMDEBUG
1437   if (!ConverseDeliver(pe)) {
1438     CmiFree(env);
1439     return;
1440   }
1441 #endif
1442   CkPackMessage(&env);
1443   int len=env->getTotalsize();
1444   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1445   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1446   else CmiSyncSendAndFree(pe, len, (char *)env);
1447 }
1448
1449 //static void _noCldNodeEnqueue(int node, envelope *env)
1450 //Made non-static to be used by ckmessagelogging
1451 void _noCldNodeEnqueue(int node, envelope *env)
1452 {
1453 /*
1454   if (node == CkMyNode()) {
1455     CmiHandleMessage(env);
1456   } else {
1457 */
1458 #if CMK_CHARMDEBUG
1459   if (!ConverseDeliver(node)) {
1460     CmiFree(env);
1461     return;
1462   }
1463 #endif
1464   CkPackMessage(&env);
1465   int len=env->getTotalsize();
1466   if (node==CLD_BROADCAST) { 
1467 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1468         CmiSyncNodeBroadcast(len, (char *)env);
1469 #else
1470         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1471 #endif
1472 }
1473   else if (node==CLD_BROADCAST_ALL) { 
1474 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1475                 CmiSyncNodeBroadcastAll(len, (char *)env);
1476 #else
1477                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1478 #endif
1479
1480 }
1481   else {
1482 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1483         CmiSyncNodeSend(node, len, (char *)env);
1484 #else
1485         CmiSyncNodeSendAndFree(node, len, (char *)env);
1486 #endif
1487   }
1488 }
1489
1490 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1491 {
1492   register envelope *env = UsrToEnv(msg);
1493   _CHECK_USED(env);
1494   _SET_USED(env, 1);
1495 #if CMK_REPLAYSYSTEM
1496   setEventID(env);
1497 #endif
1498   env->setMsgtype(ForChareMsg);
1499   env->setEpIdx(eIdx);
1500   env->setSrcPe(CkMyPe());
1501 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1502   criticalPath_send(env);
1503   automaticallySetMessagePriority(env);
1504 #endif
1505 #if CMK_CHARMDEBUG
1506   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1507 #endif
1508 #if CMK_OBJECT_QUEUE_AVAILABLE
1509   CmiSetHandler(env, index_objectQHandler);
1510 #else
1511   CmiSetHandler(env, _charmHandlerIdx);
1512 #endif
1513   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1514     register int pe = -(pCid->onPE+1);
1515     if(pe==CkMyPe()) {
1516 #ifndef CMK_CHARE_USE_PTR
1517       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1518 #else
1519       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1520 #endif
1521       void *objPtr;
1522       if (NULL!=(objPtr=vblk->getLocalChare()))
1523       { //A ready local chare
1524         env->setObjPtr(objPtr);
1525         return pe;
1526       }
1527       else { //The vidblock is not ready-- forget it
1528         vblk->send(env);
1529         return -1;
1530       }
1531     } else { //Valid vidblock for another PE:
1532       env->setMsgtype(ForVidMsg);
1533       env->setVidPtr(pCid->objPtr);
1534       return pe;
1535     }
1536   }
1537   else {
1538     env->setObjPtr(pCid->objPtr);
1539     return pCid->onPE;
1540   }
1541 }
1542
1543 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1544 {
1545   int destPE = _prepareMsg(eIdx, msg, pCid);
1546   if (destPE != -1) {
1547     register envelope *env = UsrToEnv(msg);
1548 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1549     criticalPath_send(env);
1550     automaticallySetMessagePriority(env);
1551 #endif
1552     CmiBecomeImmediate(env);
1553   }
1554   return destPE;
1555 }
1556
1557 extern "C"
1558 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1559 {
1560   if (opts & CK_MSG_INLINE) {
1561     CkSendMsgInline(entryIdx, msg, pCid, opts);
1562     return;
1563   }
1564 #if CMK_ERROR_CHECKING
1565   if (opts & CK_MSG_IMMEDIATE) {
1566     CmiAbort("Immediate message is not allowed in Chare!");
1567   }
1568 #endif
1569   register envelope *env = UsrToEnv(msg);
1570   int destPE=_prepareMsg(entryIdx,msg,pCid);
1571   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1572   // VidBlock was not yet filled). The problem is that the creation was never
1573   // traced later when the VidBlock was filled. One solution is to trace the
1574   // creation here, the other to trace it in VidBlock->msgDeliver().
1575   _TRACE_CREATION_1(env);
1576   if (destPE!=-1) {
1577     CpvAccess(_qd)->create();
1578     if (opts & CK_MSG_SKIP_OR_IMM)
1579       _noCldEnqueue(destPE, env);
1580     else
1581       _CldEnqueue(destPE, env, _infoIdx);
1582   }
1583   _TRACE_CREATION_DONE(1);
1584 }
1585
1586 extern "C"
1587 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1588 {
1589   if (pCid->onPE==CkMyPe())
1590   {
1591     if(!CmiNodeAlive(CkMyPe())){
1592         return;
1593     }
1594 #if CMK_CHARMDEBUG
1595     //Just in case we need to breakpoint or use the envelope in some way
1596     _prepareMsg(entryIndex,msg,pCid);
1597 #endif
1598                 //Just directly call the chare (skip QD handling & scheduler)
1599     register envelope *env = UsrToEnv(msg);
1600     if (env->isPacked()) CkUnpackMessage(&env);
1601     _STATS_RECORD_PROCESS_MSG_1();
1602     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1603   }
1604   else {
1605     //No way to inline a cross-processor message:
1606     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1607   }
1608 }
1609
1610 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1611 {
1612   register envelope *env = UsrToEnv(msg);
1613 #if CMK_ERROR_CHECKING
1614   CkNodeGroupID nodeRedMgr;
1615 #endif
1616   _CHECK_USED(env);
1617   _SET_USED(env, 1);
1618 #if CMK_REPLAYSYSTEM
1619   setEventID(env);
1620 #endif
1621   env->setMsgtype(type);
1622   env->setEpIdx(eIdx);
1623   env->setGroupNum(gID);
1624   env->setSrcPe(CkMyPe());
1625 #if CMK_ERROR_CHECKING
1626   nodeRedMgr.setZero();
1627   env->setRednMgr(nodeRedMgr);
1628 #endif
1629 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1630   criticalPath_send(env);
1631   automaticallySetMessagePriority(env);
1632 #endif
1633 #if CMK_CHARMDEBUG
1634   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1635 #endif
1636   CmiSetHandler(env, _charmHandlerIdx);
1637   return env;
1638 }
1639
1640 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1641 {
1642   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1643 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1644   criticalPath_send(env);
1645   automaticallySetMessagePriority(env);
1646 #endif
1647   CmiBecomeImmediate(env);
1648   return env;
1649 }
1650
1651 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1652                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1653 {
1654   int numPes;
1655   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1656 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1657   sendTicketGroupRequest(env,pe,_infoIdx);
1658 #else
1659   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1660   _TRACE_CREATION_N(env, numPes);
1661   if (opts & CK_MSG_SKIP_OR_IMM)
1662     _noCldEnqueue(pe, env);
1663   else
1664     _skipCldEnqueue(pe, env, _infoIdx);
1665   _TRACE_CREATION_DONE(1);
1666 #endif
1667 }
1668
1669 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1670                            int npes, int *pes)
1671 {
1672   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1673   _TRACE_CREATION_MULTICAST(env, npes, pes);
1674   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1675   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1676 }
1677
1678 extern "C"
1679 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1680 {
1681 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1682   if (destPE==CkMyPe())
1683   {
1684     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1685     return;
1686   }
1687   //Can't inline-- send the usual way
1688   register envelope *env = UsrToEnv(msg);
1689   int numPes;
1690   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1691   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1692   _TRACE_CREATION_N(env, numPes);
1693   _noCldEnqueue(destPE, env);
1694   _STATS_RECORD_SEND_BRANCH_1();
1695   CkpvAccess(_coreState)->create();
1696   _TRACE_CREATION_DONE(1);
1697 #else
1698   // no support for immediate message, send inline
1699   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1700 #endif
1701 }
1702
1703 extern "C"
1704 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1705 {
1706   if (destPE==CkMyPe())
1707   {
1708     if(!CmiNodeAlive(CkMyPe())){
1709         return;
1710     }
1711     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1712     if (obj!=NULL)
1713     { //Just directly call the group:
1714 #if CMK_ERROR_CHECKING
1715       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1716 #else
1717       envelope *env=UsrToEnv(msg);
1718 #endif
1719       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1720       return;
1721     }
1722   }
1723   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1724   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1725 }
1726
1727 extern "C"
1728 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1729 {
1730   if (opts & CK_MSG_INLINE) {
1731     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1732     return;
1733   }
1734   if (opts & CK_MSG_IMMEDIATE) {
1735     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1736     return;
1737   }
1738   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1739   _STATS_RECORD_SEND_BRANCH_1();
1740   CkpvAccess(_coreState)->create();
1741 }
1742
1743 extern "C"
1744 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1745 {
1746 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1747   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1748   _TRACE_CREATION_MULTICAST(env, npes, pes);
1749   _noCldEnqueueMulti(npes, pes, env);
1750   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1751 #else
1752   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1753   CpvAccess(_qd)->create(-npes);
1754 #endif
1755   _STATS_RECORD_SEND_BRANCH_N(npes);
1756   CpvAccess(_qd)->create(npes);
1757 }
1758
1759 extern "C"
1760 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1761 {
1762   if (opts & CK_MSG_IMMEDIATE) {
1763     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1764     return;
1765   }
1766     // normal mesg
1767   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1768   _STATS_RECORD_SEND_BRANCH_N(npes);
1769   CpvAccess(_qd)->create(npes);
1770 }
1771
1772 extern "C"
1773 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1774 {
1775   int npes;
1776   int *pes;
1777   if (opts & CK_MSG_IMMEDIATE) {
1778     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1779     return;
1780   }
1781     // normal mesg
1782   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1783   CmiLookupGroup(grp, &npes, &pes);
1784   _TRACE_CREATION_MULTICAST(env, npes, pes);
1785   _CldEnqueueGroup(grp, env, _infoIdx);
1786   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1787   _STATS_RECORD_SEND_BRANCH_N(npes);
1788   CpvAccess(_qd)->create(npes);
1789 }
1790
1791 extern "C"
1792 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1793 {
1794   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1795   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1796   CpvAccess(_qd)->create(CkNumPes());
1797 }
1798
1799 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1800                 int node=CLD_BROADCAST_ALL, int opts=0)
1801 {
1802   int numPes;
1803   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1804 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1805         sendTicketNodeGroupRequest(env,node,_infoIdx);
1806 #else
1807   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1808   _TRACE_CREATION_N(env, numPes);
1809   if (opts & CK_MSG_SKIP_OR_IMM) {
1810     _noCldNodeEnqueue(node, env);
1811     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1812       CkpvAccess(_coreState)->create(-numPes);
1813     }
1814   }
1815   else
1816     _CldNodeEnqueue(node, env, _infoIdx);
1817   _TRACE_CREATION_DONE(1);
1818 #endif
1819 }
1820
1821 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1822                            int npes, int *nodes)
1823 {
1824   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1825   _TRACE_CREATION_N(env, npes);
1826   for (int i=0; i<npes; i++) {
1827     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1828   }
1829   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1830 }
1831
1832 extern "C"
1833 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1834 {
1835 #if CMK_IMMEDIATE_MSG
1836   if (node==CkMyNode())
1837   {
1838     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1839     return;
1840   }
1841   //Can't inline-- send the usual way
1842   register envelope *env = UsrToEnv(msg);
1843   int numPes;
1844   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1845   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1846   _TRACE_CREATION_N(env, numPes);
1847   _noCldNodeEnqueue(node, env);
1848   _STATS_RECORD_SEND_BRANCH_1();
1849   /* immeidate message is invisible to QD */
1850 //  CkpvAccess(_coreState)->create();
1851   _TRACE_CREATION_DONE(1);
1852 #else
1853   // no support for immediate message, send inline
1854   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1855 #endif
1856 }
1857
1858 extern "C"
1859 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1860 {
1861   if (node==CkMyNode())
1862   {
1863     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1864     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1865     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1866     if (obj!=NULL)
1867     { //Just directly call the group:
1868 #if CMK_ERROR_CHECKING
1869       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1870 #else
1871       envelope *env=UsrToEnv(msg);
1872 #endif
1873       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1874       return;
1875     }
1876   }
1877   //Can't inline-- send the usual way
1878   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1879 }
1880
1881 extern "C"
1882 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1883 {
1884   if (opts & CK_MSG_INLINE) {
1885     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1886     return;
1887   }
1888   if (opts & CK_MSG_IMMEDIATE) {
1889     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1890     return;
1891   }
1892   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1893   _STATS_RECORD_SEND_NODE_BRANCH_1();
1894   CkpvAccess(_coreState)->create();
1895 }
1896
1897 extern "C"
1898 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1899 {
1900 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1901   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1902   _noCldEnqueueMulti(npes, nodes, env);
1903 #else
1904   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1905   CpvAccess(_qd)->create(-npes);
1906 #endif
1907   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1908   CpvAccess(_qd)->create(npes);
1909 }
1910
1911 extern "C"
1912 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1913 {
1914   if (opts & CK_MSG_IMMEDIATE) {
1915     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1916     return;
1917   }
1918     // normal mesg
1919   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1920   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1921   CpvAccess(_qd)->create(npes);
1922 }
1923
1924 extern "C"
1925 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1926 {
1927   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1928   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1929   CpvAccess(_qd)->create(CkNumNodes());
1930 }
1931
1932 //Needed by delegation manager:
1933 extern "C"
1934 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1935 { return _prepareMsg(eIdx,msg,pCid); }
1936 extern "C"
1937 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1938 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1939 extern "C"
1940 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1941 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1942
1943 void _ckModuleInit(void) {
1944         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1945 #if CMK_OBJECT_QUEUE_AVAILABLE
1946         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1947 #endif
1948         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1949         CkpvInitialize(TokenPool*, _tokenPool);
1950         CkpvAccess(_tokenPool) = new TokenPool;
1951 }
1952
1953
1954 /************** Send: Arrays *************/
1955
1956 extern void CkArrayManagerInsert(int onPe,void *msg);
1957 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1958
1959 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1960 {
1961   _CHECK_USED(env);
1962   _SET_USED(env, 1);
1963   env->setMsgtype(type);
1964 #if CMK_CHARMDEBUG
1965   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1966 #endif
1967   CmiSetHandler(env, _charmHandlerIdx);
1968   CpvAccess(_qd)->create();
1969 }
1970
1971 extern "C"
1972 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1973   register envelope *env = UsrToEnv(msg);
1974   env->getsetArrayMgr()=aID;
1975   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1976   _CldEnqueue(pe, env, _infoIdx);
1977 }
1978
1979 extern "C"
1980 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1981   register envelope *env = UsrToEnv(msg);
1982   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1983 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1984    sendTicketArrayRequest(env,pe,_infoIdx);
1985 #else
1986   if (opts & CK_MSG_IMMEDIATE)
1987     CmiBecomeImmediate(env);
1988   if (opts & CK_MSG_SKIP_OR_IMM)
1989     _noCldEnqueue(pe, env);
1990   else
1991     _skipCldEnqueue(pe, env, _infoIdx);
1992 #endif
1993 }
1994
1995 class ElementDestroyer : public CkLocIterator {
1996 private:
1997         CkLocMgr *locMgr;
1998 public:
1999         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2000         void addLocation(CkLocation &loc) {
2001           loc.destroyAll();
2002         }
2003 };
2004
2005 void CkDeleteChares() {
2006   int i;
2007   int numGroups = CkpvAccess(_groupIDTable)->size();
2008
2009   // delete all plain chares
2010 #ifndef CMK_CHARE_USE_PTR
2011   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2012         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2013         delete obj;
2014         CkpvAccess(chare_objs)[i] = NULL;
2015   }
2016   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2017         VidBlock *obj = CkpvAccess(vidblocks)[i];
2018         delete obj;
2019         CkpvAccess(vidblocks)[i] = NULL;
2020   }
2021 #endif
2022
2023   // delete all array elements
2024   for(i=0;i<numGroups;i++) {
2025     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2026     if(obj && obj->isLocMgr())  {
2027       CkLocMgr *mgr = (CkLocMgr*)obj;
2028       ElementDestroyer destroyer(mgr);
2029       mgr->iterate(destroyer);
2030     }
2031   }
2032
2033   // delete all groups
2034   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2035   for(i=0;i<numGroups;i++) {
2036     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2037     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2038     if (obj) delete obj;
2039   }
2040   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2041
2042   // delete all node groups
2043   if (CkMyRank() == 0) {
2044     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2045     for(i=0;i<numNodeGroups;i++) {
2046       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2047       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2048       if (obj) delete obj;
2049     }
2050   }
2051 }
2052
2053 //------------------- Message Watcher (record/replay) ----------------
2054
2055 #include "crc32.h"
2056
2057 CkpvDeclare(int, envelopeEventID);
2058 int _recplay_crc = 0;
2059 int _recplay_checksum = 0;
2060 int _recplay_logsize = 1024*1024;
2061
2062 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2063 #define REPLAYDEBUG(args) /* empty */
2064
2065 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2066
2067 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2068
2069 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2070
2071     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2072     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2073     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2074     FILE *f=fopen(fName,permissions);
2075     REPLAYDEBUG("openReplayfile "<<fName);
2076     if (f==NULL) {
2077         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2078             CkMyPe(),fName,permissions);
2079         CkAbort("openReplayFile> Could not open replay file");
2080     }
2081     return f;
2082 }
2083
2084 #include "BaseLB.h" /* For LBMigrateMsg message */
2085
2086 class CkMessageRecorder : public CkMessageWatcher {
2087   char *buffer;
2088   unsigned int curpos;
2089   bool firstOpen;
2090 public:
2091   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2092   ~CkMessageRecorder() {
2093     flushLog(0);
2094     fprintf(f,"-1 -1 -1 ");
2095     fclose(f);
2096     delete[] buffer;
2097 #if 0
2098     FILE *stsfp = fopen("sts", "w");
2099     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2100     traceWriteSTS(stsfp, 0);
2101     fclose(stsfp);
2102 #endif
2103     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2104   }
2105
2106 private:
2107   void flushLog(int verbose=1) {
2108     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2109     fprintf(f, "%s", buffer);
2110     curpos=0;
2111   }
2112   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2113     if ((*envptr)->getEvent()) {
2114       bool wasPacked = (*envptr)->isPacked();
2115       if (!wasPacked) CkPackMessage(envptr);
2116       envelope *env = *envptr;
2117       unsigned int crc1=0, crc2=0;
2118       if (_recplay_crc) {
2119         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2120         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2121         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2122       } else if (_recplay_checksum) {
2123         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2124         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2125       }
2126       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2127       if (curpos > _recplay_logsize-128) flushLog();
2128       if (!wasPacked) CkUnpackMessage(envptr);
2129     }
2130     return CmiTrue;
2131   }
2132   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2133     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2134     if (curpos > _recplay_logsize-128) flushLog();
2135     return CmiTrue;
2136   }
2137   
2138   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2139     FILE *f;
2140     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2141     else f = openReplayFile("ckreplay_",".lb","a");
2142     firstOpen = false;
2143     if (f != NULL) {
2144       PUP::toDisk p(f);
2145       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2146       (*msg)->pup(p);
2147       fclose(f);
2148     }
2149     return CmiTrue;
2150   }
2151 };
2152
2153 class CkMessageDetailRecorder : public CkMessageWatcher {
2154 public:
2155   CkMessageDetailRecorder(FILE *f_) {
2156     f=f_;
2157     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2158      * The value of 'x' is the pointer size.
2159      */
2160     CmiUInt2 little = sizeof(void*);
2161     fwrite(&little, 2, 1, f);
2162   }
2163   ~CkMessageDetailRecorder() {fclose(f);}
2164 private:
2165   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2166     bool wasPacked = (*envptr)->isPacked();
2167     if (!wasPacked) CkPackMessage(envptr);
2168     envelope *env = *envptr;
2169     CmiUInt4 size = env->getTotalsize();
2170     fwrite(&size, 4, 1, f);
2171     fwrite(env, env->getTotalsize(), 1, f);
2172     if (!wasPacked) CkUnpackMessage(envptr);
2173     return CmiTrue;
2174   }
2175 };
2176
2177 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2178 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2179
2180 #if CMK_BLUEGENE_CHARM
2181 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2182                                    int pb,unsigned int *prio);
2183 #endif
2184
2185 class CkMessageReplay : public CkMessageWatcher {
2186   int counter;
2187         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2188         int nextEP;
2189         unsigned int crc1, crc2;
2190         FILE *lbFile;
2191         /// Read the next message we need from the file:
2192         void getNext(void) {
2193           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2194           if (nextSize > 0) {
2195             // We are reading a regular message
2196             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2197               CkAbort("CkMessageReplay> Syntax error reading replay file");
2198             }
2199             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2200           } else if (nextSize == -2) {
2201             // We are reading a special message (right now only thread awaken)
2202             // Nothing to do since we have already read all info
2203             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2204           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2205             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2206             CkAbort("CkMessageReplay> Unrecognized input");
2207           }
2208             /*
2209                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2210                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2211                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2212                 }
2213                 */
2214                 counter++;
2215         }
2216         /// If this is the next message we need, advance and return CmiTrue.
2217         CmiBool isNext(envelope *env) {
2218                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2219                 if (nextEvent!=env->getEvent()) return CmiFalse;
2220                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2221 #if 1
2222                 if (nextEP != env->getEpIdx()) {
2223                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2224                         return CmiFalse;
2225                 }
2226 #endif
2227 #if ! CMK_BLUEGENE_CHARM
2228                 if (nextSize!=env->getTotalsize())
2229                 {
2230                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2231                         return CmiFalse;
2232                 }
2233                 if (_recplay_crc || _recplay_checksum) {
2234                   bool wasPacked = env->isPacked();
2235                   if (!wasPacked) CkPackMessage(&env);
2236                   if (_recplay_crc) {
2237                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2238                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2239                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2240                     if (crcnew1 != crc1) {
2241                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2242                     }
2243                     if (crcnew2 != crc2) {
2244                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2245                     }
2246                   } else if (_recplay_checksum) {
2247             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2248             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2249             if (crcnew1 != crc1) {
2250               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2251             }
2252             if (crcnew2 != crc2) {
2253               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2254             }               
2255                   }
2256                   if (!wasPacked) CkUnpackMessage(&env);
2257                 }
2258 #endif
2259                 return CmiTrue;
2260         }
2261         CmiBool isNext(CthThreadToken *token) {
2262           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2263           return CmiFalse;
2264         }
2265
2266         /// This is a (short) list of messages we aren't yet ready for:
2267         CkQ<envelope *> delayedMessages;
2268         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2269         CkQ<CthThreadToken *> delayedTokens;
2270
2271         /// Try to flush out any delayed messages
2272         void flush(void) {
2273           if (nextSize>0) {
2274                 int len=delayedMessages.length();
2275                 for (int i=0;i<len;i++) {
2276                         envelope *env=delayedMessages.deq();
2277                         if (isNext(env)) { /* this is the next message: process it */
2278                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2279                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2280                                 return;
2281                         }
2282                         else /* Not ready yet-- put it back in the
2283                                 queue */
2284                           {
2285                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2286                                 delayedMessages.enq(env);
2287                           }
2288                 }
2289           } else if (nextSize==-2) {
2290             int len=delayedTokens.length();
2291             for (int i=0;i<len;++i) {
2292               CthThreadToken *token=delayedTokens.deq();
2293               if (isNext(token)) {
2294             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2295 #if ! CMK_BLUEGENE_CHARM
2296                 CsdEnqueueLifo((void*)token);
2297 #else
2298                 CthEnqueueBigSimThread(token,0,0,NULL);
2299 #endif
2300                 return;
2301               } else {
2302             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2303                 delayedTokens.enq(token);
2304               }
2305             }
2306           }
2307         }
2308
2309 public:
2310         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2311           counter=0;
2312           f=f_;
2313           getNext();
2314           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2315           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2316         }
2317         ~CkMessageReplay() {fclose(f);}
2318
2319 private:
2320         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2321           bool wasPacked = (*envptr)->isPacked();
2322           if (!wasPacked) CkPackMessage(envptr);
2323           envelope *env = *envptr;
2324           //CkAssert(*(int*)env == 0x34567890);
2325           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2326                 if (env->getEvent() == 0) return CmiTrue;
2327                 if (isNext(env)) { /* This is the message we were expecting */
2328                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2329                         getNext(); /* Advance over this message */
2330                         flush(); /* try to process queued-up stuff */
2331                         if (!wasPacked) CkUnpackMessage(envptr);
2332                         return CmiTrue;
2333                 }
2334 #if CMK_SMP
2335                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2336                          // try next rank, we can't just buffer the msg and left
2337                          // we need to keep unprocessed msg on the fly
2338                         int nextpe = CkMyPe()+1;
2339                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2340                         nextpe = CkNodeFirst(CkMyNode());
2341                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2342                         return CmiFalse;
2343                 }
2344 #endif
2345                 else /*!isNext(env) */ {
2346                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2347                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2348                         delayedMessages.enq(env);
2349                         flush();
2350                         return CmiFalse;
2351                 }
2352         }
2353         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2354       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2355           if (isNext(token)) {
2356         REPLAYDEBUG("Executing token: "<<token->serialNo)
2357             getNext();
2358             flush();
2359             return CmiTrue;
2360           } else {
2361         REPLAYDEBUG("Queueing token: "<<token->serialNo
2362             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2363             delayedTokens.enq(token);
2364             return CmiFalse;
2365           }
2366         }
2367
2368         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2369           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2370           if (lbFile != NULL) {
2371             int num_moves;
2372         PUP::fromDisk p(lbFile);
2373             p | num_moves;
2374             if (num_moves != (*msg)->n_moves) {
2375               delete *msg;
2376               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2377             }
2378             (*msg)->pup(p);
2379           }
2380           return CmiTrue;
2381         }
2382 };
2383
2384 class CkMessageDetailReplay : public CkMessageWatcher {
2385   void *getNext() {
2386     CmiUInt4 size; size_t nread;
2387     if ((nread=fread(&size, 4, 1, f)) < 1) {
2388       if (feof(f)) return NULL;
2389       CkPrintf("Broken record file (metadata) got %d\n",nread);
2390       CkAbort("");
2391     }
2392     void *env = CmiAlloc(size);
2393     long tell = ftell(f);
2394     if ((nread=fread(env, size, 1, f)) < 1) {
2395       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2396       CkAbort("");
2397     }
2398     //*(int*)env = 0x34567890; // set first integer as magic
2399     return env;
2400   }
2401 public:
2402   double starttime;
2403   CkMessageDetailReplay(FILE *f_) {
2404     f=f_;
2405     starttime=CkWallTimer();
2406     /* This must match what CkMessageDetailRecorder did */
2407     CmiUInt2 little;
2408     fread(&little, 2, 1, f);
2409     if (little != sizeof(void*)) {
2410       CkAbort("Replaying on a different architecture from which recording was done!");
2411     }
2412
2413     CsdEnqueue(getNext());
2414
2415     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2416   }
2417   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2418     void *msg = getNext();
2419     if (msg != NULL) CsdEnqueue(msg);
2420     return CmiTrue;
2421   }
2422 };
2423
2424 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2425 #if ! CMK_BLUEGENE_CHARM
2426   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2427 #endif
2428   CkMessageReplay *replay = (CkMessageReplay*)rep;
2429   //CmiStartQD(CkMessageReplayQuiescence, replay);
2430 }
2431
2432 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2433   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2434   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2435   ConverseExit();
2436 }
2437
2438 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2439   CkCoreState *ck = CkpvAccess(_coreState);
2440   if (ck->watcher!=NULL) {
2441     return ck->watcher->processThread(token,ck);
2442   }
2443   return CmiTrue;
2444 }
2445
2446 CpvCExtern(int, CthResumeNormalThreadIdx);
2447 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2448 {
2449   CthThread t = token->thread;
2450
2451   if(t == NULL){
2452     free(token);
2453     return;
2454   }
2455 #if CMK_TRACE_ENABLED
2456 #if ! CMK_TRACE_IN_CHARM
2457   if(CpvAccess(traceOn))
2458     CthTraceResume(t);
2459 /*    if(CpvAccess(_traceCoreOn)) 
2460             resumeTraceCore();*/
2461 #endif
2462 #endif
2463   
2464   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2465   if (CpdExecuteThreadResume(token)) {
2466     CthResume(t);
2467   }
2468 }
2469
2470 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2471   CkCoreState *ck = CkpvAccess(_coreState);
2472   if (ck->watcher!=NULL) {
2473     ck->watcher->processLBMessage(msg, ck);
2474   }
2475 }
2476
2477 #if CMK_BLUEGENE_CHARM
2478 CpvExtern(int      , CthResumeBigSimThreadIdx);
2479 #endif
2480
2481 #include "ckliststring.h"
2482 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2483     CmiArgGroup("Charm++","Record/Replay");
2484     CmiBool forceReplay = CmiFalse;
2485     char *procs = NULL;
2486     _replaySystem = 0;
2487     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2488       _recplay_crc = 1;
2489     }
2490     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2491       _recplay_checksum = 1;
2492     }
2493     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2494     REPLAYDEBUG("CkMessageWatcherInit ");
2495     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2496         CkListString list(procs);
2497         if (list.includes(CkMyPe())) {
2498           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2499           CpdSetInitializeMemory(1);
2500           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2501         }
2502     }
2503     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2504       if (CkMyPe() == 0) {
2505         CmiPrintf("Charm++> record mode.\n");
2506         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2507           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2508           _recplay_crc = _recplay_checksum = 0;
2509         }
2510       }
2511       CpdSetInitializeMemory(1);
2512       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2513       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2514     }
2515         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2516             forceReplay = CmiTrue;
2517             CpdSetInitializeMemory(1);
2518             // Set the parameters of the processor
2519 #if CMK_SHARED_VARS_UNAVAILABLE
2520             _Cmi_mype = atoi(procs);
2521             while (procs[0]!='/') procs++;
2522             procs++;
2523             _Cmi_numpes = atoi(procs);
2524 #else
2525             CkAbort("+replay-detail available only for non-SMP build");
2526 #endif
2527             _replaySystem = 1;
2528             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2529         }
2530         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2531           if (CkMyPe() == 0)  {
2532             CmiPrintf("Charm++> replay mode.\n");
2533             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2534               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2535               _recplay_crc = _recplay_checksum = 0;
2536             }
2537           }
2538           CpdSetInitializeMemory(1);
2539 #if ! CMK_BLUEGENE_CHARM
2540           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2541 #else
2542           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2543 #endif
2544           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2545         }
2546         if (_recplay_crc && _recplay_checksum) {
2547           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2548         }
2549 }
2550
2551 extern "C"
2552 int CkMessageToEpIdx(void *msg) {
2553         envelope *env=UsrToEnv(msg);
2554         int ep=env->getEpIdx();
2555         if (ep==CkIndex_CkArray::recvBroadcast(0))
2556                 return env->getsetArrayBcastEp();
2557         else
2558                 return ep;
2559 }
2560
2561 extern "C"
2562 int getCharmEnvelopeSize() {
2563   return sizeof(envelope);
2564 }
2565
2566
2567 #include "CkMarshall.def.h"
2568