fixed a bug in calling ckLocal for a local chare.
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 #include <map>
30 CkpvDeclare(CkVec<void *>, chare_objs);
31 CkpvDeclare(CkVec<int>, chare_types);
32 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
33
34 typedef std::map<int, CkChareID>  Vidblockmap;
35 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
36 CkpvDeclare(int, currentChareIdx);
37 #endif
38
39
40 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
41
42 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
43
44 int CMessage_CkMessage::__idx=-1;
45 int CMessage_CkArgMsg::__idx=0;
46 int CkIndex_Chare::__idx;
47 int CkIndex_Group::__idx;
48 int CkIndex_ArrayBase::__idx=-1;
49
50 extern int _defaultObjectQ;
51
52 void _initChareTables()
53 {
54 #ifndef CMK_CHARE_USE_PTR
55           /* chare and vidblock table */
56   CkpvInitialize(CkVec<void *>, chare_objs);
57   CkpvInitialize(CkVec<int>, chare_types);
58   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
59   CkpvInitialize(Vidblockmap, vmap);
60   CkpvInitialize(int, currentChareIdx);
61   CkpvAccess(currentChareIdx) = -1;
62 #endif
63 }
64
65 //Charm++ virtual functions: declaring these here results in a smaller executable
66 Chare::Chare(void) {
67   thishandle.onPE=CkMyPe();
68   thishandle.objPtr=this;
69 #ifndef CMK_CHARE_USE_PTR
70      // for plain chare, objPtr is actually the index to chare obj table
71   if (CkpvAccess(currentChareIdx) >= 0) {
72     thishandle.objPtr=(void*)CkpvAccess(currentChareIdx);
73   }
74   chareIdx = CkpvAccess(currentChareIdx);
75 #endif
76 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
77   mlogData = new ChareMlogData();
78   mlogData->objID.type = TypeChare;
79   mlogData->objID.data.chare.id = thishandle;
80 #endif
81 #if CMK_OBJECT_QUEUE_AVAILABLE
82   if (_defaultObjectQ)  CkEnableObjQ();
83 #endif
84 }
85
86 Chare::Chare(CkMigrateMessage* m) {
87   thishandle.onPE=CkMyPe();
88   thishandle.objPtr=this;
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 }
149
150 int Chare::ckGetChareType() const {
151   return -3;
152 }
153 char *Chare::ckDebugChareName(void) {
154   char buf[100];
155   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
156   return strdup(buf);
157 }
158 int Chare::ckDebugChareID(char *str, int limit) {
159   // pure chares for now do not have a valid ID
160   str[0] = 0;
161   return 1;
162 }
163 void Chare::ckDebugPup(PUP::er &p) {
164   pup(p);
165 }
166
167 /// This method is called before starting a [threaded] entry method.
168 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
169   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
170   traceAddThreadListeners(th, UsrToEnv(msg));
171 }
172
173
174 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
175   p.comment("Bytes");
176   int ts=UsrToEnv(msg)->getTotalsize();
177   int msgLen=ts-sizeof(envelope);
178   if (msgLen>0)
179     p((char*)msg,msgLen);
180 }
181
182 IrrGroup::IrrGroup(void) {
183   thisgroup = CkpvAccess(_currentGroup);
184 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
185         mlogData->objID.type = TypeGroup;
186         mlogData->objID.data.group.id = thisgroup;
187         mlogData->objID.data.group.onPE = CkMyPe();
188 #endif
189 }
190
191 IrrGroup::~IrrGroup() {
192   // remove the object pointer
193   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
194   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
195   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
196 }
197
198 void IrrGroup::pup(PUP::er &p)
199 {
200   Chare::pup(p);
201   p|thisgroup;
202 }
203
204 int IrrGroup::ckGetChareType() const {
205   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
206 }
207
208 int IrrGroup::ckDebugChareID(char *str, int limit) {
209   if (limit<5) return -1;
210   str[0] = 1;
211   *((int*)&str[1]) = thisgroup.idx;
212   return 5;
213 }
214
215 char *IrrGroup::ckDebugChareName() {
216   return strdup(_chareTable[ckGetChareType()]->name);
217 }
218
219 void IrrGroup::ckJustMigrated(void)
220 {
221 }
222
223 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
224   /* FIXME: **CW** not entirely sure what we should do here yet */
225 }
226
227 void Group::CkAddThreadListeners(CthThread th, void *msg) {
228   Chare::CkAddThreadListeners(th, msg);
229   CthSetThreadID(th, thisgroup.idx, 0, 0);
230 }
231
232 void Group::pup(PUP::er &p)
233 {
234   CkReductionMgr::pup(p);
235   reductionInfo.pup(p);
236 }
237
238 /**** Delegation Manager Group */
239 CkDelegateMgr::~CkDelegateMgr() { }
240
241 //Default delegator implementation: do not delegate-- send directly
242 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
243   { CkSendMsg(ep,m,c); }
244 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
245   { CkSendMsgBranch(ep,m,onPE,g); }
246 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
247   { CkBroadcastMsgBranch(ep,m,g); }
248 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
249   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
250 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
251   { CkSendMsgNodeBranch(ep,m,onNode,g); }
252 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
253   { CkBroadcastMsgNodeBranch(ep,m,g); }
254 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
255   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
256 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
257 {
258         CProxyElement_ArrayBase ap(a,idx);
259         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
260 }
261 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
262 {
263         CProxyElement_ArrayBase ap(a,idx);
264         ap.ckSend((CkArrayMessage *)m,ep);
265 }
266 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
267 {
268         CProxy_ArrayBase ap(a);
269         ap.ckBroadcast((CkArrayMessage *)m,ep);
270 }
271
272 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
273 {
274         CmiAbort("ArraySectionSend is not implemented!\n");
275 /*
276         CProxyElement_ArrayBase ap(a,idx);
277         ap.ckSend((CkArrayMessage *)m,ep);
278 */
279 }
280
281 /*** Proxy <-> delegator communication */
282 CkDelegateData::~CkDelegateData() {}
283
284 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
285   return pd; // default implementation ignores pup call
286 }
287
288 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
289    this tricky manual reference counting business... */
290
291 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
292         if (dPtr) dPtr->ref();
293         ckUndelegate();
294         delegatedMgr = dTo;
295         delegatedPtr = dPtr;
296 }
297 void CProxy::ckUndelegate(void) {
298         delegatedMgr=NULL;
299         if (delegatedPtr) delegatedPtr->unref();
300         delegatedPtr=NULL;
301 }
302
303 /// Copy constructor
304 CProxy::CProxy(const CProxy &src)
305     :delegatedMgr(src.delegatedMgr)
306 {
307     delegatedPtr = NULL;
308     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
309         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
310 }
311
312 /// Assignment operator
313 CProxy& CProxy::operator=(const CProxy &src) {
314         CkDelegateData *oldPtr=delegatedPtr;
315         ckUndelegate();
316         delegatedMgr=src.delegatedMgr;
317
318         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
319             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
320         else
321             delegatedPtr = NULL;
322
323         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
324         if (oldPtr) oldPtr->unref();
325         return *this;
326 }
327
328 void CProxy::pup(PUP::er &p) {
329       CkGroupID delegatedTo;
330       delegatedTo.setZero();
331       int isNodeGroup = 0;
332       if (!p.isUnpacking()) {
333         if (delegatedMgr) {
334           delegatedTo = delegatedMgr->CkGetGroupID();
335           isNodeGroup = delegatedMgr->isNodeGroup();
336         }
337       }
338       p|delegatedTo;
339       if (!delegatedTo.isZero()) {
340         p|isNodeGroup;
341         if (p.isUnpacking()) {
342           if (isNodeGroup)
343                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
344           else
345                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
346         }
347
348         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
349         if (p.isUnpacking() && delegatedPtr)
350             delegatedPtr->ref();
351       }
352 }
353
354 /**** Array sections */
355 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
356 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
357   _cookie.aid = aid;    \
358   _cookie.get_pe() = CkMyPe();  \
359   _elems = new CkArrayIndex[nElems];    \
360   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
361   pelist = NULL;        \
362   npes  = 0;    \
363 }
364
365 CKSECTIONID_CONSTRUCTOR_DEF(1D)
366 CKSECTIONID_CONSTRUCTOR_DEF(2D)
367 CKSECTIONID_CONSTRUCTOR_DEF(3D)
368 CKSECTIONID_CONSTRUCTOR_DEF(4D)
369 CKSECTIONID_CONSTRUCTOR_DEF(5D)
370 CKSECTIONID_CONSTRUCTOR_DEF(6D)
371 CKSECTIONID_CONSTRUCTOR_DEF(Max)
372
373 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
374   pelist = new int[npes];
375   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
376   _cookie.aid = gid;
377 }
378
379 CkSectionID::CkSectionID(const CkSectionID &sid) {
380   int i;
381   _cookie = sid._cookie;
382   _nElems = sid._nElems;
383   if (_nElems > 0) {
384     _elems = new CkArrayIndex[_nElems];
385     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
386   } else _elems = NULL;
387   npes = sid.npes;
388   if (npes > 0) {
389     pelist = new int[npes];
390     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
391   } else pelist = NULL;
392 }
393
394 void CkSectionID::operator=(const CkSectionID &sid) {
395   int i;
396   _cookie = sid._cookie;
397   _nElems = sid._nElems;
398   if (_nElems > 0) {
399     _elems = new CkArrayIndex[_nElems];
400     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
401   } else _elems = NULL;
402   npes = sid.npes;
403   if (npes > 0) {
404     pelist = new int[npes];
405     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
406   } else pelist = NULL;
407 }
408
409 void CkSectionID::pup(PUP::er &p) {
410     p | _cookie;
411     p(_nElems);
412     if (_nElems > 0) {
413       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
414       for (int i=0; i< _nElems; i++) p | _elems[i];
415       npes = 0;
416       pelist = NULL;
417     } else {
418       // If _nElems is zero, than this section describes processors instead of array elements
419       _elems = NULL;
420       p(npes);
421       if (p.isUnpacking()) pelist = new int[npes];
422       p(pelist, npes);
423     }
424 }
425
426 /**** Tiny random API routines */
427
428 #ifdef CMK_CUDA
429 void CUDACallbackManager(void *fn) {
430   if (fn != NULL) {
431     CkCallback *cb = (CkCallback*) fn;
432     cb->send();
433   }
434 }
435
436 #endif
437
438 extern "C"
439 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
440 {
441   UsrToEnv(msg)->setRef(ref);
442 }
443
444 extern "C"
445 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
446 {
447   return UsrToEnv(msg)->getRef();
448 }
449
450 extern "C"
451 int CkGetSrcPe(void *msg)
452 {
453   return UsrToEnv(msg)->getSrcPe();
454 }
455
456 extern "C"
457 int CkGetSrcNode(void *msg)
458 {
459   return CmiNodeOf(CkGetSrcPe(msg));
460 }
461
462 extern "C"
463 void *CkLocalBranch(CkGroupID gID) {
464   return _localBranch(gID);
465 }
466
467 static
468 void *_ckLocalNodeBranch(CkGroupID groupID) {
469   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
470   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
471   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
472   return retval;
473 }
474
475 extern "C"
476 void *CkLocalNodeBranch(CkGroupID groupID)
477 {
478   void *retval;
479   // we are called in a constructor
480   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
481     return CkpvAccess(_currentNodeGroupObj);
482   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
483   { // Nodegroup hasn't finished being created yet-- schedule...
484     CsdScheduler(0);
485   }
486   return retval;
487 }
488
489 extern "C"
490 void *CkLocalChare(const CkChareID *pCid)
491 {
492         int pe=pCid->onPE;
493         if (pe<0) { //A virtual chare ID
494                 if (pe!=(-(CkMyPe()+1)))
495                         return NULL;//VID block not on this PE
496 #ifdef CMK_CHARE_USE_PTR
497                 VidBlock *v=(VidBlock *)pCid->objPtr;
498 #else
499                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
500 #endif
501                 return v->getLocalChareObj();
502         }
503         else
504         { //An ordinary chare ID
505                 if (pe!=CkMyPe())
506                         return NULL;//Chare not on this PE
507 #ifdef CMK_CHARE_USE_PTR
508                 return pCid->objPtr;
509 #else
510                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
511 #endif
512         }
513 };
514
515 CkpvDeclare(char**,Ck_argv);
516
517 extern "C" char **CkGetArgv(void) {
518         return CkpvAccess(Ck_argv);
519 }
520 extern "C" int CkGetArgc(void) {
521         return CmiGetArgc(CkpvAccess(Ck_argv));
522 }
523
524 /******************** Basic support *****************/
525 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
526 {
527 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
528         CpvAccess(_currentObj) = (Chare *)obj;
529 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
530 #endif
531   //BIGSIM_OOC DEBUGGING
532   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
533   //fflush(stdout);
534 #if CMK_CHARMDEBUG
535   CpdBeforeEp(epIdx, obj, msg);
536 #endif
537   _entryTable[epIdx]->call(msg, obj);
538 #if CMK_CHARMDEBUG
539   CpdAfterEp(epIdx);
540 #endif
541   if (_entryTable[epIdx]->noKeep)
542   { /* Method doesn't keep/delete the message, so we have to: */
543     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
544   }
545 }
546 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
547 {
548   //BIGSIM_OOC DEBUGGING
549   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
550   //fflush(stdout);
551
552   void *deliverMsg;
553 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
554         CpvAccess(_currentObj) = (Chare *)obj;
555 #endif
556   if (_entryTable[epIdx]->noKeep)
557   { /* Deliver a read-only copy of the message */
558     deliverMsg=(void *)msg;
559   } else
560   { /* Method needs a copy of the message to keep/delete */
561     void *oldMsg=(void *)msg;
562     deliverMsg=CkCopyMsg(&oldMsg);
563 #if CMK_ERROR_CHECKING
564     if (oldMsg!=msg)
565       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
566 #endif
567   }
568 #if CMK_CHARMDEBUG
569   CpdBeforeEp(epIdx, obj, (void*)msg);
570 #endif
571   _entryTable[epIdx]->call(deliverMsg, obj);
572 #if CMK_CHARMDEBUG
573   CpdAfterEp(epIdx);
574 #endif
575 }
576
577 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
578 {
579   register void *msg = EnvToUsr(env);
580   _SET_USED(env, 0);
581   CkDeliverMessageFree(epIdx,msg,obj);
582 }
583
584 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
585 {
586
587 #if CMK_TRACE_ENABLED 
588   if (_entryTable[epIdx]->traceEnabled) {
589     _TRACE_BEGIN_EXECUTE(env);
590     _invokeEntryNoTrace(epIdx,env,obj);
591     _TRACE_END_EXECUTE();
592   }
593   else
594 #endif
595     _invokeEntryNoTrace(epIdx,env,obj);
596
597 }
598
599 /********************* Creation ********************/
600
601 extern "C"
602 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
603 {
604   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
605   envelope *env = UsrToEnv(msg);
606   _CHECK_USED(env);
607   if(pCid == 0) {
608     env->setMsgtype(NewChareMsg);
609   } else {
610     pCid->onPE = (-(CkMyPe()+1));
611     //  pCid->magic = _GETIDX(cIdx);
612     pCid->objPtr = (void *) new VidBlock();
613     _MEMCHECK(pCid->objPtr);
614     env->setMsgtype(NewVChareMsg);
615     env->setVidPtr(pCid->objPtr);
616 #ifndef CMK_CHARE_USE_PTR
617     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
618     int idx = CkpvAccess(vidblocks).size()-1;
619     pCid->objPtr = (void *)idx;
620     env->setVidPtr((void *)idx);
621 #endif
622   }
623   env->setEpIdx(eIdx);
624   env->setSrcPe(CkMyPe());
625   CmiSetHandler(env, _charmHandlerIdx);
626   _TRACE_CREATION_1(env);
627   CpvAccess(_qd)->create();
628   _STATS_RECORD_CREATE_CHARE_1();
629   _SET_USED(env, 1);
630   if(destPE == CK_PE_ANY)
631     env->setForAnyPE(1);
632   else
633     env->setForAnyPE(0);
634   _CldEnqueue(destPE, env, _infoIdx);
635   _TRACE_CREATION_DONE(1);
636 }
637
638 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
639 {
640   register int gIdx = _entryTable[epIdx]->chareIdx;
641   register void *obj = malloc(_chareTable[gIdx]->size);
642   _MEMCHECK(obj);
643   setMemoryTypeChare(obj);
644   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
645   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
646   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
647   CkpvAccess(_groupIDTable)->push_back(groupID);
648   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
649   if(ptrq) {
650     void *pending;
651     while((pending=ptrq->deq())!=0)
652       _CldEnqueue(CkMyPe(), pending, _infoIdx);
653 //    delete ptrq;
654       CkpvAccess(_groupTable)->find(groupID).clearPending();
655   }
656   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
657
658   CkpvAccess(_currentGroup) = groupID;
659   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
660 #ifndef CMK_CHARE_USE_PTR
661   //((Chare *)obj)->chareIdx = -1;
662   CkpvAccess(currentChareIdx) = -1;
663 #endif
664   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
665   _STATS_RECORD_PROCESS_GROUP_1();
666 }
667
668 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
669 {
670   register int gIdx = _entryTable[epIdx]->chareIdx;
671   int objSize=_chareTable[gIdx]->size;
672   register void *obj = malloc(objSize);
673   _MEMCHECK(obj);
674   setMemoryTypeChare(obj);
675   CkpvAccess(_currentGroup) = groupID;
676
677 // Now that the NodeGroup is created, add it to the table.
678 //  NodeGroups can be accessed by multiple processors, so
679 //  this is in the opposite order from groups - invoking the constructor
680 //  before registering it.
681 // User may call CkLocalNodeBranch() inside the nodegroup constructor
682 //  store nodegroup into _currentNodeGroupObj
683   CkpvAccess(_currentNodeGroupObj) = obj;
684 #ifndef CMK_CHARE_USE_PTR
685   //((Chare *)obj)->chareIdx = -1;
686   CkpvAccess(currentChareIdx) = -1;
687 #endif
688   _invokeEntryNoTrace(epIdx,env,obj);
689   CkpvAccess(_currentNodeGroupObj) = NULL;
690   _STATS_RECORD_PROCESS_NODE_GROUP_1();
691
692   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
693   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
694   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
695   CksvAccess(_nodeGroupIDTable).push_back(groupID);
696
697   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
698   if(ptrq) {
699     void *pending;
700     while((pending=ptrq->deq())!=0)
701       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
702 //    delete ptrq;
703       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
704   }
705   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
706 }
707
708 void _createGroup(CkGroupID groupID, envelope *env)
709 {
710   _CHECK_USED(env);
711   _SET_USED(env, 1);
712   register int epIdx = env->getEpIdx();
713   int gIdx = _entryTable[epIdx]->chareIdx;
714   CkNodeGroupID rednMgr;
715   if(_chareTable[gIdx]->isIrr == 0){
716                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
717                 rednMgr = rednMgrProxy;
718 //              rednMgrProxy.setAttachedGroup(groupID);
719   }else{
720         rednMgr.setZero();
721   }
722   env->setGroupNum(groupID);
723   env->setSrcPe(CkMyPe());
724   env->setRednMgr(rednMgr);
725   env->setGroupEpoch(CkpvAccess(_charmEpoch));
726
727   if(CkNumPes()>1) {
728     CkPackMessage(&env);
729     CmiSetHandler(env, _bocHandlerIdx);
730     _numInitMsgs++;
731     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
732     CpvAccess(_qd)->create(CkNumPes()-1);
733     CkUnpackMessage(&env);
734   }
735   _STATS_RECORD_CREATE_GROUP_1();
736   CkCreateLocalGroup(groupID, epIdx, env);
737 }
738
739 void _createNodeGroup(CkGroupID groupID, envelope *env)
740 {
741   _CHECK_USED(env);
742   _SET_USED(env, 1);
743   register int epIdx = env->getEpIdx();
744   env->setGroupNum(groupID);
745   env->setSrcPe(CkMyPe());
746   env->setGroupEpoch(CkpvAccess(_charmEpoch));
747   if(CkNumNodes()>1) {
748     CkPackMessage(&env);
749     CmiSetHandler(env, _bocHandlerIdx);
750     _numInitMsgs++;
751     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
752     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
753     CpvAccess(_qd)->create(CkNumNodes()-1);
754     CkUnpackMessage(&env);
755   }
756   _STATS_RECORD_CREATE_NODE_GROUP_1();
757   CkCreateLocalNodeGroup(groupID, epIdx, env);
758 }
759
760 // new _groupCreate
761
762 static CkGroupID _groupCreate(envelope *env)
763 {
764   register CkGroupID groupNum;
765
766   // check CkMyPe(). if it is 0 then idx is _numGroups++
767   // if not, then something else...
768   if(CkMyPe() == 0)
769      groupNum.idx = CkpvAccess(_numGroups)++;
770   else
771      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
772   _createGroup(groupNum, env);
773   return groupNum;
774 }
775
776 // new _nodeGroupCreate
777 static CkGroupID _nodeGroupCreate(envelope *env)
778 {
779   register CkGroupID groupNum;
780   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
781   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
782           groupNum.idx = CksvAccess(_numNodeGroups)++;
783    else
784           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
785   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
786   _createNodeGroup(groupNum, env);
787   return groupNum;
788 }
789
790 /**** generate the group idx when group is creator pe is not pe0
791  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
792  **** remaining bits contain the group creator processor number and
793  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
794
795 int _getGroupIdx(int numNodes,int myNode,int numGroups)
796 {
797         int idx;
798         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
799         int n = 32 - (x+1);                                     // number of bits remaining for the index
800         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
801                                                                 // then add the next available index
802         // of course this won't work when int is 8 bytes long on T3E
803         //idx |= 0x80000000;                                      // set the most significant bit to 1
804         idx = - idx;
805                                                                 // if int is not 32 bits, wouldn't this be wrong?
806         return idx;
807 }
808
809 extern "C"
810 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
811 {
812   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
813   register envelope *env = UsrToEnv(msg);
814   env->setMsgtype(BocInitMsg);
815   env->setEpIdx(eIdx);
816   env->setSrcPe(CkMyPe());
817   _TRACE_CREATION_N(env, CkNumPes());
818   CkGroupID gid = _groupCreate(env);
819   _TRACE_CREATION_DONE(1);
820   return gid;
821 }
822
823 extern "C"
824 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
825 {
826   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
827   register envelope *env = UsrToEnv(msg);
828   env->setMsgtype(NodeBocInitMsg);
829   env->setEpIdx(eIdx);
830   env->setSrcPe(CkMyPe());
831   _TRACE_CREATION_N(env, CkNumNodes());
832   CkGroupID gid = _nodeGroupCreate(env);
833   _TRACE_CREATION_DONE(1);
834   return gid;
835 }
836
837 static inline void *_allocNewChare(envelope *env, int &idx)
838 {
839   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
840   void *tmp=malloc(_chareTable[chareIdx]->size);
841   _MEMCHECK(tmp);
842 #ifndef CMK_CHARE_USE_PTR
843   CkpvAccess(chare_objs).push_back(tmp);
844   CkpvAccess(chare_types).push_back(chareIdx);
845   idx = CkpvAccess(chare_objs).size()-1;
846 #endif
847   setMemoryTypeChare(tmp);
848   return tmp;
849 }
850
851 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
852 {
853   int idx;
854   register void *obj = _allocNewChare(env, idx);
855 #ifndef CMK_CHARE_USE_PTR
856   //((Chare *)obj)->chareIdx = idx;
857   CkpvAccess(currentChareIdx) = idx;
858 #endif
859   _invokeEntry(env->getEpIdx(),env,obj);
860 }
861
862 void CkCreateLocalChare(int epIdx, envelope *env)
863 {
864   env->setEpIdx(epIdx);
865   _processNewChareMsg(NULL, env);
866 }
867
868 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
869 {
870   int idx;
871   register void *obj = _allocNewChare(env, idx);
872   register CkChareID *pCid = (CkChareID *)
873       _allocMsg(FillVidMsg, sizeof(CkChareID));
874   pCid->onPE = CkMyPe();
875 #ifndef CMK_CHARE_USE_PTR
876   pCid->objPtr = (void*)idx;
877 #else
878   pCid->objPtr = obj;
879 #endif
880   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
881   register envelope *ret = UsrToEnv(pCid);
882   ret->setVidPtr(env->getVidPtr());
883   register int srcPe = env->getSrcPe();
884   ret->setSrcPe(CkMyPe());
885   CmiSetHandler(ret, _charmHandlerIdx);
886   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
887 #ifndef CMK_CHARE_USE_PTR
888   // register the remote vidblock for deletion when chare is deleted
889   CkChareID vid;
890   vid.onPE = srcPe;
891   vid.objPtr = env->getVidPtr();
892   CkpvAccess(vmap)[idx] = vid;    
893 #endif
894   CpvAccess(_qd)->create();
895 #ifndef CMK_CHARE_USE_PTR
896   //((Chare *)obj)->chareIdx = idx;
897   CkpvAccess(currentChareIdx) = idx;
898 #endif
899   _invokeEntry(env->getEpIdx(),env,obj);
900 }
901
902 /************** Receive: Chares *************/
903
904 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
905 {
906   register int epIdx = env->getEpIdx();
907   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
908   register void *obj;
909   if (mainIdx != -1)  {           // mainchare
910     CmiAssert(CkMyPe()==0);
911     obj = _mainTable[mainIdx]->getObj();
912   }
913   else {
914 #ifndef CMK_CHARE_USE_PTR
915     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
916       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
917     else
918       obj = env->getObjPtr();
919 #else
920     obj = env->getObjPtr();
921 #endif
922   }
923   _invokeEntry(epIdx,env,obj);
924 }
925
926 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
927 {
928   register int epIdx = env->getEpIdx();
929   register void *obj = env->getObjPtr();
930   _invokeEntry(epIdx,env,obj);
931 }
932
933 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
934 {
935 #ifndef CMK_CHARE_USE_PTR
936   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
937 #else
938   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
939   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
940 #endif
941   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
942   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
943   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
944   CmiFree(env);
945 }
946
947 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
948 {
949 #ifndef CMK_CHARE_USE_PTR
950   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
951 #else
952   VidBlock *vptr = (VidBlock *) env->getVidPtr();
953   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
954 #endif
955   _SET_USED(env, 1);
956   vptr->send(env);
957 }
958
959 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
960 {
961 #ifndef CMK_CHARE_USE_PTR
962   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
963   delete vptr;
964   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
965 #endif
966   CmiFree(env);
967 }
968
969 /************** Receive: Groups ****************/
970
971 /**
972  Return a pointer to the local BOC of "groupID".
973  The message "env" passed in has some known dependency on this groupID
974  (either it is to be delivered to this BOC, or it depends on this BOC being there).
975  Therefore, if the return value is NULL, this function buffers the massage so that
976  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
977  The message passed in must have its handlers correctly set so that it can be
978  scheduled again.
979 */
980 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
981 {
982
983         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
984         IrrGroup *obj = ck->localBranch(groupID);
985         if (obj==NULL) { /* groupmember not yet created: stash message */
986                 ck->getGroupTable()->find(groupID).enqMsg(env);
987         }
988         else { /* will be able to process message */
989                 ck->process();
990         }
991         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
992         return obj;
993 }
994
995 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
996 {
997 #if CMK_LBDB_ON
998   // if there is a running obj being measured, stop it temporarily
999   LDObjHandle objHandle;
1000   int objstopped = 0;
1001   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1002   if (the_lbdb->RunningObject(&objHandle)) {
1003     objstopped = 1;
1004     the_lbdb->ObjectStop(objHandle);
1005   }
1006 #endif
1007   _invokeEntry(epIdx,env,obj);
1008 #if CMK_LBDB_ON
1009   if (objstopped) the_lbdb->ObjectStart(objHandle);
1010 #endif
1011   _STATS_RECORD_PROCESS_BRANCH_1();
1012 }
1013
1014 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1015 {
1016   register CkGroupID groupID =  env->getGroupNum();
1017   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1018   if(obj) {
1019     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1020   }
1021 }
1022
1023 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1024 {
1025   env->setMsgtype(ForChareMsg);
1026   env->setObjPtr(obj);
1027   _processForChareMsg(ck,env);
1028   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1029 }
1030
1031 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1032 {
1033   env->setEpIdx(epIdx);
1034   _deliverForNodeBocMsg(ck,env, obj);
1035 }
1036
1037 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1038 {
1039   register CkGroupID groupID = env->getGroupNum();
1040   register void *obj;
1041
1042   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1043   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1044   if(!obj) { // groupmember not yet created
1045 #if CMK_IMMEDIATE_MSG
1046     if (CmiIsImmediate(env))     // buffer immediate message
1047       CmiDelayImmediate();
1048     else
1049 #endif
1050     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1051     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1052     return;
1053   }
1054   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1055 #if CMK_IMMEDIATE_MSG
1056   if (!CmiIsImmediate(env))
1057 #endif
1058   ck->process();
1059   env->setMsgtype(ForChareMsg);
1060   env->setObjPtr(obj);
1061   _processForChareMsg(ck,env);
1062   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1063 }
1064
1065 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1066 {
1067   register CkGroupID groupID = env->getGroupNum();
1068   register int epIdx = env->getEpIdx();
1069   if (!env->getGroupDep().isZero()) {      // dependence
1070     CkGroupID dep = env->getGroupDep();
1071     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1072     if (obj == NULL) return;
1073   }
1074   else
1075     ck->process();
1076   CkCreateLocalGroup(groupID, epIdx, env);
1077 }
1078
1079 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1080 {
1081   register CkGroupID groupID = env->getGroupNum();
1082   register int epIdx = env->getEpIdx();
1083   CkCreateLocalNodeGroup(groupID, epIdx, env);
1084 }
1085
1086 /************** Receive: Arrays *************/
1087
1088 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1089   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1090   if (mgr) {
1091     _SET_USED(env, 0);
1092     mgr->insertElement((CkMessage *)EnvToUsr(env));
1093   }
1094 }
1095 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1096   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1097   if (mgr) {
1098     _SET_USED(env, 0);
1099     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1100   }
1101 }
1102
1103 //BIGSIM_OOC DEBUGGING
1104 #define TELLMSGTYPE(x) //x
1105
1106 /**
1107  * This is the main converse-level handler used by all of Charm++.
1108  *
1109  * \addtogroup CriticalPathFramework
1110  */
1111 void _processHandler(void *converseMsg,CkCoreState *ck)
1112 {
1113   register envelope *env = (envelope *) converseMsg;
1114
1115   MESSAGE_PHASE_CHECK(env);
1116
1117 //#if CMK_RECORD_REPLAY
1118   if (ck->watcher!=NULL) {
1119     if (!ck->watcher->processMessage(&env,ck)) return;
1120   }
1121 //#endif
1122 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1123         Chare *obj=NULL;
1124         CkObjID sender;
1125         MCount SN;
1126         MlogEntry *entry=NULL;
1127         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1128         env->getMsgtype() == ForArrayEltMsg){
1129                 sender = env->sender;
1130                 SN = env->SN;
1131                 int result = preProcessReceivedMessage(env,&obj,&entry);
1132                 if(result == 0){
1133                         return;
1134                 }
1135         }
1136 #endif
1137
1138 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1139   //  CkPrintf("START\n");
1140   criticalPath_start(env);
1141 #endif
1142
1143
1144   switch(env->getMsgtype()) {
1145 // Group support
1146     case BocInitMsg :
1147       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1148       // QD processing moved inside _processBocInitMsg because it is conditional
1149       //ck->process(); 
1150       if(env->isPacked()) CkUnpackMessage(&env);
1151       _processBocInitMsg(ck,env);
1152       break;
1153     case NodeBocInitMsg :
1154       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1155       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1156       _processNodeBocInitMsg(ck,env);
1157       break;
1158     case ForBocMsg :
1159       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1160       // QD processing moved inside _processForBocMsg because it is conditional
1161       if(env->isPacked()) CkUnpackMessage(&env);
1162       _processForBocMsg(ck,env);
1163       // stats record moved inside _processForBocMsg because it is conditional
1164       break;
1165     case ForNodeBocMsg :
1166       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1167       // QD processing moved to _processForNodeBocMsg because it is conditional
1168       if(env->isPacked()) CkUnpackMessage(&env);
1169       _processForNodeBocMsg(ck,env);
1170       // stats record moved to _processForNodeBocMsg because it is conditional
1171       break;
1172
1173 // Array support
1174     case ArrayEltInitMsg:
1175       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1176       if(env->isPacked()) CkUnpackMessage(&env);
1177       _processArrayEltInitMsg(ck,env);
1178       break;
1179     case ForArrayEltMsg:
1180       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1181       if(env->isPacked()) CkUnpackMessage(&env);
1182       _processArrayEltMsg(ck,env);
1183       break;
1184
1185 // Chare support
1186     case NewChareMsg :
1187       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1188       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1189       _processNewChareMsg(ck,env);
1190       _STATS_RECORD_PROCESS_CHARE_1();
1191       break;
1192     case NewVChareMsg :
1193       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1194       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1195       _processNewVChareMsg(ck,env);
1196       _STATS_RECORD_PROCESS_CHARE_1();
1197       break;
1198     case ForChareMsg :
1199       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1200       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1201       _processForPlainChareMsg(ck,env);
1202       _STATS_RECORD_PROCESS_MSG_1();
1203       break;
1204     case ForVidMsg   :
1205       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1206       ck->process();
1207       _processForVidMsg(ck,env);
1208       break;
1209     case FillVidMsg  :
1210       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1211       ck->process();
1212       _processFillVidMsg(ck,env);
1213       break;
1214     case DeleteVidMsg  :
1215       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1216       ck->process();
1217       _processDeleteVidMsg(ck,env);
1218       break;
1219
1220     default:
1221       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1222   }
1223 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1224         if(obj != NULL){
1225                 postProcessReceivedMessage(obj,sender,SN,entry);
1226         }
1227 #endif
1228
1229
1230 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1231   criticalPath_end();
1232   //  CkPrintf("STOP\n");
1233 #endif
1234
1235
1236 }
1237
1238
1239 /******************** Message Send **********************/
1240
1241 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1242              int *queueing, int *priobits, unsigned int **prioptr)
1243 {
1244   register envelope *env = (envelope *)converseMsg;
1245   *pfn = (CldPackFn)CkPackMessage;
1246   *len = env->getTotalsize();
1247   *queueing = env->getQueueing();
1248   *priobits = env->getPriobits();
1249   *prioptr = (unsigned int *) env->getPrioPtr();
1250 }
1251
1252 void CkPackMessage(envelope **pEnv)
1253 {
1254   register envelope *env = *pEnv;
1255   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1256     register void *msg = EnvToUsr(env);
1257     _TRACE_BEGIN_PACK();
1258     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1259     _TRACE_END_PACK();
1260     env=UsrToEnv(msg);
1261     env->setPacked(1);
1262     *pEnv = env;
1263   }
1264 }
1265
1266 void CkUnpackMessage(envelope **pEnv)
1267 {
1268   register envelope *env = *pEnv;
1269   register int msgIdx = env->getMsgIdx();
1270   if(env->isPacked()) {
1271     register void *msg = EnvToUsr(env);
1272     _TRACE_BEGIN_UNPACK();
1273     msg = _msgTable[msgIdx]->unpack(msg);
1274     _TRACE_END_UNPACK();
1275     env=UsrToEnv(msg);
1276     env->setPacked(0);
1277     *pEnv = env;
1278   }
1279 }
1280
1281 //There's no reason for most messages to go through the Cld--
1282 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1283 // Thus these accellerated versions of the Cld calls.
1284
1285 static int index_objectQHandler;
1286 int index_tokenHandler;
1287 int index_skipCldHandler;
1288
1289 void _skipCldHandler(void *converseMsg)
1290 {
1291   register envelope *env = (envelope *)(converseMsg);
1292   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1293 #if CMK_GRID_QUEUE_AVAILABLE
1294   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1295     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1296                        env, env->getQueueing (), env->getPriobits (),
1297                        (unsigned int *) env->getPrioPtr ());
1298   } else {
1299     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1300                        env, env->getQueueing (), env->getPriobits (),
1301                        (unsigned int *) env->getPrioPtr ());
1302   }
1303 #else
1304   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1305         env, env->getQueueing(),env->getPriobits(),
1306         (unsigned int *)env->getPrioPtr());
1307 #endif
1308 }
1309
1310
1311 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1312 // Made non-static to be used by ckmessagelogging
1313 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1314 {
1315 #if CMK_CHARMDEBUG
1316   if (!ConverseDeliver(pe)) {
1317     CmiFree(env);
1318     return;
1319   }
1320 #endif
1321   if(pe == CkMyPe() ){
1322     if(!CmiNodeAlive(CkMyPe())){
1323         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1324 //      return;
1325     }
1326   }
1327   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1328 #if CMK_OBJECT_QUEUE_AVAILABLE
1329     Chare *obj = CkFindObjectPtr(env);
1330     if (obj && obj->CkGetObjQueue().queue()) {
1331       _enqObjQueue(obj, env);
1332     }
1333     else
1334 #endif
1335     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1336         env, env->getQueueing(),env->getPriobits(),
1337         (unsigned int *)env->getPrioPtr());
1338   } else {
1339     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1340       CkPackMessage(&env);
1341     int len=env->getTotalsize();
1342     CmiSetXHandler(env,CmiGetHandler(env));
1343 #if CMK_OBJECT_QUEUE_AVAILABLE
1344     CmiSetHandler(env,index_objectQHandler);
1345 #else
1346     CmiSetHandler(env,index_skipCldHandler);
1347 #endif
1348     CmiSetInfo(env,infoFn);
1349     if (pe==CLD_BROADCAST) {
1350 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1351                         CmiSyncBroadcast(len, (char *)env);
1352 #else
1353                         CmiSyncBroadcastAndFree(len, (char *)env); 
1354 #endif
1355
1356 }
1357     else if (pe==CLD_BROADCAST_ALL) { 
1358 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1359                         CmiSyncBroadcastAll(len, (char *)env);
1360 #else
1361                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1362 #endif
1363
1364 }
1365     else{
1366 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1367                         CmiSyncSend(pe, len, (char *)env);
1368 #else
1369                         CmiSyncSendAndFree(pe, len, (char *)env);
1370 #endif
1371
1372                 }
1373   }
1374 }
1375
1376 #if CMK_BLUEGENE_CHARM
1377 #   define  _skipCldEnqueue   _CldEnqueue
1378 #endif
1379
1380 // by pass Charm++ priority queue, send as Converse message
1381 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1382 {
1383 #if CMK_CHARMDEBUG
1384   if (!ConverseDeliver(-1)) {
1385     CmiFree(env);
1386     return;
1387   }
1388 #endif
1389   CkPackMessage(&env);
1390   int len=env->getTotalsize();
1391   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1392 }
1393
1394 static void _noCldEnqueue(int pe, envelope *env)
1395 {
1396 /*
1397   if (pe == CkMyPe()) {
1398     CmiHandleMessage(env);
1399   } else
1400 */
1401 #if CMK_CHARMDEBUG
1402   if (!ConverseDeliver(pe)) {
1403     CmiFree(env);
1404     return;
1405   }
1406 #endif
1407   CkPackMessage(&env);
1408   int len=env->getTotalsize();
1409   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1410   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1411   else CmiSyncSendAndFree(pe, len, (char *)env);
1412 }
1413
1414 //static void _noCldNodeEnqueue(int node, envelope *env)
1415 //Made non-static to be used by ckmessagelogging
1416 void _noCldNodeEnqueue(int node, envelope *env)
1417 {
1418 /*
1419   if (node == CkMyNode()) {
1420     CmiHandleMessage(env);
1421   } else {
1422 */
1423 #if CMK_CHARMDEBUG
1424   if (!ConverseDeliver(node)) {
1425     CmiFree(env);
1426     return;
1427   }
1428 #endif
1429   CkPackMessage(&env);
1430   int len=env->getTotalsize();
1431   if (node==CLD_BROADCAST) { 
1432 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1433         CmiSyncNodeBroadcast(len, (char *)env);
1434 #else
1435         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1436 #endif
1437 }
1438   else if (node==CLD_BROADCAST_ALL) { 
1439 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1440                 CmiSyncNodeBroadcastAll(len, (char *)env);
1441 #else
1442                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1443 #endif
1444
1445 }
1446   else {
1447 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1448         CmiSyncNodeSend(node, len, (char *)env);
1449 #else
1450         CmiSyncNodeSendAndFree(node, len, (char *)env);
1451 #endif
1452   }
1453 }
1454
1455 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1456 {
1457   register envelope *env = UsrToEnv(msg);
1458   _CHECK_USED(env);
1459   _SET_USED(env, 1);
1460 #if CMK_REPLAYSYSTEM
1461   setEventID(env);
1462 #endif
1463   env->setMsgtype(ForChareMsg);
1464   env->setEpIdx(eIdx);
1465   env->setSrcPe(CkMyPe());
1466 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1467   criticalPath_send(env);
1468   automaticallySetMessagePriority(env);
1469 #endif
1470 #if CMK_CHARMDEBUG
1471   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1472 #endif
1473 #if CMK_OBJECT_QUEUE_AVAILABLE
1474   CmiSetHandler(env, index_objectQHandler);
1475 #else
1476   CmiSetHandler(env, _charmHandlerIdx);
1477 #endif
1478   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1479     register int pe = -(pCid->onPE+1);
1480     if(pe==CkMyPe()) {
1481 #ifndef CMK_CHARE_USE_PTR
1482       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1483 #else
1484       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1485 #endif
1486       void *objPtr;
1487       if (NULL!=(objPtr=vblk->getLocalChare()))
1488       { //A ready local chare
1489         env->setObjPtr(objPtr);
1490         return pe;
1491       }
1492       else { //The vidblock is not ready-- forget it
1493         vblk->send(env);
1494         return -1;
1495       }
1496     } else { //Valid vidblock for another PE:
1497       env->setMsgtype(ForVidMsg);
1498       env->setVidPtr(pCid->objPtr);
1499       return pe;
1500     }
1501   }
1502   else {
1503     env->setObjPtr(pCid->objPtr);
1504     return pCid->onPE;
1505   }
1506 }
1507
1508 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1509 {
1510   int destPE = _prepareMsg(eIdx, msg, pCid);
1511   if (destPE != -1) {
1512     register envelope *env = UsrToEnv(msg);
1513 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1514     criticalPath_send(env);
1515     automaticallySetMessagePriority(env);
1516 #endif
1517     CmiBecomeImmediate(env);
1518   }
1519   return destPE;
1520 }
1521
1522 extern "C"
1523 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1524 {
1525   if (opts & CK_MSG_INLINE) {
1526     CkSendMsgInline(entryIdx, msg, pCid, opts);
1527     return;
1528   }
1529 #if CMK_ERROR_CHECKING
1530   if (opts & CK_MSG_IMMEDIATE) {
1531     CmiAbort("Immediate message is not allowed in Chare!");
1532   }
1533 #endif
1534   register envelope *env = UsrToEnv(msg);
1535   int destPE=_prepareMsg(entryIdx,msg,pCid);
1536   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1537   // VidBlock was not yet filled). The problem is that the creation was never
1538   // traced later when the VidBlock was filled. One solution is to trace the
1539   // creation here, the other to trace it in VidBlock->msgDeliver().
1540   _TRACE_CREATION_1(env);
1541   if (destPE!=-1) {
1542     CpvAccess(_qd)->create();
1543     if (opts & CK_MSG_SKIP_OR_IMM)
1544       _noCldEnqueue(destPE, env);
1545     else
1546       _CldEnqueue(destPE, env, _infoIdx);
1547   }
1548   _TRACE_CREATION_DONE(1);
1549 }
1550
1551 extern "C"
1552 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1553 {
1554   if (pCid->onPE==CkMyPe())
1555   {
1556     if(!CmiNodeAlive(CkMyPe())){
1557         return;
1558     }
1559 #if CMK_CHARMDEBUG
1560     //Just in case we need to breakpoint or use the envelope in some way
1561     _prepareMsg(entryIndex,msg,pCid);
1562 #endif
1563                 //Just directly call the chare (skip QD handling & scheduler)
1564     register envelope *env = UsrToEnv(msg);
1565     if (env->isPacked()) CkUnpackMessage(&env);
1566     _STATS_RECORD_PROCESS_MSG_1();
1567     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1568   }
1569   else {
1570     //No way to inline a cross-processor message:
1571     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1572   }
1573 }
1574
1575 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1576 {
1577   register envelope *env = UsrToEnv(msg);
1578   CkNodeGroupID nodeRedMgr;
1579   _CHECK_USED(env);
1580   _SET_USED(env, 1);
1581 #if CMK_REPLAYSYSTEM
1582   setEventID(env);
1583 #endif
1584   env->setMsgtype(type);
1585   env->setEpIdx(eIdx);
1586   env->setGroupNum(gID);
1587   env->setSrcPe(CkMyPe());
1588 #if CMK_ERROR_CHECKING
1589   nodeRedMgr.setZero();
1590   env->setRednMgr(nodeRedMgr);
1591 #endif
1592 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1593   criticalPath_send(env);
1594   automaticallySetMessagePriority(env);
1595 #endif
1596 #if CMK_CHARMDEBUG
1597   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1598 #endif
1599   CmiSetHandler(env, _charmHandlerIdx);
1600   return env;
1601 }
1602
1603 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1604 {
1605   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1606 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1607   criticalPath_send(env);
1608   automaticallySetMessagePriority(env);
1609 #endif
1610   CmiBecomeImmediate(env);
1611   return env;
1612 }
1613
1614 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1615                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1616 {
1617   int numPes;
1618   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1619 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1620   sendTicketGroupRequest(env,pe,_infoIdx);
1621 #else
1622   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1623   _TRACE_CREATION_N(env, numPes);
1624   if (opts & CK_MSG_SKIP_OR_IMM)
1625     _noCldEnqueue(pe, env);
1626   else
1627     _skipCldEnqueue(pe, env, _infoIdx);
1628   _TRACE_CREATION_DONE(1);
1629 #endif
1630 }
1631
1632 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1633                            int npes, int *pes)
1634 {
1635   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1636   _TRACE_CREATION_MULTICAST(env, npes, pes);
1637   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1638   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1639 }
1640
1641 extern "C"
1642 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1643 {
1644 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1645   if (destPE==CkMyPe())
1646   {
1647     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1648     return;
1649   }
1650   //Can't inline-- send the usual way
1651   register envelope *env = UsrToEnv(msg);
1652   int numPes;
1653   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1654   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1655   _TRACE_CREATION_N(env, numPes);
1656   _noCldEnqueue(destPE, env);
1657   _STATS_RECORD_SEND_BRANCH_1();
1658   CkpvAccess(_coreState)->create();
1659   _TRACE_CREATION_DONE(1);
1660 #else
1661   // no support for immediate message, send inline
1662   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1663 #endif
1664 }
1665
1666 extern "C"
1667 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1668 {
1669   if (destPE==CkMyPe())
1670   {
1671     if(!CmiNodeAlive(CkMyPe())){
1672         return;
1673     }
1674     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1675     if (obj!=NULL)
1676     { //Just directly call the group:
1677 #if CMK_ERROR_CHECKING
1678       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1679 #else
1680       envelope *env=UsrToEnv(msg);
1681 #endif
1682       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1683       return;
1684     }
1685   }
1686   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1687   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1688 }
1689
1690 extern "C"
1691 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1692 {
1693   if (opts & CK_MSG_INLINE) {
1694     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1695     return;
1696   }
1697   if (opts & CK_MSG_IMMEDIATE) {
1698     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1699     return;
1700   }
1701   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1702   _STATS_RECORD_SEND_BRANCH_1();
1703   CkpvAccess(_coreState)->create();
1704 }
1705
1706 extern "C"
1707 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1708 {
1709 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1710   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1711   _TRACE_CREATION_MULTICAST(env, npes, pes);
1712   _noCldEnqueueMulti(npes, pes, env);
1713   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1714 #else
1715   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1716   CpvAccess(_qd)->create(-npes);
1717 #endif
1718   _STATS_RECORD_SEND_BRANCH_N(npes);
1719   CpvAccess(_qd)->create(npes);
1720 }
1721
1722 extern "C"
1723 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1724 {
1725   if (opts & CK_MSG_IMMEDIATE) {
1726     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1727     return;
1728   }
1729     // normal mesg
1730   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1731   _STATS_RECORD_SEND_BRANCH_N(npes);
1732   CpvAccess(_qd)->create(npes);
1733 }
1734
1735 extern "C"
1736 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1737 {
1738   int npes;
1739   int *pes;
1740   if (opts & CK_MSG_IMMEDIATE) {
1741     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1742     return;
1743   }
1744     // normal mesg
1745   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1746   CmiLookupGroup(grp, &npes, &pes);
1747   _TRACE_CREATION_MULTICAST(env, npes, pes);
1748   _CldEnqueueGroup(grp, env, _infoIdx);
1749   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1750   _STATS_RECORD_SEND_BRANCH_N(npes);
1751   CpvAccess(_qd)->create(npes);
1752 }
1753
1754 extern "C"
1755 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1756 {
1757   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1758   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1759   CpvAccess(_qd)->create(CkNumPes());
1760 }
1761
1762 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1763                 int node=CLD_BROADCAST_ALL, int opts=0)
1764 {
1765   int numPes;
1766   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1767 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1768         sendTicketNodeGroupRequest(env,node,_infoIdx);
1769 #else
1770   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1771   _TRACE_CREATION_N(env, numPes);
1772   if (opts & CK_MSG_SKIP_OR_IMM) {
1773     _noCldNodeEnqueue(node, env);
1774     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1775       CkpvAccess(_coreState)->create(-numPes);
1776     }
1777   }
1778   else
1779     _CldNodeEnqueue(node, env, _infoIdx);
1780   _TRACE_CREATION_DONE(1);
1781 #endif
1782 }
1783
1784 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1785                            int npes, int *nodes)
1786 {
1787   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1788   _TRACE_CREATION_N(env, npes);
1789   for (int i=0; i<npes; i++) {
1790     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1791   }
1792   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1793 }
1794
1795 extern "C"
1796 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1797 {
1798 #if CMK_IMMEDIATE_MSG
1799   if (node==CkMyNode())
1800   {
1801     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1802     return;
1803   }
1804   //Can't inline-- send the usual way
1805   register envelope *env = UsrToEnv(msg);
1806   int numPes;
1807   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1808   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1809   _TRACE_CREATION_N(env, numPes);
1810   _noCldNodeEnqueue(node, env);
1811   _STATS_RECORD_SEND_BRANCH_1();
1812   /* immeidate message is invisible to QD */
1813 //  CkpvAccess(_coreState)->create();
1814   _TRACE_CREATION_DONE(1);
1815 #else
1816   // no support for immediate message, send inline
1817   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1818 #endif
1819 }
1820
1821 extern "C"
1822 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1823 {
1824   if (node==CkMyNode())
1825   {
1826     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1827     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1828     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1829     if (obj!=NULL)
1830     { //Just directly call the group:
1831 #if CMK_ERROR_CHECKING
1832       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1833 #else
1834       envelope *env=UsrToEnv(msg);
1835 #endif
1836       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1837       return;
1838     }
1839   }
1840   //Can't inline-- send the usual way
1841   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1842 }
1843
1844 extern "C"
1845 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1846 {
1847   if (opts & CK_MSG_INLINE) {
1848     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1849     return;
1850   }
1851   if (opts & CK_MSG_IMMEDIATE) {
1852     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1853     return;
1854   }
1855   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1856   _STATS_RECORD_SEND_NODE_BRANCH_1();
1857   CkpvAccess(_coreState)->create();
1858 }
1859
1860 extern "C"
1861 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1862 {
1863 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1864   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1865   _noCldEnqueueMulti(npes, nodes, env);
1866 #else
1867   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1868   CpvAccess(_qd)->create(-npes);
1869 #endif
1870   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1871   CpvAccess(_qd)->create(npes);
1872 }
1873
1874 extern "C"
1875 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1876 {
1877   if (opts & CK_MSG_IMMEDIATE) {
1878     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1879     return;
1880   }
1881     // normal mesg
1882   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1883   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1884   CpvAccess(_qd)->create(npes);
1885 }
1886
1887 extern "C"
1888 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1889 {
1890   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1891   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1892   CpvAccess(_qd)->create(CkNumNodes());
1893 }
1894
1895 //Needed by delegation manager:
1896 extern "C"
1897 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1898 { return _prepareMsg(eIdx,msg,pCid); }
1899 extern "C"
1900 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1901 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1902 extern "C"
1903 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1904 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1905
1906 void _ckModuleInit(void) {
1907         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1908         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1909         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1910         CkpvInitialize(TokenPool*, _tokenPool);
1911         CkpvAccess(_tokenPool) = new TokenPool;
1912 }
1913
1914
1915 /************** Send: Arrays *************/
1916
1917 extern void CkArrayManagerInsert(int onPe,void *msg);
1918 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1919
1920 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1921 {
1922   _CHECK_USED(env);
1923   _SET_USED(env, 1);
1924   env->setMsgtype(type);
1925 #if CMK_CHARMDEBUG
1926   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1927 #endif
1928   CmiSetHandler(env, _charmHandlerIdx);
1929   CpvAccess(_qd)->create();
1930 }
1931
1932 extern "C"
1933 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1934   register envelope *env = UsrToEnv(msg);
1935   env->getsetArrayMgr()=aID;
1936   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1937   _CldEnqueue(pe, env, _infoIdx);
1938 }
1939
1940 extern "C"
1941 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1942   register envelope *env = UsrToEnv(msg);
1943   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1944 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1945    sendTicketArrayRequest(env,pe,_infoIdx);
1946 #else
1947   if (opts & CK_MSG_IMMEDIATE)
1948     CmiBecomeImmediate(env);
1949   if (opts & CK_MSG_SKIP_OR_IMM)
1950     _noCldEnqueue(pe, env);
1951   else
1952     _skipCldEnqueue(pe, env, _infoIdx);
1953 #endif
1954 }
1955
1956 class ElementDestroyer : public CkLocIterator {
1957 private:
1958         CkLocMgr *locMgr;
1959 public:
1960         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1961         void addLocation(CkLocation &loc) {
1962           loc.destroyAll();
1963         }
1964 };
1965
1966 void CkDeleteChares() {
1967   int i;
1968   int numGroups = CkpvAccess(_groupIDTable)->size();
1969
1970   // delete all plain chares
1971 #ifndef CMK_CHARE_USE_PTR
1972   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
1973         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
1974         delete obj;
1975         CkpvAccess(chare_objs)[i] = NULL;
1976   }
1977   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
1978         VidBlock *obj = CkpvAccess(vidblocks)[i];
1979         delete obj;
1980         CkpvAccess(vidblocks)[i] = NULL;
1981   }
1982 #endif
1983
1984   // delete all array elements
1985   for(i=0;i<numGroups;i++) {
1986     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1987     if(obj && obj->isLocMgr())  {
1988       CkLocMgr *mgr = (CkLocMgr*)obj;
1989       ElementDestroyer destroyer(mgr);
1990       mgr->iterate(destroyer);
1991     }
1992   }
1993
1994   // delete all groups
1995   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1996   for(i=0;i<numGroups;i++) {
1997     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1998     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1999     if (obj) delete obj;
2000   }
2001   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2002
2003   // delete all node groups
2004   if (CkMyRank() == 0) {
2005     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2006     for(i=0;i<numNodeGroups;i++) {
2007       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2008       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2009       if (obj) delete obj;
2010     }
2011   }
2012 }
2013
2014 //------------------- Message Watcher (record/replay) ----------------
2015
2016 #include "crc32.h"
2017
2018 CkpvDeclare(int, envelopeEventID);
2019 int _recplay_crc = 0;
2020 int _recplay_checksum = 0;
2021 int _recplay_logsize = 1024*1024;
2022
2023 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2024 #define REPLAYDEBUG(args) /* empty */
2025
2026 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2027
2028 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2029
2030 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2031
2032     int i;
2033     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2034     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2035     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2036     FILE *f=fopen(fName,permissions);
2037     REPLAYDEBUG("openReplayfile "<<fName);
2038     if (f==NULL) {
2039         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2040             CkMyPe(),fName,permissions);
2041         CkAbort("openReplayFile> Could not open replay file");
2042     }
2043     return f;
2044 }
2045
2046 #include "BaseLB.h" /* For LBMigrateMsg message */
2047
2048 class CkMessageRecorder : public CkMessageWatcher {
2049   char *buffer;
2050   unsigned int curpos;
2051   bool firstOpen;
2052 public:
2053   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2054   ~CkMessageRecorder() {
2055     flushLog(0);
2056     fprintf(f,"-1 -1 -1 ");
2057     fclose(f);
2058     delete[] buffer;
2059 #if 0
2060     FILE *stsfp = fopen("sts", "w");
2061     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2062     traceWriteSTS(stsfp, 0);
2063     fclose(stsfp);
2064 #endif
2065     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2066   }
2067
2068 private:
2069   void flushLog(int verbose=1) {
2070     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2071     fprintf(f, "%s", buffer);
2072     curpos=0;
2073   }
2074   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2075     if ((*envptr)->getEvent()) {
2076       char tmp[128];
2077       bool wasPacked = (*envptr)->isPacked();
2078       if (!wasPacked) CkPackMessage(envptr);
2079       envelope *env = *envptr;
2080       unsigned int crc1=0, crc2=0;
2081       if (_recplay_crc) {
2082         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2083         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2084         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2085       } else if (_recplay_checksum) {
2086         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2087         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2088       }
2089       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2090       if (curpos > _recplay_logsize-128) flushLog();
2091       if (!wasPacked) CkUnpackMessage(envptr);
2092     }
2093     return CmiTrue;
2094   }
2095   virtual CmiBool process(CthThreadToken *token,CkCoreState *ck) {
2096     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2097     if (curpos > _recplay_logsize-128) flushLog();
2098     return CmiTrue;
2099   }
2100   
2101   virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2102     FILE *f;
2103     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2104     else f = openReplayFile("ckreplay_",".lb","a");
2105     firstOpen = false;
2106     if (f != NULL) {
2107       PUP::toDisk p(f);
2108       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2109       (*msg)->pup(p);
2110       fclose(f);
2111     }
2112     return CmiTrue;
2113   }
2114 };
2115
2116 class CkMessageDetailRecorder : public CkMessageWatcher {
2117 public:
2118   CkMessageDetailRecorder(FILE *f_) {
2119     f=f_;
2120     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2121      * The value of 'x' is the pointer size.
2122      */
2123     CmiUInt2 little = sizeof(void*);
2124     fwrite(&little, 2, 1, f);
2125   }
2126   ~CkMessageDetailRecorder() {fclose(f);}
2127 private:
2128   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2129     bool wasPacked = (*envptr)->isPacked();
2130     if (!wasPacked) CkPackMessage(envptr);
2131     envelope *env = *envptr;
2132     CmiUInt4 size = env->getTotalsize();
2133     fwrite(&size, 4, 1, f);
2134     fwrite(env, env->getTotalsize(), 1, f);
2135     if (!wasPacked) CkUnpackMessage(envptr);
2136     return CmiTrue;
2137   }
2138 };
2139
2140 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2141 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2142
2143 #if CMK_BLUEGENE_CHARM
2144 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2145                                    int pb,unsigned int *prio);
2146 #endif
2147
2148 class CkMessageReplay : public CkMessageWatcher {
2149   int counter;
2150         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2151         int nextEP;
2152         unsigned int crc1, crc2;
2153         FILE *lbFile;
2154         /// Read the next message we need from the file:
2155         void getNext(void) {
2156           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2157           if (nextSize > 0) {
2158             // We are reading a regular message
2159             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2160               CkAbort("CkMessageReplay> Syntax error reading replay file");
2161             }
2162             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2163           } else if (nextSize == -2) {
2164             // We are reading a special message (right now only thread awaken)
2165             // Nothing to do since we have already read all info
2166             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2167           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2168             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2169             CkAbort("CkMessageReplay> Unrecognized input");
2170           }
2171             /*
2172                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2173                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2174                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2175                 }
2176                 */
2177                 counter++;
2178         }
2179         /// If this is the next message we need, advance and return CmiTrue.
2180         CmiBool isNext(envelope *env) {
2181                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2182                 if (nextEvent!=env->getEvent()) return CmiFalse;
2183                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2184 #if 1
2185                 if (nextEP != env->getEpIdx()) {
2186                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2187                         return CmiFalse;
2188                 }
2189 #endif
2190 #if ! CMK_BLUEGENE_CHARM
2191                 if (nextSize!=env->getTotalsize())
2192                 {
2193                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2194                         return CmiFalse;
2195                 }
2196                 if (_recplay_crc || _recplay_checksum) {
2197                   bool wasPacked = env->isPacked();
2198                   if (!wasPacked) CkPackMessage(&env);
2199                   if (_recplay_crc) {
2200                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2201                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2202                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2203                     if (crcnew1 != crc1) {
2204                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2205                     }
2206                     if (crcnew2 != crc2) {
2207                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2208                     }
2209                   } else if (_recplay_checksum) {
2210             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2211             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2212             if (crcnew1 != crc1) {
2213               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2214             }
2215             if (crcnew2 != crc2) {
2216               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2217             }               
2218                   }
2219                   if (!wasPacked) CkUnpackMessage(&env);
2220                 }
2221 #endif
2222                 return CmiTrue;
2223         }
2224         CmiBool isNext(CthThreadToken *token) {
2225           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2226           return CmiFalse;
2227         }
2228
2229         /// This is a (short) list of messages we aren't yet ready for:
2230         CkQ<envelope *> delayedMessages;
2231         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2232         CkQ<CthThreadToken *> delayedTokens;
2233
2234         /// Try to flush out any delayed messages
2235         void flush(void) {
2236           if (nextSize>0) {
2237                 int len=delayedMessages.length();
2238                 for (int i=0;i<len;i++) {
2239                         envelope *env=delayedMessages.deq();
2240                         if (isNext(env)) { /* this is the next message: process it */
2241                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2242                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2243                                 return;
2244                         }
2245                         else /* Not ready yet-- put it back in the
2246                                 queue */
2247                           {
2248                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2249                                 delayedMessages.enq(env);
2250                           }
2251                 }
2252           } else if (nextSize==-2) {
2253             int len=delayedTokens.length();
2254             for (int i=0;i<len;++i) {
2255               CthThreadToken *token=delayedTokens.deq();
2256               if (isNext(token)) {
2257             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2258 #if ! CMK_BLUEGENE_CHARM
2259                 CsdEnqueueLifo((void*)token);
2260 #else
2261                 CthEnqueueBigSimThread(token,0,0,NULL);
2262 #endif
2263                 return;
2264               } else {
2265             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2266                 delayedTokens.enq(token);
2267               }
2268             }
2269           }
2270         }
2271
2272 public:
2273         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2274           counter=0;
2275           f=f_;
2276           getNext();
2277           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2278           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2279         }
2280         ~CkMessageReplay() {fclose(f);}
2281
2282 private:
2283         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2284           bool wasPacked = (*envptr)->isPacked();
2285           if (!wasPacked) CkPackMessage(envptr);
2286           envelope *env = *envptr;
2287           //CkAssert(*(int*)env == 0x34567890);
2288           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2289                 if (env->getEvent() == 0) return CmiTrue;
2290                 if (isNext(env)) { /* This is the message we were expecting */
2291                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2292                         getNext(); /* Advance over this message */
2293                         flush(); /* try to process queued-up stuff */
2294                         if (!wasPacked) CkUnpackMessage(envptr);
2295                         return CmiTrue;
2296                 }
2297 #if CMK_SMP
2298                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2299                          // try next rank, we can't just buffer the msg and left
2300                          // we need to keep unprocessed msg on the fly
2301                         int nextpe = CkMyPe()+1;
2302                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2303                         nextpe = CkNodeFirst(CkMyNode());
2304                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2305                         return CmiFalse;
2306                 }
2307 #endif
2308                 else /*!isNext(env) */ {
2309                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2310                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2311                         delayedMessages.enq(env);
2312                         flush();
2313                         return CmiFalse;
2314                 }
2315         }
2316         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {
2317       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2318           if (isNext(token)) {
2319         REPLAYDEBUG("Executing token: "<<token->serialNo)
2320             getNext();
2321             flush();
2322             return CmiTrue;
2323           } else {
2324         REPLAYDEBUG("Queueing token: "<<token->serialNo
2325             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2326             delayedTokens.enq(token);
2327             return CmiFalse;
2328           }
2329         }
2330
2331         virtual CmiBool process(LBMigrateMsg **msg,CkCoreState *ck) {
2332           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2333           if (lbFile != NULL) {
2334             int num_moves;
2335         PUP::fromDisk p(lbFile);
2336             p | num_moves;
2337             if (num_moves != (*msg)->n_moves) {
2338               delete *msg;
2339               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2340             }
2341             (*msg)->pup(p);
2342           }
2343           return CmiTrue;
2344         }
2345 };
2346
2347 class CkMessageDetailReplay : public CkMessageWatcher {
2348   void *getNext() {
2349     CmiUInt4 size; size_t nread;
2350     if ((nread=fread(&size, 4, 1, f)) < 1) {
2351       if (feof(f)) return NULL;
2352       CkPrintf("Broken record file (metadata) got %d\n",nread);
2353       CkAbort("");
2354     }
2355     void *env = CmiAlloc(size);
2356     long tell = ftell(f);
2357     if ((nread=fread(env, size, 1, f)) < 1) {
2358       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2359       CkAbort("");
2360     }
2361     //*(int*)env = 0x34567890; // set first integer as magic
2362     return env;
2363   }
2364 public:
2365   double starttime;
2366   CkMessageDetailReplay(FILE *f_) {
2367     f=f_;
2368     starttime=CkWallTimer();
2369     /* This must match what CkMessageDetailRecorder did */
2370     CmiUInt2 little;
2371     fread(&little, 2, 1, f);
2372     if (little != sizeof(void*)) {
2373       CkAbort("Replaying on a different architecture from which recording was done!");
2374     }
2375
2376     CsdEnqueue(getNext());
2377
2378     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2379   }
2380   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2381     void *msg = getNext();
2382     if (msg != NULL) CsdEnqueue(msg);
2383     return CmiTrue;
2384   }
2385 };
2386
2387 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2388 #if ! CMK_BLUEGENE_CHARM
2389   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2390 #endif
2391   CkMessageReplay *replay = (CkMessageReplay*)rep;
2392   //CmiStartQD(CkMessageReplayQuiescence, replay);
2393 }
2394
2395 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2396   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2397   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2398   ConverseExit();
2399 }
2400
2401 static CmiBool CpdExecuteThreadResume(CthThreadToken *token) {
2402   CkCoreState *ck = CkpvAccess(_coreState);
2403   if (ck->watcher!=NULL) {
2404     return ck->watcher->processThread(token,ck);
2405   }
2406   return CmiTrue;
2407 }
2408
2409 CpvCExtern(int, CthResumeNormalThreadIdx);
2410 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2411 {
2412   CthThread t = token->thread;
2413
2414   if(t == NULL){
2415     free(token);
2416     return;
2417   }
2418 #if CMK_TRACE_ENABLED
2419 #if ! CMK_TRACE_IN_CHARM
2420   if(CpvAccess(traceOn))
2421     CthTraceResume(t);
2422 /*    if(CpvAccess(_traceCoreOn)) 
2423             resumeTraceCore();*/
2424 #endif
2425 #endif
2426   
2427   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2428   if (CpdExecuteThreadResume(token)) {
2429     CthResume(t);
2430   }
2431 }
2432
2433 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2434   CkCoreState *ck = CkpvAccess(_coreState);
2435   if (ck->watcher!=NULL) {
2436     ck->watcher->processLBMessage(msg, ck);
2437   }
2438 }
2439
2440 #if CMK_BLUEGENE_CHARM
2441 CpvExtern(int      , CthResumeBigSimThreadIdx);
2442 #endif
2443
2444 #include "ckliststring.h"
2445 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2446     CmiArgGroup("Charm++","Record/Replay");
2447     CmiBool forceReplay = CmiFalse;
2448     char *procs = NULL;
2449     _replaySystem = 0;
2450     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2451       _recplay_crc = 1;
2452     }
2453     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2454       _recplay_checksum = 1;
2455     }
2456     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2457     REPLAYDEBUG("CkMessageWatcherInit ");
2458     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2459         CkListString list(procs);
2460         if (list.includes(CkMyPe())) {
2461           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2462           CpdSetInitializeMemory(1);
2463           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2464         }
2465     }
2466     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2467       if (CkMyPe() == 0) {
2468         CmiPrintf("Charm++> record mode.\n");
2469         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2470           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2471           _recplay_crc = _recplay_checksum = 0;
2472         }
2473       }
2474       CpdSetInitializeMemory(1);
2475       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2476       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2477     }
2478         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2479             forceReplay = CmiTrue;
2480             CpdSetInitializeMemory(1);
2481             // Set the parameters of the processor
2482 #if CMK_SHARED_VARS_UNAVAILABLE
2483             _Cmi_mype = atoi(procs);
2484             while (procs[0]!='/') procs++;
2485             procs++;
2486             _Cmi_numpes = atoi(procs);
2487 #else
2488             CkAbort("+replay-detail available only for non-SMP build");
2489 #endif
2490             _replaySystem = 1;
2491             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2492         }
2493         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2494           if (CkMyPe() == 0)  {
2495             CmiPrintf("Charm++> replay mode.\n");
2496             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2497               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2498               _recplay_crc = _recplay_checksum = 0;
2499             }
2500           }
2501           CpdSetInitializeMemory(1);
2502 #if ! CMK_BLUEGENE_CHARM
2503           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2504 #else
2505           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2506 #endif
2507           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2508         }
2509         if (_recplay_crc && _recplay_checksum) {
2510           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2511         }
2512 }
2513
2514 extern "C"
2515 int CkMessageToEpIdx(void *msg) {
2516         envelope *env=UsrToEnv(msg);
2517         int ep=env->getEpIdx();
2518         if (ep==CkIndex_CkArray::recvBroadcast(0))
2519                 return env->getsetArrayBcastEp();
2520         else
2521                 return ep;
2522 }
2523
2524 extern "C"
2525 int getCharmEnvelopeSize() {
2526   return sizeof(envelope);
2527 }
2528
2529
2530 #include "CkMarshall.def.h"
2531