b61be8633b62dec92652272132c37850ab82d042
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
29
30 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
31
32 int CMessage_CkMessage::__idx=-1;
33 int CMessage_CkArgMsg::__idx=0;
34 int CkIndex_Chare::__idx;
35 int CkIndex_Group::__idx;
36 int CkIndex_ArrayBase::__idx=-1;
37
38 extern int _defaultObjectQ;
39
40 //Charm++ virtual functions: declaring these here results in a smaller executable
41 Chare::Chare(void) {
42   thishandle.onPE=CkMyPe();
43   thishandle.objPtr=this;
44 #ifdef _FAULT_MLOG_
45         mlogData = new ChareMlogData();
46         mlogData->objID.type = TypeChare;
47         mlogData->objID.data.chare.id = thishandle;
48 #endif
49 #if CMK_OBJECT_QUEUE_AVAILABLE
50   if (_defaultObjectQ)  CkEnableObjQ();
51 #endif
52 }
53
54 Chare::Chare(CkMigrateMessage* m) {
55   thishandle.onPE=CkMyPe();
56   thishandle.objPtr=this;
57
58 #if CMK_OBJECT_QUEUE_AVAILABLE
59   if (_defaultObjectQ)  CkEnableObjQ();
60 #endif
61 }
62
63 void Chare::CkEnableObjQ()
64 {
65 #if CMK_OBJECT_QUEUE_AVAILABLE
66   objQ.create();
67 #endif
68 }
69
70 Chare::~Chare() {}
71
72 void Chare::pup(PUP::er &p)
73 {
74   p(thishandle.onPE);
75   thishandle.objPtr=(void *)this;
76 #ifdef _FAULT_MLOG_
77         if(p.isUnpacking()){
78                 mlogData = new ChareMlogData();
79         }
80         mlogData->pup(p);
81 #endif
82 }
83
84 int Chare::ckGetChareType() const {
85   return -3;
86 }
87 char *Chare::ckDebugChareName(void) {
88   char buf[100];
89   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
90   return strdup(buf);
91 }
92 int Chare::ckDebugChareID(char *str, int limit) {
93   // pure chares for now do not have a valid ID
94   str[0] = 0;
95   return 1;
96 }
97 void Chare::ckDebugPup(PUP::er &p) {
98   pup(p);
99 }
100
101 /// This method is called before starting a [threaded] entry method.
102 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
103   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
104   traceAddThreadListeners(th, UsrToEnv(msg));
105 }
106
107
108 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
109   p.comment("Bytes");
110   int ts=UsrToEnv(msg)->getTotalsize();
111   int msgLen=ts-sizeof(envelope);
112   if (msgLen>0)
113     p((char*)msg,msgLen);
114 }
115
116 IrrGroup::IrrGroup(void) {
117   thisgroup = CkpvAccess(_currentGroup);
118 #ifdef _FAULT_MLOG_
119         mlogData->objID.type = TypeGroup;
120         mlogData->objID.data.group.id = thisgroup;
121         mlogData->objID.data.group.onPE = CkMyPe();
122 #endif
123 }
124
125 IrrGroup::~IrrGroup() {
126   // remove the object pointer
127   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
128   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
129   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
130 }
131
132 void IrrGroup::pup(PUP::er &p)
133 {
134   Chare::pup(p);
135   p|thisgroup;
136 }
137
138 int IrrGroup::ckGetChareType() const {
139   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
140 }
141
142 int IrrGroup::ckDebugChareID(char *str, int limit) {
143   if (limit<5) return -1;
144   str[0] = 1;
145   *((int*)&str[1]) = thisgroup.idx;
146   return 5;
147 }
148
149 char *IrrGroup::ckDebugChareName() {
150   return strdup(_chareTable[ckGetChareType()]->name);
151 }
152
153 void IrrGroup::ckJustMigrated(void)
154 {
155 }
156
157 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
158   /* FIXME: **CW** not entirely sure what we should do here yet */
159 }
160
161 void Group::CkAddThreadListeners(CthThread th, void *msg) {
162   Chare::CkAddThreadListeners(th, msg);
163   CthSetThreadID(th, thisgroup.idx, 0, 0);
164 }
165
166 void Group::pup(PUP::er &p)
167 {
168   CkReductionMgr::pup(p);
169   reductionInfo.pup(p);
170 }
171
172 /**** Delegation Manager Group */
173 CkDelegateMgr::~CkDelegateMgr() { }
174
175 //Default delegator implementation: do not delegate-- send directly
176 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
177   { CkSendMsg(ep,m,c); }
178 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
179   { CkSendMsgBranch(ep,m,onPE,g); }
180 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
181   { CkBroadcastMsgBranch(ep,m,g); }
182 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
183   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
184 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
185   { CkSendMsgNodeBranch(ep,m,onNode,g); }
186 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
187   { CkBroadcastMsgNodeBranch(ep,m,g); }
188 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
189   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
190 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
191 {
192         CProxyElement_ArrayBase ap(a,idx);
193         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
194 }
195 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
196 {
197         CProxyElement_ArrayBase ap(a,idx);
198         ap.ckSend((CkArrayMessage *)m,ep);
199 }
200 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
201 {
202         CProxy_ArrayBase ap(a);
203         ap.ckBroadcast((CkArrayMessage *)m,ep);
204 }
205
206 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
207 {
208         CmiAbort("ArraySectionSend is not implemented!\n");
209 /*
210         CProxyElement_ArrayBase ap(a,idx);
211         ap.ckSend((CkArrayMessage *)m,ep);
212 */
213 }
214
215 /*** Proxy <-> delegator communication */
216 CkDelegateData::~CkDelegateData() {}
217
218 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
219   return pd; // default implementation ignores pup call
220 }
221
222 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
223    this tricky manual reference counting business... */
224
225 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
226         if (dPtr) dPtr->ref();
227         ckUndelegate();
228         delegatedMgr = dTo;
229         delegatedPtr = dPtr;
230 }
231 void CProxy::ckUndelegate(void) {
232         delegatedMgr=NULL;
233         if (delegatedPtr) delegatedPtr->unref();
234         delegatedPtr=NULL;
235 }
236
237 /// Copy constructor
238 CProxy::CProxy(const CProxy &src)
239     :delegatedMgr(src.delegatedMgr)
240 {
241     delegatedPtr = NULL;
242     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
243         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
244 }
245
246 /// Assignment operator
247 CProxy& CProxy::operator=(const CProxy &src) {
248         CkDelegateData *oldPtr=delegatedPtr;
249         ckUndelegate();
250         delegatedMgr=src.delegatedMgr;
251
252         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
253             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
254         else
255             delegatedPtr = NULL;
256
257         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
258         if (oldPtr) oldPtr->unref();
259         return *this;
260 }
261
262 void CProxy::pup(PUP::er &p) {
263       CkGroupID delegatedTo;
264       delegatedTo.setZero();
265       int isNodeGroup = 0;
266       if (!p.isUnpacking()) {
267         if (delegatedMgr) {
268           delegatedTo = delegatedMgr->CkGetGroupID();
269           isNodeGroup = delegatedMgr->isNodeGroup();
270         }
271       }
272       p|delegatedTo;
273       if (!delegatedTo.isZero()) {
274         p|isNodeGroup;
275         if (p.isUnpacking()) {
276           if (isNodeGroup)
277                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
278           else
279                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
280         }
281
282         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
283         if (p.isUnpacking() && delegatedPtr)
284             delegatedPtr->ref();
285       }
286 }
287
288 /**** Array sections */
289 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
290 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
291   _cookie.aid = aid;    \
292   _cookie.get_pe() = CkMyPe();  \
293   _elems = new CkArrayIndexMax[nElems]; \
294   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
295   pelist = NULL;        \
296   npes  = 0;    \
297 }
298
299 CKSECTIONID_CONSTRUCTOR_DEF(1D)
300 CKSECTIONID_CONSTRUCTOR_DEF(2D)
301 CKSECTIONID_CONSTRUCTOR_DEF(3D)
302 CKSECTIONID_CONSTRUCTOR_DEF(4D)
303 CKSECTIONID_CONSTRUCTOR_DEF(5D)
304 CKSECTIONID_CONSTRUCTOR_DEF(6D)
305 CKSECTIONID_CONSTRUCTOR_DEF(Max)
306
307 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
308   pelist = new int[npes];
309   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
310   _cookie.aid = gid;
311 }
312
313 CkSectionID::CkSectionID(const CkSectionID &sid) {
314   int i;
315   _cookie = sid._cookie;
316   _nElems = sid._nElems;
317   if (_nElems > 0) {
318     _elems = new CkArrayIndexMax[_nElems];
319     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
320   } else _elems = NULL;
321   npes = sid.npes;
322   if (npes > 0) {
323     pelist = new int[npes];
324     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
325   } else pelist = NULL;
326 }
327
328 void CkSectionID::operator=(const CkSectionID &sid) {
329   int i;
330   _cookie = sid._cookie;
331   _nElems = sid._nElems;
332   if (_nElems > 0) {
333     _elems = new CkArrayIndexMax[_nElems];
334     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
335   } else _elems = NULL;
336   npes = sid.npes;
337   if (npes > 0) {
338     pelist = new int[npes];
339     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
340   } else pelist = NULL;
341 }
342
343 void CkSectionID::pup(PUP::er &p) {
344     p | _cookie;
345     p(_nElems);
346     if (_nElems > 0) {
347       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
348       for (int i=0; i< _nElems; i++) p | _elems[i];
349     } else {
350       // If _nElems is zero, than this section describes processors instead of array elements
351       p(npes);
352       p(pelist, npes);
353     }
354 }
355
356 /**** Tiny random API routines */
357
358 #ifdef CMK_CUDA
359 void CUDACallbackManager(void *fn) {
360   if (fn != NULL) {
361     CkCallback *cb = (CkCallback*) fn;
362     cb->send();
363   }
364 }
365
366 #endif
367
368 extern "C"
369 void CkSetRefNum(void *msg, int ref)
370 {
371   UsrToEnv(msg)->setRef(ref);
372 }
373
374 extern "C"
375 int CkGetRefNum(void *msg)
376 {
377   return UsrToEnv(msg)->getRef();
378 }
379
380 extern "C"
381 int CkGetSrcPe(void *msg)
382 {
383   return UsrToEnv(msg)->getSrcPe();
384 }
385
386 extern "C"
387 int CkGetSrcNode(void *msg)
388 {
389   return CmiNodeOf(CkGetSrcPe(msg));
390 }
391
392 extern "C"
393 void *CkLocalBranch(CkGroupID gID) {
394   return _localBranch(gID);
395 }
396
397 static
398 void *_ckLocalNodeBranch(CkGroupID groupID) {
399   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
400   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
401   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
402   return retval;
403 }
404
405 extern "C"
406 void *CkLocalNodeBranch(CkGroupID groupID)
407 {
408   void *retval;
409   // we are called in a constructor
410   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
411     return CkpvAccess(_currentNodeGroupObj);
412   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
413   { // Nodegroup hasn't finished being created yet-- schedule...
414     CsdScheduler(0);
415   }
416   return retval;
417 }
418
419 extern "C"
420 void *CkLocalChare(const CkChareID *pCid)
421 {
422         int pe=pCid->onPE;
423         if (pe<0) { //A virtual chare ID
424                 if (pe!=(-(CkMyPe()+1)))
425                         return NULL;//VID block not on this PE
426                 VidBlock *v=(VidBlock *)pCid->objPtr;
427                 return v->getLocalChare();
428         }
429         else
430         { //An ordinary chare ID
431                 if (pe!=CkMyPe())
432                         return NULL;//Chare not on this PE
433                 return pCid->objPtr;
434         }
435 }
436
437 CkpvDeclare(char **,Ck_argv);
438 extern "C" char **CkGetArgv(void) {
439         return CkpvAccess(Ck_argv);
440 }
441 extern "C" int CkGetArgc(void) {
442         return CmiGetArgc(CkpvAccess(Ck_argv));
443 }
444
445 /******************** Basic support *****************/
446 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
447 {
448 #ifdef _FAULT_MLOG_
449         CpvAccess(_currentObj) = (Chare *)obj;
450 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
451 #endif
452   //BIGSIM_OOC DEBUGGING
453   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
454   //fflush(stdout);
455 #ifndef CMK_OPTIMIZE
456   CpdBeforeEp(epIdx, obj, msg);
457 #endif
458   _entryTable[epIdx]->call(msg, obj);
459 #ifndef CMK_OPTIMIZE
460   CpdAfterEp(epIdx);
461 #endif
462   if (_entryTable[epIdx]->noKeep)
463   { /* Method doesn't keep/delete the message, so we have to: */
464     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
465   }
466 }
467 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
468 {
469   //BIGSIM_OOC DEBUGGING
470   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
471   //fflush(stdout);
472
473   void *deliverMsg;
474 #ifdef _FAULT_MLOG_
475         CpvAccess(_currentObj) = (Chare *)obj;
476 #endif
477   if (_entryTable[epIdx]->noKeep)
478   { /* Deliver a read-only copy of the message */
479     deliverMsg=(void *)msg;
480   } else
481   { /* Method needs a copy of the message to keep/delete */
482     void *oldMsg=(void *)msg;
483     deliverMsg=CkCopyMsg(&oldMsg);
484 #ifndef CMK_OPTIMIZE
485     if (oldMsg!=msg)
486       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
487 #endif
488   }
489 #ifndef CMK_OPTIMIZE
490   CpdBeforeEp(epIdx, obj, (void*)msg);
491 #endif
492   _entryTable[epIdx]->call(deliverMsg, obj);
493 #ifndef CMK_OPTIMIZE
494   CpdAfterEp(epIdx);
495 #endif
496 }
497
498 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
499 {
500   register void *msg = EnvToUsr(env);
501   _SET_USED(env, 0);
502   CkDeliverMessageFree(epIdx,msg,obj);
503 }
504
505 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
506 {
507
508 #ifndef CMK_OPTIMIZE /* Consider tracing: */
509   if (_entryTable[epIdx]->traceEnabled) {
510     _TRACE_BEGIN_EXECUTE(env);
511     _invokeEntryNoTrace(epIdx,env,obj);
512     _TRACE_END_EXECUTE();
513   }
514   else
515 #endif
516     _invokeEntryNoTrace(epIdx,env,obj);
517
518 }
519
520 /********************* Creation ********************/
521
522 extern "C"
523 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
524 {
525   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
526   envelope *env = UsrToEnv(msg);
527   _CHECK_USED(env);
528   if(pCid == 0) {
529     env->setMsgtype(NewChareMsg);
530   } else {
531     pCid->onPE = (-(CkMyPe()+1));
532     //  pCid->magic = _GETIDX(cIdx);
533     pCid->objPtr = (void *) new VidBlock();
534     _MEMCHECK(pCid->objPtr);
535     env->setMsgtype(NewVChareMsg);
536     env->setVidPtr(pCid->objPtr);
537   }
538   env->setEpIdx(eIdx);
539   env->setSrcPe(CkMyPe());
540   CmiSetHandler(env, _charmHandlerIdx);
541   _TRACE_CREATION_1(env);
542   CpvAccess(_qd)->create();
543   _STATS_RECORD_CREATE_CHARE_1();
544   _SET_USED(env, 1);
545   if(destPE == CK_PE_ANY)
546     env->setForAnyPE(1);
547   else
548     env->setForAnyPE(0);
549   CldEnqueue(destPE, env, _infoIdx);
550   _TRACE_CREATION_DONE(1);
551 }
552
553 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
554 {
555   register int gIdx = _entryTable[epIdx]->chareIdx;
556   register void *obj = malloc(_chareTable[gIdx]->size);
557   _MEMCHECK(obj);
558   setMemoryTypeChare(obj);
559   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
560   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
561   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
562   CkpvAccess(_groupIDTable)->push_back(groupID);
563   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
564   if(ptrq) {
565     void *pending;
566     while((pending=ptrq->deq())!=0)
567       CldEnqueue(CkMyPe(), pending, _infoIdx);
568 //    delete ptrq;
569       CkpvAccess(_groupTable)->find(groupID).clearPending();
570   }
571   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
572
573   CkpvAccess(_currentGroup) = groupID;
574   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
575   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
576   _STATS_RECORD_PROCESS_GROUP_1();
577 }
578
579 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
580 {
581   register int gIdx = _entryTable[epIdx]->chareIdx;
582   int objSize=_chareTable[gIdx]->size;
583   register void *obj = malloc(objSize);
584   _MEMCHECK(obj);
585   setMemoryTypeChare(obj);
586   CkpvAccess(_currentGroup) = groupID;
587
588 // Now that the NodeGroup is created, add it to the table.
589 //  NodeGroups can be accessed by multiple processors, so
590 //  this is in the opposite order from groups - invoking the constructor
591 //  before registering it.
592 // User may call CkLocalNodeBranch() inside the nodegroup constructor
593 //  store nodegroup into _currentNodeGroupObj
594   CkpvAccess(_currentNodeGroupObj) = obj;
595   _invokeEntryNoTrace(epIdx,env,obj);
596   CkpvAccess(_currentNodeGroupObj) = NULL;
597   _STATS_RECORD_PROCESS_NODE_GROUP_1();
598
599   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
600   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
601   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
602   CksvAccess(_nodeGroupIDTable).push_back(groupID);
603
604   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
605   if(ptrq) {
606     void *pending;
607     while((pending=ptrq->deq())!=0)
608       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
609 //    delete ptrq;
610       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
611   }
612   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
613 }
614
615 void _createGroup(CkGroupID groupID, envelope *env)
616 {
617   _CHECK_USED(env);
618   _SET_USED(env, 1);
619   register int epIdx = env->getEpIdx();
620   int gIdx = _entryTable[epIdx]->chareIdx;
621   CkNodeGroupID rednMgr;
622   if(_chareTable[gIdx]->isIrr == 0){
623                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
624                 rednMgr = rednMgrProxy;
625 //              rednMgrProxy.setAttachedGroup(groupID);
626   }else{
627         rednMgr.setZero();
628   }
629   env->setGroupNum(groupID);
630   env->setSrcPe(CkMyPe());
631   env->setRednMgr(rednMgr);
632   env->setGroupEpoch(CkpvAccess(_charmEpoch));
633
634   if(CkNumPes()>1) {
635     CkPackMessage(&env);
636     CmiSetHandler(env, _bocHandlerIdx);
637     _numInitMsgs++;
638     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
639     CpvAccess(_qd)->create(CkNumPes()-1);
640     CkUnpackMessage(&env);
641   }
642   _STATS_RECORD_CREATE_GROUP_1();
643   CkCreateLocalGroup(groupID, epIdx, env);
644 }
645
646 void _createNodeGroup(CkGroupID groupID, envelope *env)
647 {
648   _CHECK_USED(env);
649   _SET_USED(env, 1);
650   register int epIdx = env->getEpIdx();
651   env->setGroupNum(groupID);
652   env->setSrcPe(CkMyPe());
653   env->setGroupEpoch(CkpvAccess(_charmEpoch));
654   if(CkNumNodes()>1) {
655     CkPackMessage(&env);
656     CmiSetHandler(env, _bocHandlerIdx);
657     _numInitMsgs++;
658     CksvAccess(_numInitNodeMsgs)++;
659     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
660     CpvAccess(_qd)->create(CkNumNodes()-1);
661     CkUnpackMessage(&env);
662   }
663   _STATS_RECORD_CREATE_NODE_GROUP_1();
664   CkCreateLocalNodeGroup(groupID, epIdx, env);
665 }
666
667 // new _groupCreate
668
669 static CkGroupID _groupCreate(envelope *env)
670 {
671   register CkGroupID groupNum;
672
673   // check CkMyPe(). if it is 0 then idx is _numGroups++
674   // if not, then something else...
675   if(CkMyPe() == 0)
676      groupNum.idx = CkpvAccess(_numGroups)++;
677   else
678      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
679   _createGroup(groupNum, env);
680   return groupNum;
681 }
682
683 // new _nodeGroupCreate
684 static CkGroupID _nodeGroupCreate(envelope *env)
685 {
686   register CkGroupID groupNum;
687   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
688   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
689           groupNum.idx = CksvAccess(_numNodeGroups)++;
690    else
691           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
692   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
693   _createNodeGroup(groupNum, env);
694   return groupNum;
695 }
696
697 /**** generate the group idx when group is creator pe is not pe0
698  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
699  **** remaining bits contain the group creator processor number and
700  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
701
702 int _getGroupIdx(int numNodes,int myNode,int numGroups)
703 {
704         int idx;
705         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
706         int n = 32 - (x+1);                                     // number of bits remaining for the index
707         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
708                                                                 // then add the next available index
709         // of course this won't work when int is 8 bytes long on T3E
710         //idx |= 0x80000000;                                      // set the most significant bit to 1
711         idx = - idx;
712                                                                 // if int is not 32 bits, wouldn't this be wrong?
713         return idx;
714 }
715
716 extern "C"
717 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
718 {
719   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
720   register envelope *env = UsrToEnv(msg);
721   env->setMsgtype(BocInitMsg);
722   env->setEpIdx(eIdx);
723   env->setSrcPe(CkMyPe());
724   _TRACE_CREATION_N(env, CkNumPes());
725   CkGroupID gid = _groupCreate(env);
726   _TRACE_CREATION_DONE(1);
727   return gid;
728 }
729
730 extern "C"
731 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
732 {
733   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
734   register envelope *env = UsrToEnv(msg);
735   env->setMsgtype(NodeBocInitMsg);
736   env->setEpIdx(eIdx);
737   env->setSrcPe(CkMyPe());
738   _TRACE_CREATION_N(env, CkNumNodes());
739   CkGroupID gid = _nodeGroupCreate(env);
740   _TRACE_CREATION_DONE(1);
741   return gid;
742 }
743
744 static inline void *_allocNewChare(envelope *env)
745 {
746   void *tmp=malloc(_chareTable[_entryTable[env->getEpIdx()]->chareIdx]->size);
747   _MEMCHECK(tmp);
748   setMemoryTypeChare(tmp);
749   return tmp;
750 }
751
752 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
753 {
754   register void *obj = _allocNewChare(env);
755   _invokeEntry(env->getEpIdx(),env,obj);
756 }
757
758 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
759 {
760   register void *obj = _allocNewChare(env);
761   register CkChareID *pCid = (CkChareID *)
762       _allocMsg(FillVidMsg, sizeof(CkChareID));
763   pCid->onPE = CkMyPe();
764   pCid->objPtr = obj;
765   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
766   register envelope *ret = UsrToEnv(pCid);
767   ret->setVidPtr(env->getVidPtr());
768   register int srcPe = env->getSrcPe();
769   ret->setSrcPe(CkMyPe());
770   CmiSetHandler(ret, _charmHandlerIdx);
771   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
772   CpvAccess(_qd)->create();
773   _invokeEntry(env->getEpIdx(),env,obj);
774 }
775
776 /************** Receive: Chares *************/
777
778 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
779 {
780   register int epIdx = env->getEpIdx();
781   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
782   register void *obj;
783   if (mainIdx != -1)  {
784     CmiAssert(CkMyPe()==0);
785     obj = _mainTable[mainIdx]->getObj();
786   }
787   else
788     obj = env->getObjPtr();
789   _invokeEntry(epIdx,env,obj);
790 }
791
792 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
793 {
794   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
795   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
796   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
797   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
798   vptr->fill(pcid->onPE, pcid->objPtr);
799   CmiFree(env);
800 }
801
802 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
803 {
804   VidBlock *vptr = (VidBlock *) env->getVidPtr();
805   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
806   _SET_USED(env, 1);
807   vptr->send(env);
808 }
809
810 /************** Receive: Groups ****************/
811
812 /**
813  This message is sent to this groupID--prepare to
814  handle this message by looking up the group,
815  and possibly stashing the message.
816 */
817 IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
818 {
819
820         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
821         IrrGroup *obj = ck->localBranch(groupID);
822         if (obj==NULL) { /* groupmember not yet created: stash message */
823                 ck->getGroupTable()->find(groupID).enqMsg(env);
824         }
825         else { /* will be able to process message */
826                 ck->process();
827         }
828         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
829         return obj;
830 }
831
832 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
833 {
834 #if CMK_LBDB_ON
835   // if there is a running obj being measured, stop it temporarily
836   LDObjHandle objHandle;
837   int objstopped = 0;
838   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
839   if (the_lbdb->RunningObject(&objHandle)) {
840     objstopped = 1;
841     the_lbdb->ObjectStop(objHandle);
842   }
843 #endif
844   _invokeEntry(epIdx,env,obj);
845 #if CMK_LBDB_ON
846   if (objstopped) the_lbdb->ObjectStart(objHandle);
847 #endif
848   _STATS_RECORD_PROCESS_BRANCH_1();
849 }
850
851 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
852 {
853   register CkGroupID groupID =  env->getGroupNum();
854   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
855   if(obj) {
856     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
857   }
858 }
859
860 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
861 {
862   env->setMsgtype(ForChareMsg);
863   env->setObjPtr(obj);
864   _processForChareMsg(ck,env);
865   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
866 }
867
868 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
869 {
870   env->setEpIdx(epIdx);
871   _deliverForNodeBocMsg(ck,env, obj);
872 }
873
874 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
875 {
876   register CkGroupID groupID = env->getGroupNum();
877   register void *obj;
878
879   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
880   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
881   if(!obj) { // groupmember not yet created
882 #if CMK_IMMEDIATE_MSG
883     if (CmiIsImmediate(env))     // buffer immediate message
884       CmiDelayImmediate();
885     else
886 #endif
887     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
888     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
889     return;
890   }
891   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
892 #if CMK_IMMEDIATE_MSG
893   if (!CmiIsImmediate(env))
894 #endif
895   ck->process();
896   env->setMsgtype(ForChareMsg);
897   env->setObjPtr(obj);
898   _processForChareMsg(ck,env);
899   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
900 }
901
902 void _processBocInitMsg(CkCoreState *ck,envelope *env)
903 {
904   register CkGroupID groupID = env->getGroupNum();
905   register int epIdx = env->getEpIdx();
906   CkCreateLocalGroup(groupID, epIdx, env);
907 }
908
909 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
910 {
911   register CkGroupID groupID = env->getGroupNum();
912   register int epIdx = env->getEpIdx();
913   CkCreateLocalNodeGroup(groupID, epIdx, env);
914 }
915
916 /************** Receive: Arrays *************/
917
918 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
919   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
920   if (mgr) {
921     _SET_USED(env, 0);
922     mgr->insertElement((CkMessage *)EnvToUsr(env));
923   }
924 }
925 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
926   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
927   if (mgr) {
928     _SET_USED(env, 0);
929     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
930   }
931 }
932
933 //BIGSIM_OOC DEBUGGING
934 #define TELLMSGTYPE(x) //x
935
936 /**
937  * This is the main converse-level handler used by all of Charm++.
938  *
939  * \addtogroup CriticalPathFramework
940  */
941 void _processHandler(void *converseMsg,CkCoreState *ck)
942 {
943   register envelope *env = (envelope *) converseMsg;
944
945 //#if CMK_RECORD_REPLAY
946   if (ck->watcher!=NULL) {
947     if (!ck->watcher->processMessage(env,ck)) return;
948   }
949 //#endif
950 #ifdef _FAULT_MLOG_
951         Chare *obj=NULL;
952         CkObjID sender;
953         MCount SN;
954         MlogEntry *entry=NULL;
955         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
956         env->getMsgtype() == ForArrayEltMsg){
957                 sender = env->sender;
958                 SN = env->SN;
959                 int result = preProcessReceivedMessage(env,&obj,&entry);
960                 if(result == 0){
961                         return;
962                 }
963         }
964 #endif
965
966 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
967   //  CkPrintf("START\n");
968   criticalPath_start(env);
969 #endif
970
971
972   switch(env->getMsgtype()) {
973 // Group support
974     case BocInitMsg :
975       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
976       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
977       _processBocInitMsg(ck,env);
978       break;
979     case NodeBocInitMsg :
980       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
981       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
982       _processNodeBocInitMsg(ck,env);
983       break;
984     case ForBocMsg :
985       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
986       // QD processing moved inside _processForBocMsg because it is conditional
987       if(env->isPacked()) CkUnpackMessage(&env);
988       _processForBocMsg(ck,env);
989       // stats record moved inside _processForBocMsg because it is conditional
990       break;
991     case ForNodeBocMsg :
992       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
993       // QD processing moved to _processForNodeBocMsg because it is conditional
994       if(env->isPacked()) CkUnpackMessage(&env);
995       _processForNodeBocMsg(ck,env);
996       // stats record moved to _processForNodeBocMsg because it is conditional
997       break;
998
999 // Array support
1000     case ArrayEltInitMsg:
1001       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1002       if(env->isPacked()) CkUnpackMessage(&env);
1003       _processArrayEltInitMsg(ck,env);
1004       break;
1005     case ForArrayEltMsg:
1006       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1007       if(env->isPacked()) CkUnpackMessage(&env);
1008       _processArrayEltMsg(ck,env);
1009       break;
1010
1011 // Chare support
1012     case NewChareMsg :
1013       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1014       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1015       _processNewChareMsg(ck,env);
1016       _STATS_RECORD_PROCESS_CHARE_1();
1017       break;
1018     case NewVChareMsg :
1019       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1020       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1021       _processNewVChareMsg(ck,env);
1022       _STATS_RECORD_PROCESS_CHARE_1();
1023       break;
1024     case ForChareMsg :
1025       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1026       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1027       _processForChareMsg(ck,env);
1028       _STATS_RECORD_PROCESS_MSG_1();
1029       break;
1030     case ForVidMsg   :
1031       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1032       ck->process();
1033       _processForVidMsg(ck,env);
1034       break;
1035     case FillVidMsg  :
1036       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1037       ck->process();
1038       _processFillVidMsg(ck,env);
1039       break;
1040
1041     default:
1042       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1043   }
1044 #ifdef _FAULT_MLOG_
1045         if(obj != NULL){
1046                 postProcessReceivedMessage(obj,sender,SN,entry);
1047         }
1048 #endif
1049
1050
1051 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1052   criticalPath_end();
1053   //  CkPrintf("STOP\n");
1054 #endif
1055
1056
1057 }
1058
1059
1060 /******************** Message Send **********************/
1061
1062 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1063              int *queueing, int *priobits, unsigned int **prioptr)
1064 {
1065   register envelope *env = (envelope *)converseMsg;
1066   *pfn = (CldPackFn)CkPackMessage;
1067   *len = env->getTotalsize();
1068   *queueing = env->getQueueing();
1069   *priobits = env->getPriobits();
1070   *prioptr = (unsigned int *) env->getPrioPtr();
1071 }
1072
1073 void CkPackMessage(envelope **pEnv)
1074 {
1075   register envelope *env = *pEnv;
1076   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1077     register void *msg = EnvToUsr(env);
1078     _TRACE_BEGIN_PACK();
1079     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1080     _TRACE_END_PACK();
1081     env=UsrToEnv(msg);
1082     env->setPacked(1);
1083     *pEnv = env;
1084   }
1085 }
1086
1087 void CkUnpackMessage(envelope **pEnv)
1088 {
1089   register envelope *env = *pEnv;
1090   register int msgIdx = env->getMsgIdx();
1091   if(env->isPacked()) {
1092     register void *msg = EnvToUsr(env);
1093     _TRACE_BEGIN_UNPACK();
1094     msg = _msgTable[msgIdx]->unpack(msg);
1095     _TRACE_END_UNPACK();
1096     env=UsrToEnv(msg);
1097     env->setPacked(0);
1098     *pEnv = env;
1099   }
1100 }
1101
1102 //There's no reason for most messages to go through the Cld--
1103 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1104 // Thus these accellerated versions of the Cld calls.
1105
1106 int index_objectQHandler;
1107 int index_tokenHandler;
1108 static int index_skipCldHandler;
1109
1110 static void _skipCldHandler(void *converseMsg)
1111 {
1112   register envelope *env = (envelope *)(converseMsg);
1113   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1114 #if CMK_GRID_QUEUE_AVAILABLE
1115   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1116     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1117                        env, env->getQueueing (), env->getPriobits (),
1118                        (unsigned int *) env->getPrioPtr ());
1119   } else {
1120     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1121                        env, env->getQueueing (), env->getPriobits (),
1122                        (unsigned int *) env->getPrioPtr ());
1123   }
1124 #else
1125   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1126         env, env->getQueueing(),env->getPriobits(),
1127         (unsigned int *)env->getPrioPtr());
1128 #endif
1129 }
1130
1131
1132 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1133 // Made non-static to be used by ckmessagelogging
1134 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1135 {
1136   if(pe == CkMyPe() ){
1137     if(!CmiNodeAlive(CkMyPe())){
1138         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1139 //      return;
1140     }
1141   }
1142   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1143 #if CMK_OBJECT_QUEUE_AVAILABLE
1144     Chare *obj = CkFindObjectPtr(env);
1145     if (obj && obj->CkGetObjQueue().queue()) {
1146       _enqObjQueue(obj, env);
1147     }
1148     else
1149 #endif
1150     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1151         env, env->getQueueing(),env->getPriobits(),
1152         (unsigned int *)env->getPrioPtr());
1153   } else {
1154     CkPackMessage(&env);
1155     int len=env->getTotalsize();
1156     CmiSetXHandler(env,CmiGetHandler(env));
1157 #if CMK_OBJECT_QUEUE_AVAILABLE
1158     CmiSetHandler(env,index_objectQHandler);
1159 #else
1160     CmiSetHandler(env,index_skipCldHandler);
1161 #endif
1162     CmiSetInfo(env,infoFn);
1163     if (pe==CLD_BROADCAST) {
1164 #ifdef _FAULT_MLOG_             
1165                         CmiSyncBroadcast(len, (char *)env);
1166 #else
1167                         CmiSyncBroadcastAndFree(len, (char *)env); 
1168 #endif
1169
1170 }
1171     else if (pe==CLD_BROADCAST_ALL) { 
1172 #ifdef _FAULT_MLOG_             
1173                         CmiSyncBroadcastAll(len, (char *)env);
1174 #else
1175                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1176 #endif
1177
1178 }
1179     else{
1180 #ifdef _FAULT_MLOG_             
1181                         CmiSyncSend(pe, len, (char *)env);
1182 #else
1183                         CmiSyncSendAndFree(pe, len, (char *)env);
1184 #endif
1185
1186                 }
1187   }
1188 }
1189
1190 #if CMK_BLUEGENE_CHARM
1191 #   define  _skipCldEnqueue   CldEnqueue
1192 #endif
1193
1194 // by pass Charm++ priority queue, send as Converse message
1195 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1196 {
1197   CkPackMessage(&env);
1198   int len=env->getTotalsize();
1199   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1200 }
1201
1202 static void _noCldEnqueue(int pe, envelope *env)
1203 {
1204 /*
1205   if (pe == CkMyPe()) {
1206     CmiHandleMessage(env);
1207   } else
1208 */
1209   CkPackMessage(&env);
1210   int len=env->getTotalsize();
1211   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1212   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1213   else CmiSyncSendAndFree(pe, len, (char *)env);
1214 }
1215
1216 //static void _noCldNodeEnqueue(int node, envelope *env)
1217 //Made non-static to be used by ckmessagelogging
1218 void _noCldNodeEnqueue(int node, envelope *env)
1219 {
1220 /*
1221   if (node == CkMyNode()) {
1222     CmiHandleMessage(env);
1223   } else {
1224 */
1225   CkPackMessage(&env);
1226   int len=env->getTotalsize();
1227   if (node==CLD_BROADCAST) { 
1228 #ifdef _FAULT_MLOG_
1229         CmiSyncNodeBroadcast(len, (char *)env);
1230 #else
1231         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1232 #endif
1233 }
1234   else if (node==CLD_BROADCAST_ALL) { 
1235 #ifdef _FAULT_MLOG_
1236                 CmiSyncNodeBroadcastAll(len, (char *)env);
1237 #else
1238                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1239 #endif
1240
1241 }
1242   else {
1243 #ifdef _FAULT_MLOG_
1244         CmiSyncNodeSend(node, len, (char *)env);
1245 #else
1246         CmiSyncNodeSendAndFree(node, len, (char *)env);
1247 #endif
1248   }
1249 }
1250
1251 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1252 {
1253   register envelope *env = UsrToEnv(msg);
1254   _CHECK_USED(env);
1255   _SET_USED(env, 1);
1256   env->setMsgtype(ForChareMsg);
1257   env->setEpIdx(eIdx);
1258   env->setSrcPe(CkMyPe());
1259 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1260   criticalPath_send(env);
1261   automaticallySetMessagePriority(env);
1262 #endif
1263 #ifndef CMK_OPTIMIZE
1264   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1265 #endif
1266 #if CMK_OBJECT_QUEUE_AVAILABLE
1267   CmiSetHandler(env, index_objectQHandler);
1268 #else
1269   CmiSetHandler(env, _charmHandlerIdx);
1270 #endif
1271   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1272     register int pe = -(pCid->onPE+1);
1273     if(pe==CkMyPe()) {
1274       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1275       void *objPtr;
1276       if (NULL!=(objPtr=vblk->getLocalChare()))
1277       { //A ready local chare
1278         env->setObjPtr(objPtr);
1279         return pe;
1280       }
1281       else { //The vidblock is not ready-- forget it
1282         vblk->send(env);
1283         return -1;
1284       }
1285     } else { //Valid vidblock for another PE:
1286       env->setMsgtype(ForVidMsg);
1287       env->setVidPtr(pCid->objPtr);
1288       return pe;
1289     }
1290   }
1291   else {
1292     env->setObjPtr(pCid->objPtr);
1293     return pCid->onPE;
1294   }
1295 }
1296
1297 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1298 {
1299   int destPE = _prepareMsg(eIdx, msg, pCid);
1300   if (destPE != -1) {
1301     register envelope *env = UsrToEnv(msg);
1302 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1303     criticalPath_send(env);
1304     automaticallySetMessagePriority(env);
1305 #endif
1306     CmiBecomeImmediate(env);
1307   }
1308   return destPE;
1309 }
1310
1311 extern "C"
1312 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1313 {
1314   if (opts & CK_MSG_INLINE) {
1315     CkSendMsgInline(entryIdx, msg, pCid, opts);
1316     return;
1317   }
1318 #ifndef CMK_OPTIMIZE
1319   if (opts & CK_MSG_IMMEDIATE) {
1320     CmiAbort("Immediate message is not allowed in Chare!");
1321   }
1322 #endif
1323   register envelope *env = UsrToEnv(msg);
1324   int destPE=_prepareMsg(entryIdx,msg,pCid);
1325   if (destPE!=-1) {
1326     _TRACE_CREATION_1(env);
1327     CpvAccess(_qd)->create();
1328     if (opts & CK_MSG_SKIP_OR_IMM)
1329       _noCldEnqueue(destPE, env);
1330     else
1331       CldEnqueue(destPE, env, _infoIdx);
1332     _TRACE_CREATION_DONE(1);
1333   }
1334 }
1335
1336 extern "C"
1337 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1338 {
1339   if (pCid->onPE==CkMyPe())
1340   {
1341     if(!CmiNodeAlive(CkMyPe())){
1342         return;
1343     }
1344 #ifndef CMK_OPTIMIZE
1345     //Just in case we need to breakpoint or use the envelope in some way
1346     _prepareMsg(entryIndex,msg,pCid);
1347 #endif
1348                 //Just directly call the chare (skip QD handling & scheduler)
1349     register envelope *env = UsrToEnv(msg);
1350     if (env->isPacked()) CkUnpackMessage(&env);
1351     _STATS_RECORD_PROCESS_MSG_1();
1352     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1353   }
1354   else {
1355     //No way to inline a cross-processor message:
1356     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
1357   }
1358 }
1359
1360 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1361 {
1362   register envelope *env = UsrToEnv(msg);
1363   _CHECK_USED(env);
1364   _SET_USED(env, 1);
1365   env->setMsgtype(type);
1366   env->setEpIdx(eIdx);
1367   env->setGroupNum(gID);
1368   env->setSrcPe(CkMyPe());
1369 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1370   criticalPath_send(env);
1371   automaticallySetMessagePriority(env);
1372 #endif
1373 #ifndef CMK_OPTIMIZE
1374   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1375 #endif
1376   CmiSetHandler(env, _charmHandlerIdx);
1377   return env;
1378 }
1379
1380 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1381 {
1382   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1383 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1384   criticalPath_send(env);
1385   automaticallySetMessagePriority(env);
1386 #endif
1387   CmiBecomeImmediate(env);
1388   return env;
1389 }
1390
1391 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1392                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1393 {
1394   int numPes;
1395   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1396 #ifdef _FAULT_MLOG_
1397         sendTicketGroupRequest(env,pe,_infoIdx);
1398 #else
1399   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1400   _TRACE_CREATION_N(env, numPes);
1401   if (opts & CK_MSG_SKIP_OR_IMM)
1402     _noCldEnqueue(pe, env);
1403   else
1404     _skipCldEnqueue(pe, env, _infoIdx);
1405   _TRACE_CREATION_DONE(1);
1406 #endif
1407 }
1408
1409 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1410                            int npes, int *pes)
1411 {
1412   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1413   _TRACE_CREATION_MULTICAST(env, npes, pes);
1414   CldEnqueueMulti(npes, pes, env, _infoIdx);
1415   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1416 }
1417
1418 extern "C"
1419 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1420 {
1421 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1422   if (destPE==CkMyPe())
1423   {
1424     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1425     return;
1426   }
1427   //Can't inline-- send the usual way
1428   register envelope *env = UsrToEnv(msg);
1429   int numPes;
1430   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1431   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1432   _TRACE_CREATION_N(env, numPes);
1433   _noCldEnqueue(destPE, env);
1434   _STATS_RECORD_SEND_BRANCH_1();
1435   CkpvAccess(_coreState)->create();
1436   _TRACE_CREATION_DONE(1);
1437 #else
1438   // no support for immediate message, send inline
1439   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1440 #endif
1441 }
1442
1443 extern "C"
1444 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1445 {
1446   if (destPE==CkMyPe())
1447   {
1448     if(!CmiNodeAlive(CkMyPe())){
1449         return;
1450     }
1451     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1452     if (obj!=NULL)
1453     { //Just directly call the group:
1454 #ifndef CMK_OPTIMIZE
1455       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1456 #else
1457       envelope *env=UsrToEnv(msg);
1458 #endif
1459       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1460       return;
1461     }
1462   }
1463   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1464   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
1465 }
1466
1467 extern "C"
1468 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1469 {
1470   if (opts & CK_MSG_INLINE) {
1471     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1472     return;
1473   }
1474   if (opts & CK_MSG_IMMEDIATE) {
1475     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1476     return;
1477   }
1478   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1479   _STATS_RECORD_SEND_BRANCH_1();
1480   CkpvAccess(_coreState)->create();
1481 }
1482
1483 extern "C"
1484 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1485 {
1486 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1487   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1488   _TRACE_CREATION_MULTICAST(env, npes, pes);
1489   _noCldEnqueueMulti(npes, pes, env);
1490   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1491 #else
1492   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1493   CpvAccess(_qd)->create(-npes);
1494 #endif
1495   _STATS_RECORD_SEND_BRANCH_N(npes);
1496   CpvAccess(_qd)->create(npes);
1497 }
1498
1499 extern "C"
1500 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1501 {
1502   if (opts & CK_MSG_IMMEDIATE) {
1503     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1504     return;
1505   }
1506     // normal mesg
1507   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1508   _STATS_RECORD_SEND_BRANCH_N(npes);
1509   CpvAccess(_qd)->create(npes);
1510 }
1511
1512 extern "C"
1513 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1514 {
1515   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1516   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1517   CpvAccess(_qd)->create(CkNumPes());
1518 }
1519
1520 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1521                 int node=CLD_BROADCAST_ALL, int opts=0)
1522 {
1523   int numPes;
1524   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1525 #ifdef _FAULT_MLOG_
1526         sendTicketNodeGroupRequest(env,node,_infoIdx);
1527 #else
1528   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1529   _TRACE_CREATION_N(env, numPes);
1530   if (opts & CK_MSG_SKIP_OR_IMM) {
1531     _noCldNodeEnqueue(node, env);
1532     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1533       CkpvAccess(_coreState)->create(-numPes);
1534     }
1535   }
1536   else
1537     CldNodeEnqueue(node, env, _infoIdx);
1538   _TRACE_CREATION_DONE(1);
1539 #endif
1540 }
1541
1542 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1543                            int npes, int *nodes)
1544 {
1545   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1546   _TRACE_CREATION_N(env, npes);
1547   for (int i=0; i<npes; i++) {
1548     CldNodeEnqueue(nodes[i], env, _infoIdx);
1549   }
1550   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1551 }
1552
1553 extern "C"
1554 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1555 {
1556 #if CMK_IMMEDIATE_MSG
1557   if (node==CkMyNode())
1558   {
1559     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1560     return;
1561   }
1562   //Can't inline-- send the usual way
1563   register envelope *env = UsrToEnv(msg);
1564   int numPes;
1565   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1566   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1567   _TRACE_CREATION_N(env, numPes);
1568   _noCldNodeEnqueue(node, env);
1569   _STATS_RECORD_SEND_BRANCH_1();
1570   /* immeidate message is invisible to QD */
1571 //  CkpvAccess(_coreState)->create();
1572   _TRACE_CREATION_DONE(1);
1573 #else
1574   // no support for immediate message, send inline
1575   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1576 #endif
1577 }
1578
1579 extern "C"
1580 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1581 {
1582   if (node==CkMyNode())
1583   {
1584     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1585     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1586     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1587     if (obj!=NULL)
1588     { //Just directly call the group:
1589 #ifndef CMK_OPTIMIZE
1590       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1591 #else
1592       envelope *env=UsrToEnv(msg);
1593 #endif
1594       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1595       return;
1596     }
1597   }
1598   //Can't inline-- send the usual way
1599   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
1600 }
1601
1602 extern "C"
1603 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1604 {
1605   if (opts & CK_MSG_INLINE) {
1606     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1607     return;
1608   }
1609   if (opts & CK_MSG_IMMEDIATE) {
1610     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1611     return;
1612   }
1613   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1614   _STATS_RECORD_SEND_NODE_BRANCH_1();
1615   CkpvAccess(_coreState)->create();
1616 }
1617
1618 extern "C"
1619 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1620 {
1621 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1622   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1623   _noCldEnqueueMulti(npes, nodes, env);
1624 #else
1625   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1626   CpvAccess(_qd)->create(-npes);
1627 #endif
1628   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1629   CpvAccess(_qd)->create(npes);
1630 }
1631
1632 extern "C"
1633 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1634 {
1635   if (opts & CK_MSG_IMMEDIATE) {
1636     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1637     return;
1638   }
1639     // normal mesg
1640   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1641   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1642   CpvAccess(_qd)->create(npes);
1643 }
1644
1645 extern "C"
1646 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1647 {
1648   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1649   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1650   CpvAccess(_qd)->create(CkNumNodes());
1651 }
1652
1653 //Needed by delegation manager:
1654 extern "C"
1655 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1656 { return _prepareMsg(eIdx,msg,pCid); }
1657 extern "C"
1658 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1659 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1660 extern "C"
1661 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1662 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1663
1664 void _ckModuleInit(void) {
1665         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1666         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1667         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1668         CkpvInitialize(TokenPool*, _tokenPool);
1669         CkpvAccess(_tokenPool) = new TokenPool;
1670 }
1671
1672
1673 /************** Send: Arrays *************/
1674
1675 extern void CkArrayManagerInsert(int onPe,void *msg);
1676 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1677
1678 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1679 {
1680   _CHECK_USED(env);
1681   _SET_USED(env, 1);
1682   env->setMsgtype(type);
1683 #ifndef CMK_OPTIMIZE
1684   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1685 #endif
1686   CmiSetHandler(env, _charmHandlerIdx);
1687   CpvAccess(_qd)->create();
1688 }
1689
1690 extern "C"
1691 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1692   register envelope *env = UsrToEnv(msg);
1693   env->getsetArrayMgr()=aID;
1694   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1695   CldEnqueue(pe, env, _infoIdx);
1696 }
1697
1698 extern "C"
1699 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1700   register envelope *env = UsrToEnv(msg);
1701   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1702 #ifdef _FAULT_MLOG_
1703         sendTicketArrayRequest(env,pe,_infoIdx);
1704 #else
1705   if (opts & CK_MSG_IMMEDIATE)
1706     CmiBecomeImmediate(env);
1707   if (opts & CK_MSG_SKIP_OR_IMM)
1708     _noCldEnqueue(pe, env);
1709   else
1710     _skipCldEnqueue(pe, env, _infoIdx);
1711 #endif
1712 }
1713
1714 class ElementDestroyer : public CkLocIterator {
1715 private:
1716         CkLocMgr *locMgr;
1717 public:
1718         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1719         void addLocation(CkLocation &loc) {
1720           loc.destroyAll();
1721         }
1722 };
1723
1724 void CkDeleteChares() {
1725   int i;
1726   int numGroups = CkpvAccess(_groupIDTable)->size();
1727
1728   // delete all array elements
1729   for(i=0;i<numGroups;i++) {
1730     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1731     if(obj && obj->isLocMgr())  {
1732       CkLocMgr *mgr = (CkLocMgr*)obj;
1733       ElementDestroyer destroyer(mgr);
1734       mgr->iterate(destroyer);
1735 printf("[%d] DELETE!\n", CkMyPe());
1736     }
1737   }
1738
1739   // delete all groups
1740   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1741   for(i=0;i<numGroups;i++) {
1742     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1743     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1744     if (obj) delete obj;
1745   }
1746   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1747
1748   // delete all node groups
1749   if (CkMyRank() == 0) {
1750     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1751     for(i=0;i<numNodeGroups;i++) {
1752       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1753       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1754       if (obj) delete obj;
1755     }
1756   }
1757 }
1758
1759 //------------------- Message Watcher (record/replay) ----------------
1760
1761 CkMessageWatcher::~CkMessageWatcher() {}
1762
1763 class CkMessageRecorder : public CkMessageWatcher {
1764         FILE *f;
1765 public:
1766         CkMessageRecorder(FILE *f_) :f(f_) {}
1767         ~CkMessageRecorder() {
1768                 fprintf(f,"-1 -1 -1");
1769                 fclose(f);
1770         }
1771
1772         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1773                 if (env->getEvent())
1774                      fprintf(f,"%d %d %d %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg);
1775                 return CmiTrue;
1776         }
1777 };
1778
1779 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1780 #define REPLAYDEBUG(args) /* empty */
1781
1782 class CkMessageReplay : public CkMessageWatcher {
1783         FILE *f;
1784         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1785         /// Read the next message we need from the file:
1786         void getNext(void) {
1787                 if (4!=fscanf(f,"%d%d%d%d", &nextPE,&nextSize,&nextEvent,&nexttype)) {
1788                         // CkAbort("CkMessageReplay> Syntax error reading replay file");
1789                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1790                 }
1791         }
1792         /// If this is the next message we need, advance and return CmiTrue.
1793         CmiBool isNext(envelope *env) {
1794                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1795                 if (nextEvent!=env->getEvent()) return CmiFalse;
1796                 if (nextSize!=env->getTotalsize())
1797                 {
1798                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1799                         return CmiFalse;
1800                 }
1801                 return CmiTrue;
1802         }
1803
1804         /// This is a (short) list of messages we aren't yet ready for:
1805         CkQ<envelope *> delayed;
1806
1807         /// Try to flush out any delayed messages
1808         void flush(void) {
1809                 int len=delayed.length();
1810                 for (int i=0;i<len;i++) {
1811                         envelope *env=delayed.deq();
1812                         if (isNext(env)) { /* this is the next message: process it */
1813                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1814                                 CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1815                                 return;
1816                         }
1817                         else /* Not ready yet-- put it back in the
1818                                 queue */
1819                           {
1820                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1821                                 delayed.enq(env);
1822                           }
1823                 }
1824         }
1825
1826 public:
1827         CkMessageReplay(FILE *f_) :f(f_) { getNext();
1828         REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
1829                     }
1830         ~CkMessageReplay() {fclose(f);}
1831
1832         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1833           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
1834                 if (env->getEvent() == 0) return CmiTrue;
1835                 if (isNext(env)) { /* This is the message we were expecting */
1836                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1837                         getNext(); /* Advance over this message */
1838                         flush(); /* try to process queued-up stuff */
1839                         return CmiTrue;
1840                 }
1841 #if CMK_SMP
1842                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
1843                          // try next rank, we can't just buffer the msg and left
1844                          // we need to keep unprocessed msg on the fly
1845                         int nextpe = CkMyPe()+1;
1846                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
1847                         nextpe = CkNodeFirst(CkMyNode());
1848                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
1849                         return CmiFalse;
1850                 }
1851 #endif
1852                 else /*!isNext(env) */ {
1853                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
1854                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
1855                         delayed.enq(env);
1856                         flush();
1857                         return CmiFalse;
1858                 }
1859         }
1860 };
1861
1862 static FILE *openReplayFile(const char *permissions) {
1863
1864         char fName[200];
1865         sprintf(fName,"ckreplay_%06d.log",CkMyPe());
1866         FILE *f=fopen(fName,permissions);
1867         REPLAYDEBUG("openReplayfile "<<fName);
1868         if (f==NULL) {
1869                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
1870                         CkMyPe(),fName,permissions);
1871                 CkAbort("openReplayFile> Could not open replay file");
1872         }
1873         return f;
1874 }
1875
1876 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
1877         REPLAYDEBUG("CkMessageWaterInit ");
1878         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order"))
1879                 ck->watcher=new CkMessageRecorder(openReplayFile("w"));
1880         if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream"))
1881                 ck->watcher=new CkMessageReplay(openReplayFile("r"));
1882 }
1883
1884 extern "C"
1885 int CkMessageToEpIdx(void *msg) {
1886         envelope *env=UsrToEnv(msg);
1887         int ep=env->getEpIdx();
1888         if (ep==CkIndex_CkArray::recvBroadcast(0))
1889                 return env->getsetArrayBcastEp();
1890         else
1891                 return ep;
1892 }
1893
1894 extern "C"
1895 int getCharmEnvelopeSize() {
1896   return sizeof(envelope);
1897 }
1898
1899
1900 #include "CkMarshall.def.h"
1901