Merge branch 'charm' into development
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 CpvDeclare(CkVec<void *>, chare_objs);
30 CpvDeclare(CkVec<int>, chare_types);
31 CpvDeclare(CkVec<VidBlock *>, vidblocks);
32 #endif
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 //Charm++ virtual functions: declaring these here results in a smaller executable
47 Chare::Chare(void) {
48   thishandle.onPE=CkMyPe();
49   thishandle.objPtr=this;
50 #ifndef CMK_CHARE_USE_PTR
51      // for plain chare, objPtr is actually the index to chare obj table
52   if (chareIdx >= 0) thishandle.objPtr=(void*)chareIdx;
53 #endif
54 #ifdef _FAULT_MLOG_
55   mlogData = new ChareMlogData();
56   mlogData->objID.type = TypeChare;
57   mlogData->objID.data.chare.id = thishandle;
58 #endif
59 #if CMK_OBJECT_QUEUE_AVAILABLE
60   if (_defaultObjectQ)  CkEnableObjQ();
61 #endif
62 }
63
64 Chare::Chare(CkMigrateMessage* m) {
65   thishandle.onPE=CkMyPe();
66   thishandle.objPtr=this;
67
68 #if CMK_OBJECT_QUEUE_AVAILABLE
69   if (_defaultObjectQ)  CkEnableObjQ();
70 #endif
71 }
72
73 void Chare::CkEnableObjQ()
74 {
75 #if CMK_OBJECT_QUEUE_AVAILABLE
76   objQ.create();
77 #endif
78 }
79
80 Chare::~Chare() {}
81
82 void Chare::pup(PUP::er &p)
83 {
84   p(thishandle.onPE);
85   thishandle.objPtr=(void *)this;
86 #ifndef CMK_CHARE_USE_PTR
87   p(chareIdx);
88   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
89 #endif
90 #ifdef _FAULT_MLOG_
91   if(p.isUnpacking()){
92     mlogData = new ChareMlogData();
93   }
94   mlogData->pup(p);
95 #endif
96 }
97
98 int Chare::ckGetChareType() const {
99   return -3;
100 }
101 char *Chare::ckDebugChareName(void) {
102   char buf[100];
103   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
104   return strdup(buf);
105 }
106 int Chare::ckDebugChareID(char *str, int limit) {
107   // pure chares for now do not have a valid ID
108   str[0] = 0;
109   return 1;
110 }
111 void Chare::ckDebugPup(PUP::er &p) {
112   pup(p);
113 }
114
115 /// This method is called before starting a [threaded] entry method.
116 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
117   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
118   traceAddThreadListeners(th, UsrToEnv(msg));
119 }
120
121
122 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
123   p.comment("Bytes");
124   int ts=UsrToEnv(msg)->getTotalsize();
125   int msgLen=ts-sizeof(envelope);
126   if (msgLen>0)
127     p((char*)msg,msgLen);
128 }
129
130 IrrGroup::IrrGroup(void) {
131   thisgroup = CkpvAccess(_currentGroup);
132 #ifdef _FAULT_MLOG_
133         mlogData->objID.type = TypeGroup;
134         mlogData->objID.data.group.id = thisgroup;
135         mlogData->objID.data.group.onPE = CkMyPe();
136 #endif
137 }
138
139 IrrGroup::~IrrGroup() {
140   // remove the object pointer
141   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
142   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
143   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
144 }
145
146 void IrrGroup::pup(PUP::er &p)
147 {
148   Chare::pup(p);
149   p|thisgroup;
150 }
151
152 int IrrGroup::ckGetChareType() const {
153   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
154 }
155
156 int IrrGroup::ckDebugChareID(char *str, int limit) {
157   if (limit<5) return -1;
158   str[0] = 1;
159   *((int*)&str[1]) = thisgroup.idx;
160   return 5;
161 }
162
163 char *IrrGroup::ckDebugChareName() {
164   return strdup(_chareTable[ckGetChareType()]->name);
165 }
166
167 void IrrGroup::ckJustMigrated(void)
168 {
169 }
170
171 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
172   /* FIXME: **CW** not entirely sure what we should do here yet */
173 }
174
175 void Group::CkAddThreadListeners(CthThread th, void *msg) {
176   Chare::CkAddThreadListeners(th, msg);
177   CthSetThreadID(th, thisgroup.idx, 0, 0);
178 }
179
180 void Group::pup(PUP::er &p)
181 {
182   CkReductionMgr::pup(p);
183   reductionInfo.pup(p);
184 }
185
186 /**** Delegation Manager Group */
187 CkDelegateMgr::~CkDelegateMgr() { }
188
189 //Default delegator implementation: do not delegate-- send directly
190 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
191   { CkSendMsg(ep,m,c); }
192 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
193   { CkSendMsgBranch(ep,m,onPE,g); }
194 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
195   { CkBroadcastMsgBranch(ep,m,g); }
196 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
197   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
198 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
199   { CkSendMsgNodeBranch(ep,m,onNode,g); }
200 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
201   { CkBroadcastMsgNodeBranch(ep,m,g); }
202 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
203   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
204 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
205 {
206         CProxyElement_ArrayBase ap(a,idx);
207         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
208 }
209 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
210 {
211         CProxyElement_ArrayBase ap(a,idx);
212         ap.ckSend((CkArrayMessage *)m,ep);
213 }
214 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
215 {
216         CProxy_ArrayBase ap(a);
217         ap.ckBroadcast((CkArrayMessage *)m,ep);
218 }
219
220 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
221 {
222         CmiAbort("ArraySectionSend is not implemented!\n");
223 /*
224         CProxyElement_ArrayBase ap(a,idx);
225         ap.ckSend((CkArrayMessage *)m,ep);
226 */
227 }
228
229 /*** Proxy <-> delegator communication */
230 CkDelegateData::~CkDelegateData() {}
231
232 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
233   return pd; // default implementation ignores pup call
234 }
235
236 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
237    this tricky manual reference counting business... */
238
239 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
240         if (dPtr) dPtr->ref();
241         ckUndelegate();
242         delegatedMgr = dTo;
243         delegatedPtr = dPtr;
244 }
245 void CProxy::ckUndelegate(void) {
246         delegatedMgr=NULL;
247         if (delegatedPtr) delegatedPtr->unref();
248         delegatedPtr=NULL;
249 }
250
251 /// Copy constructor
252 CProxy::CProxy(const CProxy &src)
253     :delegatedMgr(src.delegatedMgr)
254 {
255     delegatedPtr = NULL;
256     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
257         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
258 }
259
260 /// Assignment operator
261 CProxy& CProxy::operator=(const CProxy &src) {
262         CkDelegateData *oldPtr=delegatedPtr;
263         ckUndelegate();
264         delegatedMgr=src.delegatedMgr;
265
266         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
267             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
268         else
269             delegatedPtr = NULL;
270
271         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
272         if (oldPtr) oldPtr->unref();
273         return *this;
274 }
275
276 void CProxy::pup(PUP::er &p) {
277       CkGroupID delegatedTo;
278       delegatedTo.setZero();
279       int isNodeGroup = 0;
280       if (!p.isUnpacking()) {
281         if (delegatedMgr) {
282           delegatedTo = delegatedMgr->CkGetGroupID();
283           isNodeGroup = delegatedMgr->isNodeGroup();
284         }
285       }
286       p|delegatedTo;
287       if (!delegatedTo.isZero()) {
288         p|isNodeGroup;
289         if (p.isUnpacking()) {
290           if (isNodeGroup)
291                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
292           else
293                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
294         }
295
296         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
297         if (p.isUnpacking() && delegatedPtr)
298             delegatedPtr->ref();
299       }
300 }
301
302 /**** Array sections */
303 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
304 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
305   _cookie.aid = aid;    \
306   _cookie.get_pe() = CkMyPe();  \
307   _elems = new CkArrayIndexMax[nElems]; \
308   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
309   pelist = NULL;        \
310   npes  = 0;    \
311 }
312
313 CKSECTIONID_CONSTRUCTOR_DEF(1D)
314 CKSECTIONID_CONSTRUCTOR_DEF(2D)
315 CKSECTIONID_CONSTRUCTOR_DEF(3D)
316 CKSECTIONID_CONSTRUCTOR_DEF(4D)
317 CKSECTIONID_CONSTRUCTOR_DEF(5D)
318 CKSECTIONID_CONSTRUCTOR_DEF(6D)
319 CKSECTIONID_CONSTRUCTOR_DEF(Max)
320
321 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
322   pelist = new int[npes];
323   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
324   _cookie.aid = gid;
325 }
326
327 CkSectionID::CkSectionID(const CkSectionID &sid) {
328   int i;
329   _cookie = sid._cookie;
330   _nElems = sid._nElems;
331   if (_nElems > 0) {
332     _elems = new CkArrayIndexMax[_nElems];
333     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
334   } else _elems = NULL;
335   npes = sid.npes;
336   if (npes > 0) {
337     pelist = new int[npes];
338     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
339   } else pelist = NULL;
340 }
341
342 void CkSectionID::operator=(const CkSectionID &sid) {
343   int i;
344   _cookie = sid._cookie;
345   _nElems = sid._nElems;
346   if (_nElems > 0) {
347     _elems = new CkArrayIndexMax[_nElems];
348     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
349   } else _elems = NULL;
350   npes = sid.npes;
351   if (npes > 0) {
352     pelist = new int[npes];
353     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
354   } else pelist = NULL;
355 }
356
357 void CkSectionID::pup(PUP::er &p) {
358     p | _cookie;
359     p(_nElems);
360     if (_nElems > 0) {
361       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
362       for (int i=0; i< _nElems; i++) p | _elems[i];
363       npes = 0;
364       pelist = NULL;
365     } else {
366       // If _nElems is zero, than this section describes processors instead of array elements
367       _elems = NULL;
368       p(npes);
369       if (p.isUnpacking()) pelist = new int[npes];
370       p(pelist, npes);
371     }
372 }
373
374 /**** Tiny random API routines */
375
376 #ifdef CMK_CUDA
377 void CUDACallbackManager(void *fn) {
378   if (fn != NULL) {
379     CkCallback *cb = (CkCallback*) fn;
380     cb->send();
381   }
382 }
383
384 #endif
385
386 extern "C"
387 void CkSetRefNum(void *msg, int ref)
388 {
389   UsrToEnv(msg)->setRef(ref);
390 }
391
392 extern "C"
393 int CkGetRefNum(void *msg)
394 {
395   return UsrToEnv(msg)->getRef();
396 }
397
398 extern "C"
399 int CkGetSrcPe(void *msg)
400 {
401   return UsrToEnv(msg)->getSrcPe();
402 }
403
404 extern "C"
405 int CkGetSrcNode(void *msg)
406 {
407   return CmiNodeOf(CkGetSrcPe(msg));
408 }
409
410 extern "C"
411 void *CkLocalBranch(CkGroupID gID) {
412   return _localBranch(gID);
413 }
414
415 static
416 void *_ckLocalNodeBranch(CkGroupID groupID) {
417   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
418   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
419   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
420   return retval;
421 }
422
423 extern "C"
424 void *CkLocalNodeBranch(CkGroupID groupID)
425 {
426   void *retval;
427   // we are called in a constructor
428   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
429     return CkpvAccess(_currentNodeGroupObj);
430   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
431   { // Nodegroup hasn't finished being created yet-- schedule...
432     CsdScheduler(0);
433   }
434   return retval;
435 }
436
437 extern "C"
438 void *CkLocalChare(const CkChareID *pCid)
439 {
440         int pe=pCid->onPE;
441         if (pe<0) { //A virtual chare ID
442                 if (pe!=(-(CkMyPe()+1)))
443                         return NULL;//VID block not on this PE
444                 VidBlock *v=(VidBlock *)pCid->objPtr;
445                 return v->getLocalChare();
446         }
447         else
448         { //An ordinary chare ID
449                 if (pe!=CkMyPe())
450                         return NULL;//Chare not on this PE
451                 return pCid->objPtr;
452         }
453 }
454
455 CkpvDeclare(char **,Ck_argv);
456 extern "C" char **CkGetArgv(void) {
457         return CkpvAccess(Ck_argv);
458 }
459 extern "C" int CkGetArgc(void) {
460         return CmiGetArgc(CkpvAccess(Ck_argv));
461 }
462
463 /******************** Basic support *****************/
464 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
465 {
466 #ifdef _FAULT_MLOG_
467         CpvAccess(_currentObj) = (Chare *)obj;
468 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
469 #endif
470   //BIGSIM_OOC DEBUGGING
471   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
472   //fflush(stdout);
473 #ifndef CMK_OPTIMIZE
474   CpdBeforeEp(epIdx, obj, msg);
475 #endif
476   _entryTable[epIdx]->call(msg, obj);
477 #ifndef CMK_OPTIMIZE
478   CpdAfterEp(epIdx);
479 #endif
480   if (_entryTable[epIdx]->noKeep)
481   { /* Method doesn't keep/delete the message, so we have to: */
482     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
483   }
484 }
485 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
486 {
487   //BIGSIM_OOC DEBUGGING
488   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
489   //fflush(stdout);
490
491   void *deliverMsg;
492 #ifdef _FAULT_MLOG_
493         CpvAccess(_currentObj) = (Chare *)obj;
494 #endif
495   if (_entryTable[epIdx]->noKeep)
496   { /* Deliver a read-only copy of the message */
497     deliverMsg=(void *)msg;
498   } else
499   { /* Method needs a copy of the message to keep/delete */
500     void *oldMsg=(void *)msg;
501     deliverMsg=CkCopyMsg(&oldMsg);
502 #ifndef CMK_OPTIMIZE
503     if (oldMsg!=msg)
504       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
505 #endif
506   }
507 #ifndef CMK_OPTIMIZE
508   CpdBeforeEp(epIdx, obj, (void*)msg);
509 #endif
510   _entryTable[epIdx]->call(deliverMsg, obj);
511 #ifndef CMK_OPTIMIZE
512   CpdAfterEp(epIdx);
513 #endif
514 }
515
516 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
517 {
518   register void *msg = EnvToUsr(env);
519   _SET_USED(env, 0);
520   CkDeliverMessageFree(epIdx,msg,obj);
521 }
522
523 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
524 {
525
526 #ifndef CMK_OPTIMIZE /* Consider tracing: */
527   if (_entryTable[epIdx]->traceEnabled) {
528     _TRACE_BEGIN_EXECUTE(env);
529     _invokeEntryNoTrace(epIdx,env,obj);
530     _TRACE_END_EXECUTE();
531   }
532   else
533 #endif
534     _invokeEntryNoTrace(epIdx,env,obj);
535
536 }
537
538 /********************* Creation ********************/
539
540 extern "C"
541 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
542 {
543   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
544   envelope *env = UsrToEnv(msg);
545   _CHECK_USED(env);
546   if(pCid == 0) {
547     env->setMsgtype(NewChareMsg);
548   } else {
549     pCid->onPE = (-(CkMyPe()+1));
550     //  pCid->magic = _GETIDX(cIdx);
551     pCid->objPtr = (void *) new VidBlock();
552     _MEMCHECK(pCid->objPtr);
553     env->setMsgtype(NewVChareMsg);
554     env->setVidPtr(pCid->objPtr);
555 #ifndef CMK_CHARE_USE_PTR
556     CpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
557     int idx = CpvAccess(vidblocks).size()-1;
558     pCid->objPtr = (void *)idx;
559     env->setVidPtr((void *)idx);
560 #endif
561   }
562   env->setEpIdx(eIdx);
563   env->setSrcPe(CkMyPe());
564   CmiSetHandler(env, _charmHandlerIdx);
565   _TRACE_CREATION_1(env);
566   CpvAccess(_qd)->create();
567   _STATS_RECORD_CREATE_CHARE_1();
568   _SET_USED(env, 1);
569   if(destPE == CK_PE_ANY)
570     env->setForAnyPE(1);
571   else
572     env->setForAnyPE(0);
573   CldEnqueue(destPE, env, _infoIdx);
574   _TRACE_CREATION_DONE(1);
575 }
576
577 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
578 {
579   register int gIdx = _entryTable[epIdx]->chareIdx;
580   register void *obj = malloc(_chareTable[gIdx]->size);
581   _MEMCHECK(obj);
582   setMemoryTypeChare(obj);
583   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
584   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
585   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
586   CkpvAccess(_groupIDTable)->push_back(groupID);
587   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
588   if(ptrq) {
589     void *pending;
590     while((pending=ptrq->deq())!=0)
591       CldEnqueue(CkMyPe(), pending, _infoIdx);
592 //    delete ptrq;
593       CkpvAccess(_groupTable)->find(groupID).clearPending();
594   }
595   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
596
597   CkpvAccess(_currentGroup) = groupID;
598   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
599 #ifndef CMK_CHARE_USE_PTR
600   ((Chare *)obj)->chareIdx = -1;
601 #endif
602   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
603   _STATS_RECORD_PROCESS_GROUP_1();
604 }
605
606 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
607 {
608   register int gIdx = _entryTable[epIdx]->chareIdx;
609   int objSize=_chareTable[gIdx]->size;
610   register void *obj = malloc(objSize);
611   _MEMCHECK(obj);
612   setMemoryTypeChare(obj);
613   CkpvAccess(_currentGroup) = groupID;
614
615 // Now that the NodeGroup is created, add it to the table.
616 //  NodeGroups can be accessed by multiple processors, so
617 //  this is in the opposite order from groups - invoking the constructor
618 //  before registering it.
619 // User may call CkLocalNodeBranch() inside the nodegroup constructor
620 //  store nodegroup into _currentNodeGroupObj
621   CkpvAccess(_currentNodeGroupObj) = obj;
622 #ifndef CMK_CHARE_USE_PTR
623   ((Chare *)obj)->chareIdx = -1;
624 #endif
625   _invokeEntryNoTrace(epIdx,env,obj);
626   CkpvAccess(_currentNodeGroupObj) = NULL;
627   _STATS_RECORD_PROCESS_NODE_GROUP_1();
628
629   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
630   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
631   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
632   CksvAccess(_nodeGroupIDTable).push_back(groupID);
633
634   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
635   if(ptrq) {
636     void *pending;
637     while((pending=ptrq->deq())!=0)
638       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
639 //    delete ptrq;
640       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
641   }
642   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
643 }
644
645 void _createGroup(CkGroupID groupID, envelope *env)
646 {
647   _CHECK_USED(env);
648   _SET_USED(env, 1);
649   register int epIdx = env->getEpIdx();
650   int gIdx = _entryTable[epIdx]->chareIdx;
651   CkNodeGroupID rednMgr;
652   if(_chareTable[gIdx]->isIrr == 0){
653                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
654                 rednMgr = rednMgrProxy;
655 //              rednMgrProxy.setAttachedGroup(groupID);
656   }else{
657         rednMgr.setZero();
658   }
659   env->setGroupNum(groupID);
660   env->setSrcPe(CkMyPe());
661   env->setRednMgr(rednMgr);
662   env->setGroupEpoch(CkpvAccess(_charmEpoch));
663
664   if(CkNumPes()>1) {
665     CkPackMessage(&env);
666     CmiSetHandler(env, _bocHandlerIdx);
667     _numInitMsgs++;
668     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
669     CpvAccess(_qd)->create(CkNumPes()-1);
670     CkUnpackMessage(&env);
671   }
672   _STATS_RECORD_CREATE_GROUP_1();
673   CkCreateLocalGroup(groupID, epIdx, env);
674 }
675
676 void _createNodeGroup(CkGroupID groupID, envelope *env)
677 {
678   _CHECK_USED(env);
679   _SET_USED(env, 1);
680   register int epIdx = env->getEpIdx();
681   env->setGroupNum(groupID);
682   env->setSrcPe(CkMyPe());
683   env->setGroupEpoch(CkpvAccess(_charmEpoch));
684   if(CkNumNodes()>1) {
685     CkPackMessage(&env);
686     CmiSetHandler(env, _bocHandlerIdx);
687     _numInitMsgs++;
688     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
689     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
690     CpvAccess(_qd)->create(CkNumNodes()-1);
691     CkUnpackMessage(&env);
692   }
693   _STATS_RECORD_CREATE_NODE_GROUP_1();
694   CkCreateLocalNodeGroup(groupID, epIdx, env);
695 }
696
697 // new _groupCreate
698
699 static CkGroupID _groupCreate(envelope *env)
700 {
701   register CkGroupID groupNum;
702
703   // check CkMyPe(). if it is 0 then idx is _numGroups++
704   // if not, then something else...
705   if(CkMyPe() == 0)
706      groupNum.idx = CkpvAccess(_numGroups)++;
707   else
708      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
709   _createGroup(groupNum, env);
710   return groupNum;
711 }
712
713 // new _nodeGroupCreate
714 static CkGroupID _nodeGroupCreate(envelope *env)
715 {
716   register CkGroupID groupNum;
717   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
718   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
719           groupNum.idx = CksvAccess(_numNodeGroups)++;
720    else
721           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
722   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
723   _createNodeGroup(groupNum, env);
724   return groupNum;
725 }
726
727 /**** generate the group idx when group is creator pe is not pe0
728  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
729  **** remaining bits contain the group creator processor number and
730  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
731
732 int _getGroupIdx(int numNodes,int myNode,int numGroups)
733 {
734         int idx;
735         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
736         int n = 32 - (x+1);                                     // number of bits remaining for the index
737         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
738                                                                 // then add the next available index
739         // of course this won't work when int is 8 bytes long on T3E
740         //idx |= 0x80000000;                                      // set the most significant bit to 1
741         idx = - idx;
742                                                                 // if int is not 32 bits, wouldn't this be wrong?
743         return idx;
744 }
745
746 extern "C"
747 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
748 {
749   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
750   register envelope *env = UsrToEnv(msg);
751   env->setMsgtype(BocInitMsg);
752   env->setEpIdx(eIdx);
753   env->setSrcPe(CkMyPe());
754   _TRACE_CREATION_N(env, CkNumPes());
755   CkGroupID gid = _groupCreate(env);
756   _TRACE_CREATION_DONE(1);
757   return gid;
758 }
759
760 extern "C"
761 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
762 {
763   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
764   register envelope *env = UsrToEnv(msg);
765   env->setMsgtype(NodeBocInitMsg);
766   env->setEpIdx(eIdx);
767   env->setSrcPe(CkMyPe());
768   _TRACE_CREATION_N(env, CkNumNodes());
769   CkGroupID gid = _nodeGroupCreate(env);
770   _TRACE_CREATION_DONE(1);
771   return gid;
772 }
773
774 static inline void *_allocNewChare(envelope *env, int &idx)
775 {
776   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
777   void *tmp=malloc(_chareTable[chareIdx]->size);
778   _MEMCHECK(tmp);
779 #ifndef CMK_CHARE_USE_PTR
780   CpvAccess(chare_objs).push_back(tmp);
781   CpvAccess(chare_types).push_back(chareIdx);
782   idx = CpvAccess(chare_objs).size()-1;
783 #endif
784   setMemoryTypeChare(tmp);
785   return tmp;
786 }
787
788 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
789 {
790   int idx;
791   register void *obj = _allocNewChare(env, idx);
792 #ifndef CMK_CHARE_USE_PTR
793   ((Chare *)obj)->chareIdx = idx;
794 #endif
795   _invokeEntry(env->getEpIdx(),env,obj);
796 }
797
798 void CkCreateLocalChare(int epIdx, envelope *env)
799 {
800   env->setEpIdx(epIdx);
801   _processNewChareMsg(NULL, env);
802 }
803
804 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
805 {
806   int idx;
807   register void *obj = _allocNewChare(env, idx);
808   register CkChareID *pCid = (CkChareID *)
809       _allocMsg(FillVidMsg, sizeof(CkChareID));
810   pCid->onPE = CkMyPe();
811 #ifndef CMK_CHARE_USE_PTR
812   pCid->objPtr = (void*)idx;
813 #else
814   pCid->objPtr = obj;
815 #endif
816   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
817   register envelope *ret = UsrToEnv(pCid);
818   ret->setVidPtr(env->getVidPtr());
819   register int srcPe = env->getSrcPe();
820   ret->setSrcPe(CkMyPe());
821   CmiSetHandler(ret, _charmHandlerIdx);
822   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
823   CpvAccess(_qd)->create();
824 #ifndef CMK_CHARE_USE_PTR
825   ((Chare *)obj)->chareIdx = idx;
826 #endif
827   _invokeEntry(env->getEpIdx(),env,obj);
828 }
829
830 /************** Receive: Chares *************/
831
832 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
833 {
834   register int epIdx = env->getEpIdx();
835   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
836   register void *obj;
837   if (mainIdx != -1)  {           // mainchare
838     CmiAssert(CkMyPe()==0);
839     obj = _mainTable[mainIdx]->getObj();
840   }
841   else {
842 #ifndef CMK_CHARE_USE_PTR
843     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
844       obj = CpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
845     else
846       obj = env->getObjPtr();
847 #else
848     obj = env->getObjPtr();
849 #endif
850   }
851   _invokeEntry(epIdx,env,obj);
852 }
853
854 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
855 {
856   register int epIdx = env->getEpIdx();
857   register void *obj = env->getObjPtr();
858   _invokeEntry(epIdx,env,obj);
859 }
860
861 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
862 {
863 #ifndef CMK_CHARE_USE_PTR
864   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
865 #else
866   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
867   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
868 #endif
869   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
870   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
871   vptr->fill(pcid->onPE, pcid->objPtr);
872   CmiFree(env);
873 }
874
875 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
876 {
877 #ifndef CMK_CHARE_USE_PTR
878   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
879 #else
880   VidBlock *vptr = (VidBlock *) env->getVidPtr();
881   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
882 #endif
883   _SET_USED(env, 1);
884   vptr->send(env);
885 }
886
887 /************** Receive: Groups ****************/
888
889 /**
890  Return a pointer to the local BOC of "groupID".
891  The message "env" passed in has some known dependency on this groupID
892  (either it is to be delivered to this BOC, or it depends on this BOC being there).
893  Therefore, if the return value is NULL, this function buffers the massage so that
894  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
895  The message passed in must have its handlers correctly set so that it can be
896  scheduled again.
897 */
898 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
899 {
900
901         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
902         IrrGroup *obj = ck->localBranch(groupID);
903         if (obj==NULL) { /* groupmember not yet created: stash message */
904                 ck->getGroupTable()->find(groupID).enqMsg(env);
905         }
906         else { /* will be able to process message */
907                 ck->process();
908         }
909         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
910         return obj;
911 }
912
913 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
914 {
915 #if CMK_LBDB_ON
916   // if there is a running obj being measured, stop it temporarily
917   LDObjHandle objHandle;
918   int objstopped = 0;
919   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
920   if (the_lbdb->RunningObject(&objHandle)) {
921     objstopped = 1;
922     the_lbdb->ObjectStop(objHandle);
923   }
924 #endif
925   _invokeEntry(epIdx,env,obj);
926 #if CMK_LBDB_ON
927   if (objstopped) the_lbdb->ObjectStart(objHandle);
928 #endif
929   _STATS_RECORD_PROCESS_BRANCH_1();
930 }
931
932 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
933 {
934   register CkGroupID groupID =  env->getGroupNum();
935   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
936   if(obj) {
937     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
938   }
939 }
940
941 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
942 {
943   env->setMsgtype(ForChareMsg);
944   env->setObjPtr(obj);
945   _processForChareMsg(ck,env);
946   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
947 }
948
949 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
950 {
951   env->setEpIdx(epIdx);
952   _deliverForNodeBocMsg(ck,env, obj);
953 }
954
955 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
956 {
957   register CkGroupID groupID = env->getGroupNum();
958   register void *obj;
959
960   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
961   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
962   if(!obj) { // groupmember not yet created
963 #if CMK_IMMEDIATE_MSG
964     if (CmiIsImmediate(env))     // buffer immediate message
965       CmiDelayImmediate();
966     else
967 #endif
968     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
969     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
970     return;
971   }
972   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
973 #if CMK_IMMEDIATE_MSG
974   if (!CmiIsImmediate(env))
975 #endif
976   ck->process();
977   env->setMsgtype(ForChareMsg);
978   env->setObjPtr(obj);
979   _processForChareMsg(ck,env);
980   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
981 }
982
983 void _processBocInitMsg(CkCoreState *ck,envelope *env)
984 {
985   register CkGroupID groupID = env->getGroupNum();
986   register int epIdx = env->getEpIdx();
987   if (!env->getGroupDep().isZero()) {      // dependence
988     CkGroupID dep = env->getGroupDep();
989     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
990     if (obj == NULL) return;
991   }
992   else
993     ck->process();
994   CkCreateLocalGroup(groupID, epIdx, env);
995 }
996
997 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
998 {
999   register CkGroupID groupID = env->getGroupNum();
1000   register int epIdx = env->getEpIdx();
1001   CkCreateLocalNodeGroup(groupID, epIdx, env);
1002 }
1003
1004 /************** Receive: Arrays *************/
1005
1006 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
1007   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1008   if (mgr) {
1009     _SET_USED(env, 0);
1010     mgr->insertElement((CkMessage *)EnvToUsr(env));
1011   }
1012 }
1013 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1014   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
1015   if (mgr) {
1016     _SET_USED(env, 0);
1017     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1018   }
1019 }
1020
1021 //BIGSIM_OOC DEBUGGING
1022 #define TELLMSGTYPE(x) //x
1023
1024 /**
1025  * This is the main converse-level handler used by all of Charm++.
1026  *
1027  * \addtogroup CriticalPathFramework
1028  */
1029 void _processHandler(void *converseMsg,CkCoreState *ck)
1030 {
1031   register envelope *env = (envelope *) converseMsg;
1032
1033 //#if CMK_RECORD_REPLAY
1034   if (ck->watcher!=NULL) {
1035     if (!ck->watcher->processMessage(env,ck)) return;
1036   }
1037 //#endif
1038 #ifdef _FAULT_MLOG_
1039         Chare *obj=NULL;
1040         CkObjID sender;
1041         MCount SN;
1042         MlogEntry *entry=NULL;
1043         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1044         env->getMsgtype() == ForArrayEltMsg){
1045                 sender = env->sender;
1046                 SN = env->SN;
1047                 int result = preProcessReceivedMessage(env,&obj,&entry);
1048                 if(result == 0){
1049                         return;
1050                 }
1051         }
1052 #endif
1053
1054 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1055   //  CkPrintf("START\n");
1056   criticalPath_start(env);
1057 #endif
1058
1059
1060   switch(env->getMsgtype()) {
1061 // Group support
1062     case BocInitMsg :
1063       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1064       // QD processing moved inside _processBocInitMsg because it is conditional
1065       //ck->process(); 
1066       if(env->isPacked()) CkUnpackMessage(&env);
1067       _processBocInitMsg(ck,env);
1068       break;
1069     case NodeBocInitMsg :
1070       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1071       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1072       _processNodeBocInitMsg(ck,env);
1073       break;
1074     case ForBocMsg :
1075       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1076       // QD processing moved inside _processForBocMsg because it is conditional
1077       if(env->isPacked()) CkUnpackMessage(&env);
1078       _processForBocMsg(ck,env);
1079       // stats record moved inside _processForBocMsg because it is conditional
1080       break;
1081     case ForNodeBocMsg :
1082       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1083       // QD processing moved to _processForNodeBocMsg because it is conditional
1084       if(env->isPacked()) CkUnpackMessage(&env);
1085       _processForNodeBocMsg(ck,env);
1086       // stats record moved to _processForNodeBocMsg because it is conditional
1087       break;
1088
1089 // Array support
1090     case ArrayEltInitMsg:
1091       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1092       if(env->isPacked()) CkUnpackMessage(&env);
1093       _processArrayEltInitMsg(ck,env);
1094       break;
1095     case ForArrayEltMsg:
1096       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1097       if(env->isPacked()) CkUnpackMessage(&env);
1098       _processArrayEltMsg(ck,env);
1099       break;
1100
1101 // Chare support
1102     case NewChareMsg :
1103       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1104       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1105       _processNewChareMsg(ck,env);
1106       _STATS_RECORD_PROCESS_CHARE_1();
1107       break;
1108     case NewVChareMsg :
1109       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1110       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1111       _processNewVChareMsg(ck,env);
1112       _STATS_RECORD_PROCESS_CHARE_1();
1113       break;
1114     case ForChareMsg :
1115       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1116       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1117       _processForPlainChareMsg(ck,env);
1118       _STATS_RECORD_PROCESS_MSG_1();
1119       break;
1120     case ForVidMsg   :
1121       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1122       ck->process();
1123       _processForVidMsg(ck,env);
1124       break;
1125     case FillVidMsg  :
1126       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1127       ck->process();
1128       _processFillVidMsg(ck,env);
1129       break;
1130
1131     default:
1132       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1133   }
1134 #ifdef _FAULT_MLOG_
1135         if(obj != NULL){
1136                 postProcessReceivedMessage(obj,sender,SN,entry);
1137         }
1138 #endif
1139
1140
1141 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1142   criticalPath_end();
1143   //  CkPrintf("STOP\n");
1144 #endif
1145
1146
1147 }
1148
1149
1150 /******************** Message Send **********************/
1151
1152 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1153              int *queueing, int *priobits, unsigned int **prioptr)
1154 {
1155   register envelope *env = (envelope *)converseMsg;
1156   *pfn = (CldPackFn)CkPackMessage;
1157   *len = env->getTotalsize();
1158   *queueing = env->getQueueing();
1159   *priobits = env->getPriobits();
1160   *prioptr = (unsigned int *) env->getPrioPtr();
1161 }
1162
1163 void CkPackMessage(envelope **pEnv)
1164 {
1165   register envelope *env = *pEnv;
1166   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1167     register void *msg = EnvToUsr(env);
1168     _TRACE_BEGIN_PACK();
1169     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1170     _TRACE_END_PACK();
1171     env=UsrToEnv(msg);
1172     env->setPacked(1);
1173     *pEnv = env;
1174   }
1175 }
1176
1177 void CkUnpackMessage(envelope **pEnv)
1178 {
1179   register envelope *env = *pEnv;
1180   register int msgIdx = env->getMsgIdx();
1181   if(env->isPacked()) {
1182     register void *msg = EnvToUsr(env);
1183     _TRACE_BEGIN_UNPACK();
1184     msg = _msgTable[msgIdx]->unpack(msg);
1185     _TRACE_END_UNPACK();
1186     env=UsrToEnv(msg);
1187     env->setPacked(0);
1188     *pEnv = env;
1189   }
1190 }
1191
1192 //There's no reason for most messages to go through the Cld--
1193 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1194 // Thus these accellerated versions of the Cld calls.
1195
1196 static int index_objectQHandler;
1197 int index_tokenHandler;
1198 static int index_skipCldHandler;
1199
1200 static void _skipCldHandler(void *converseMsg)
1201 {
1202   register envelope *env = (envelope *)(converseMsg);
1203   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1204 #if CMK_GRID_QUEUE_AVAILABLE
1205   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1206     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1207                        env, env->getQueueing (), env->getPriobits (),
1208                        (unsigned int *) env->getPrioPtr ());
1209   } else {
1210     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1211                        env, env->getQueueing (), env->getPriobits (),
1212                        (unsigned int *) env->getPrioPtr ());
1213   }
1214 #else
1215   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1216         env, env->getQueueing(),env->getPriobits(),
1217         (unsigned int *)env->getPrioPtr());
1218 #endif
1219 }
1220
1221
1222 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1223 // Made non-static to be used by ckmessagelogging
1224 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1225 {
1226   if(pe == CkMyPe() ){
1227     if(!CmiNodeAlive(CkMyPe())){
1228         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1229 //      return;
1230     }
1231   }
1232   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1233 #if CMK_OBJECT_QUEUE_AVAILABLE
1234     Chare *obj = CkFindObjectPtr(env);
1235     if (obj && obj->CkGetObjQueue().queue()) {
1236       _enqObjQueue(obj, env);
1237     }
1238     else
1239 #endif
1240     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1241         env, env->getQueueing(),env->getPriobits(),
1242         (unsigned int *)env->getPrioPtr());
1243   } else {
1244     CkPackMessage(&env);
1245     int len=env->getTotalsize();
1246     CmiSetXHandler(env,CmiGetHandler(env));
1247 #if CMK_OBJECT_QUEUE_AVAILABLE
1248     CmiSetHandler(env,index_objectQHandler);
1249 #else
1250     CmiSetHandler(env,index_skipCldHandler);
1251 #endif
1252     CmiSetInfo(env,infoFn);
1253     if (pe==CLD_BROADCAST) {
1254 #ifdef _FAULT_MLOG_             
1255                         CmiSyncBroadcast(len, (char *)env);
1256 #else
1257                         CmiSyncBroadcastAndFree(len, (char *)env); 
1258 #endif
1259
1260 }
1261     else if (pe==CLD_BROADCAST_ALL) { 
1262 #ifdef _FAULT_MLOG_             
1263                         CmiSyncBroadcastAll(len, (char *)env);
1264 #else
1265                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1266 #endif
1267
1268 }
1269     else{
1270 #ifdef _FAULT_MLOG_             
1271                         CmiSyncSend(pe, len, (char *)env);
1272 #else
1273                         CmiSyncSendAndFree(pe, len, (char *)env);
1274 #endif
1275
1276                 }
1277   }
1278 }
1279
1280 #if CMK_BLUEGENE_CHARM
1281 #   define  _skipCldEnqueue   CldEnqueue
1282 #endif
1283
1284 // by pass Charm++ priority queue, send as Converse message
1285 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1286 {
1287   CkPackMessage(&env);
1288   int len=env->getTotalsize();
1289   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1290 }
1291
1292 static void _noCldEnqueue(int pe, envelope *env)
1293 {
1294 /*
1295   if (pe == CkMyPe()) {
1296     CmiHandleMessage(env);
1297   } else
1298 */
1299   CkPackMessage(&env);
1300   int len=env->getTotalsize();
1301   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1302   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1303   else CmiSyncSendAndFree(pe, len, (char *)env);
1304 }
1305
1306 //static void _noCldNodeEnqueue(int node, envelope *env)
1307 //Made non-static to be used by ckmessagelogging
1308 void _noCldNodeEnqueue(int node, envelope *env)
1309 {
1310 /*
1311   if (node == CkMyNode()) {
1312     CmiHandleMessage(env);
1313   } else {
1314 */
1315   CkPackMessage(&env);
1316   int len=env->getTotalsize();
1317   if (node==CLD_BROADCAST) { 
1318 #ifdef _FAULT_MLOG_
1319         CmiSyncNodeBroadcast(len, (char *)env);
1320 #else
1321         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1322 #endif
1323 }
1324   else if (node==CLD_BROADCAST_ALL) { 
1325 #ifdef _FAULT_MLOG_
1326                 CmiSyncNodeBroadcastAll(len, (char *)env);
1327 #else
1328                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1329 #endif
1330
1331 }
1332   else {
1333 #ifdef _FAULT_MLOG_
1334         CmiSyncNodeSend(node, len, (char *)env);
1335 #else
1336         CmiSyncNodeSendAndFree(node, len, (char *)env);
1337 #endif
1338   }
1339 }
1340
1341 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1342 {
1343   register envelope *env = UsrToEnv(msg);
1344   _CHECK_USED(env);
1345   _SET_USED(env, 1);
1346   env->setMsgtype(ForChareMsg);
1347   env->setEpIdx(eIdx);
1348   env->setSrcPe(CkMyPe());
1349 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1350   criticalPath_send(env);
1351   automaticallySetMessagePriority(env);
1352 #endif
1353 #ifndef CMK_OPTIMIZE
1354   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1355 #endif
1356 #if CMK_OBJECT_QUEUE_AVAILABLE
1357   CmiSetHandler(env, index_objectQHandler);
1358 #else
1359   CmiSetHandler(env, _charmHandlerIdx);
1360 #endif
1361   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1362     register int pe = -(pCid->onPE+1);
1363     if(pe==CkMyPe()) {
1364 #ifndef CMK_CHARE_USE_PTR
1365       VidBlock *vblk = CpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1366 #else
1367       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1368 #endif
1369       void *objPtr;
1370       if (NULL!=(objPtr=vblk->getLocalChare()))
1371       { //A ready local chare
1372         env->setObjPtr(objPtr);
1373         return pe;
1374       }
1375       else { //The vidblock is not ready-- forget it
1376         vblk->send(env);
1377         return -1;
1378       }
1379     } else { //Valid vidblock for another PE:
1380       env->setMsgtype(ForVidMsg);
1381       env->setVidPtr(pCid->objPtr);
1382       return pe;
1383     }
1384   }
1385   else {
1386     env->setObjPtr(pCid->objPtr);
1387     return pCid->onPE;
1388   }
1389 }
1390
1391 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1392 {
1393   int destPE = _prepareMsg(eIdx, msg, pCid);
1394   if (destPE != -1) {
1395     register envelope *env = UsrToEnv(msg);
1396 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1397     criticalPath_send(env);
1398     automaticallySetMessagePriority(env);
1399 #endif
1400     CmiBecomeImmediate(env);
1401   }
1402   return destPE;
1403 }
1404
1405 extern "C"
1406 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1407 {
1408   if (opts & CK_MSG_INLINE) {
1409     CkSendMsgInline(entryIdx, msg, pCid, opts);
1410     return;
1411   }
1412 #ifndef CMK_OPTIMIZE
1413   if (opts & CK_MSG_IMMEDIATE) {
1414     CmiAbort("Immediate message is not allowed in Chare!");
1415   }
1416 #endif
1417   register envelope *env = UsrToEnv(msg);
1418   int destPE=_prepareMsg(entryIdx,msg,pCid);
1419   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1420   // VidBlock was not yet filled). The problem is that the creation was never
1421   // traced later when the VidBlock was filled. One solution is to trace the
1422   // creation here, the other to trace it in VidBlock->msgDeliver().
1423   _TRACE_CREATION_1(env);
1424   if (destPE!=-1) {
1425     CpvAccess(_qd)->create();
1426     if (opts & CK_MSG_SKIP_OR_IMM)
1427       _noCldEnqueue(destPE, env);
1428     else
1429       CldEnqueue(destPE, env, _infoIdx);
1430   }
1431   _TRACE_CREATION_DONE(1);
1432 }
1433
1434 extern "C"
1435 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1436 {
1437   if (pCid->onPE==CkMyPe())
1438   {
1439     if(!CmiNodeAlive(CkMyPe())){
1440         return;
1441     }
1442 #ifndef CMK_OPTIMIZE
1443     //Just in case we need to breakpoint or use the envelope in some way
1444     _prepareMsg(entryIndex,msg,pCid);
1445 #endif
1446                 //Just directly call the chare (skip QD handling & scheduler)
1447     register envelope *env = UsrToEnv(msg);
1448     if (env->isPacked()) CkUnpackMessage(&env);
1449     _STATS_RECORD_PROCESS_MSG_1();
1450     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1451   }
1452   else {
1453     //No way to inline a cross-processor message:
1454     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1455   }
1456 }
1457
1458 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1459 {
1460   register envelope *env = UsrToEnv(msg);
1461   CkNodeGroupID nodeRedMgr;
1462   _CHECK_USED(env);
1463   _SET_USED(env, 1);
1464   env->setMsgtype(type);
1465   env->setEpIdx(eIdx);
1466   env->setGroupNum(gID);
1467   env->setSrcPe(CkMyPe());
1468 #ifndef CMK_OPTIMIZE
1469   nodeRedMgr.setZero();
1470   env->setRednMgr(nodeRedMgr);
1471 #endif
1472 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1473   criticalPath_send(env);
1474   automaticallySetMessagePriority(env);
1475 #endif
1476 #ifndef CMK_OPTIMIZE
1477   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1478 #endif
1479   CmiSetHandler(env, _charmHandlerIdx);
1480   return env;
1481 }
1482
1483 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1484 {
1485   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1486 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1487   criticalPath_send(env);
1488   automaticallySetMessagePriority(env);
1489 #endif
1490   CmiBecomeImmediate(env);
1491   return env;
1492 }
1493
1494 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1495                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1496 {
1497   int numPes;
1498   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1499 #ifdef _FAULT_MLOG_
1500         sendTicketGroupRequest(env,pe,_infoIdx);
1501 #else
1502   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1503   _TRACE_CREATION_N(env, numPes);
1504   if (opts & CK_MSG_SKIP_OR_IMM)
1505     _noCldEnqueue(pe, env);
1506   else
1507     _skipCldEnqueue(pe, env, _infoIdx);
1508   _TRACE_CREATION_DONE(1);
1509 #endif
1510 }
1511
1512 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1513                            int npes, int *pes)
1514 {
1515   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1516   _TRACE_CREATION_MULTICAST(env, npes, pes);
1517   CldEnqueueMulti(npes, pes, env, _infoIdx);
1518   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1519 }
1520
1521 extern "C"
1522 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1523 {
1524 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1525   if (destPE==CkMyPe())
1526   {
1527     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1528     return;
1529   }
1530   //Can't inline-- send the usual way
1531   register envelope *env = UsrToEnv(msg);
1532   int numPes;
1533   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1534   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1535   _TRACE_CREATION_N(env, numPes);
1536   _noCldEnqueue(destPE, env);
1537   _STATS_RECORD_SEND_BRANCH_1();
1538   CkpvAccess(_coreState)->create();
1539   _TRACE_CREATION_DONE(1);
1540 #else
1541   // no support for immediate message, send inline
1542   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1543 #endif
1544 }
1545
1546 extern "C"
1547 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1548 {
1549   if (destPE==CkMyPe())
1550   {
1551     if(!CmiNodeAlive(CkMyPe())){
1552         return;
1553     }
1554     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1555     if (obj!=NULL)
1556     { //Just directly call the group:
1557 #ifndef CMK_OPTIMIZE
1558       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1559 #else
1560       envelope *env=UsrToEnv(msg);
1561 #endif
1562       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1563       return;
1564     }
1565   }
1566   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1567   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1568 }
1569
1570 extern "C"
1571 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1572 {
1573   if (opts & CK_MSG_INLINE) {
1574     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1575     return;
1576   }
1577   if (opts & CK_MSG_IMMEDIATE) {
1578     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1579     return;
1580   }
1581   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1582   _STATS_RECORD_SEND_BRANCH_1();
1583   CkpvAccess(_coreState)->create();
1584 }
1585
1586 extern "C"
1587 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1588 {
1589 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1590   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1591   _TRACE_CREATION_MULTICAST(env, npes, pes);
1592   _noCldEnqueueMulti(npes, pes, env);
1593   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1594 #else
1595   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1596   CpvAccess(_qd)->create(-npes);
1597 #endif
1598   _STATS_RECORD_SEND_BRANCH_N(npes);
1599   CpvAccess(_qd)->create(npes);
1600 }
1601
1602 extern "C"
1603 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1604 {
1605   if (opts & CK_MSG_IMMEDIATE) {
1606     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1607     return;
1608   }
1609     // normal mesg
1610   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1611   _STATS_RECORD_SEND_BRANCH_N(npes);
1612   CpvAccess(_qd)->create(npes);
1613 }
1614
1615 extern "C"
1616 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1617 {
1618   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1619   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1620   CpvAccess(_qd)->create(CkNumPes());
1621 }
1622
1623 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1624                 int node=CLD_BROADCAST_ALL, int opts=0)
1625 {
1626   int numPes;
1627   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1628 #ifdef _FAULT_MLOG_
1629         sendTicketNodeGroupRequest(env,node,_infoIdx);
1630 #else
1631   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1632   _TRACE_CREATION_N(env, numPes);
1633   if (opts & CK_MSG_SKIP_OR_IMM) {
1634     _noCldNodeEnqueue(node, env);
1635     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1636       CkpvAccess(_coreState)->create(-numPes);
1637     }
1638   }
1639   else
1640     CldNodeEnqueue(node, env, _infoIdx);
1641   _TRACE_CREATION_DONE(1);
1642 #endif
1643 }
1644
1645 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1646                            int npes, int *nodes)
1647 {
1648   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1649   _TRACE_CREATION_N(env, npes);
1650   for (int i=0; i<npes; i++) {
1651     CldNodeEnqueue(nodes[i], env, _infoIdx);
1652   }
1653   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1654 }
1655
1656 extern "C"
1657 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1658 {
1659 #if CMK_IMMEDIATE_MSG
1660   if (node==CkMyNode())
1661   {
1662     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1663     return;
1664   }
1665   //Can't inline-- send the usual way
1666   register envelope *env = UsrToEnv(msg);
1667   int numPes;
1668   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1669   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1670   _TRACE_CREATION_N(env, numPes);
1671   _noCldNodeEnqueue(node, env);
1672   _STATS_RECORD_SEND_BRANCH_1();
1673   /* immeidate message is invisible to QD */
1674 //  CkpvAccess(_coreState)->create();
1675   _TRACE_CREATION_DONE(1);
1676 #else
1677   // no support for immediate message, send inline
1678   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1679 #endif
1680 }
1681
1682 extern "C"
1683 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1684 {
1685   if (node==CkMyNode())
1686   {
1687     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1688     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1689     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1690     if (obj!=NULL)
1691     { //Just directly call the group:
1692 #ifndef CMK_OPTIMIZE
1693       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1694 #else
1695       envelope *env=UsrToEnv(msg);
1696 #endif
1697       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1698       return;
1699     }
1700   }
1701   //Can't inline-- send the usual way
1702   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1703 }
1704
1705 extern "C"
1706 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1707 {
1708   if (opts & CK_MSG_INLINE) {
1709     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1710     return;
1711   }
1712   if (opts & CK_MSG_IMMEDIATE) {
1713     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1714     return;
1715   }
1716   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1717   _STATS_RECORD_SEND_NODE_BRANCH_1();
1718   CkpvAccess(_coreState)->create();
1719 }
1720
1721 extern "C"
1722 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1723 {
1724 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1725   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1726   _noCldEnqueueMulti(npes, nodes, env);
1727 #else
1728   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1729   CpvAccess(_qd)->create(-npes);
1730 #endif
1731   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1732   CpvAccess(_qd)->create(npes);
1733 }
1734
1735 extern "C"
1736 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1737 {
1738   if (opts & CK_MSG_IMMEDIATE) {
1739     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1740     return;
1741   }
1742     // normal mesg
1743   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1744   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1745   CpvAccess(_qd)->create(npes);
1746 }
1747
1748 extern "C"
1749 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1750 {
1751   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1752   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1753   CpvAccess(_qd)->create(CkNumNodes());
1754 }
1755
1756 //Needed by delegation manager:
1757 extern "C"
1758 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1759 { return _prepareMsg(eIdx,msg,pCid); }
1760 extern "C"
1761 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1762 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1763 extern "C"
1764 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1765 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1766
1767 void _ckModuleInit(void) {
1768         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1769         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1770         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1771         CkpvInitialize(TokenPool*, _tokenPool);
1772         CkpvAccess(_tokenPool) = new TokenPool;
1773 }
1774
1775
1776 /************** Send: Arrays *************/
1777
1778 extern void CkArrayManagerInsert(int onPe,void *msg);
1779 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1780
1781 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1782 {
1783   _CHECK_USED(env);
1784   _SET_USED(env, 1);
1785   env->setMsgtype(type);
1786 #ifndef CMK_OPTIMIZE
1787   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1788 #endif
1789   CmiSetHandler(env, _charmHandlerIdx);
1790   CpvAccess(_qd)->create();
1791 }
1792
1793 extern "C"
1794 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1795   register envelope *env = UsrToEnv(msg);
1796   env->getsetArrayMgr()=aID;
1797   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1798   CldEnqueue(pe, env, _infoIdx);
1799 }
1800
1801 extern "C"
1802 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1803   register envelope *env = UsrToEnv(msg);
1804   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1805 #ifdef _FAULT_MLOG_
1806         sendTicketArrayRequest(env,pe,_infoIdx);
1807 #else
1808   if (opts & CK_MSG_IMMEDIATE)
1809     CmiBecomeImmediate(env);
1810   if (opts & CK_MSG_SKIP_OR_IMM)
1811     _noCldEnqueue(pe, env);
1812   else
1813     _skipCldEnqueue(pe, env, _infoIdx);
1814 #endif
1815 }
1816
1817 class ElementDestroyer : public CkLocIterator {
1818 private:
1819         CkLocMgr *locMgr;
1820 public:
1821         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1822         void addLocation(CkLocation &loc) {
1823           loc.destroyAll();
1824         }
1825 };
1826
1827 void CkDeleteChares() {
1828   int i;
1829   int numGroups = CkpvAccess(_groupIDTable)->size();
1830
1831   // delete all array elements
1832   for(i=0;i<numGroups;i++) {
1833     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1834     if(obj && obj->isLocMgr())  {
1835       CkLocMgr *mgr = (CkLocMgr*)obj;
1836       ElementDestroyer destroyer(mgr);
1837       mgr->iterate(destroyer);
1838 printf("[%d] DELETE!\n", CkMyPe());
1839     }
1840   }
1841
1842   // delete all groups
1843   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1844   for(i=0;i<numGroups;i++) {
1845     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1846     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1847     if (obj) delete obj;
1848   }
1849   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1850
1851   // delete all node groups
1852   if (CkMyRank() == 0) {
1853     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1854     for(i=0;i<numNodeGroups;i++) {
1855       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1856       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1857       if (obj) delete obj;
1858     }
1859   }
1860 }
1861
1862 //------------------- Message Watcher (record/replay) ----------------
1863
1864 #include "crc32.h"
1865
1866 CkpvDeclare(int, envelopeEventID);
1867
1868 CkMessageWatcher::~CkMessageWatcher() {}
1869
1870 class CkMessageRecorder : public CkMessageWatcher {
1871 public:
1872   CkMessageRecorder(FILE *f_) { f=f_; }
1873   ~CkMessageRecorder() {
1874     fprintf(f,"-1 -1 -1 ");
1875     fclose(f);
1876   }
1877
1878 private:
1879   virtual CmiBool process(envelope *env,CkCoreState *ck) {
1880     if (env->getEvent()) {
1881       bool wasPacked = env->isPacked();
1882       if (!wasPacked) CkPackMessage(&env);
1883       //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
1884       unsigned int crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
1885       unsigned int crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
1886       fprintf(f,"%d %d %d %hhd %x %x\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2);
1887       if (!wasPacked) CkUnpackMessage(&env);
1888     }
1889     return CmiTrue;
1890   }
1891   virtual int process(CthThreadToken *token,CkCoreState *ck) {
1892     fprintf(f, "%d %d %d\n",CkMyPe(), -2, token->serialNo);
1893     return 1;
1894   }
1895 };
1896
1897 class CkMessageDetailRecorder : public CkMessageWatcher {
1898 public:
1899   CkMessageDetailRecorder(FILE *f_) {
1900     f=f_;
1901     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
1902      * The value of 'x' is the pointer size.
1903      */
1904     CmiUInt2 little = sizeof(void*);
1905     fwrite(&little, 2, 1, f);
1906   }
1907   ~CkMessageDetailRecorder() {fclose(f);}
1908 private:
1909   virtual CmiBool process(envelope *env, CkCoreState *ck) {
1910     bool wasPacked = env->isPacked();
1911     if (!wasPacked) CkPackMessage(&env);
1912     CmiUInt4 size = env->getTotalsize();
1913     fwrite(&size, 4, 1, f);
1914     fwrite(env, env->getTotalsize(), 1, f);
1915     if (!wasPacked) CkUnpackMessage(&env);
1916     return CmiTrue;
1917   }
1918 };
1919
1920 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1921 #define REPLAYDEBUG(args) /* empty */
1922
1923 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
1924
1925 class CkMessageReplay : public CkMessageWatcher {
1926   int counter;
1927         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1928         unsigned int crc1, crc2;
1929         /// Read the next message we need from the file:
1930         void getNext(void) {
1931           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
1932           if (nextSize > 0) {
1933             // We are reading a regular message
1934             if (3!=fscanf(f,"%d%x%x", &nexttype,&crc1,&crc2)) {
1935               CkAbort("CkMessageReplay> Syntax error reading replay file");
1936             }
1937           } else if (nextSize == -2) {
1938             // We are reading a special message (right now only thread awaken)
1939             getNext(); // For now simply skip that
1940           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
1941             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
1942             CkAbort("CkMessageReplay> Unrecognized input");
1943           }
1944             /*
1945                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
1946                         CkAbort("CkMessageReplay> Syntax error reading replay file");
1947                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1948                 }
1949                 */
1950                 counter++;
1951         }
1952         /// If this is the next message we need, advance and return CmiTrue.
1953         CmiBool isNext(envelope *env) {
1954                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1955                 if (nextEvent!=env->getEvent()) return CmiFalse;
1956                 if (nextSize!=env->getTotalsize())
1957                 {
1958                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]\n", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1959                         return CmiFalse;
1960                 }
1961                 bool wasPacked = env->isPacked();
1962                 if (!wasPacked) CkPackMessage(&env);
1963                 //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
1964                 unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
1965                 unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
1966                 if (crcnew1 != crc1) {
1967                   CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
1968                 }
1969         if (crcnew2 != crc2) {
1970           CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
1971         }
1972         if (!wasPacked) CkUnpackMessage(&env);
1973                 return CmiTrue;
1974         }
1975
1976         /// This is a (short) list of messages we aren't yet ready for:
1977         CkQ<envelope *> delayed;
1978
1979         /// Try to flush out any delayed messages
1980         void flush(void) {
1981                 int len=delayed.length();
1982                 for (int i=0;i<len;i++) {
1983                         envelope *env=delayed.deq();
1984                         if (isNext(env)) { /* this is the next message: process it */
1985                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1986                                 //CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1987                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
1988                                 return;
1989                         }
1990                         else /* Not ready yet-- put it back in the
1991                                 queue */
1992                           {
1993                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1994                                 delayed.enq(env);
1995                           }
1996                 }
1997         }
1998
1999 public:
2000         CkMessageReplay(FILE *f_) {
2001           counter=0;
2002           f=f_;
2003           getNext();
2004           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2005           CmiStartQD(CkMessageReplayQuiescence, this);
2006         }
2007         ~CkMessageReplay() {fclose(f);}
2008
2009 private:
2010         virtual CmiBool process(envelope *env,CkCoreState *ck) {
2011           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
2012                 if (env->getEvent() == 0) return CmiTrue;
2013                 if (isNext(env)) { /* This is the message we were expecting */
2014                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2015                         getNext(); /* Advance over this message */
2016                         flush(); /* try to process queued-up stuff */
2017                         return CmiTrue;
2018                 }
2019 #if CMK_SMP
2020                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2021                          // try next rank, we can't just buffer the msg and left
2022                          // we need to keep unprocessed msg on the fly
2023                         int nextpe = CkMyPe()+1;
2024                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2025                         nextpe = CkNodeFirst(CkMyNode());
2026                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2027                         return CmiFalse;
2028                 }
2029 #endif
2030                 else /*!isNext(env) */ {
2031                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
2032                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2033                         delayed.enq(env);
2034                         flush();
2035                         return CmiFalse;
2036                 }
2037         }
2038         virtual int process(CthThreadToken *token, CkCoreState *ck) {
2039           return 1;
2040         }
2041 };
2042
2043 class CkMessageDetailReplay : public CkMessageWatcher {
2044   void *getNext() {
2045     CmiUInt4 size; size_t nread;
2046     if ((nread=fread(&size, 4, 1, f)) < 1) {
2047       if (feof(f)) return NULL;
2048       CkPrintf("Broken record file (metadata) got %d\n",nread);
2049       CkAbort("");
2050     }
2051     void *env = CmiAlloc(size);
2052     if ((nread=fread(env, size, 1, f)) < 1) {
2053       CkPrintf("Broken record file (data) expecting %d, got %d\n",size,nread);
2054       CkAbort("");
2055     }
2056     return env;
2057   }
2058 public:
2059   CkMessageDetailReplay(FILE *f_) {
2060     f=f_;
2061     /* This must match what CkMessageDetailRecorder did */
2062     CmiUInt2 little;
2063     fread(&little, 2, 1, f);
2064     if (little != sizeof(void*)) {
2065       CkAbort("Replaying on a different architecture from which recording was done!");
2066     }
2067
2068     CmiPushPE(CkMyPe(), getNext());
2069   }
2070   virtual CmiBool process(envelope *env,CkCoreState *ck) {
2071     CmiPushPE(CkMyPe(), getNext());
2072     return CmiTrue;
2073   }
2074 };
2075
2076 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2077   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2078   CkMessageReplay *replay = (CkMessageReplay*)rep;
2079   //CmiStartQD(CkMessageReplayQuiescence, replay);
2080 }
2081
2082 extern "C" int CmiExecuteThreadResume(CthThreadToken *token) {
2083   CkCoreState *ck = CkpvAccess(_coreState);
2084   if (ck->watcher!=NULL) {
2085     return ck->watcher->processThread(token,ck);
2086   }
2087   return 1;
2088 }
2089
2090 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2091
2092 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2093
2094         int i;
2095         char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2096         strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2097         sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2098         FILE *f=fopen(fName,permissions);
2099         REPLAYDEBUG("openReplayfile "<<fName);
2100         if (f==NULL) {
2101                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2102                         CkMyPe(),fName,permissions);
2103                 CkAbort("openReplayFile> Could not open replay file");
2104         }
2105         return f;
2106 }
2107
2108 #include "ckliststring.h"
2109 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2110     char *procs = NULL;
2111     replaySystem = 0;
2112         REPLAYDEBUG("CkMessageWatcherInit ");
2113     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2114         CkListString list(procs);
2115         if (list.includes(CkMyPe())) {
2116           CpdSetInitializeMemory(1);
2117           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2118         }
2119     }
2120         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2121             CpdSetInitializeMemory(1);
2122                 ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2123         }
2124         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2125             CpdSetInitializeMemory(1);
2126             // Set the parameters of the processor
2127 #if CMK_SHARED_VARS_UNAVAILABLE
2128             _Cmi_mype = atoi(procs);
2129             while (procs[0]!='/') procs++;
2130             procs++;
2131             _Cmi_numpes = atoi(procs);
2132 #else
2133             CkAbort("+replay-detail available only for non-SMP build");
2134 #endif
2135             replaySystem = 1;
2136             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2137         }
2138     if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream")) {
2139         CpdSetInitializeMemory(1);
2140         ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2141     }
2142 }
2143
2144 extern "C"
2145 int CkMessageToEpIdx(void *msg) {
2146         envelope *env=UsrToEnv(msg);
2147         int ep=env->getEpIdx();
2148         if (ep==CkIndex_CkArray::recvBroadcast(0))
2149                 return env->getsetArrayBcastEp();
2150         else
2151                 return ep;
2152 }
2153
2154 extern "C"
2155 int getCharmEnvelopeSize() {
2156   return sizeof(envelope);
2157 }
2158
2159
2160 #include "CkMarshall.def.h"
2161