commit a lot of unwanted changes.
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 #include <map>
30 CkpvDeclare(CkVec<void *>, chare_objs);
31 CkpvDeclare(CkVec<int>, chare_types);
32 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
33
34 typedef std::map<int, CkChareID>  Vidblockmap;
35 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
36 CkpvDeclare(int, currentChareIdx);
37 #endif
38
39
40 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
41
42 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
43
44 int CMessage_CkMessage::__idx=-1;
45 int CMessage_CkArgMsg::__idx=0;
46 int CkIndex_Chare::__idx;
47 int CkIndex_Group::__idx;
48 int CkIndex_ArrayBase::__idx=-1;
49
50 extern int _defaultObjectQ;
51
52 void _initChareTables()
53 {
54 #ifndef CMK_CHARE_USE_PTR
55           /* chare and vidblock table */
56   CkpvInitialize(CkVec<void *>, chare_objs);
57   CkpvInitialize(CkVec<int>, chare_types);
58   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
59   CkpvInitialize(Vidblockmap, vmap);
60   CkpvInitialize(int, currentChareIdx);
61   CkpvAccess(currentChareIdx) = -1;
62 #endif
63 }
64
65 //Charm++ virtual functions: declaring these here results in a smaller executable
66 Chare::Chare(void) {
67   thishandle.onPE=CkMyPe();
68   thishandle.objPtr=this;
69 #ifndef CMK_CHARE_USE_PTR
70      // for plain chare, objPtr is actually the index to chare obj table
71   if (CpvAccess(currentChareIdx) >= 0) {
72     thishandle.objPtr=(void*)CpvAccess(currentChareIdx);
73   }
74   chareIdx = CkpvAccess(currentChareIdx);
75 #endif
76 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
77   mlogData = new ChareMlogData();
78   mlogData->objID.type = TypeChare;
79   mlogData->objID.data.chare.id = thishandle;
80 #endif
81 #if CMK_OBJECT_QUEUE_AVAILABLE
82   if (_defaultObjectQ)  CkEnableObjQ();
83 #endif
84 }
85
86 Chare::Chare(CkMigrateMessage* m) {
87   thishandle.onPE=CkMyPe();
88   thishandle.objPtr=this;
89
90 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
91         mlogData = NULL;
92 #endif
93
94 #if CMK_OBJECT_QUEUE_AVAILABLE
95   if (_defaultObjectQ)  CkEnableObjQ();
96 #endif
97 }
98
99 void Chare::CkEnableObjQ()
100 {
101 #if CMK_OBJECT_QUEUE_AVAILABLE
102   objQ.create();
103 #endif
104 }
105
106 Chare::~Chare() {
107 #ifndef CMK_CHARE_USE_PTR
108 /*
109   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
110 */
111   if (chareIdx != -1)
112   {
113     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
114     CkpvAccess(chare_objs)[chareIdx] = NULL;
115     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
116     if (iter != CkpvAccess(vmap).end()) {
117       register CkChareID *pCid = (CkChareID *)
118         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
119       int srcPe = iter->second.onPE;
120       *pCid = iter->second;
121       register envelope *ret = UsrToEnv(pCid);
122       ret->setVidPtr(iter->second.objPtr);
123       ret->setSrcPe(CkMyPe());
124       CmiSetHandler(ret, _charmHandlerIdx);
125       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
126       CpvAccess(_qd)->create();
127       CkpvAccess(vmap).erase(iter);
128     }
129   }
130 #endif
131 }
132
133 void Chare::pup(PUP::er &p)
134 {
135   p(thishandle.onPE);
136   thishandle.objPtr=(void *)this;
137 #ifndef CMK_CHARE_USE_PTR
138   p(chareIdx);
139   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
140 #endif
141 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
142         if(p.isUnpacking()){
143                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
144                 mlogData = new ChareMlogData();
145         }
146         mlogData->pup(p);
147 #endif
148 }
149
150 int Chare::ckGetChareType() const {
151   return -3;
152 }
153 char *Chare::ckDebugChareName(void) {
154   char buf[100];
155   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
156   return strdup(buf);
157 }
158 int Chare::ckDebugChareID(char *str, int limit) {
159   // pure chares for now do not have a valid ID
160   str[0] = 0;
161   return 1;
162 }
163 void Chare::ckDebugPup(PUP::er &p) {
164   pup(p);
165 }
166
167 /// This method is called before starting a [threaded] entry method.
168 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
169   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
170   traceAddThreadListeners(th, UsrToEnv(msg));
171 }
172
173
174 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
175   p.comment("Bytes");
176   int ts=UsrToEnv(msg)->getTotalsize();
177   int msgLen=ts-sizeof(envelope);
178   if (msgLen>0)
179     p((char*)msg,msgLen);
180 }
181
182 IrrGroup::IrrGroup(void) {
183   thisgroup = CkpvAccess(_currentGroup);
184 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
185         mlogData->objID.type = TypeGroup;
186         mlogData->objID.data.group.id = thisgroup;
187         mlogData->objID.data.group.onPE = CkMyPe();
188 #endif
189 }
190
191 IrrGroup::~IrrGroup() {
192   // remove the object pointer
193   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
194   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
195   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
196 }
197
198 void IrrGroup::pup(PUP::er &p)
199 {
200   Chare::pup(p);
201   p|thisgroup;
202 }
203
204 int IrrGroup::ckGetChareType() const {
205   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
206 }
207
208 int IrrGroup::ckDebugChareID(char *str, int limit) {
209   if (limit<5) return -1;
210   str[0] = 1;
211   *((int*)&str[1]) = thisgroup.idx;
212   return 5;
213 }
214
215 char *IrrGroup::ckDebugChareName() {
216   return strdup(_chareTable[ckGetChareType()]->name);
217 }
218
219 void IrrGroup::ckJustMigrated(void)
220 {
221 }
222
223 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
224   /* FIXME: **CW** not entirely sure what we should do here yet */
225 }
226
227 void Group::CkAddThreadListeners(CthThread th, void *msg) {
228   Chare::CkAddThreadListeners(th, msg);
229   CthSetThreadID(th, thisgroup.idx, 0, 0);
230 }
231
232 void Group::pup(PUP::er &p)
233 {
234   CkReductionMgr::pup(p);
235   reductionInfo.pup(p);
236 }
237
238 /**** Delegation Manager Group */
239 CkDelegateMgr::~CkDelegateMgr() { }
240
241 //Default delegator implementation: do not delegate-- send directly
242 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
243   { CkSendMsg(ep,m,c); }
244 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
245   { CkSendMsgBranch(ep,m,onPE,g); }
246 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
247   { CkBroadcastMsgBranch(ep,m,g); }
248 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
249   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
250 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
251   { CkSendMsgNodeBranch(ep,m,onNode,g); }
252 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
253   { CkBroadcastMsgNodeBranch(ep,m,g); }
254 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
255   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
256 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
257 {
258         CProxyElement_ArrayBase ap(a,idx);
259         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
260 }
261 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
262 {
263         CProxyElement_ArrayBase ap(a,idx);
264         ap.ckSend((CkArrayMessage *)m,ep);
265 }
266 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
267 {
268         CProxy_ArrayBase ap(a);
269         ap.ckBroadcast((CkArrayMessage *)m,ep);
270 }
271
272 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
273 {
274         CmiAbort("ArraySectionSend is not implemented!\n");
275 /*
276         CProxyElement_ArrayBase ap(a,idx);
277         ap.ckSend((CkArrayMessage *)m,ep);
278 */
279 }
280
281 /*** Proxy <-> delegator communication */
282 CkDelegateData::~CkDelegateData() {}
283
284 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
285   return pd; // default implementation ignores pup call
286 }
287
288 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
289    this tricky manual reference counting business... */
290
291 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
292         if (dPtr) dPtr->ref();
293         ckUndelegate();
294         delegatedMgr = dTo;
295         delegatedPtr = dPtr;
296 }
297 void CProxy::ckUndelegate(void) {
298         delegatedMgr=NULL;
299         if (delegatedPtr) delegatedPtr->unref();
300         delegatedPtr=NULL;
301 }
302
303 /// Copy constructor
304 CProxy::CProxy(const CProxy &src)
305     :delegatedMgr(src.delegatedMgr)
306 {
307     delegatedPtr = NULL;
308     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
309         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
310 }
311
312 /// Assignment operator
313 CProxy& CProxy::operator=(const CProxy &src) {
314         CkDelegateData *oldPtr=delegatedPtr;
315         ckUndelegate();
316         delegatedMgr=src.delegatedMgr;
317
318         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
319             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
320         else
321             delegatedPtr = NULL;
322
323         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
324         if (oldPtr) oldPtr->unref();
325         return *this;
326 }
327
328 void CProxy::pup(PUP::er &p) {
329       CkGroupID delegatedTo;
330       delegatedTo.setZero();
331       int isNodeGroup = 0;
332       if (!p.isUnpacking()) {
333         if (delegatedMgr) {
334           delegatedTo = delegatedMgr->CkGetGroupID();
335           isNodeGroup = delegatedMgr->isNodeGroup();
336         }
337       }
338       p|delegatedTo;
339       if (!delegatedTo.isZero()) {
340         p|isNodeGroup;
341         if (p.isUnpacking()) {
342           if (isNodeGroup)
343                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
344           else
345                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
346         }
347
348         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
349         if (p.isUnpacking() && delegatedPtr)
350             delegatedPtr->ref();
351       }
352 }
353
354 /**** Array sections */
355 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
356 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
357   _cookie.aid = aid;    \
358   _cookie.get_pe() = CkMyPe();  \
359   _elems = new CkArrayIndexMax[nElems]; \
360   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
361   pelist = NULL;        \
362   npes  = 0;    \
363 }
364
365 CKSECTIONID_CONSTRUCTOR_DEF(1D)
366 CKSECTIONID_CONSTRUCTOR_DEF(2D)
367 CKSECTIONID_CONSTRUCTOR_DEF(3D)
368 CKSECTIONID_CONSTRUCTOR_DEF(4D)
369 CKSECTIONID_CONSTRUCTOR_DEF(5D)
370 CKSECTIONID_CONSTRUCTOR_DEF(6D)
371 CKSECTIONID_CONSTRUCTOR_DEF(Max)
372
373 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
374   pelist = new int[npes];
375   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
376   _cookie.aid = gid;
377 }
378
379 CkSectionID::CkSectionID(const CkSectionID &sid) {
380   int i;
381   _cookie = sid._cookie;
382   _nElems = sid._nElems;
383   if (_nElems > 0) {
384     _elems = new CkArrayIndexMax[_nElems];
385     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
386   } else _elems = NULL;
387   npes = sid.npes;
388   if (npes > 0) {
389     pelist = new int[npes];
390     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
391   } else pelist = NULL;
392 }
393
394 void CkSectionID::operator=(const CkSectionID &sid) {
395   int i;
396   _cookie = sid._cookie;
397   _nElems = sid._nElems;
398   if (_nElems > 0) {
399     _elems = new CkArrayIndexMax[_nElems];
400     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
401   } else _elems = NULL;
402   npes = sid.npes;
403   if (npes > 0) {
404     pelist = new int[npes];
405     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
406   } else pelist = NULL;
407 }
408
409 void CkSectionID::pup(PUP::er &p) {
410     p | _cookie;
411     p(_nElems);
412     if (_nElems > 0) {
413       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
414       for (int i=0; i< _nElems; i++) p | _elems[i];
415       npes = 0;
416       pelist = NULL;
417     } else {
418       // If _nElems is zero, than this section describes processors instead of array elements
419       _elems = NULL;
420       p(npes);
421       if (p.isUnpacking()) pelist = new int[npes];
422       p(pelist, npes);
423     }
424 }
425
426 /**** Tiny random API routines */
427
428 #ifdef CMK_CUDA
429 void CUDACallbackManager(void *fn) {
430   if (fn != NULL) {
431     CkCallback *cb = (CkCallback*) fn;
432     cb->send();
433   }
434 }
435
436 #endif
437
438 extern "C"
439 void CkSetRefNum(void *msg, int ref)
440 {
441   UsrToEnv(msg)->setRef(ref);
442 }
443
444 extern "C"
445 int CkGetRefNum(void *msg)
446 {
447   return UsrToEnv(msg)->getRef();
448 }
449
450 extern "C"
451 int CkGetSrcPe(void *msg)
452 {
453   return UsrToEnv(msg)->getSrcPe();
454 }
455
456 extern "C"
457 int CkGetSrcNode(void *msg)
458 {
459   return CmiNodeOf(CkGetSrcPe(msg));
460 }
461
462 extern "C"
463 void *CkLocalBranch(CkGroupID gID) {
464   return _localBranch(gID);
465 }
466
467 static
468 void *_ckLocalNodeBranch(CkGroupID groupID) {
469   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
470   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
471   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
472   return retval;
473 }
474
475 extern "C"
476 void *CkLocalNodeBranch(CkGroupID groupID)
477 {
478   void *retval;
479   // we are called in a constructor
480   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
481     return CkpvAccess(_currentNodeGroupObj);
482   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
483   { // Nodegroup hasn't finished being created yet-- schedule...
484     CsdScheduler(0);
485   }
486   return retval;
487 }
488
489 extern "C"
490 void *CkLocalChare(const CkChareID *pCid)
491 {
492         int pe=pCid->onPE;
493         if (pe<0) { //A virtual chare ID
494                 if (pe!=(-(CkMyPe()+1)))
495                         return NULL;//VID block not on this PE
496                 VidBlock *v=(VidBlock *)pCid->objPtr;
497                 return v->getLocalChare();
498         }
499         else
500         { //An ordinary chare ID
501                 if (pe!=CkMyPe())
502                         return NULL;//Chare not on this PE
503                 return pCid->objPtr;
504         }
505 }
506
507 CkpvDeclare(char **,Ck_argv);
508 extern "C" char **CkGetArgv(void) {
509         return CkpvAccess(Ck_argv);
510 }
511 extern "C" int CkGetArgc(void) {
512         return CmiGetArgc(CkpvAccess(Ck_argv));
513 }
514
515 /******************** Basic support *****************/
516 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
517 {
518 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
519         CpvAccess(_currentObj) = (Chare *)obj;
520 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
521 #endif
522   //BIGSIM_OOC DEBUGGING
523   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
524   //fflush(stdout);
525 #if CMK_CHARMDEBUG
526   CpdBeforeEp(epIdx, obj, msg);
527 #endif
528   _entryTable[epIdx]->call(msg, obj);
529 #if CMK_CHARMDEBUG
530   CpdAfterEp(epIdx);
531 #endif
532   if (_entryTable[epIdx]->noKeep)
533   { /* Method doesn't keep/delete the message, so we have to: */
534     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
535   }
536 }
537 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
538 {
539   //BIGSIM_OOC DEBUGGING
540   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
541   //fflush(stdout);
542
543   void *deliverMsg;
544 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
545         CpvAccess(_currentObj) = (Chare *)obj;
546 #endif
547   if (_entryTable[epIdx]->noKeep)
548   { /* Deliver a read-only copy of the message */
549     deliverMsg=(void *)msg;
550   } else
551   { /* Method needs a copy of the message to keep/delete */
552     void *oldMsg=(void *)msg;
553     deliverMsg=CkCopyMsg(&oldMsg);
554 #if CMK_ERROR_CHECKING
555     if (oldMsg!=msg)
556       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
557 #endif
558   }
559 #if CMK_CHARMDEBUG
560   CpdBeforeEp(epIdx, obj, (void*)msg);
561 #endif
562   _entryTable[epIdx]->call(deliverMsg, obj);
563 #if CMK_CHARMDEBUG
564   CpdAfterEp(epIdx);
565 #endif
566 }
567
568 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
569 {
570   register void *msg = EnvToUsr(env);
571   _SET_USED(env, 0);
572   CkDeliverMessageFree(epIdx,msg,obj);
573 }
574
575 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
576 {
577
578 #if CMK_TRACE_ENABLED 
579   if (_entryTable[epIdx]->traceEnabled) {
580     _TRACE_BEGIN_EXECUTE(env);
581     _invokeEntryNoTrace(epIdx,env,obj);
582     _TRACE_END_EXECUTE();
583   }
584   else
585 #endif
586     _invokeEntryNoTrace(epIdx,env,obj);
587
588 }
589
590 /********************* Creation ********************/
591
592 extern "C"
593 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
594 {
595   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
596   envelope *env = UsrToEnv(msg);
597   _CHECK_USED(env);
598   if(pCid == 0) {
599     env->setMsgtype(NewChareMsg);
600   } else {
601     pCid->onPE = (-(CkMyPe()+1));
602     //  pCid->magic = _GETIDX(cIdx);
603     pCid->objPtr = (void *) new VidBlock();
604     _MEMCHECK(pCid->objPtr);
605     env->setMsgtype(NewVChareMsg);
606     env->setVidPtr(pCid->objPtr);
607 #ifndef CMK_CHARE_USE_PTR
608     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
609     int idx = CkpvAccess(vidblocks).size()-1;
610     pCid->objPtr = (void *)idx;
611     env->setVidPtr((void *)idx);
612 #endif
613   }
614   env->setEpIdx(eIdx);
615   env->setSrcPe(CkMyPe());
616   CmiSetHandler(env, _charmHandlerIdx);
617   _TRACE_CREATION_1(env);
618   CpvAccess(_qd)->create();
619   _STATS_RECORD_CREATE_CHARE_1();
620   _SET_USED(env, 1);
621   if(destPE == CK_PE_ANY)
622     env->setForAnyPE(1);
623   else
624     env->setForAnyPE(0);
625   _CldEnqueue(destPE, env, _infoIdx);
626   _TRACE_CREATION_DONE(1);
627 }
628
629 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
630 {
631   register int gIdx = _entryTable[epIdx]->chareIdx;
632   register void *obj = malloc(_chareTable[gIdx]->size);
633   _MEMCHECK(obj);
634   setMemoryTypeChare(obj);
635   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
636   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
637   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
638   CkpvAccess(_groupIDTable)->push_back(groupID);
639   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
640   if(ptrq) {
641     void *pending;
642     while((pending=ptrq->deq())!=0)
643       _CldEnqueue(CkMyPe(), pending, _infoIdx);
644 //    delete ptrq;
645       CkpvAccess(_groupTable)->find(groupID).clearPending();
646   }
647   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
648
649   CkpvAccess(_currentGroup) = groupID;
650   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
651 #ifndef CMK_CHARE_USE_PTR
652   //((Chare *)obj)->chareIdx = -1;
653   CkpvAccess(currentChareIdx) = -1;
654 #endif
655   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
656   _STATS_RECORD_PROCESS_GROUP_1();
657 }
658
659 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
660 {
661   register int gIdx = _entryTable[epIdx]->chareIdx;
662   int objSize=_chareTable[gIdx]->size;
663   register void *obj = malloc(objSize);
664   _MEMCHECK(obj);
665   setMemoryTypeChare(obj);
666   CkpvAccess(_currentGroup) = groupID;
667
668 // Now that the NodeGroup is created, add it to the table.
669 //  NodeGroups can be accessed by multiple processors, so
670 //  this is in the opposite order from groups - invoking the constructor
671 //  before registering it.
672 // User may call CkLocalNodeBranch() inside the nodegroup constructor
673 //  store nodegroup into _currentNodeGroupObj
674   CkpvAccess(_currentNodeGroupObj) = obj;
675 #ifndef CMK_CHARE_USE_PTR
676   //((Chare *)obj)->chareIdx = -1;
677   CkpvAccess(currentChareIdx) = -1;
678 #endif
679   _invokeEntryNoTrace(epIdx,env,obj);
680   CkpvAccess(_currentNodeGroupObj) = NULL;
681   _STATS_RECORD_PROCESS_NODE_GROUP_1();
682
683   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
684   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
685   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
686   CksvAccess(_nodeGroupIDTable).push_back(groupID);
687
688   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
689   if(ptrq) {
690     void *pending;
691     while((pending=ptrq->deq())!=0)
692       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
693 //    delete ptrq;
694       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
695   }
696   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
697 }
698
699 void _createGroup(CkGroupID groupID, envelope *env)
700 {
701   _CHECK_USED(env);
702   _SET_USED(env, 1);
703   register int epIdx = env->getEpIdx();
704   int gIdx = _entryTable[epIdx]->chareIdx;
705   CkNodeGroupID rednMgr;
706   if(_chareTable[gIdx]->isIrr == 0){
707                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
708                 rednMgr = rednMgrProxy;
709 //              rednMgrProxy.setAttachedGroup(groupID);
710   }else{
711         rednMgr.setZero();
712   }
713   env->setGroupNum(groupID);
714   env->setSrcPe(CkMyPe());
715   env->setRednMgr(rednMgr);
716   env->setGroupEpoch(CkpvAccess(_charmEpoch));
717
718   if(CkNumPes()>1) {
719     CkPackMessage(&env);
720     CmiSetHandler(env, _bocHandlerIdx);
721     _numInitMsgs++;
722     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
723     CpvAccess(_qd)->create(CkNumPes()-1);
724     CkUnpackMessage(&env);
725   }
726   _STATS_RECORD_CREATE_GROUP_1();
727   CkCreateLocalGroup(groupID, epIdx, env);
728 }
729
730 void _createNodeGroup(CkGroupID groupID, envelope *env)
731 {
732   _CHECK_USED(env);
733   _SET_USED(env, 1);
734   register int epIdx = env->getEpIdx();
735   env->setGroupNum(groupID);
736   env->setSrcPe(CkMyPe());
737   env->setGroupEpoch(CkpvAccess(_charmEpoch));
738   if(CkNumNodes()>1) {
739     CkPackMessage(&env);
740     CmiSetHandler(env, _bocHandlerIdx);
741     _numInitMsgs++;
742     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
743     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
744     CpvAccess(_qd)->create(CkNumNodes()-1);
745     CkUnpackMessage(&env);
746   }
747   _STATS_RECORD_CREATE_NODE_GROUP_1();
748   CkCreateLocalNodeGroup(groupID, epIdx, env);
749 }
750
751 // new _groupCreate
752
753 static CkGroupID _groupCreate(envelope *env)
754 {
755   register CkGroupID groupNum;
756
757   // check CkMyPe(). if it is 0 then idx is _numGroups++
758   // if not, then something else...
759   if(CkMyPe() == 0)
760      groupNum.idx = CkpvAccess(_numGroups)++;
761   else
762      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
763   _createGroup(groupNum, env);
764   return groupNum;
765 }
766
767 // new _nodeGroupCreate
768 static CkGroupID _nodeGroupCreate(envelope *env)
769 {
770   register CkGroupID groupNum;
771   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
772   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
773           groupNum.idx = CksvAccess(_numNodeGroups)++;
774    else
775           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
776   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
777   _createNodeGroup(groupNum, env);
778   return groupNum;
779 }
780
781 /**** generate the group idx when group is creator pe is not pe0
782  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
783  **** remaining bits contain the group creator processor number and
784  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
785
786 int _getGroupIdx(int numNodes,int myNode,int numGroups)
787 {
788         int idx;
789         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
790         int n = 32 - (x+1);                                     // number of bits remaining for the index
791         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
792                                                                 // then add the next available index
793         // of course this won't work when int is 8 bytes long on T3E
794         //idx |= 0x80000000;                                      // set the most significant bit to 1
795         idx = - idx;
796                                                                 // if int is not 32 bits, wouldn't this be wrong?
797         return idx;
798 }
799
800 extern "C"
801 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
802 {
803   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
804   register envelope *env = UsrToEnv(msg);
805   env->setMsgtype(BocInitMsg);
806   env->setEpIdx(eIdx);
807   env->setSrcPe(CkMyPe());
808   _TRACE_CREATION_N(env, CkNumPes());
809   CkGroupID gid = _groupCreate(env);
810   _TRACE_CREATION_DONE(1);
811   return gid;
812 }
813
814 extern "C"
815 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
816 {
817   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
818   register envelope *env = UsrToEnv(msg);
819   env->setMsgtype(NodeBocInitMsg);
820   env->setEpIdx(eIdx);
821   env->setSrcPe(CkMyPe());
822   _TRACE_CREATION_N(env, CkNumNodes());
823   CkGroupID gid = _nodeGroupCreate(env);
824   _TRACE_CREATION_DONE(1);
825   return gid;
826 }
827
828 static inline void *_allocNewChare(envelope *env, int &idx)
829 {
830   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
831   void *tmp=malloc(_chareTable[chareIdx]->size);
832   _MEMCHECK(tmp);
833 #ifndef CMK_CHARE_USE_PTR
834   CkpvAccess(chare_objs).push_back(tmp);
835   CkpvAccess(chare_types).push_back(chareIdx);
836   idx = CkpvAccess(chare_objs).size()-1;
837 #endif
838   setMemoryTypeChare(tmp);
839   return tmp;
840 }
841
842 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
843 {
844   int idx;
845   register void *obj = _allocNewChare(env, idx);
846 #ifndef CMK_CHARE_USE_PTR
847   //((Chare *)obj)->chareIdx = idx;
848   CkpvAccess(currentChareIdx) = idx;
849 #endif
850   _invokeEntry(env->getEpIdx(),env,obj);
851 }
852
853 void CkCreateLocalChare(int epIdx, envelope *env)
854 {
855   env->setEpIdx(epIdx);
856   _processNewChareMsg(NULL, env);
857 }
858
859 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
860 {
861   int idx;
862   register void *obj = _allocNewChare(env, idx);
863   register CkChareID *pCid = (CkChareID *)
864       _allocMsg(FillVidMsg, sizeof(CkChareID));
865   pCid->onPE = CkMyPe();
866 #ifndef CMK_CHARE_USE_PTR
867   pCid->objPtr = (void*)idx;
868 #else
869   pCid->objPtr = obj;
870 #endif
871   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
872   register envelope *ret = UsrToEnv(pCid);
873   ret->setVidPtr(env->getVidPtr());
874   register int srcPe = env->getSrcPe();
875   ret->setSrcPe(CkMyPe());
876   CmiSetHandler(ret, _charmHandlerIdx);
877   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
878 #ifndef CMK_CHARE_USE_PTR
879   // register the remote vidblock for deletion when chare is deleted
880   CkChareID vid;
881   vid.onPE = srcPe;
882   vid.objPtr = env->getVidPtr();
883   CkpvAccess(vmap)[idx] = vid;    
884 #endif
885   CpvAccess(_qd)->create();
886 #ifndef CMK_CHARE_USE_PTR
887   //((Chare *)obj)->chareIdx = idx;
888   CkpvAccess(currentChareIdx) = idx;
889 #endif
890   _invokeEntry(env->getEpIdx(),env,obj);
891 }
892
893 /************** Receive: Chares *************/
894
895 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
896 {
897   register int epIdx = env->getEpIdx();
898   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
899   register void *obj;
900   if (mainIdx != -1)  {           // mainchare
901     CmiAssert(CkMyPe()==0);
902     obj = _mainTable[mainIdx]->getObj();
903   }
904   else {
905 #ifndef CMK_CHARE_USE_PTR
906     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
907       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
908     else
909       obj = env->getObjPtr();
910 #else
911     obj = env->getObjPtr();
912 #endif
913   }
914   _invokeEntry(epIdx,env,obj);
915 }
916
917 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
918 {
919   register int epIdx = env->getEpIdx();
920   register void *obj = env->getObjPtr();
921   _invokeEntry(epIdx,env,obj);
922 }
923
924 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
925 {
926 #ifndef CMK_CHARE_USE_PTR
927   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
928 #else
929   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
930   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
931 #endif
932   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
933   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
934   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
935   CmiFree(env);
936 }
937
938 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
939 {
940 #ifndef CMK_CHARE_USE_PTR
941   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
942 #else
943   VidBlock *vptr = (VidBlock *) env->getVidPtr();
944   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
945 #endif
946   _SET_USED(env, 1);
947   vptr->send(env);
948 }
949
950 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
951 {
952 #ifndef CMK_CHARE_USE_PTR
953   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
954   delete vptr;
955   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
956 #endif
957   CmiFree(env);
958 }
959
960 /************** Receive: Groups ****************/
961
962 /**
963  Return a pointer to the local BOC of "groupID".
964  The message "env" passed in has some known dependency on this groupID
965  (either it is to be delivered to this BOC, or it depends on this BOC being there).
966  Therefore, if the return value is NULL, this function buffers the massage so that
967  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
968  The message passed in must have its handlers correctly set so that it can be
969  scheduled again.
970 */
971 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
972 {
973
974         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
975         IrrGroup *obj = ck->localBranch(groupID);
976         if (obj==NULL) { /* groupmember not yet created: stash message */
977                 ck->getGroupTable()->find(groupID).enqMsg(env);
978         }
979         else { /* will be able to process message */
980                 ck->process();
981         }
982         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
983         return obj;
984 }
985
986 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
987 {
988 #if CMK_LBDB_ON
989   // if there is a running obj being measured, stop it temporarily
990   LDObjHandle objHandle;
991   int objstopped = 0;
992   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
993   if (the_lbdb->RunningObject(&objHandle)) {
994     objstopped = 1;
995     the_lbdb->ObjectStop(objHandle);
996   }
997 #endif
998   _invokeEntry(epIdx,env,obj);
999 #if CMK_LBDB_ON
1000   if (objstopped) the_lbdb->ObjectStart(objHandle);
1001 #endif
1002   _STATS_RECORD_PROCESS_BRANCH_1();
1003 }
1004
1005 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1006 {
1007   register CkGroupID groupID =  env->getGroupNum();
1008   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1009   if(obj) {
1010     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1011   }
1012 }
1013
1014 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1015 {
1016   env->setMsgtype(ForChareMsg);
1017   env->setObjPtr(obj);
1018   _processForChareMsg(ck,env);
1019   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1020 }
1021
1022 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1023 {
1024   env->setEpIdx(epIdx);
1025   _deliverForNodeBocMsg(ck,env, obj);
1026 }
1027
1028 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1029 {
1030   register CkGroupID groupID = env->getGroupNum();
1031   register void *obj;
1032
1033   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1034   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1035   if(!obj) { // groupmember not yet created
1036 #if CMK_IMMEDIATE_MSG
1037     if (CmiIsImmediate(env))     // buffer immediate message
1038       CmiDelayImmediate();
1039     else
1040 #endif
1041     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1042     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1043     return;
1044   }
1045   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1046 #if CMK_IMMEDIATE_MSG
1047   if (!CmiIsImmediate(env))
1048 #endif
1049   ck->process();
1050   env->setMsgtype(ForChareMsg);
1051   env->setObjPtr(obj);
1052   _processForChareMsg(ck,env);
1053   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1054 }
1055
1056 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1057 {
1058   register CkGroupID groupID = env->getGroupNum();
1059   register int epIdx = env->getEpIdx();
1060   if (!env->getGroupDep().isZero()) {      // dependence
1061     CkGroupID dep = env->getGroupDep();
1062     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1063     if (obj == NULL) return;
1064   }
1065   else
1066     ck->process();
1067   CkCreateLocalGroup(groupID, epIdx, env);
1068 }
1069
1070 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1071 {
1072   register CkGroupID groupID = env->getGroupNum();
1073   register int epIdx = env->getEpIdx();
1074   CkCreateLocalNodeGroup(groupID, epIdx, env);
1075 }
1076
1077 /************** Receive: Arrays *************/
1078
1079 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1080   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1081   if (mgr) {
1082     _SET_USED(env, 0);
1083     mgr->insertElement((CkMessage *)EnvToUsr(env));
1084   }
1085 }
1086 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1087   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1088   if (mgr) {
1089     _SET_USED(env, 0);
1090     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1091   }
1092 }
1093
1094 //BIGSIM_OOC DEBUGGING
1095 #define TELLMSGTYPE(x) //x
1096
1097 /**
1098  * This is the main converse-level handler used by all of Charm++.
1099  *
1100  * \addtogroup CriticalPathFramework
1101  */
1102 void _processHandler(void *converseMsg,CkCoreState *ck)
1103 {
1104   register envelope *env = (envelope *) converseMsg;
1105
1106   MESSAGE_PHASE_CHECK(env);
1107
1108 //#if CMK_RECORD_REPLAY
1109   if (ck->watcher!=NULL) {
1110     if (!ck->watcher->processMessage(&env,ck)) return;
1111   }
1112 //#endif
1113 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1114         Chare *obj=NULL;
1115         CkObjID sender;
1116         MCount SN;
1117         MlogEntry *entry=NULL;
1118         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1119         env->getMsgtype() == ForArrayEltMsg){
1120                 sender = env->sender;
1121                 SN = env->SN;
1122                 int result = preProcessReceivedMessage(env,&obj,&entry);
1123                 if(result == 0){
1124                         return;
1125                 }
1126         }
1127 #endif
1128
1129 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1130   //  CkPrintf("START\n");
1131   criticalPath_start(env);
1132 #endif
1133
1134
1135   switch(env->getMsgtype()) {
1136 // Group support
1137     case BocInitMsg :
1138       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1139       // QD processing moved inside _processBocInitMsg because it is conditional
1140       //ck->process(); 
1141       if(env->isPacked()) CkUnpackMessage(&env);
1142       _processBocInitMsg(ck,env);
1143       break;
1144     case NodeBocInitMsg :
1145       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1146       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1147       _processNodeBocInitMsg(ck,env);
1148       break;
1149     case ForBocMsg :
1150       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1151       // QD processing moved inside _processForBocMsg because it is conditional
1152       if(env->isPacked()) CkUnpackMessage(&env);
1153       _processForBocMsg(ck,env);
1154       // stats record moved inside _processForBocMsg because it is conditional
1155       break;
1156     case ForNodeBocMsg :
1157       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1158       // QD processing moved to _processForNodeBocMsg because it is conditional
1159       if(env->isPacked()) CkUnpackMessage(&env);
1160       _processForNodeBocMsg(ck,env);
1161       // stats record moved to _processForNodeBocMsg because it is conditional
1162       break;
1163
1164 // Array support
1165     case ArrayEltInitMsg:
1166       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1167       if(env->isPacked()) CkUnpackMessage(&env);
1168       _processArrayEltInitMsg(ck,env);
1169       break;
1170     case ForArrayEltMsg:
1171       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1172       if(env->isPacked()) CkUnpackMessage(&env);
1173       _processArrayEltMsg(ck,env);
1174       break;
1175
1176 // Chare support
1177     case NewChareMsg :
1178       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1179       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1180       _processNewChareMsg(ck,env);
1181       _STATS_RECORD_PROCESS_CHARE_1();
1182       break;
1183     case NewVChareMsg :
1184       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1185       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1186       _processNewVChareMsg(ck,env);
1187       _STATS_RECORD_PROCESS_CHARE_1();
1188       break;
1189     case ForChareMsg :
1190       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1191       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1192       _processForPlainChareMsg(ck,env);
1193       _STATS_RECORD_PROCESS_MSG_1();
1194       break;
1195     case ForVidMsg   :
1196       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1197       ck->process();
1198       _processForVidMsg(ck,env);
1199       break;
1200     case FillVidMsg  :
1201       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1202       ck->process();
1203       _processFillVidMsg(ck,env);
1204       break;
1205     case DeleteVidMsg  :
1206       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1207       ck->process();
1208       _processDeleteVidMsg(ck,env);
1209       break;
1210
1211     default:
1212       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1213   }
1214 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1215         if(obj != NULL){
1216                 postProcessReceivedMessage(obj,sender,SN,entry);
1217         }
1218 #endif
1219
1220
1221 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1222   criticalPath_end();
1223   //  CkPrintf("STOP\n");
1224 #endif
1225
1226
1227 }
1228
1229
1230 /******************** Message Send **********************/
1231
1232 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1233              int *queueing, int *priobits, unsigned int **prioptr)
1234 {
1235   register envelope *env = (envelope *)converseMsg;
1236   *pfn = (CldPackFn)CkPackMessage;
1237   *len = env->getTotalsize();
1238   *queueing = env->getQueueing();
1239   *priobits = env->getPriobits();
1240   *prioptr = (unsigned int *) env->getPrioPtr();
1241 }
1242
1243 void CkPackMessage(envelope **pEnv)
1244 {
1245   register envelope *env = *pEnv;
1246   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1247     register void *msg = EnvToUsr(env);
1248     _TRACE_BEGIN_PACK();
1249     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1250     _TRACE_END_PACK();
1251     env=UsrToEnv(msg);
1252     env->setPacked(1);
1253     *pEnv = env;
1254   }
1255 }
1256
1257 void CkUnpackMessage(envelope **pEnv)
1258 {
1259   register envelope *env = *pEnv;
1260   register int msgIdx = env->getMsgIdx();
1261   if(env->isPacked()) {
1262     register void *msg = EnvToUsr(env);
1263     _TRACE_BEGIN_UNPACK();
1264     msg = _msgTable[msgIdx]->unpack(msg);
1265     _TRACE_END_UNPACK();
1266     env=UsrToEnv(msg);
1267     env->setPacked(0);
1268     *pEnv = env;
1269   }
1270 }
1271
1272 //There's no reason for most messages to go through the Cld--
1273 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1274 // Thus these accellerated versions of the Cld calls.
1275
1276 static int index_objectQHandler;
1277 int index_tokenHandler;
1278 int index_skipCldHandler;
1279
1280 void _skipCldHandler(void *converseMsg)
1281 {
1282   register envelope *env = (envelope *)(converseMsg);
1283   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1284 #if CMK_GRID_QUEUE_AVAILABLE
1285   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1286     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1287                        env, env->getQueueing (), env->getPriobits (),
1288                        (unsigned int *) env->getPrioPtr ());
1289   } else {
1290     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1291                        env, env->getQueueing (), env->getPriobits (),
1292                        (unsigned int *) env->getPrioPtr ());
1293   }
1294 #else
1295   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1296         env, env->getQueueing(),env->getPriobits(),
1297         (unsigned int *)env->getPrioPtr());
1298 #endif
1299 }
1300
1301
1302 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1303 // Made non-static to be used by ckmessagelogging
1304 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1305 {
1306 #if CMK_REPLAYSYSTEM
1307   if (_replaySystem) {
1308     CmiFree(env);
1309     return;
1310   }
1311 #endif
1312   if(pe == CkMyPe() ){
1313     if(!CmiNodeAlive(CkMyPe())){
1314         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1315 //      return;
1316     }
1317   }
1318   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1319 #if CMK_OBJECT_QUEUE_AVAILABLE
1320     Chare *obj = CkFindObjectPtr(env);
1321     if (obj && obj->CkGetObjQueue().queue()) {
1322       _enqObjQueue(obj, env);
1323     }
1324     else
1325 #endif
1326     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1327         env, env->getQueueing(),env->getPriobits(),
1328         (unsigned int *)env->getPrioPtr());
1329   } else {
1330     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1331       CkPackMessage(&env);
1332     int len=env->getTotalsize();
1333     CmiSetXHandler(env,CmiGetHandler(env));
1334 #if CMK_OBJECT_QUEUE_AVAILABLE
1335     CmiSetHandler(env,index_objectQHandler);
1336 #else
1337     CmiSetHandler(env,index_skipCldHandler);
1338 #endif
1339     CmiSetInfo(env,infoFn);
1340     if (pe==CLD_BROADCAST) {
1341 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1342                         CmiSyncBroadcast(len, (char *)env);
1343 #else
1344                         CmiSyncBroadcastAndFree(len, (char *)env); 
1345 #endif
1346
1347 }
1348     else if (pe==CLD_BROADCAST_ALL) { 
1349 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1350                         CmiSyncBroadcastAll(len, (char *)env);
1351 #else
1352                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1353 #endif
1354
1355 }
1356     else{
1357 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))             
1358                         CmiSyncSend(pe, len, (char *)env);
1359 #else
1360                         CmiSyncSendAndFree(pe, len, (char *)env);
1361 #endif
1362
1363                 }
1364   }
1365 }
1366
1367 #if CMK_BLUEGENE_CHARM
1368 #   define  _skipCldEnqueue   _CldEnqueue
1369 #endif
1370
1371 // by pass Charm++ priority queue, send as Converse message
1372 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1373 {
1374 #if CMK_REPLAYSYSTEM
1375   if (_replaySystem) {
1376     CmiFree(env);
1377     return;
1378   }
1379 #endif
1380   CkPackMessage(&env);
1381   int len=env->getTotalsize();
1382   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1383 }
1384
1385 static void _noCldEnqueue(int pe, envelope *env)
1386 {
1387 /*
1388   if (pe == CkMyPe()) {
1389     CmiHandleMessage(env);
1390   } else
1391 */
1392 #if CMK_REPLAYSYSTEM
1393   if (_replaySystem) {
1394     CmiFree(env);
1395     return;
1396   }
1397 #endif
1398   CkPackMessage(&env);
1399   int len=env->getTotalsize();
1400   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1401   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1402   else CmiSyncSendAndFree(pe, len, (char *)env);
1403 }
1404
1405 //static void _noCldNodeEnqueue(int node, envelope *env)
1406 //Made non-static to be used by ckmessagelogging
1407 void _noCldNodeEnqueue(int node, envelope *env)
1408 {
1409 /*
1410   if (node == CkMyNode()) {
1411     CmiHandleMessage(env);
1412   } else {
1413 */
1414 #if CMK_REPLAYSYSTEM
1415   if (_replaySystem) {
1416     CmiFree(env);
1417     return;
1418   }
1419 #endif
1420   CkPackMessage(&env);
1421   int len=env->getTotalsize();
1422   if (node==CLD_BROADCAST) { 
1423 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1424         CmiSyncNodeBroadcast(len, (char *)env);
1425 #else
1426         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1427 #endif
1428 }
1429   else if (node==CLD_BROADCAST_ALL) { 
1430 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1431                 CmiSyncNodeBroadcastAll(len, (char *)env);
1432 #else
1433                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1434 #endif
1435
1436 }
1437   else {
1438 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1439         CmiSyncNodeSend(node, len, (char *)env);
1440 #else
1441         CmiSyncNodeSendAndFree(node, len, (char *)env);
1442 #endif
1443   }
1444 }
1445
1446 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1447 {
1448   register envelope *env = UsrToEnv(msg);
1449   _CHECK_USED(env);
1450   _SET_USED(env, 1);
1451 #if CMK_REPLAYSYSTEM
1452   setEventID(env);
1453 #endif
1454   env->setMsgtype(ForChareMsg);
1455   env->setEpIdx(eIdx);
1456   env->setSrcPe(CkMyPe());
1457 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1458   criticalPath_send(env);
1459   automaticallySetMessagePriority(env);
1460 #endif
1461 #if CMK_CHARMDEBUG
1462   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1463 #endif
1464 #if CMK_OBJECT_QUEUE_AVAILABLE
1465   CmiSetHandler(env, index_objectQHandler);
1466 #else
1467   CmiSetHandler(env, _charmHandlerIdx);
1468 #endif
1469   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1470     register int pe = -(pCid->onPE+1);
1471     if(pe==CkMyPe()) {
1472 #ifndef CMK_CHARE_USE_PTR
1473       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1474 #else
1475       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1476 #endif
1477       void *objPtr;
1478       if (NULL!=(objPtr=vblk->getLocalChare()))
1479       { //A ready local chare
1480         env->setObjPtr(objPtr);
1481         return pe;
1482       }
1483       else { //The vidblock is not ready-- forget it
1484         vblk->send(env);
1485         return -1;
1486       }
1487     } else { //Valid vidblock for another PE:
1488       env->setMsgtype(ForVidMsg);
1489       env->setVidPtr(pCid->objPtr);
1490       return pe;
1491     }
1492   }
1493   else {
1494     env->setObjPtr(pCid->objPtr);
1495     return pCid->onPE;
1496   }
1497 }
1498
1499 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1500 {
1501   int destPE = _prepareMsg(eIdx, msg, pCid);
1502   if (destPE != -1) {
1503     register envelope *env = UsrToEnv(msg);
1504 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1505     criticalPath_send(env);
1506     automaticallySetMessagePriority(env);
1507 #endif
1508     CmiBecomeImmediate(env);
1509   }
1510   return destPE;
1511 }
1512
1513 extern "C"
1514 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1515 {
1516   if (opts & CK_MSG_INLINE) {
1517     CkSendMsgInline(entryIdx, msg, pCid, opts);
1518     return;
1519   }
1520 #if CMK_ERROR_CHECKING
1521   if (opts & CK_MSG_IMMEDIATE) {
1522     CmiAbort("Immediate message is not allowed in Chare!");
1523   }
1524 #endif
1525   register envelope *env = UsrToEnv(msg);
1526   int destPE=_prepareMsg(entryIdx,msg,pCid);
1527   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1528   // VidBlock was not yet filled). The problem is that the creation was never
1529   // traced later when the VidBlock was filled. One solution is to trace the
1530   // creation here, the other to trace it in VidBlock->msgDeliver().
1531   _TRACE_CREATION_1(env);
1532   if (destPE!=-1) {
1533     CpvAccess(_qd)->create();
1534     if (opts & CK_MSG_SKIP_OR_IMM)
1535       _noCldEnqueue(destPE, env);
1536     else
1537       _CldEnqueue(destPE, env, _infoIdx);
1538   }
1539   _TRACE_CREATION_DONE(1);
1540 }
1541
1542 extern "C"
1543 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1544 {
1545   if (pCid->onPE==CkMyPe())
1546   {
1547     if(!CmiNodeAlive(CkMyPe())){
1548         return;
1549     }
1550 #if CMK_CHARMDEBUG
1551     //Just in case we need to breakpoint or use the envelope in some way
1552     _prepareMsg(entryIndex,msg,pCid);
1553 #endif
1554                 //Just directly call the chare (skip QD handling & scheduler)
1555     register envelope *env = UsrToEnv(msg);
1556     if (env->isPacked()) CkUnpackMessage(&env);
1557     _STATS_RECORD_PROCESS_MSG_1();
1558     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1559   }
1560   else {
1561     //No way to inline a cross-processor message:
1562     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1563   }
1564 }
1565
1566 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1567 {
1568   register envelope *env = UsrToEnv(msg);
1569   CkNodeGroupID nodeRedMgr;
1570   _CHECK_USED(env);
1571   _SET_USED(env, 1);
1572 #if CMK_REPLAYSYSTEM
1573   setEventID(env);
1574 #endif
1575   env->setMsgtype(type);
1576   env->setEpIdx(eIdx);
1577   env->setGroupNum(gID);
1578   env->setSrcPe(CkMyPe());
1579 #if CMK_ERROR_CHECKING
1580   nodeRedMgr.setZero();
1581   env->setRednMgr(nodeRedMgr);
1582 #endif
1583 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1584   criticalPath_send(env);
1585   automaticallySetMessagePriority(env);
1586 #endif
1587 #if CMK_CHARMDEBUG
1588   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1589 #endif
1590   CmiSetHandler(env, _charmHandlerIdx);
1591   return env;
1592 }
1593
1594 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1595 {
1596   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1597 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1598   criticalPath_send(env);
1599   automaticallySetMessagePriority(env);
1600 #endif
1601   CmiBecomeImmediate(env);
1602   return env;
1603 }
1604
1605 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1606                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1607 {
1608   int numPes;
1609   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1610 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1611   sendTicketGroupRequest(env,pe,_infoIdx);
1612 #else
1613   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1614   _TRACE_CREATION_N(env, numPes);
1615   if (opts & CK_MSG_SKIP_OR_IMM)
1616     _noCldEnqueue(pe, env);
1617   else
1618     _skipCldEnqueue(pe, env, _infoIdx);
1619   _TRACE_CREATION_DONE(1);
1620 #endif
1621 }
1622
1623 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1624                            int npes, int *pes)
1625 {
1626   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1627   _TRACE_CREATION_MULTICAST(env, npes, pes);
1628   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1629   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1630 }
1631
1632 extern "C"
1633 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1634 {
1635 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1636   if (destPE==CkMyPe())
1637   {
1638     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1639     return;
1640   }
1641   //Can't inline-- send the usual way
1642   register envelope *env = UsrToEnv(msg);
1643   int numPes;
1644   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1645   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1646   _TRACE_CREATION_N(env, numPes);
1647   _noCldEnqueue(destPE, env);
1648   _STATS_RECORD_SEND_BRANCH_1();
1649   CkpvAccess(_coreState)->create();
1650   _TRACE_CREATION_DONE(1);
1651 #else
1652   // no support for immediate message, send inline
1653   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1654 #endif
1655 }
1656
1657 extern "C"
1658 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1659 {
1660   if (destPE==CkMyPe())
1661   {
1662     if(!CmiNodeAlive(CkMyPe())){
1663         return;
1664     }
1665     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1666     if (obj!=NULL)
1667     { //Just directly call the group:
1668 #if CMK_ERROR_CHECKING
1669       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1670 #else
1671       envelope *env=UsrToEnv(msg);
1672 #endif
1673       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1674       return;
1675     }
1676   }
1677   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1678   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1679 }
1680
1681 extern "C"
1682 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1683 {
1684   if (opts & CK_MSG_INLINE) {
1685     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1686     return;
1687   }
1688   if (opts & CK_MSG_IMMEDIATE) {
1689     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1690     return;
1691   }
1692   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1693   _STATS_RECORD_SEND_BRANCH_1();
1694   CkpvAccess(_coreState)->create();
1695 }
1696
1697 extern "C"
1698 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1699 {
1700 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1701   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1702   _TRACE_CREATION_MULTICAST(env, npes, pes);
1703   _noCldEnqueueMulti(npes, pes, env);
1704   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1705 #else
1706   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1707   CpvAccess(_qd)->create(-npes);
1708 #endif
1709   _STATS_RECORD_SEND_BRANCH_N(npes);
1710   CpvAccess(_qd)->create(npes);
1711 }
1712
1713 extern "C"
1714 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1715 {
1716   if (opts & CK_MSG_IMMEDIATE) {
1717     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1718     return;
1719   }
1720     // normal mesg
1721   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1722   _STATS_RECORD_SEND_BRANCH_N(npes);
1723   CpvAccess(_qd)->create(npes);
1724 }
1725
1726 extern "C"
1727 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1728 {
1729   int npes;
1730   int *pes;
1731   if (opts & CK_MSG_IMMEDIATE) {
1732     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1733     return;
1734   }
1735     // normal mesg
1736   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1737   CmiLookupGroup(grp, &npes, &pes);
1738   _TRACE_CREATION_MULTICAST(env, npes, pes);
1739   _CldEnqueueGroup(grp, env, _infoIdx);
1740   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1741   _STATS_RECORD_SEND_BRANCH_N(npes);
1742   CpvAccess(_qd)->create(npes);
1743 }
1744
1745 extern "C"
1746 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1747 {
1748   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1749   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1750   CpvAccess(_qd)->create(CkNumPes());
1751 }
1752
1753 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1754                 int node=CLD_BROADCAST_ALL, int opts=0)
1755 {
1756   int numPes;
1757   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1758 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1759         sendTicketNodeGroupRequest(env,node,_infoIdx);
1760 #else
1761   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1762   _TRACE_CREATION_N(env, numPes);
1763   if (opts & CK_MSG_SKIP_OR_IMM) {
1764     _noCldNodeEnqueue(node, env);
1765     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1766       CkpvAccess(_coreState)->create(-numPes);
1767     }
1768   }
1769   else
1770     _CldNodeEnqueue(node, env, _infoIdx);
1771   _TRACE_CREATION_DONE(1);
1772 #endif
1773 }
1774
1775 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1776                            int npes, int *nodes)
1777 {
1778   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1779   _TRACE_CREATION_N(env, npes);
1780   for (int i=0; i<npes; i++) {
1781     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1782   }
1783   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1784 }
1785
1786 extern "C"
1787 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1788 {
1789 #if CMK_IMMEDIATE_MSG
1790   if (node==CkMyNode())
1791   {
1792     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1793     return;
1794   }
1795   //Can't inline-- send the usual way
1796   register envelope *env = UsrToEnv(msg);
1797   int numPes;
1798   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1799   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1800   _TRACE_CREATION_N(env, numPes);
1801   _noCldNodeEnqueue(node, env);
1802   _STATS_RECORD_SEND_BRANCH_1();
1803   /* immeidate message is invisible to QD */
1804 //  CkpvAccess(_coreState)->create();
1805   _TRACE_CREATION_DONE(1);
1806 #else
1807   // no support for immediate message, send inline
1808   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1809 #endif
1810 }
1811
1812 extern "C"
1813 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1814 {
1815   if (node==CkMyNode())
1816   {
1817     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1818     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1819     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1820     if (obj!=NULL)
1821     { //Just directly call the group:
1822 #if CMK_ERROR_CHECKING
1823       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1824 #else
1825       envelope *env=UsrToEnv(msg);
1826 #endif
1827       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1828       return;
1829     }
1830   }
1831   //Can't inline-- send the usual way
1832   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1833 }
1834
1835 extern "C"
1836 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1837 {
1838   if (opts & CK_MSG_INLINE) {
1839     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1840     return;
1841   }
1842   if (opts & CK_MSG_IMMEDIATE) {
1843     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1844     return;
1845   }
1846   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1847   _STATS_RECORD_SEND_NODE_BRANCH_1();
1848   CkpvAccess(_coreState)->create();
1849 }
1850
1851 extern "C"
1852 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1853 {
1854 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1855   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1856   _noCldEnqueueMulti(npes, nodes, env);
1857 #else
1858   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1859   CpvAccess(_qd)->create(-npes);
1860 #endif
1861   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1862   CpvAccess(_qd)->create(npes);
1863 }
1864
1865 extern "C"
1866 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1867 {
1868   if (opts & CK_MSG_IMMEDIATE) {
1869     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1870     return;
1871   }
1872     // normal mesg
1873   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1874   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1875   CpvAccess(_qd)->create(npes);
1876 }
1877
1878 extern "C"
1879 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1880 {
1881   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1882   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1883   CpvAccess(_qd)->create(CkNumNodes());
1884 }
1885
1886 //Needed by delegation manager:
1887 extern "C"
1888 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1889 { return _prepareMsg(eIdx,msg,pCid); }
1890 extern "C"
1891 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1892 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1893 extern "C"
1894 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1895 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1896
1897 void _ckModuleInit(void) {
1898         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1899         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1900         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1901         CkpvInitialize(TokenPool*, _tokenPool);
1902         CkpvAccess(_tokenPool) = new TokenPool;
1903 }
1904
1905
1906 /************** Send: Arrays *************/
1907
1908 extern void CkArrayManagerInsert(int onPe,void *msg);
1909 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1910
1911 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1912 {
1913   _CHECK_USED(env);
1914   _SET_USED(env, 1);
1915   env->setMsgtype(type);
1916 #if CMK_CHARMDEBUG
1917   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1918 #endif
1919   CmiSetHandler(env, _charmHandlerIdx);
1920   CpvAccess(_qd)->create();
1921 }
1922
1923 extern "C"
1924 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1925   register envelope *env = UsrToEnv(msg);
1926   env->getsetArrayMgr()=aID;
1927   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1928   _CldEnqueue(pe, env, _infoIdx);
1929 }
1930
1931 extern "C"
1932 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1933   register envelope *env = UsrToEnv(msg);
1934   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1935 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1936    sendTicketArrayRequest(env,pe,_infoIdx);
1937 #else
1938   if (opts & CK_MSG_IMMEDIATE)
1939     CmiBecomeImmediate(env);
1940   if (opts & CK_MSG_SKIP_OR_IMM)
1941     _noCldEnqueue(pe, env);
1942   else
1943     _skipCldEnqueue(pe, env, _infoIdx);
1944 #endif
1945 }
1946
1947 class ElementDestroyer : public CkLocIterator {
1948 private:
1949         CkLocMgr *locMgr;
1950 public:
1951         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1952         void addLocation(CkLocation &loc) {
1953           loc.destroyAll();
1954         }
1955 };
1956
1957 void CkDeleteChares() {
1958   int i;
1959   int numGroups = CkpvAccess(_groupIDTable)->size();
1960
1961   // delete all plain chares
1962 #ifndef CMK_CHARE_USE_PTR
1963   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
1964         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
1965         delete obj;
1966         CkpvAccess(chare_objs)[i] = NULL;
1967   }
1968   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
1969         VidBlock *obj = CkpvAccess(vidblocks)[i];
1970         delete obj;
1971         CkpvAccess(vidblocks)[i] = NULL;
1972   }
1973 #endif
1974
1975   // delete all array elements
1976   for(i=0;i<numGroups;i++) {
1977     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1978     if(obj && obj->isLocMgr())  {
1979       CkLocMgr *mgr = (CkLocMgr*)obj;
1980       ElementDestroyer destroyer(mgr);
1981       mgr->iterate(destroyer);
1982     }
1983   }
1984
1985   // delete all groups
1986   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1987   for(i=0;i<numGroups;i++) {
1988     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1989     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1990     if (obj) delete obj;
1991   }
1992   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1993
1994   // delete all node groups
1995   if (CkMyRank() == 0) {
1996     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1997     for(i=0;i<numNodeGroups;i++) {
1998       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1999       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2000       if (obj) delete obj;
2001     }
2002   }
2003 }
2004
2005 //------------------- Message Watcher (record/replay) ----------------
2006
2007 #include "crc32.h"
2008
2009 CkpvDeclare(int, envelopeEventID);
2010 int _recplay_crc = 0;
2011 int _recplay_checksum = 0;
2012 int _recplay_logsize = 1024*1024;
2013
2014 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2015
2016 class CkMessageRecorder : public CkMessageWatcher {
2017   char *buffer;
2018   unsigned int curpos;
2019 public:
2020   CkMessageRecorder(FILE *f_): curpos(0) { f=f_; buffer=new char[_recplay_logsize]; }
2021   ~CkMessageRecorder() {
2022     flushLog(0);
2023     fprintf(f,"-1 -1 -1 ");
2024     fclose(f);
2025     delete[] buffer;
2026 #if 0
2027     FILE *stsfp = fopen("sts", "w");
2028     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2029     traceWriteSTS(stsfp, 0);
2030     fclose(stsfp);
2031 #endif
2032     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2033   }
2034
2035 private:
2036   void flushLog(int verbose=1) {
2037     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2038     fprintf(f, "%s", buffer);
2039     curpos=0;
2040   }
2041   virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2042     if ((*envptr)->getEvent()) {
2043       char tmp[128];
2044       bool wasPacked = (*envptr)->isPacked();
2045       if (!wasPacked) CkPackMessage(envptr);
2046       envelope *env = *envptr;
2047       unsigned int crc1=0, crc2=0;
2048       if (_recplay_crc) {
2049         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2050         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2051         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2052       } else if (_recplay_checksum) {
2053         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2054         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2055       }
2056       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2057       if (curpos > _recplay_logsize-128) flushLog();
2058       if (!wasPacked) CkUnpackMessage(envptr);
2059     }
2060     return CmiTrue;
2061   }
2062   virtual int process(CthThreadToken *token,CkCoreState *ck) {
2063     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2064     if (curpos > _recplay_logsize-128) flushLog();
2065     return 1;
2066   }
2067 };
2068
2069 class CkMessageDetailRecorder : public CkMessageWatcher {
2070 public:
2071   CkMessageDetailRecorder(FILE *f_) {
2072     f=f_;
2073     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2074      * The value of 'x' is the pointer size.
2075      */
2076     CmiUInt2 little = sizeof(void*);
2077     fwrite(&little, 2, 1, f);
2078   }
2079   ~CkMessageDetailRecorder() {fclose(f);}
2080 private:
2081   virtual CmiBool process(envelope **envptr, CkCoreState *ck) {
2082     bool wasPacked = (*envptr)->isPacked();
2083     if (!wasPacked) CkPackMessage(envptr);
2084     envelope *env = *envptr;
2085     CmiUInt4 size = env->getTotalsize();
2086     fwrite(&size, 4, 1, f);
2087     fwrite(env, env->getTotalsize(), 1, f);
2088     if (!wasPacked) CkUnpackMessage(envptr);
2089     return CmiTrue;
2090   }
2091 };
2092
2093 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2094 #define REPLAYDEBUG(args) /* empty */
2095
2096 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2097 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2098
2099 #if CMK_BLUEGENE_CHARM
2100 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2101                                    int pb,unsigned int *prio);
2102 #endif
2103
2104 class CkMessageReplay : public CkMessageWatcher {
2105   int counter;
2106         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2107         int nextEP;
2108         unsigned int crc1, crc2;
2109         /// Read the next message we need from the file:
2110         void getNext(void) {
2111           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2112           if (nextSize > 0) {
2113             // We are reading a regular message
2114             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2115               CkAbort("CkMessageReplay> Syntax error reading replay file");
2116             }
2117             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2118           } else if (nextSize == -2) {
2119             // We are reading a special message (right now only thread awaken)
2120             // Nothing to do since we have already read all info
2121             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2122           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2123             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2124             CkAbort("CkMessageReplay> Unrecognized input");
2125           }
2126             /*
2127                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2128                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2129                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2130                 }
2131                 */
2132                 counter++;
2133         }
2134         /// If this is the next message we need, advance and return CmiTrue.
2135         CmiBool isNext(envelope *env) {
2136                 if (nextPE!=env->getSrcPe()) return CmiFalse;
2137                 if (nextEvent!=env->getEvent()) return CmiFalse;
2138                 if (nextSize<0) return CmiFalse; // not waiting for a regular message
2139 #if 1
2140                 if (nextEP != env->getEpIdx()) {
2141                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2142                         return CmiFalse;
2143                 }
2144 #endif
2145 #if ! CMK_BLUEGENE_CHARM
2146                 if (nextSize!=env->getTotalsize())
2147                 {
2148                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2149                         return CmiFalse;
2150                 }
2151                 if (_recplay_crc || _recplay_checksum) {
2152                   bool wasPacked = env->isPacked();
2153                   if (!wasPacked) CkPackMessage(&env);
2154                   if (_recplay_crc) {
2155                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2156                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2157                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2158                     if (crcnew1 != crc1) {
2159                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2160                     }
2161                     if (crcnew2 != crc2) {
2162                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2163                     }
2164                   } else if (_recplay_checksum) {
2165             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2166             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2167             if (crcnew1 != crc1) {
2168               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2169             }
2170             if (crcnew2 != crc2) {
2171               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2172             }               
2173                   }
2174                   if (!wasPacked) CkUnpackMessage(&env);
2175                 }
2176 #endif
2177                 return CmiTrue;
2178         }
2179         CmiBool isNext(CthThreadToken *token) {
2180           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return CmiTrue;
2181           return CmiFalse;
2182         }
2183
2184         /// This is a (short) list of messages we aren't yet ready for:
2185         CkQ<envelope *> delayedMessages;
2186         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2187         CkQ<CthThreadToken *> delayedTokens;
2188
2189         /// Try to flush out any delayed messages
2190         void flush(void) {
2191           if (nextSize>0) {
2192                 int len=delayedMessages.length();
2193                 for (int i=0;i<len;i++) {
2194                         envelope *env=delayedMessages.deq();
2195                         if (isNext(env)) { /* this is the next message: process it */
2196                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2197                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2198                                 return;
2199                         }
2200                         else /* Not ready yet-- put it back in the
2201                                 queue */
2202                           {
2203                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2204                                 delayedMessages.enq(env);
2205                           }
2206                 }
2207           } else if (nextSize==-2) {
2208             int len=delayedTokens.length();
2209             for (int i=0;i<len;++i) {
2210               CthThreadToken *token=delayedTokens.deq();
2211               if (isNext(token)) {
2212             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2213 #if ! CMK_BLUEGENE_CHARM
2214                 CsdEnqueueLifo((void*)token);
2215 #else
2216                 CthEnqueueBigSimThread(token,0,0,NULL);
2217 #endif
2218                 return;
2219               } else {
2220             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2221                 delayedTokens.enq(token);
2222               }
2223             }
2224           }
2225         }
2226
2227 public:
2228         CkMessageReplay(FILE *f_) {
2229           counter=0;
2230           f=f_;
2231           getNext();
2232           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2233           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2234         }
2235         ~CkMessageReplay() {fclose(f);}
2236
2237 private:
2238         virtual CmiBool process(envelope **envptr,CkCoreState *ck) {
2239           bool wasPacked = (*envptr)->isPacked();
2240           if (!wasPacked) CkPackMessage(envptr);
2241           envelope *env = *envptr;
2242           //CkAssert(*(int*)env == 0x34567890);
2243           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2244                 if (env->getEvent() == 0) return CmiTrue;
2245                 if (isNext(env)) { /* This is the message we were expecting */
2246                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2247                         getNext(); /* Advance over this message */
2248                         flush(); /* try to process queued-up stuff */
2249                         if (!wasPacked) CkUnpackMessage(envptr);
2250                         return CmiTrue;
2251                 }
2252 #if CMK_SMP
2253                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2254                          // try next rank, we can't just buffer the msg and left
2255                          // we need to keep unprocessed msg on the fly
2256                         int nextpe = CkMyPe()+1;
2257                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2258                         nextpe = CkNodeFirst(CkMyNode());
2259                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2260                         return CmiFalse;
2261                 }
2262 #endif
2263                 else /*!isNext(env) */ {
2264                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2265                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2266                         delayedMessages.enq(env);
2267                         flush();
2268                         return CmiFalse;
2269                 }
2270         }
2271         virtual int process(CthThreadToken *token, CkCoreState *ck) {
2272       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2273           if (isNext(token)) {
2274         REPLAYDEBUG("Executing token: "<<token->serialNo)
2275             getNext();
2276             flush();
2277             return 1;
2278           } else {
2279         REPLAYDEBUG("Queueing token: "<<token->serialNo
2280             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2281             delayedTokens.enq(token);
2282             return 0;
2283           }
2284         }
2285 };
2286
2287 class CkMessageDetailReplay : public CkMessageWatcher {
2288   void *getNext() {
2289     CmiUInt4 size; size_t nread;
2290     if ((nread=fread(&size, 4, 1, f)) < 1) {
2291       if (feof(f)) return NULL;
2292       CkPrintf("Broken record file (metadata) got %d\n",nread);
2293       CkAbort("");
2294     }
2295     void *env = CmiAlloc(size);
2296     long tell = ftell(f);
2297     if ((nread=fread(env, size, 1, f)) < 1) {
2298       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2299       CkAbort("");
2300     }
2301     //*(int*)env = 0x34567890; // set first integer as magic
2302     return env;
2303   }
2304 public:
2305   double starttime;
2306   CkMessageDetailReplay(FILE *f_) {
2307     f=f_;
2308     starttime=CkWallTimer();
2309     /* This must match what CkMessageDetailRecorder did */
2310     CmiUInt2 little;
2311     fread(&little, 2, 1, f);
2312     if (little != sizeof(void*)) {
2313       CkAbort("Replaying on a different architecture from which recording was done!");
2314     }
2315
2316     CsdEnqueue(getNext());
2317
2318     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2319   }
2320   virtual CmiBool process(envelope **env,CkCoreState *ck) {
2321     void *msg = getNext();
2322     if (msg != NULL) CsdEnqueue(msg);
2323     return CmiTrue;
2324   }
2325 };
2326
2327 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2328 #if ! CMK_BLUEGENE_CHARM
2329   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2330 #endif
2331   CkMessageReplay *replay = (CkMessageReplay*)rep;
2332   //CmiStartQD(CkMessageReplayQuiescence, replay);
2333 }
2334
2335 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2336   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2337   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2338   ConverseExit();
2339 }
2340
2341 static int CpdExecuteThreadResume(CthThreadToken *token) {
2342   CkCoreState *ck = CkpvAccess(_coreState);
2343   if (ck->watcher!=NULL) {
2344     return ck->watcher->processThread(token,ck);
2345   }
2346   return 1;
2347 }
2348
2349 CpvCExtern(int, CthResumeNormalThreadIdx);
2350 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2351 {
2352   CthThread t = token->thread;
2353
2354   if(t == NULL){
2355     free(token);
2356     return;
2357   }
2358 #if CMK_TRACE_ENABLED
2359 #if ! CMK_TRACE_IN_CHARM
2360   if(CpvAccess(traceOn))
2361     CthTraceResume(t);
2362 /*    if(CpvAccess(_traceCoreOn)) 
2363             resumeTraceCore();*/
2364 #endif
2365 #endif
2366   
2367   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2368   if (CpdExecuteThreadResume(token)) {
2369     CthResume(t);
2370   }
2371 }
2372
2373 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2374
2375 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2376
2377         int i;
2378         char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2379         strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2380         sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2381         FILE *f=fopen(fName,permissions);
2382         REPLAYDEBUG("openReplayfile "<<fName);
2383         if (f==NULL) {
2384                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2385                         CkMyPe(),fName,permissions);
2386                 CkAbort("openReplayFile> Could not open replay file");
2387         }
2388         return f;
2389 }
2390
2391 #if CMK_BLUEGENE_CHARM
2392 CpvExtern(int      , CthResumeBigSimThreadIdx);
2393 #endif
2394
2395 #include "ckliststring.h"
2396 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2397     CmiBool forceReplay = CmiFalse;
2398     char *procs = NULL;
2399     _replaySystem = 0;
2400     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2401       _recplay_crc = 1;
2402     }
2403     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2404       _recplay_checksum = 1;
2405     }
2406     CmiGetArgIntDesc(argv,"+recplay-logsize",&_recplay_logsize,"Specify the size of the buffer used by the message recorder");
2407     REPLAYDEBUG("CkMessageWatcherInit ");
2408     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2409         CkListString list(procs);
2410         if (list.includes(CkMyPe())) {
2411           CpdSetInitializeMemory(1);
2412           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2413         }
2414     }
2415     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2416       if (CkMyPe() == 0) {
2417         CmiPrintf("Charm++> record mode.\n");
2418         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2419           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2420           _recplay_crc = _recplay_checksum = 0;
2421         }
2422       }
2423       CpdSetInitializeMemory(1);
2424       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2425       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2426     }
2427         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2428             forceReplay = CmiTrue;
2429             CpdSetInitializeMemory(1);
2430             // Set the parameters of the processor
2431 #if CMK_SHARED_VARS_UNAVAILABLE
2432             _Cmi_mype = atoi(procs);
2433             while (procs[0]!='/') procs++;
2434             procs++;
2435             _Cmi_numpes = atoi(procs);
2436 #else
2437             CkAbort("+replay-detail available only for non-SMP build");
2438 #endif
2439             _replaySystem = 1;
2440             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2441         }
2442         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2443           if (CkMyPe() == 0)  {
2444             CmiPrintf("Charm++> replay mode.\n");
2445             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2446               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2447               _recplay_crc = _recplay_checksum = 0;
2448             }
2449           }
2450           CpdSetInitializeMemory(1);
2451 #if ! CMK_BLUEGENE_CHARM
2452           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2453 #else
2454           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2455 #endif
2456           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2457         }
2458         if (_recplay_crc && _recplay_checksum) {
2459           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2460         }
2461 }
2462
2463 extern "C"
2464 int CkMessageToEpIdx(void *msg) {
2465         envelope *env=UsrToEnv(msg);
2466         int ep=env->getEpIdx();
2467         if (ep==CkIndex_CkArray::recvBroadcast(0))
2468                 return env->getsetArrayBcastEp();
2469         else
2470                 return ep;
2471 }
2472
2473 extern "C"
2474 int getCharmEnvelopeSize() {
2475   return sizeof(envelope);
2476 }
2477
2478
2479 #include "CkMarshall.def.h"
2480