Adding hook when resuming a normal thread of execution (i.e not the main thread or...
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #ifndef CMK_CHARE_USE_PTR
29 CpvDeclare(CkVec<void *>, chare_objs);
30 CpvDeclare(CkVec<int>, chare_types);
31 CpvDeclare(CkVec<VidBlock *>, vidblocks);
32 #endif
33
34 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35
36 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37
38 int CMessage_CkMessage::__idx=-1;
39 int CMessage_CkArgMsg::__idx=0;
40 int CkIndex_Chare::__idx;
41 int CkIndex_Group::__idx;
42 int CkIndex_ArrayBase::__idx=-1;
43
44 extern int _defaultObjectQ;
45
46 //Charm++ virtual functions: declaring these here results in a smaller executable
47 Chare::Chare(void) {
48   thishandle.onPE=CkMyPe();
49   thishandle.objPtr=this;
50 #ifndef CMK_CHARE_USE_PTR
51      // for plain chare, objPtr is actually the index to chare obj table
52   if (chareIdx >= 0) thishandle.objPtr=(void*)chareIdx;
53 #endif
54 #ifdef _FAULT_MLOG_
55   mlogData = new ChareMlogData();
56   mlogData->objID.type = TypeChare;
57   mlogData->objID.data.chare.id = thishandle;
58 #endif
59 #if CMK_OBJECT_QUEUE_AVAILABLE
60   if (_defaultObjectQ)  CkEnableObjQ();
61 #endif
62 }
63
64 Chare::Chare(CkMigrateMessage* m) {
65   thishandle.onPE=CkMyPe();
66   thishandle.objPtr=this;
67
68 #if CMK_OBJECT_QUEUE_AVAILABLE
69   if (_defaultObjectQ)  CkEnableObjQ();
70 #endif
71 }
72
73 void Chare::CkEnableObjQ()
74 {
75 #if CMK_OBJECT_QUEUE_AVAILABLE
76   objQ.create();
77 #endif
78 }
79
80 Chare::~Chare() {}
81
82 void Chare::pup(PUP::er &p)
83 {
84   p(thishandle.onPE);
85   thishandle.objPtr=(void *)this;
86 #ifndef CMK_CHARE_USE_PTR
87   p(chareIdx);
88   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
89 #endif
90 #ifdef _FAULT_MLOG_
91   if(p.isUnpacking()){
92     mlogData = new ChareMlogData();
93   }
94   mlogData->pup(p);
95 #endif
96 }
97
98 int Chare::ckGetChareType() const {
99   return -3;
100 }
101 char *Chare::ckDebugChareName(void) {
102   char buf[100];
103   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
104   return strdup(buf);
105 }
106 int Chare::ckDebugChareID(char *str, int limit) {
107   // pure chares for now do not have a valid ID
108   str[0] = 0;
109   return 1;
110 }
111 void Chare::ckDebugPup(PUP::er &p) {
112   pup(p);
113 }
114
115 /// This method is called before starting a [threaded] entry method.
116 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
117   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
118   traceAddThreadListeners(th, UsrToEnv(msg));
119 }
120
121
122 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
123   p.comment("Bytes");
124   int ts=UsrToEnv(msg)->getTotalsize();
125   int msgLen=ts-sizeof(envelope);
126   if (msgLen>0)
127     p((char*)msg,msgLen);
128 }
129
130 IrrGroup::IrrGroup(void) {
131   thisgroup = CkpvAccess(_currentGroup);
132 #ifdef _FAULT_MLOG_
133         mlogData->objID.type = TypeGroup;
134         mlogData->objID.data.group.id = thisgroup;
135         mlogData->objID.data.group.onPE = CkMyPe();
136 #endif
137 }
138
139 IrrGroup::~IrrGroup() {
140   // remove the object pointer
141   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
142   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
143   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
144 }
145
146 void IrrGroup::pup(PUP::er &p)
147 {
148   Chare::pup(p);
149   p|thisgroup;
150 }
151
152 int IrrGroup::ckGetChareType() const {
153   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
154 }
155
156 int IrrGroup::ckDebugChareID(char *str, int limit) {
157   if (limit<5) return -1;
158   str[0] = 1;
159   *((int*)&str[1]) = thisgroup.idx;
160   return 5;
161 }
162
163 char *IrrGroup::ckDebugChareName() {
164   return strdup(_chareTable[ckGetChareType()]->name);
165 }
166
167 void IrrGroup::ckJustMigrated(void)
168 {
169 }
170
171 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
172   /* FIXME: **CW** not entirely sure what we should do here yet */
173 }
174
175 void Group::CkAddThreadListeners(CthThread th, void *msg) {
176   Chare::CkAddThreadListeners(th, msg);
177   CthSetThreadID(th, thisgroup.idx, 0, 0);
178 }
179
180 void Group::pup(PUP::er &p)
181 {
182   CkReductionMgr::pup(p);
183   reductionInfo.pup(p);
184 }
185
186 /**** Delegation Manager Group */
187 CkDelegateMgr::~CkDelegateMgr() { }
188
189 //Default delegator implementation: do not delegate-- send directly
190 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
191   { CkSendMsg(ep,m,c); }
192 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
193   { CkSendMsgBranch(ep,m,onPE,g); }
194 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
195   { CkBroadcastMsgBranch(ep,m,g); }
196 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
197   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
198 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
199   { CkSendMsgNodeBranch(ep,m,onNode,g); }
200 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
201   { CkBroadcastMsgNodeBranch(ep,m,g); }
202 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
203   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
204 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
205 {
206         CProxyElement_ArrayBase ap(a,idx);
207         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
208 }
209 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
210 {
211         CProxyElement_ArrayBase ap(a,idx);
212         ap.ckSend((CkArrayMessage *)m,ep);
213 }
214 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
215 {
216         CProxy_ArrayBase ap(a);
217         ap.ckBroadcast((CkArrayMessage *)m,ep);
218 }
219
220 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
221 {
222         CmiAbort("ArraySectionSend is not implemented!\n");
223 /*
224         CProxyElement_ArrayBase ap(a,idx);
225         ap.ckSend((CkArrayMessage *)m,ep);
226 */
227 }
228
229 /*** Proxy <-> delegator communication */
230 CkDelegateData::~CkDelegateData() {}
231
232 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
233   return pd; // default implementation ignores pup call
234 }
235
236 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
237    this tricky manual reference counting business... */
238
239 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
240         if (dPtr) dPtr->ref();
241         ckUndelegate();
242         delegatedMgr = dTo;
243         delegatedPtr = dPtr;
244 }
245 void CProxy::ckUndelegate(void) {
246         delegatedMgr=NULL;
247         if (delegatedPtr) delegatedPtr->unref();
248         delegatedPtr=NULL;
249 }
250
251 /// Copy constructor
252 CProxy::CProxy(const CProxy &src)
253     :delegatedMgr(src.delegatedMgr)
254 {
255     delegatedPtr = NULL;
256     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
257         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
258 }
259
260 /// Assignment operator
261 CProxy& CProxy::operator=(const CProxy &src) {
262         CkDelegateData *oldPtr=delegatedPtr;
263         ckUndelegate();
264         delegatedMgr=src.delegatedMgr;
265
266         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
267             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
268         else
269             delegatedPtr = NULL;
270
271         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
272         if (oldPtr) oldPtr->unref();
273         return *this;
274 }
275
276 void CProxy::pup(PUP::er &p) {
277       CkGroupID delegatedTo;
278       delegatedTo.setZero();
279       int isNodeGroup = 0;
280       if (!p.isUnpacking()) {
281         if (delegatedMgr) {
282           delegatedTo = delegatedMgr->CkGetGroupID();
283           isNodeGroup = delegatedMgr->isNodeGroup();
284         }
285       }
286       p|delegatedTo;
287       if (!delegatedTo.isZero()) {
288         p|isNodeGroup;
289         if (p.isUnpacking()) {
290           if (isNodeGroup)
291                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
292           else
293                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
294         }
295
296         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
297         if (p.isUnpacking() && delegatedPtr)
298             delegatedPtr->ref();
299       }
300 }
301
302 /**** Array sections */
303 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
304 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
305   _cookie.aid = aid;    \
306   _cookie.get_pe() = CkMyPe();  \
307   _elems = new CkArrayIndexMax[nElems]; \
308   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
309   pelist = NULL;        \
310   npes  = 0;    \
311 }
312
313 CKSECTIONID_CONSTRUCTOR_DEF(1D)
314 CKSECTIONID_CONSTRUCTOR_DEF(2D)
315 CKSECTIONID_CONSTRUCTOR_DEF(3D)
316 CKSECTIONID_CONSTRUCTOR_DEF(4D)
317 CKSECTIONID_CONSTRUCTOR_DEF(5D)
318 CKSECTIONID_CONSTRUCTOR_DEF(6D)
319 CKSECTIONID_CONSTRUCTOR_DEF(Max)
320
321 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
322   pelist = new int[npes];
323   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
324   _cookie.aid = gid;
325 }
326
327 CkSectionID::CkSectionID(const CkSectionID &sid) {
328   int i;
329   _cookie = sid._cookie;
330   _nElems = sid._nElems;
331   if (_nElems > 0) {
332     _elems = new CkArrayIndexMax[_nElems];
333     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
334   } else _elems = NULL;
335   npes = sid.npes;
336   if (npes > 0) {
337     pelist = new int[npes];
338     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
339   } else pelist = NULL;
340 }
341
342 void CkSectionID::operator=(const CkSectionID &sid) {
343   int i;
344   _cookie = sid._cookie;
345   _nElems = sid._nElems;
346   if (_nElems > 0) {
347     _elems = new CkArrayIndexMax[_nElems];
348     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
349   } else _elems = NULL;
350   npes = sid.npes;
351   if (npes > 0) {
352     pelist = new int[npes];
353     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
354   } else pelist = NULL;
355 }
356
357 void CkSectionID::pup(PUP::er &p) {
358     p | _cookie;
359     p(_nElems);
360     if (_nElems > 0) {
361       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
362       for (int i=0; i< _nElems; i++) p | _elems[i];
363       npes = 0;
364       pelist = NULL;
365     } else {
366       // If _nElems is zero, than this section describes processors instead of array elements
367       _elems = NULL;
368       p(npes);
369       if (p.isUnpacking()) pelist = new int[npes];
370       p(pelist, npes);
371     }
372 }
373
374 /**** Tiny random API routines */
375
376 #ifdef CMK_CUDA
377 void CUDACallbackManager(void *fn) {
378   if (fn != NULL) {
379     CkCallback *cb = (CkCallback*) fn;
380     cb->send();
381   }
382 }
383
384 #endif
385
386 extern "C"
387 void CkSetRefNum(void *msg, int ref)
388 {
389   UsrToEnv(msg)->setRef(ref);
390 }
391
392 extern "C"
393 int CkGetRefNum(void *msg)
394 {
395   return UsrToEnv(msg)->getRef();
396 }
397
398 extern "C"
399 int CkGetSrcPe(void *msg)
400 {
401   return UsrToEnv(msg)->getSrcPe();
402 }
403
404 extern "C"
405 int CkGetSrcNode(void *msg)
406 {
407   return CmiNodeOf(CkGetSrcPe(msg));
408 }
409
410 extern "C"
411 void *CkLocalBranch(CkGroupID gID) {
412   return _localBranch(gID);
413 }
414
415 static
416 void *_ckLocalNodeBranch(CkGroupID groupID) {
417   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
418   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
419   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
420   return retval;
421 }
422
423 extern "C"
424 void *CkLocalNodeBranch(CkGroupID groupID)
425 {
426   void *retval;
427   // we are called in a constructor
428   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
429     return CkpvAccess(_currentNodeGroupObj);
430   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
431   { // Nodegroup hasn't finished being created yet-- schedule...
432     CsdScheduler(0);
433   }
434   return retval;
435 }
436
437 extern "C"
438 void *CkLocalChare(const CkChareID *pCid)
439 {
440         int pe=pCid->onPE;
441         if (pe<0) { //A virtual chare ID
442                 if (pe!=(-(CkMyPe()+1)))
443                         return NULL;//VID block not on this PE
444                 VidBlock *v=(VidBlock *)pCid->objPtr;
445                 return v->getLocalChare();
446         }
447         else
448         { //An ordinary chare ID
449                 if (pe!=CkMyPe())
450                         return NULL;//Chare not on this PE
451                 return pCid->objPtr;
452         }
453 }
454
455 CkpvDeclare(char **,Ck_argv);
456 extern "C" char **CkGetArgv(void) {
457         return CkpvAccess(Ck_argv);
458 }
459 extern "C" int CkGetArgc(void) {
460         return CmiGetArgc(CkpvAccess(Ck_argv));
461 }
462
463 /******************** Basic support *****************/
464 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
465 {
466 #ifdef _FAULT_MLOG_
467         CpvAccess(_currentObj) = (Chare *)obj;
468 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
469 #endif
470   //BIGSIM_OOC DEBUGGING
471   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
472   //fflush(stdout);
473 #ifndef CMK_OPTIMIZE
474   CpdBeforeEp(epIdx, obj, msg);
475 #endif
476   _entryTable[epIdx]->call(msg, obj);
477 #ifndef CMK_OPTIMIZE
478   CpdAfterEp(epIdx);
479 #endif
480   if (_entryTable[epIdx]->noKeep)
481   { /* Method doesn't keep/delete the message, so we have to: */
482     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
483   }
484 }
485 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
486 {
487   //BIGSIM_OOC DEBUGGING
488   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
489   //fflush(stdout);
490
491   void *deliverMsg;
492 #ifdef _FAULT_MLOG_
493         CpvAccess(_currentObj) = (Chare *)obj;
494 #endif
495   if (_entryTable[epIdx]->noKeep)
496   { /* Deliver a read-only copy of the message */
497     deliverMsg=(void *)msg;
498   } else
499   { /* Method needs a copy of the message to keep/delete */
500     void *oldMsg=(void *)msg;
501     deliverMsg=CkCopyMsg(&oldMsg);
502 #ifndef CMK_OPTIMIZE
503     if (oldMsg!=msg)
504       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
505 #endif
506   }
507 #ifndef CMK_OPTIMIZE
508   CpdBeforeEp(epIdx, obj, (void*)msg);
509 #endif
510   _entryTable[epIdx]->call(deliverMsg, obj);
511 #ifndef CMK_OPTIMIZE
512   CpdAfterEp(epIdx);
513 #endif
514 }
515
516 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
517 {
518   register void *msg = EnvToUsr(env);
519   _SET_USED(env, 0);
520   CkDeliverMessageFree(epIdx,msg,obj);
521 }
522
523 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
524 {
525
526 #ifndef CMK_OPTIMIZE /* Consider tracing: */
527   if (_entryTable[epIdx]->traceEnabled) {
528     _TRACE_BEGIN_EXECUTE(env);
529     _invokeEntryNoTrace(epIdx,env,obj);
530     _TRACE_END_EXECUTE();
531   }
532   else
533 #endif
534     _invokeEntryNoTrace(epIdx,env,obj);
535
536 }
537
538 /********************* Creation ********************/
539
540 extern "C"
541 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
542 {
543   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
544   envelope *env = UsrToEnv(msg);
545   _CHECK_USED(env);
546   if(pCid == 0) {
547     env->setMsgtype(NewChareMsg);
548   } else {
549     pCid->onPE = (-(CkMyPe()+1));
550     //  pCid->magic = _GETIDX(cIdx);
551     pCid->objPtr = (void *) new VidBlock();
552     _MEMCHECK(pCid->objPtr);
553     env->setMsgtype(NewVChareMsg);
554     env->setVidPtr(pCid->objPtr);
555 #ifndef CMK_CHARE_USE_PTR
556     CpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
557     int idx = CpvAccess(vidblocks).size()-1;
558     pCid->objPtr = (void *)idx;
559     env->setVidPtr((void *)idx);
560 #endif
561   }
562   env->setEpIdx(eIdx);
563   env->setSrcPe(CkMyPe());
564   CmiSetHandler(env, _charmHandlerIdx);
565   _TRACE_CREATION_1(env);
566   CpvAccess(_qd)->create();
567   _STATS_RECORD_CREATE_CHARE_1();
568   _SET_USED(env, 1);
569   if(destPE == CK_PE_ANY)
570     env->setForAnyPE(1);
571   else
572     env->setForAnyPE(0);
573   CldEnqueue(destPE, env, _infoIdx);
574   _TRACE_CREATION_DONE(1);
575 }
576
577 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
578 {
579   register int gIdx = _entryTable[epIdx]->chareIdx;
580   register void *obj = malloc(_chareTable[gIdx]->size);
581   _MEMCHECK(obj);
582   setMemoryTypeChare(obj);
583   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
584   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
585   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
586   CkpvAccess(_groupIDTable)->push_back(groupID);
587   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
588   if(ptrq) {
589     void *pending;
590     while((pending=ptrq->deq())!=0)
591       CldEnqueue(CkMyPe(), pending, _infoIdx);
592 //    delete ptrq;
593       CkpvAccess(_groupTable)->find(groupID).clearPending();
594   }
595   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
596
597   CkpvAccess(_currentGroup) = groupID;
598   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
599 #ifndef CMK_CHARE_USE_PTR
600   ((Chare *)obj)->chareIdx = -1;
601 #endif
602   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
603   _STATS_RECORD_PROCESS_GROUP_1();
604 }
605
606 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
607 {
608   register int gIdx = _entryTable[epIdx]->chareIdx;
609   int objSize=_chareTable[gIdx]->size;
610   register void *obj = malloc(objSize);
611   _MEMCHECK(obj);
612   setMemoryTypeChare(obj);
613   CkpvAccess(_currentGroup) = groupID;
614
615 // Now that the NodeGroup is created, add it to the table.
616 //  NodeGroups can be accessed by multiple processors, so
617 //  this is in the opposite order from groups - invoking the constructor
618 //  before registering it.
619 // User may call CkLocalNodeBranch() inside the nodegroup constructor
620 //  store nodegroup into _currentNodeGroupObj
621   CkpvAccess(_currentNodeGroupObj) = obj;
622 #ifndef CMK_CHARE_USE_PTR
623   ((Chare *)obj)->chareIdx = -1;
624 #endif
625   _invokeEntryNoTrace(epIdx,env,obj);
626   CkpvAccess(_currentNodeGroupObj) = NULL;
627   _STATS_RECORD_PROCESS_NODE_GROUP_1();
628
629   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
630   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
631   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
632   CksvAccess(_nodeGroupIDTable).push_back(groupID);
633
634   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
635   if(ptrq) {
636     void *pending;
637     while((pending=ptrq->deq())!=0)
638       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
639 //    delete ptrq;
640       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
641   }
642   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
643 }
644
645 void _createGroup(CkGroupID groupID, envelope *env)
646 {
647   _CHECK_USED(env);
648   _SET_USED(env, 1);
649   register int epIdx = env->getEpIdx();
650   int gIdx = _entryTable[epIdx]->chareIdx;
651   CkNodeGroupID rednMgr;
652   if(_chareTable[gIdx]->isIrr == 0){
653                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
654                 rednMgr = rednMgrProxy;
655 //              rednMgrProxy.setAttachedGroup(groupID);
656   }else{
657         rednMgr.setZero();
658   }
659   env->setGroupNum(groupID);
660   env->setSrcPe(CkMyPe());
661   env->setRednMgr(rednMgr);
662   env->setGroupEpoch(CkpvAccess(_charmEpoch));
663
664   if(CkNumPes()>1) {
665     CkPackMessage(&env);
666     CmiSetHandler(env, _bocHandlerIdx);
667     _numInitMsgs++;
668     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
669     CpvAccess(_qd)->create(CkNumPes()-1);
670     CkUnpackMessage(&env);
671   }
672   _STATS_RECORD_CREATE_GROUP_1();
673   CkCreateLocalGroup(groupID, epIdx, env);
674 }
675
676 void _createNodeGroup(CkGroupID groupID, envelope *env)
677 {
678   _CHECK_USED(env);
679   _SET_USED(env, 1);
680   register int epIdx = env->getEpIdx();
681   env->setGroupNum(groupID);
682   env->setSrcPe(CkMyPe());
683   env->setGroupEpoch(CkpvAccess(_charmEpoch));
684   if(CkNumNodes()>1) {
685     CkPackMessage(&env);
686     CmiSetHandler(env, _bocHandlerIdx);
687     _numInitMsgs++;
688     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
689     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
690     CpvAccess(_qd)->create(CkNumNodes()-1);
691     CkUnpackMessage(&env);
692   }
693   _STATS_RECORD_CREATE_NODE_GROUP_1();
694   CkCreateLocalNodeGroup(groupID, epIdx, env);
695 }
696
697 // new _groupCreate
698
699 static CkGroupID _groupCreate(envelope *env)
700 {
701   register CkGroupID groupNum;
702
703   // check CkMyPe(). if it is 0 then idx is _numGroups++
704   // if not, then something else...
705   if(CkMyPe() == 0)
706      groupNum.idx = CkpvAccess(_numGroups)++;
707   else
708      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
709   _createGroup(groupNum, env);
710   return groupNum;
711 }
712
713 // new _nodeGroupCreate
714 static CkGroupID _nodeGroupCreate(envelope *env)
715 {
716   register CkGroupID groupNum;
717   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
718   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
719           groupNum.idx = CksvAccess(_numNodeGroups)++;
720    else
721           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
722   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
723   _createNodeGroup(groupNum, env);
724   return groupNum;
725 }
726
727 /**** generate the group idx when group is creator pe is not pe0
728  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
729  **** remaining bits contain the group creator processor number and
730  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
731
732 int _getGroupIdx(int numNodes,int myNode,int numGroups)
733 {
734         int idx;
735         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
736         int n = 32 - (x+1);                                     // number of bits remaining for the index
737         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
738                                                                 // then add the next available index
739         // of course this won't work when int is 8 bytes long on T3E
740         //idx |= 0x80000000;                                      // set the most significant bit to 1
741         idx = - idx;
742                                                                 // if int is not 32 bits, wouldn't this be wrong?
743         return idx;
744 }
745
746 extern "C"
747 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
748 {
749   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
750   register envelope *env = UsrToEnv(msg);
751   env->setMsgtype(BocInitMsg);
752   env->setEpIdx(eIdx);
753   env->setSrcPe(CkMyPe());
754   _TRACE_CREATION_N(env, CkNumPes());
755   CkGroupID gid = _groupCreate(env);
756   _TRACE_CREATION_DONE(1);
757   return gid;
758 }
759
760 extern "C"
761 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
762 {
763   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
764   register envelope *env = UsrToEnv(msg);
765   env->setMsgtype(NodeBocInitMsg);
766   env->setEpIdx(eIdx);
767   env->setSrcPe(CkMyPe());
768   _TRACE_CREATION_N(env, CkNumNodes());
769   CkGroupID gid = _nodeGroupCreate(env);
770   _TRACE_CREATION_DONE(1);
771   return gid;
772 }
773
774 static inline void *_allocNewChare(envelope *env, int &idx)
775 {
776   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
777   void *tmp=malloc(_chareTable[chareIdx]->size);
778   _MEMCHECK(tmp);
779 #ifndef CMK_CHARE_USE_PTR
780   CpvAccess(chare_objs).push_back(tmp);
781   CpvAccess(chare_types).push_back(chareIdx);
782   idx = CpvAccess(chare_objs).size()-1;
783 #endif
784   setMemoryTypeChare(tmp);
785   return tmp;
786 }
787
788 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
789 {
790   int idx;
791   register void *obj = _allocNewChare(env, idx);
792 #ifndef CMK_CHARE_USE_PTR
793   ((Chare *)obj)->chareIdx = idx;
794 #endif
795   _invokeEntry(env->getEpIdx(),env,obj);
796 }
797
798 void CkCreateLocalChare(int epIdx, envelope *env)
799 {
800   env->setEpIdx(epIdx);
801   _processNewChareMsg(NULL, env);
802 }
803
804 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
805 {
806   int idx;
807   register void *obj = _allocNewChare(env, idx);
808   register CkChareID *pCid = (CkChareID *)
809       _allocMsg(FillVidMsg, sizeof(CkChareID));
810   pCid->onPE = CkMyPe();
811 #ifndef CMK_CHARE_USE_PTR
812   pCid->objPtr = (void*)idx;
813 #else
814   pCid->objPtr = obj;
815 #endif
816   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
817   register envelope *ret = UsrToEnv(pCid);
818   ret->setVidPtr(env->getVidPtr());
819   register int srcPe = env->getSrcPe();
820   ret->setSrcPe(CkMyPe());
821   CmiSetHandler(ret, _charmHandlerIdx);
822   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
823   CpvAccess(_qd)->create();
824 #ifndef CMK_CHARE_USE_PTR
825   ((Chare *)obj)->chareIdx = idx;
826 #endif
827   _invokeEntry(env->getEpIdx(),env,obj);
828 }
829
830 /************** Receive: Chares *************/
831
832 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
833 {
834   register int epIdx = env->getEpIdx();
835   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
836   register void *obj;
837   if (mainIdx != -1)  {           // mainchare
838     CmiAssert(CkMyPe()==0);
839     obj = _mainTable[mainIdx]->getObj();
840   }
841   else {
842 #ifndef CMK_CHARE_USE_PTR
843     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
844       obj = CpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
845     else
846       obj = env->getObjPtr();
847 #else
848     obj = env->getObjPtr();
849 #endif
850   }
851   _invokeEntry(epIdx,env,obj);
852 }
853
854 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
855 {
856   register int epIdx = env->getEpIdx();
857   register void *obj = env->getObjPtr();
858   _invokeEntry(epIdx,env,obj);
859 }
860
861 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
862 {
863 #ifndef CMK_CHARE_USE_PTR
864   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
865 #else
866   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
867   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
868 #endif
869   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
870   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
871   vptr->fill(pcid->onPE, pcid->objPtr);
872   CmiFree(env);
873 }
874
875 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
876 {
877 #ifndef CMK_CHARE_USE_PTR
878   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
879 #else
880   VidBlock *vptr = (VidBlock *) env->getVidPtr();
881   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
882 #endif
883   _SET_USED(env, 1);
884   vptr->send(env);
885 }
886
887 /************** Receive: Groups ****************/
888
889 /**
890  This message is sent to this groupID--prepare to
891  handle this message by looking up the group,
892  and possibly stashing the message.
893 */
894 IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
895 {
896
897         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
898         IrrGroup *obj = ck->localBranch(groupID);
899         if (obj==NULL) { /* groupmember not yet created: stash message */
900                 ck->getGroupTable()->find(groupID).enqMsg(env);
901         }
902         else { /* will be able to process message */
903                 ck->process();
904         }
905         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
906         return obj;
907 }
908
909 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
910 {
911 #if CMK_LBDB_ON
912   // if there is a running obj being measured, stop it temporarily
913   LDObjHandle objHandle;
914   int objstopped = 0;
915   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
916   if (the_lbdb->RunningObject(&objHandle)) {
917     objstopped = 1;
918     the_lbdb->ObjectStop(objHandle);
919   }
920 #endif
921   _invokeEntry(epIdx,env,obj);
922 #if CMK_LBDB_ON
923   if (objstopped) the_lbdb->ObjectStart(objHandle);
924 #endif
925   _STATS_RECORD_PROCESS_BRANCH_1();
926 }
927
928 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
929 {
930   register CkGroupID groupID =  env->getGroupNum();
931   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
932   if(obj) {
933     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
934   }
935 }
936
937 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
938 {
939   env->setMsgtype(ForChareMsg);
940   env->setObjPtr(obj);
941   _processForChareMsg(ck,env);
942   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
943 }
944
945 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
946 {
947   env->setEpIdx(epIdx);
948   _deliverForNodeBocMsg(ck,env, obj);
949 }
950
951 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
952 {
953   register CkGroupID groupID = env->getGroupNum();
954   register void *obj;
955
956   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
957   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
958   if(!obj) { // groupmember not yet created
959 #if CMK_IMMEDIATE_MSG
960     if (CmiIsImmediate(env))     // buffer immediate message
961       CmiDelayImmediate();
962     else
963 #endif
964     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
965     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
966     return;
967   }
968   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
969 #if CMK_IMMEDIATE_MSG
970   if (!CmiIsImmediate(env))
971 #endif
972   ck->process();
973   env->setMsgtype(ForChareMsg);
974   env->setObjPtr(obj);
975   _processForChareMsg(ck,env);
976   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
977 }
978
979 void _processBocInitMsg(CkCoreState *ck,envelope *env)
980 {
981   register CkGroupID groupID = env->getGroupNum();
982   register int epIdx = env->getEpIdx();
983   CkCreateLocalGroup(groupID, epIdx, env);
984 }
985
986 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
987 {
988   register CkGroupID groupID = env->getGroupNum();
989   register int epIdx = env->getEpIdx();
990   CkCreateLocalNodeGroup(groupID, epIdx, env);
991 }
992
993 /************** Receive: Arrays *************/
994
995 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
996   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
997   if (mgr) {
998     _SET_USED(env, 0);
999     mgr->insertElement((CkMessage *)EnvToUsr(env));
1000   }
1001 }
1002 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1003   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
1004   if (mgr) {
1005     _SET_USED(env, 0);
1006     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
1007   }
1008 }
1009
1010 //BIGSIM_OOC DEBUGGING
1011 #define TELLMSGTYPE(x) //x
1012
1013 /**
1014  * This is the main converse-level handler used by all of Charm++.
1015  *
1016  * \addtogroup CriticalPathFramework
1017  */
1018 void _processHandler(void *converseMsg,CkCoreState *ck)
1019 {
1020   register envelope *env = (envelope *) converseMsg;
1021
1022 //#if CMK_RECORD_REPLAY
1023   if (ck->watcher!=NULL) {
1024     if (!ck->watcher->processMessage(env,ck)) return;
1025   }
1026 //#endif
1027 #ifdef _FAULT_MLOG_
1028         Chare *obj=NULL;
1029         CkObjID sender;
1030         MCount SN;
1031         MlogEntry *entry=NULL;
1032         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1033         env->getMsgtype() == ForArrayEltMsg){
1034                 sender = env->sender;
1035                 SN = env->SN;
1036                 int result = preProcessReceivedMessage(env,&obj,&entry);
1037                 if(result == 0){
1038                         return;
1039                 }
1040         }
1041 #endif
1042
1043 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1044   //  CkPrintf("START\n");
1045   criticalPath_start(env);
1046 #endif
1047
1048
1049   switch(env->getMsgtype()) {
1050 // Group support
1051     case BocInitMsg :
1052       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1053       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1054       _processBocInitMsg(ck,env);
1055       break;
1056     case NodeBocInitMsg :
1057       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1058       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1059       _processNodeBocInitMsg(ck,env);
1060       break;
1061     case ForBocMsg :
1062       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1063       // QD processing moved inside _processForBocMsg because it is conditional
1064       if(env->isPacked()) CkUnpackMessage(&env);
1065       _processForBocMsg(ck,env);
1066       // stats record moved inside _processForBocMsg because it is conditional
1067       break;
1068     case ForNodeBocMsg :
1069       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1070       // QD processing moved to _processForNodeBocMsg because it is conditional
1071       if(env->isPacked()) CkUnpackMessage(&env);
1072       _processForNodeBocMsg(ck,env);
1073       // stats record moved to _processForNodeBocMsg because it is conditional
1074       break;
1075
1076 // Array support
1077     case ArrayEltInitMsg:
1078       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1079       if(env->isPacked()) CkUnpackMessage(&env);
1080       _processArrayEltInitMsg(ck,env);
1081       break;
1082     case ForArrayEltMsg:
1083       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1084       if(env->isPacked()) CkUnpackMessage(&env);
1085       _processArrayEltMsg(ck,env);
1086       break;
1087
1088 // Chare support
1089     case NewChareMsg :
1090       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1091       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1092       _processNewChareMsg(ck,env);
1093       _STATS_RECORD_PROCESS_CHARE_1();
1094       break;
1095     case NewVChareMsg :
1096       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1097       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1098       _processNewVChareMsg(ck,env);
1099       _STATS_RECORD_PROCESS_CHARE_1();
1100       break;
1101     case ForChareMsg :
1102       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1103       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1104       _processForPlainChareMsg(ck,env);
1105       _STATS_RECORD_PROCESS_MSG_1();
1106       break;
1107     case ForVidMsg   :
1108       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1109       ck->process();
1110       _processForVidMsg(ck,env);
1111       break;
1112     case FillVidMsg  :
1113       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1114       ck->process();
1115       _processFillVidMsg(ck,env);
1116       break;
1117
1118     default:
1119       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1120   }
1121 #ifdef _FAULT_MLOG_
1122         if(obj != NULL){
1123                 postProcessReceivedMessage(obj,sender,SN,entry);
1124         }
1125 #endif
1126
1127
1128 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1129   criticalPath_end();
1130   //  CkPrintf("STOP\n");
1131 #endif
1132
1133
1134 }
1135
1136
1137 /******************** Message Send **********************/
1138
1139 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1140              int *queueing, int *priobits, unsigned int **prioptr)
1141 {
1142   register envelope *env = (envelope *)converseMsg;
1143   *pfn = (CldPackFn)CkPackMessage;
1144   *len = env->getTotalsize();
1145   *queueing = env->getQueueing();
1146   *priobits = env->getPriobits();
1147   *prioptr = (unsigned int *) env->getPrioPtr();
1148 }
1149
1150 void CkPackMessage(envelope **pEnv)
1151 {
1152   register envelope *env = *pEnv;
1153   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1154     register void *msg = EnvToUsr(env);
1155     _TRACE_BEGIN_PACK();
1156     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1157     _TRACE_END_PACK();
1158     env=UsrToEnv(msg);
1159     env->setPacked(1);
1160     *pEnv = env;
1161   }
1162 }
1163
1164 void CkUnpackMessage(envelope **pEnv)
1165 {
1166   register envelope *env = *pEnv;
1167   register int msgIdx = env->getMsgIdx();
1168   if(env->isPacked()) {
1169     register void *msg = EnvToUsr(env);
1170     _TRACE_BEGIN_UNPACK();
1171     msg = _msgTable[msgIdx]->unpack(msg);
1172     _TRACE_END_UNPACK();
1173     env=UsrToEnv(msg);
1174     env->setPacked(0);
1175     *pEnv = env;
1176   }
1177 }
1178
1179 //There's no reason for most messages to go through the Cld--
1180 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1181 // Thus these accellerated versions of the Cld calls.
1182
1183 static int index_objectQHandler;
1184 int index_tokenHandler;
1185 static int index_skipCldHandler;
1186
1187 static void _skipCldHandler(void *converseMsg)
1188 {
1189   register envelope *env = (envelope *)(converseMsg);
1190   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1191 #if CMK_GRID_QUEUE_AVAILABLE
1192   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1193     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1194                        env, env->getQueueing (), env->getPriobits (),
1195                        (unsigned int *) env->getPrioPtr ());
1196   } else {
1197     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1198                        env, env->getQueueing (), env->getPriobits (),
1199                        (unsigned int *) env->getPrioPtr ());
1200   }
1201 #else
1202   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1203         env, env->getQueueing(),env->getPriobits(),
1204         (unsigned int *)env->getPrioPtr());
1205 #endif
1206 }
1207
1208
1209 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1210 // Made non-static to be used by ckmessagelogging
1211 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1212 {
1213   if(pe == CkMyPe() ){
1214     if(!CmiNodeAlive(CkMyPe())){
1215         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1216 //      return;
1217     }
1218   }
1219   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1220 #if CMK_OBJECT_QUEUE_AVAILABLE
1221     Chare *obj = CkFindObjectPtr(env);
1222     if (obj && obj->CkGetObjQueue().queue()) {
1223       _enqObjQueue(obj, env);
1224     }
1225     else
1226 #endif
1227     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1228         env, env->getQueueing(),env->getPriobits(),
1229         (unsigned int *)env->getPrioPtr());
1230   } else {
1231     CkPackMessage(&env);
1232     int len=env->getTotalsize();
1233     CmiSetXHandler(env,CmiGetHandler(env));
1234 #if CMK_OBJECT_QUEUE_AVAILABLE
1235     CmiSetHandler(env,index_objectQHandler);
1236 #else
1237     CmiSetHandler(env,index_skipCldHandler);
1238 #endif
1239     CmiSetInfo(env,infoFn);
1240     if (pe==CLD_BROADCAST) {
1241 #ifdef _FAULT_MLOG_             
1242                         CmiSyncBroadcast(len, (char *)env);
1243 #else
1244                         CmiSyncBroadcastAndFree(len, (char *)env); 
1245 #endif
1246
1247 }
1248     else if (pe==CLD_BROADCAST_ALL) { 
1249 #ifdef _FAULT_MLOG_             
1250                         CmiSyncBroadcastAll(len, (char *)env);
1251 #else
1252                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1253 #endif
1254
1255 }
1256     else{
1257 #ifdef _FAULT_MLOG_             
1258                         CmiSyncSend(pe, len, (char *)env);
1259 #else
1260                         CmiSyncSendAndFree(pe, len, (char *)env);
1261 #endif
1262
1263                 }
1264   }
1265 }
1266
1267 #if CMK_BLUEGENE_CHARM
1268 #   define  _skipCldEnqueue   CldEnqueue
1269 #endif
1270
1271 // by pass Charm++ priority queue, send as Converse message
1272 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1273 {
1274   CkPackMessage(&env);
1275   int len=env->getTotalsize();
1276   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1277 }
1278
1279 static void _noCldEnqueue(int pe, envelope *env)
1280 {
1281 /*
1282   if (pe == CkMyPe()) {
1283     CmiHandleMessage(env);
1284   } else
1285 */
1286   CkPackMessage(&env);
1287   int len=env->getTotalsize();
1288   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1289   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1290   else CmiSyncSendAndFree(pe, len, (char *)env);
1291 }
1292
1293 //static void _noCldNodeEnqueue(int node, envelope *env)
1294 //Made non-static to be used by ckmessagelogging
1295 void _noCldNodeEnqueue(int node, envelope *env)
1296 {
1297 /*
1298   if (node == CkMyNode()) {
1299     CmiHandleMessage(env);
1300   } else {
1301 */
1302   CkPackMessage(&env);
1303   int len=env->getTotalsize();
1304   if (node==CLD_BROADCAST) { 
1305 #ifdef _FAULT_MLOG_
1306         CmiSyncNodeBroadcast(len, (char *)env);
1307 #else
1308         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1309 #endif
1310 }
1311   else if (node==CLD_BROADCAST_ALL) { 
1312 #ifdef _FAULT_MLOG_
1313                 CmiSyncNodeBroadcastAll(len, (char *)env);
1314 #else
1315                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1316 #endif
1317
1318 }
1319   else {
1320 #ifdef _FAULT_MLOG_
1321         CmiSyncNodeSend(node, len, (char *)env);
1322 #else
1323         CmiSyncNodeSendAndFree(node, len, (char *)env);
1324 #endif
1325   }
1326 }
1327
1328 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1329 {
1330   register envelope *env = UsrToEnv(msg);
1331   _CHECK_USED(env);
1332   _SET_USED(env, 1);
1333   env->setMsgtype(ForChareMsg);
1334   env->setEpIdx(eIdx);
1335   env->setSrcPe(CkMyPe());
1336 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1337   criticalPath_send(env);
1338   automaticallySetMessagePriority(env);
1339 #endif
1340 #ifndef CMK_OPTIMIZE
1341   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1342 #endif
1343 #if CMK_OBJECT_QUEUE_AVAILABLE
1344   CmiSetHandler(env, index_objectQHandler);
1345 #else
1346   CmiSetHandler(env, _charmHandlerIdx);
1347 #endif
1348   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1349     register int pe = -(pCid->onPE+1);
1350     if(pe==CkMyPe()) {
1351 #ifndef CMK_CHARE_USE_PTR
1352       VidBlock *vblk = CpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1353 #else
1354       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1355 #endif
1356       void *objPtr;
1357       if (NULL!=(objPtr=vblk->getLocalChare()))
1358       { //A ready local chare
1359         env->setObjPtr(objPtr);
1360         return pe;
1361       }
1362       else { //The vidblock is not ready-- forget it
1363         vblk->send(env);
1364         return -1;
1365       }
1366     } else { //Valid vidblock for another PE:
1367       env->setMsgtype(ForVidMsg);
1368       env->setVidPtr(pCid->objPtr);
1369       return pe;
1370     }
1371   }
1372   else {
1373     env->setObjPtr(pCid->objPtr);
1374     return pCid->onPE;
1375   }
1376 }
1377
1378 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1379 {
1380   int destPE = _prepareMsg(eIdx, msg, pCid);
1381   if (destPE != -1) {
1382     register envelope *env = UsrToEnv(msg);
1383 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1384     criticalPath_send(env);
1385     automaticallySetMessagePriority(env);
1386 #endif
1387     CmiBecomeImmediate(env);
1388   }
1389   return destPE;
1390 }
1391
1392 extern "C"
1393 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1394 {
1395   if (opts & CK_MSG_INLINE) {
1396     CkSendMsgInline(entryIdx, msg, pCid, opts);
1397     return;
1398   }
1399 #ifndef CMK_OPTIMIZE
1400   if (opts & CK_MSG_IMMEDIATE) {
1401     CmiAbort("Immediate message is not allowed in Chare!");
1402   }
1403 #endif
1404   register envelope *env = UsrToEnv(msg);
1405   int destPE=_prepareMsg(entryIdx,msg,pCid);
1406   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1407   // VidBlock was not yet filled). The problem is that the creation was never
1408   // traced later when the VidBlock was filled. One solution is to trace the
1409   // creation here, the other to trace it in VidBlock->msgDeliver().
1410   _TRACE_CREATION_1(env);
1411   if (destPE!=-1) {
1412     CpvAccess(_qd)->create();
1413     if (opts & CK_MSG_SKIP_OR_IMM)
1414       _noCldEnqueue(destPE, env);
1415     else
1416       CldEnqueue(destPE, env, _infoIdx);
1417   }
1418   _TRACE_CREATION_DONE(1);
1419 }
1420
1421 extern "C"
1422 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1423 {
1424   if (pCid->onPE==CkMyPe())
1425   {
1426     if(!CmiNodeAlive(CkMyPe())){
1427         return;
1428     }
1429 #ifndef CMK_OPTIMIZE
1430     //Just in case we need to breakpoint or use the envelope in some way
1431     _prepareMsg(entryIndex,msg,pCid);
1432 #endif
1433                 //Just directly call the chare (skip QD handling & scheduler)
1434     register envelope *env = UsrToEnv(msg);
1435     if (env->isPacked()) CkUnpackMessage(&env);
1436     _STATS_RECORD_PROCESS_MSG_1();
1437     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1438   }
1439   else {
1440     //No way to inline a cross-processor message:
1441     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
1442   }
1443 }
1444
1445 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1446 {
1447   register envelope *env = UsrToEnv(msg);
1448   CkNodeGroupID nodeRedMgr;
1449   _CHECK_USED(env);
1450   _SET_USED(env, 1);
1451   env->setMsgtype(type);
1452   env->setEpIdx(eIdx);
1453   env->setGroupNum(gID);
1454   env->setSrcPe(CkMyPe());
1455 #ifndef CMK_OPTIMIZE
1456   nodeRedMgr.setZero();
1457   env->setRednMgr(nodeRedMgr);
1458 #endif
1459 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1460   criticalPath_send(env);
1461   automaticallySetMessagePriority(env);
1462 #endif
1463 #ifndef CMK_OPTIMIZE
1464   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1465 #endif
1466   CmiSetHandler(env, _charmHandlerIdx);
1467   return env;
1468 }
1469
1470 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1471 {
1472   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1473 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1474   criticalPath_send(env);
1475   automaticallySetMessagePriority(env);
1476 #endif
1477   CmiBecomeImmediate(env);
1478   return env;
1479 }
1480
1481 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1482                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1483 {
1484   int numPes;
1485   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1486 #ifdef _FAULT_MLOG_
1487         sendTicketGroupRequest(env,pe,_infoIdx);
1488 #else
1489   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1490   _TRACE_CREATION_N(env, numPes);
1491   if (opts & CK_MSG_SKIP_OR_IMM)
1492     _noCldEnqueue(pe, env);
1493   else
1494     _skipCldEnqueue(pe, env, _infoIdx);
1495   _TRACE_CREATION_DONE(1);
1496 #endif
1497 }
1498
1499 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1500                            int npes, int *pes)
1501 {
1502   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1503   _TRACE_CREATION_MULTICAST(env, npes, pes);
1504   CldEnqueueMulti(npes, pes, env, _infoIdx);
1505   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1506 }
1507
1508 extern "C"
1509 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1510 {
1511 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1512   if (destPE==CkMyPe())
1513   {
1514     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1515     return;
1516   }
1517   //Can't inline-- send the usual way
1518   register envelope *env = UsrToEnv(msg);
1519   int numPes;
1520   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1521   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1522   _TRACE_CREATION_N(env, numPes);
1523   _noCldEnqueue(destPE, env);
1524   _STATS_RECORD_SEND_BRANCH_1();
1525   CkpvAccess(_coreState)->create();
1526   _TRACE_CREATION_DONE(1);
1527 #else
1528   // no support for immediate message, send inline
1529   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1530 #endif
1531 }
1532
1533 extern "C"
1534 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1535 {
1536   if (destPE==CkMyPe())
1537   {
1538     if(!CmiNodeAlive(CkMyPe())){
1539         return;
1540     }
1541     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1542     if (obj!=NULL)
1543     { //Just directly call the group:
1544 #ifndef CMK_OPTIMIZE
1545       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1546 #else
1547       envelope *env=UsrToEnv(msg);
1548 #endif
1549       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1550       return;
1551     }
1552   }
1553   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1554   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
1555 }
1556
1557 extern "C"
1558 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1559 {
1560   if (opts & CK_MSG_INLINE) {
1561     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1562     return;
1563   }
1564   if (opts & CK_MSG_IMMEDIATE) {
1565     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1566     return;
1567   }
1568   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1569   _STATS_RECORD_SEND_BRANCH_1();
1570   CkpvAccess(_coreState)->create();
1571 }
1572
1573 extern "C"
1574 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1575 {
1576 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1577   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1578   _TRACE_CREATION_MULTICAST(env, npes, pes);
1579   _noCldEnqueueMulti(npes, pes, env);
1580   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1581 #else
1582   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1583   CpvAccess(_qd)->create(-npes);
1584 #endif
1585   _STATS_RECORD_SEND_BRANCH_N(npes);
1586   CpvAccess(_qd)->create(npes);
1587 }
1588
1589 extern "C"
1590 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1591 {
1592   if (opts & CK_MSG_IMMEDIATE) {
1593     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1594     return;
1595   }
1596     // normal mesg
1597   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1598   _STATS_RECORD_SEND_BRANCH_N(npes);
1599   CpvAccess(_qd)->create(npes);
1600 }
1601
1602 extern "C"
1603 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1604 {
1605   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1606   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1607   CpvAccess(_qd)->create(CkNumPes());
1608 }
1609
1610 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1611                 int node=CLD_BROADCAST_ALL, int opts=0)
1612 {
1613   int numPes;
1614   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1615 #ifdef _FAULT_MLOG_
1616         sendTicketNodeGroupRequest(env,node,_infoIdx);
1617 #else
1618   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1619   _TRACE_CREATION_N(env, numPes);
1620   if (opts & CK_MSG_SKIP_OR_IMM) {
1621     _noCldNodeEnqueue(node, env);
1622     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1623       CkpvAccess(_coreState)->create(-numPes);
1624     }
1625   }
1626   else
1627     CldNodeEnqueue(node, env, _infoIdx);
1628   _TRACE_CREATION_DONE(1);
1629 #endif
1630 }
1631
1632 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1633                            int npes, int *nodes)
1634 {
1635   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1636   _TRACE_CREATION_N(env, npes);
1637   for (int i=0; i<npes; i++) {
1638     CldNodeEnqueue(nodes[i], env, _infoIdx);
1639   }
1640   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1641 }
1642
1643 extern "C"
1644 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1645 {
1646 #if CMK_IMMEDIATE_MSG
1647   if (node==CkMyNode())
1648   {
1649     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1650     return;
1651   }
1652   //Can't inline-- send the usual way
1653   register envelope *env = UsrToEnv(msg);
1654   int numPes;
1655   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1656   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1657   _TRACE_CREATION_N(env, numPes);
1658   _noCldNodeEnqueue(node, env);
1659   _STATS_RECORD_SEND_BRANCH_1();
1660   /* immeidate message is invisible to QD */
1661 //  CkpvAccess(_coreState)->create();
1662   _TRACE_CREATION_DONE(1);
1663 #else
1664   // no support for immediate message, send inline
1665   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1666 #endif
1667 }
1668
1669 extern "C"
1670 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1671 {
1672   if (node==CkMyNode())
1673   {
1674     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1675     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1676     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1677     if (obj!=NULL)
1678     { //Just directly call the group:
1679 #ifndef CMK_OPTIMIZE
1680       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1681 #else
1682       envelope *env=UsrToEnv(msg);
1683 #endif
1684       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1685       return;
1686     }
1687   }
1688   //Can't inline-- send the usual way
1689   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
1690 }
1691
1692 extern "C"
1693 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1694 {
1695   if (opts & CK_MSG_INLINE) {
1696     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1697     return;
1698   }
1699   if (opts & CK_MSG_IMMEDIATE) {
1700     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1701     return;
1702   }
1703   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1704   _STATS_RECORD_SEND_NODE_BRANCH_1();
1705   CkpvAccess(_coreState)->create();
1706 }
1707
1708 extern "C"
1709 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1710 {
1711 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1712   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1713   _noCldEnqueueMulti(npes, nodes, env);
1714 #else
1715   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1716   CpvAccess(_qd)->create(-npes);
1717 #endif
1718   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1719   CpvAccess(_qd)->create(npes);
1720 }
1721
1722 extern "C"
1723 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1724 {
1725   if (opts & CK_MSG_IMMEDIATE) {
1726     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1727     return;
1728   }
1729     // normal mesg
1730   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1731   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1732   CpvAccess(_qd)->create(npes);
1733 }
1734
1735 extern "C"
1736 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1737 {
1738   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1739   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1740   CpvAccess(_qd)->create(CkNumNodes());
1741 }
1742
1743 //Needed by delegation manager:
1744 extern "C"
1745 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1746 { return _prepareMsg(eIdx,msg,pCid); }
1747 extern "C"
1748 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1749 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1750 extern "C"
1751 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1752 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1753
1754 void _ckModuleInit(void) {
1755         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1756         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1757         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1758         CkpvInitialize(TokenPool*, _tokenPool);
1759         CkpvAccess(_tokenPool) = new TokenPool;
1760 }
1761
1762
1763 /************** Send: Arrays *************/
1764
1765 extern void CkArrayManagerInsert(int onPe,void *msg);
1766 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1767
1768 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1769 {
1770   _CHECK_USED(env);
1771   _SET_USED(env, 1);
1772   env->setMsgtype(type);
1773 #ifndef CMK_OPTIMIZE
1774   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1775 #endif
1776   CmiSetHandler(env, _charmHandlerIdx);
1777   CpvAccess(_qd)->create();
1778 }
1779
1780 extern "C"
1781 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1782   register envelope *env = UsrToEnv(msg);
1783   env->getsetArrayMgr()=aID;
1784   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1785   CldEnqueue(pe, env, _infoIdx);
1786 }
1787
1788 extern "C"
1789 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1790   register envelope *env = UsrToEnv(msg);
1791   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1792 #ifdef _FAULT_MLOG_
1793         sendTicketArrayRequest(env,pe,_infoIdx);
1794 #else
1795   if (opts & CK_MSG_IMMEDIATE)
1796     CmiBecomeImmediate(env);
1797   if (opts & CK_MSG_SKIP_OR_IMM)
1798     _noCldEnqueue(pe, env);
1799   else
1800     _skipCldEnqueue(pe, env, _infoIdx);
1801 #endif
1802 }
1803
1804 class ElementDestroyer : public CkLocIterator {
1805 private:
1806         CkLocMgr *locMgr;
1807 public:
1808         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1809         void addLocation(CkLocation &loc) {
1810           loc.destroyAll();
1811         }
1812 };
1813
1814 void CkDeleteChares() {
1815   int i;
1816   int numGroups = CkpvAccess(_groupIDTable)->size();
1817
1818   // delete all array elements
1819   for(i=0;i<numGroups;i++) {
1820     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1821     if(obj && obj->isLocMgr())  {
1822       CkLocMgr *mgr = (CkLocMgr*)obj;
1823       ElementDestroyer destroyer(mgr);
1824       mgr->iterate(destroyer);
1825 printf("[%d] DELETE!\n", CkMyPe());
1826     }
1827   }
1828
1829   // delete all groups
1830   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1831   for(i=0;i<numGroups;i++) {
1832     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1833     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1834     if (obj) delete obj;
1835   }
1836   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1837
1838   // delete all node groups
1839   if (CkMyRank() == 0) {
1840     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1841     for(i=0;i<numNodeGroups;i++) {
1842       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1843       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1844       if (obj) delete obj;
1845     }
1846   }
1847 }
1848
1849 //------------------- Message Watcher (record/replay) ----------------
1850
1851 #include "crc32.h"
1852
1853 CkpvDeclare(int, envelopeEventID);
1854
1855 CkMessageWatcher::~CkMessageWatcher() {}
1856
1857 class CkMessageRecorder : public CkMessageWatcher {
1858 public:
1859   CkMessageRecorder(FILE *f_) { f=f_; }
1860   ~CkMessageRecorder() {
1861     fprintf(f,"-1 -1 -1");
1862     fclose(f);
1863   }
1864
1865 private:
1866   virtual CmiBool process(envelope *env,CkCoreState *ck) {
1867     if (env->getEvent()) {
1868       bool wasPacked = env->isPacked();
1869       if (!wasPacked) CkPackMessage(&env);
1870       //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
1871       unsigned int crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
1872       unsigned int crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
1873       fprintf(f,"%d %d %d %hhd %x %x\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2);
1874       if (!wasPacked) CkUnpackMessage(&env);
1875     }
1876     return CmiTrue;
1877   }
1878 };
1879
1880 class CkMessageDetailRecorder : public CkMessageWatcher {
1881 public:
1882   CkMessageDetailRecorder(FILE *f_) {
1883     f=f_;
1884     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
1885      * The value of 'x' is the pointer size.
1886      */
1887     CmiUInt2 little = sizeof(void*);
1888     fwrite(&little, 2, 1, f);
1889   }
1890   ~CkMessageDetailRecorder() {fclose(f);}
1891 private:
1892   virtual CmiBool process(envelope *env, CkCoreState *ck) {
1893     bool wasPacked = env->isPacked();
1894     if (!wasPacked) CkPackMessage(&env);
1895     CmiUInt4 size = env->getTotalsize();
1896     fwrite(&size, 4, 1, f);
1897     fwrite(env, env->getTotalsize(), 1, f);
1898     if (!wasPacked) CkUnpackMessage(&env);
1899     return CmiTrue;
1900   }
1901 };
1902
1903 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1904 #define REPLAYDEBUG(args) /* empty */
1905
1906 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
1907
1908 class CkMessageReplay : public CkMessageWatcher {
1909   int counter;
1910         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1911         unsigned int crc1, crc2;
1912         /// Read the next message we need from the file:
1913         void getNext(void) {
1914                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
1915                         // CkAbort("CkMessageReplay> Syntax error reading replay file");
1916                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1917                 }
1918                 counter++;
1919         }
1920         /// If this is the next message we need, advance and return CmiTrue.
1921         CmiBool isNext(envelope *env) {
1922                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1923                 if (nextEvent!=env->getEvent()) return CmiFalse;
1924                 if (nextSize!=env->getTotalsize())
1925                 {
1926                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]\n", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1927                         return CmiFalse;
1928                 }
1929                 bool wasPacked = env->isPacked();
1930                 if (!wasPacked) CkPackMessage(&env);
1931                 //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
1932                 unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
1933                 unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
1934                 if (crcnew1 != crc1) {
1935                   CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
1936                 }
1937         if (crcnew2 != crc2) {
1938           CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
1939         }
1940         if (!wasPacked) CkUnpackMessage(&env);
1941                 return CmiTrue;
1942         }
1943
1944         /// This is a (short) list of messages we aren't yet ready for:
1945         CkQ<envelope *> delayed;
1946
1947         /// Try to flush out any delayed messages
1948         void flush(void) {
1949                 int len=delayed.length();
1950                 for (int i=0;i<len;i++) {
1951                         envelope *env=delayed.deq();
1952                         if (isNext(env)) { /* this is the next message: process it */
1953                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1954                                 //CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1955                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
1956                                 return;
1957                         }
1958                         else /* Not ready yet-- put it back in the
1959                                 queue */
1960                           {
1961                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1962                                 delayed.enq(env);
1963                           }
1964                 }
1965         }
1966
1967 public:
1968         CkMessageReplay(FILE *f_) {
1969           counter=0;
1970           f=f_;
1971           getNext();
1972           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
1973           CmiStartQD(CkMessageReplayQuiescence, this);
1974         }
1975         ~CkMessageReplay() {fclose(f);}
1976
1977 private:
1978         virtual CmiBool process(envelope *env,CkCoreState *ck) {
1979           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
1980                 if (env->getEvent() == 0) return CmiTrue;
1981                 if (isNext(env)) { /* This is the message we were expecting */
1982                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1983                         getNext(); /* Advance over this message */
1984                         flush(); /* try to process queued-up stuff */
1985                         return CmiTrue;
1986                 }
1987 #if CMK_SMP
1988                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
1989                          // try next rank, we can't just buffer the msg and left
1990                          // we need to keep unprocessed msg on the fly
1991                         int nextpe = CkMyPe()+1;
1992                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
1993                         nextpe = CkNodeFirst(CkMyNode());
1994                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
1995                         return CmiFalse;
1996                 }
1997 #endif
1998                 else /*!isNext(env) */ {
1999                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
2000                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2001                         delayed.enq(env);
2002                         flush();
2003                         return CmiFalse;
2004                 }
2005         }
2006 };
2007
2008 class CkMessageDetailReplay : public CkMessageWatcher {
2009   void *getNext() {
2010     CmiUInt4 size; size_t nread;
2011     if ((nread=fread(&size, 4, 1, f)) < 1) {
2012       if (feof(f)) return NULL;
2013       CkPrintf("Broken record file (metadata) got %d\n",nread);
2014       CkAbort("");
2015     }
2016     void *env = CmiAlloc(size);
2017     if ((nread=fread(env, size, 1, f)) < 1) {
2018       CkPrintf("Broken record file (data) expecting %d, got %d\n",size,nread);
2019       CkAbort("");
2020     }
2021     return env;
2022   }
2023 public:
2024   CkMessageDetailReplay(FILE *f_) {
2025     f=f_;
2026     /* This must match what CkMessageDetailRecorder did */
2027     CmiUInt2 little;
2028     fread(&little, 2, 1, f);
2029     if (little != sizeof(void*)) {
2030       CkAbort("Replaying on a different architecture from which recording was done!");
2031     }
2032
2033     CmiPushPE(CkMyPe(), getNext());
2034   }
2035   virtual CmiBool process(envelope *env,CkCoreState *ck) {
2036     CmiPushPE(CkMyPe(), getNext());
2037     return CmiTrue;
2038   }
2039 };
2040
2041 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2042   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2043   CkMessageReplay *replay = (CkMessageReplay*)rep;
2044   //CmiStartQD(CkMessageReplayQuiescence, replay);
2045 }
2046
2047 extern "C" int CmiExecuteThreadResume(CthThreadToken *token) {
2048   CkCoreState *ck = CkpvAccess(_coreState);
2049   if (ck->watcher!=NULL) {
2050     return ck->watcher->processThread(token,ck);
2051   }
2052   return 1;
2053 }
2054
2055 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2056
2057 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2058
2059         int i;
2060         char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2061         strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2062         sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2063         FILE *f=fopen(fName,permissions);
2064         REPLAYDEBUG("openReplayfile "<<fName);
2065         if (f==NULL) {
2066                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2067                         CkMyPe(),fName,permissions);
2068                 CkAbort("openReplayFile> Could not open replay file");
2069         }
2070         return f;
2071 }
2072
2073 #include "ckliststring.h"
2074 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2075     char *procs = NULL;
2076     replaySystem = 0;
2077         REPLAYDEBUG("CkMessageWatcherInit ");
2078     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2079         CkListString list(procs);
2080         if (list.includes(CkMyPe())) {
2081           CpdSetInitializeMemory(1);
2082           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2083         }
2084     }
2085         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2086             CpdSetInitializeMemory(1);
2087                 ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2088         }
2089         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2090             CpdSetInitializeMemory(1);
2091             // Set the parameters of the processor
2092 #if CMK_SHARED_VARS_UNAVAILABLE
2093             _Cmi_mype = atoi(procs);
2094             while (procs[0]!='/') procs++;
2095             procs++;
2096             _Cmi_numpes = atoi(procs);
2097 #else
2098             CkAbort("+replay-detail available only for non-SMP build");
2099 #endif
2100             replaySystem = 1;
2101             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2102         }
2103     if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream")) {
2104         CpdSetInitializeMemory(1);
2105         ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2106     }
2107 }
2108
2109 extern "C"
2110 int CkMessageToEpIdx(void *msg) {
2111         envelope *env=UsrToEnv(msg);
2112         int ep=env->getEpIdx();
2113         if (ep==CkIndex_CkArray::recvBroadcast(0))
2114                 return env->getsetArrayBcastEp();
2115         else
2116                 return ep;
2117 }
2118
2119 extern "C"
2120 int getCharmEnvelopeSize() {
2121   return sizeof(envelope);
2122 }
2123
2124
2125 #include "CkMarshall.def.h"
2126