fixed CProxySection_ArrayBase default constructor which may resulted in random _nsid...
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
29
30 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
31
32 int CMessage_CkMessage::__idx=-1;
33 int CMessage_CkArgMsg::__idx=0;
34 int CkIndex_Chare::__idx;
35 int CkIndex_Group::__idx;
36 int CkIndex_ArrayBase::__idx=-1;
37
38 extern int _defaultObjectQ;
39
40 //Charm++ virtual functions: declaring these here results in a smaller executable
41 Chare::Chare(void) {
42   thishandle.onPE=CkMyPe();
43   thishandle.objPtr=this;
44 #ifdef _FAULT_MLOG_
45         mlogData = new ChareMlogData();
46         mlogData->objID.type = TypeChare;
47         mlogData->objID.data.chare.id = thishandle;
48 #endif
49 #if CMK_OBJECT_QUEUE_AVAILABLE
50   if (_defaultObjectQ)  CkEnableObjQ();
51 #endif
52 }
53
54 Chare::Chare(CkMigrateMessage* m) {
55   thishandle.onPE=CkMyPe();
56   thishandle.objPtr=this;
57
58 #if CMK_OBJECT_QUEUE_AVAILABLE
59   if (_defaultObjectQ)  CkEnableObjQ();
60 #endif
61 }
62
63 void Chare::CkEnableObjQ()
64 {
65 #if CMK_OBJECT_QUEUE_AVAILABLE
66   objQ.create();
67 #endif
68 }
69
70 Chare::~Chare() {}
71
72 void Chare::pup(PUP::er &p)
73 {
74   p(thishandle.onPE);
75   thishandle.objPtr=(void *)this;
76 #ifdef _FAULT_MLOG_
77         if(p.isUnpacking()){
78                 mlogData = new ChareMlogData();
79         }
80         mlogData->pup(p);
81 #endif
82 }
83
84 int Chare::ckGetChareType() const {
85   return -3;
86 }
87 char *Chare::ckDebugChareName(void) {
88   char buf[100];
89   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
90   return strdup(buf);
91 }
92 int Chare::ckDebugChareID(char *str, int limit) {
93   // pure chares for now do not have a valid ID
94   str[0] = 0;
95   return 1;
96 }
97 void Chare::ckDebugPup(PUP::er &p) {
98   pup(p);
99 }
100
101 /// This method is called before starting a [threaded] entry method.
102 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
103   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
104   traceAddThreadListeners(th, UsrToEnv(msg));
105 }
106
107
108 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
109   p.comment("Bytes");
110   int ts=UsrToEnv(msg)->getTotalsize();
111   int msgLen=ts-sizeof(envelope);
112   if (msgLen>0)
113     p((char*)msg,msgLen);
114 }
115
116 IrrGroup::IrrGroup(void) {
117   thisgroup = CkpvAccess(_currentGroup);
118 #ifdef _FAULT_MLOG_
119         mlogData->objID.type = TypeGroup;
120         mlogData->objID.data.group.id = thisgroup;
121         mlogData->objID.data.group.onPE = CkMyPe();
122 #endif
123 }
124
125 IrrGroup::~IrrGroup() {
126   // remove the object pointer
127   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
128   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
129   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
130 }
131
132 void IrrGroup::pup(PUP::er &p)
133 {
134   Chare::pup(p);
135   p|thisgroup;
136 }
137
138 int IrrGroup::ckGetChareType() const {
139   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
140 }
141
142 int IrrGroup::ckDebugChareID(char *str, int limit) {
143   if (limit<5) return -1;
144   str[0] = 1;
145   *((int*)&str[1]) = thisgroup.idx;
146   return 5;
147 }
148
149 char *IrrGroup::ckDebugChareName() {
150   return strdup(_chareTable[ckGetChareType()]->name);
151 }
152
153 void IrrGroup::ckJustMigrated(void)
154 {
155 }
156
157 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
158   /* FIXME: **CW** not entirely sure what we should do here yet */
159 }
160
161 void Group::CkAddThreadListeners(CthThread th, void *msg) {
162   Chare::CkAddThreadListeners(th, msg);
163   CthSetThreadID(th, thisgroup.idx, 0, 0);
164 }
165
166 void Group::pup(PUP::er &p)
167 {
168   CkReductionMgr::pup(p);
169   reductionInfo.pup(p);
170 }
171
172 /**** Delegation Manager Group */
173 CkDelegateMgr::~CkDelegateMgr() { }
174
175 //Default delegator implementation: do not delegate-- send directly
176 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
177   { CkSendMsg(ep,m,c); }
178 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
179   { CkSendMsgBranch(ep,m,onPE,g); }
180 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
181   { CkBroadcastMsgBranch(ep,m,g); }
182 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
183   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
184 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
185   { CkSendMsgNodeBranch(ep,m,onNode,g); }
186 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
187   { CkBroadcastMsgNodeBranch(ep,m,g); }
188 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
189   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
190 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
191 {
192         CProxyElement_ArrayBase ap(a,idx);
193         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
194 }
195 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
196 {
197         CProxyElement_ArrayBase ap(a,idx);
198         ap.ckSend((CkArrayMessage *)m,ep);
199 }
200 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
201 {
202         CProxy_ArrayBase ap(a);
203         ap.ckBroadcast((CkArrayMessage *)m,ep);
204 }
205
206 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
207 {
208         CmiAbort("ArraySectionSend is not implemented!\n");
209 /*
210         CProxyElement_ArrayBase ap(a,idx);
211         ap.ckSend((CkArrayMessage *)m,ep);
212 */
213 }
214
215 /*** Proxy <-> delegator communication */
216 CkDelegateData::~CkDelegateData() {}
217
218 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
219   return pd; // default implementation ignores pup call
220 }
221
222 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
223    this tricky manual reference counting business... */
224
225 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
226         if (dPtr) dPtr->ref();
227         ckUndelegate();
228         delegatedMgr = dTo;
229         delegatedPtr = dPtr;
230 }
231 void CProxy::ckUndelegate(void) {
232         delegatedMgr=NULL;
233         if (delegatedPtr) delegatedPtr->unref();
234         delegatedPtr=NULL;
235 }
236
237 /// Copy constructor
238 CProxy::CProxy(const CProxy &src)
239     :delegatedMgr(src.delegatedMgr)
240 {
241     delegatedPtr = NULL;
242     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
243         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
244 }
245
246 /// Assignment operator
247 CProxy& CProxy::operator=(const CProxy &src) {
248         CkDelegateData *oldPtr=delegatedPtr;
249         ckUndelegate();
250         delegatedMgr=src.delegatedMgr;
251
252         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
253             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
254         else
255             delegatedPtr = NULL;
256
257         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
258         if (oldPtr) oldPtr->unref();
259         return *this;
260 }
261
262 void CProxy::pup(PUP::er &p) {
263       CkGroupID delegatedTo;
264       delegatedTo.setZero();
265       int isNodeGroup = 0;
266       if (!p.isUnpacking()) {
267         if (delegatedMgr) {
268           delegatedTo = delegatedMgr->CkGetGroupID();
269           isNodeGroup = delegatedMgr->isNodeGroup();
270         }
271       }
272       p|delegatedTo;
273       if (!delegatedTo.isZero()) {
274         p|isNodeGroup;
275         if (p.isUnpacking()) {
276           if (isNodeGroup)
277                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
278           else
279                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
280         }
281
282         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
283         if (p.isUnpacking() && delegatedPtr)
284             delegatedPtr->ref();
285       }
286 }
287
288 /**** Array sections */
289 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
290 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
291   _cookie.aid = aid;    \
292   _cookie.get_pe() = CkMyPe();  \
293   _elems = new CkArrayIndexMax[nElems]; \
294   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
295   pelist = NULL;        \
296   npes  = 0;    \
297 }
298
299 CKSECTIONID_CONSTRUCTOR_DEF(1D)
300 CKSECTIONID_CONSTRUCTOR_DEF(2D)
301 CKSECTIONID_CONSTRUCTOR_DEF(3D)
302 CKSECTIONID_CONSTRUCTOR_DEF(4D)
303 CKSECTIONID_CONSTRUCTOR_DEF(5D)
304 CKSECTIONID_CONSTRUCTOR_DEF(6D)
305 CKSECTIONID_CONSTRUCTOR_DEF(Max)
306
307 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
308   pelist = new int[npes];
309   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
310   _cookie.aid = gid;
311 }
312
313 CkSectionID::CkSectionID(const CkSectionID &sid) {
314   int i;
315   _cookie = sid._cookie;
316   _nElems = sid._nElems;
317   if (_nElems > 0) {
318     _elems = new CkArrayIndexMax[_nElems];
319     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
320   } else _elems = NULL;
321   npes = sid.npes;
322   if (npes > 0) {
323     pelist = new int[npes];
324     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
325   } else pelist = NULL;
326 }
327
328 void CkSectionID::operator=(const CkSectionID &sid) {
329   int i;
330   _cookie = sid._cookie;
331   _nElems = sid._nElems;
332   if (_nElems > 0) {
333     _elems = new CkArrayIndexMax[_nElems];
334     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
335   } else _elems = NULL;
336   npes = sid.npes;
337   if (npes > 0) {
338     pelist = new int[npes];
339     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
340   } else pelist = NULL;
341 }
342
343 void CkSectionID::pup(PUP::er &p) {
344     p | _cookie;
345     p(_nElems);
346     if (_nElems > 0) {
347       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
348       for (int i=0; i< _nElems; i++) p | _elems[i];
349       npes = 0;
350       pelist = NULL;
351     } else {
352       // If _nElems is zero, than this section describes processors instead of array elements
353       _elems = NULL;
354       p(npes);
355       if (p.isUnpacking()) pelist = new int[npes];
356       p(pelist, npes);
357     }
358 }
359
360 /**** Tiny random API routines */
361
362 #ifdef CMK_CUDA
363 void CUDACallbackManager(void *fn) {
364   if (fn != NULL) {
365     CkCallback *cb = (CkCallback*) fn;
366     cb->send();
367   }
368 }
369
370 #endif
371
372 extern "C"
373 void CkSetRefNum(void *msg, int ref)
374 {
375   UsrToEnv(msg)->setRef(ref);
376 }
377
378 extern "C"
379 int CkGetRefNum(void *msg)
380 {
381   return UsrToEnv(msg)->getRef();
382 }
383
384 extern "C"
385 int CkGetSrcPe(void *msg)
386 {
387   return UsrToEnv(msg)->getSrcPe();
388 }
389
390 extern "C"
391 int CkGetSrcNode(void *msg)
392 {
393   return CmiNodeOf(CkGetSrcPe(msg));
394 }
395
396 extern "C"
397 void *CkLocalBranch(CkGroupID gID) {
398   return _localBranch(gID);
399 }
400
401 static
402 void *_ckLocalNodeBranch(CkGroupID groupID) {
403   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
404   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
405   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
406   return retval;
407 }
408
409 extern "C"
410 void *CkLocalNodeBranch(CkGroupID groupID)
411 {
412   void *retval;
413   // we are called in a constructor
414   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
415     return CkpvAccess(_currentNodeGroupObj);
416   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
417   { // Nodegroup hasn't finished being created yet-- schedule...
418     CsdScheduler(0);
419   }
420   return retval;
421 }
422
423 extern "C"
424 void *CkLocalChare(const CkChareID *pCid)
425 {
426         int pe=pCid->onPE;
427         if (pe<0) { //A virtual chare ID
428                 if (pe!=(-(CkMyPe()+1)))
429                         return NULL;//VID block not on this PE
430                 VidBlock *v=(VidBlock *)pCid->objPtr;
431                 return v->getLocalChare();
432         }
433         else
434         { //An ordinary chare ID
435                 if (pe!=CkMyPe())
436                         return NULL;//Chare not on this PE
437                 return pCid->objPtr;
438         }
439 }
440
441 CkpvDeclare(char **,Ck_argv);
442 extern "C" char **CkGetArgv(void) {
443         return CkpvAccess(Ck_argv);
444 }
445 extern "C" int CkGetArgc(void) {
446         return CmiGetArgc(CkpvAccess(Ck_argv));
447 }
448
449 /******************** Basic support *****************/
450 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
451 {
452 #ifdef _FAULT_MLOG_
453         CpvAccess(_currentObj) = (Chare *)obj;
454 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
455 #endif
456   //BIGSIM_OOC DEBUGGING
457   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
458   //fflush(stdout);
459 #ifndef CMK_OPTIMIZE
460   CpdBeforeEp(epIdx, obj, msg);
461 #endif
462   _entryTable[epIdx]->call(msg, obj);
463 #ifndef CMK_OPTIMIZE
464   CpdAfterEp(epIdx);
465 #endif
466   if (_entryTable[epIdx]->noKeep)
467   { /* Method doesn't keep/delete the message, so we have to: */
468     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
469   }
470 }
471 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
472 {
473   //BIGSIM_OOC DEBUGGING
474   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
475   //fflush(stdout);
476
477   void *deliverMsg;
478 #ifdef _FAULT_MLOG_
479         CpvAccess(_currentObj) = (Chare *)obj;
480 #endif
481   if (_entryTable[epIdx]->noKeep)
482   { /* Deliver a read-only copy of the message */
483     deliverMsg=(void *)msg;
484   } else
485   { /* Method needs a copy of the message to keep/delete */
486     void *oldMsg=(void *)msg;
487     deliverMsg=CkCopyMsg(&oldMsg);
488 #ifndef CMK_OPTIMIZE
489     if (oldMsg!=msg)
490       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
491 #endif
492   }
493 #ifndef CMK_OPTIMIZE
494   CpdBeforeEp(epIdx, obj, (void*)msg);
495 #endif
496   _entryTable[epIdx]->call(deliverMsg, obj);
497 #ifndef CMK_OPTIMIZE
498   CpdAfterEp(epIdx);
499 #endif
500 }
501
502 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
503 {
504   register void *msg = EnvToUsr(env);
505   _SET_USED(env, 0);
506   CkDeliverMessageFree(epIdx,msg,obj);
507 }
508
509 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
510 {
511
512 #ifndef CMK_OPTIMIZE /* Consider tracing: */
513   if (_entryTable[epIdx]->traceEnabled) {
514     _TRACE_BEGIN_EXECUTE(env);
515     _invokeEntryNoTrace(epIdx,env,obj);
516     _TRACE_END_EXECUTE();
517   }
518   else
519 #endif
520     _invokeEntryNoTrace(epIdx,env,obj);
521
522 }
523
524 /********************* Creation ********************/
525
526 extern "C"
527 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
528 {
529   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
530   envelope *env = UsrToEnv(msg);
531   _CHECK_USED(env);
532   if(pCid == 0) {
533     env->setMsgtype(NewChareMsg);
534   } else {
535     pCid->onPE = (-(CkMyPe()+1));
536     //  pCid->magic = _GETIDX(cIdx);
537     pCid->objPtr = (void *) new VidBlock();
538     _MEMCHECK(pCid->objPtr);
539     env->setMsgtype(NewVChareMsg);
540     env->setVidPtr(pCid->objPtr);
541   }
542   env->setEpIdx(eIdx);
543   env->setSrcPe(CkMyPe());
544   CmiSetHandler(env, _charmHandlerIdx);
545   _TRACE_CREATION_1(env);
546   CpvAccess(_qd)->create();
547   _STATS_RECORD_CREATE_CHARE_1();
548   _SET_USED(env, 1);
549   if(destPE == CK_PE_ANY)
550     env->setForAnyPE(1);
551   else
552     env->setForAnyPE(0);
553   CldEnqueue(destPE, env, _infoIdx);
554   _TRACE_CREATION_DONE(1);
555 }
556
557 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
558 {
559   register int gIdx = _entryTable[epIdx]->chareIdx;
560   register void *obj = malloc(_chareTable[gIdx]->size);
561   _MEMCHECK(obj);
562   setMemoryTypeChare(obj);
563   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
564   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
565   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
566   CkpvAccess(_groupIDTable)->push_back(groupID);
567   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
568   if(ptrq) {
569     void *pending;
570     while((pending=ptrq->deq())!=0)
571       CldEnqueue(CkMyPe(), pending, _infoIdx);
572 //    delete ptrq;
573       CkpvAccess(_groupTable)->find(groupID).clearPending();
574   }
575   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
576
577   CkpvAccess(_currentGroup) = groupID;
578   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
579   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
580   _STATS_RECORD_PROCESS_GROUP_1();
581 }
582
583 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
584 {
585   register int gIdx = _entryTable[epIdx]->chareIdx;
586   int objSize=_chareTable[gIdx]->size;
587   register void *obj = malloc(objSize);
588   _MEMCHECK(obj);
589   setMemoryTypeChare(obj);
590   CkpvAccess(_currentGroup) = groupID;
591
592 // Now that the NodeGroup is created, add it to the table.
593 //  NodeGroups can be accessed by multiple processors, so
594 //  this is in the opposite order from groups - invoking the constructor
595 //  before registering it.
596 // User may call CkLocalNodeBranch() inside the nodegroup constructor
597 //  store nodegroup into _currentNodeGroupObj
598   CkpvAccess(_currentNodeGroupObj) = obj;
599   _invokeEntryNoTrace(epIdx,env,obj);
600   CkpvAccess(_currentNodeGroupObj) = NULL;
601   _STATS_RECORD_PROCESS_NODE_GROUP_1();
602
603   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
604   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
605   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
606   CksvAccess(_nodeGroupIDTable).push_back(groupID);
607
608   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
609   if(ptrq) {
610     void *pending;
611     while((pending=ptrq->deq())!=0)
612       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
613 //    delete ptrq;
614       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
615   }
616   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
617 }
618
619 void _createGroup(CkGroupID groupID, envelope *env)
620 {
621   _CHECK_USED(env);
622   _SET_USED(env, 1);
623   register int epIdx = env->getEpIdx();
624   int gIdx = _entryTable[epIdx]->chareIdx;
625   CkNodeGroupID rednMgr;
626   if(_chareTable[gIdx]->isIrr == 0){
627                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
628                 rednMgr = rednMgrProxy;
629 //              rednMgrProxy.setAttachedGroup(groupID);
630   }else{
631         rednMgr.setZero();
632   }
633   env->setGroupNum(groupID);
634   env->setSrcPe(CkMyPe());
635   env->setRednMgr(rednMgr);
636   env->setGroupEpoch(CkpvAccess(_charmEpoch));
637
638   if(CkNumPes()>1) {
639     CkPackMessage(&env);
640     CmiSetHandler(env, _bocHandlerIdx);
641     _numInitMsgs++;
642     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
643     CpvAccess(_qd)->create(CkNumPes()-1);
644     CkUnpackMessage(&env);
645   }
646   _STATS_RECORD_CREATE_GROUP_1();
647   CkCreateLocalGroup(groupID, epIdx, env);
648 }
649
650 void _createNodeGroup(CkGroupID groupID, envelope *env)
651 {
652   _CHECK_USED(env);
653   _SET_USED(env, 1);
654   register int epIdx = env->getEpIdx();
655   env->setGroupNum(groupID);
656   env->setSrcPe(CkMyPe());
657   env->setGroupEpoch(CkpvAccess(_charmEpoch));
658   if(CkNumNodes()>1) {
659     CkPackMessage(&env);
660     CmiSetHandler(env, _bocHandlerIdx);
661     _numInitMsgs++;
662     CksvAccess(_numInitNodeMsgs)++;
663     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
664     CpvAccess(_qd)->create(CkNumNodes()-1);
665     CkUnpackMessage(&env);
666   }
667   _STATS_RECORD_CREATE_NODE_GROUP_1();
668   CkCreateLocalNodeGroup(groupID, epIdx, env);
669 }
670
671 // new _groupCreate
672
673 static CkGroupID _groupCreate(envelope *env)
674 {
675   register CkGroupID groupNum;
676
677   // check CkMyPe(). if it is 0 then idx is _numGroups++
678   // if not, then something else...
679   if(CkMyPe() == 0)
680      groupNum.idx = CkpvAccess(_numGroups)++;
681   else
682      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
683   _createGroup(groupNum, env);
684   return groupNum;
685 }
686
687 // new _nodeGroupCreate
688 static CkGroupID _nodeGroupCreate(envelope *env)
689 {
690   register CkGroupID groupNum;
691   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
692   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
693           groupNum.idx = CksvAccess(_numNodeGroups)++;
694    else
695           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
696   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
697   _createNodeGroup(groupNum, env);
698   return groupNum;
699 }
700
701 /**** generate the group idx when group is creator pe is not pe0
702  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
703  **** remaining bits contain the group creator processor number and
704  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
705
706 int _getGroupIdx(int numNodes,int myNode,int numGroups)
707 {
708         int idx;
709         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
710         int n = 32 - (x+1);                                     // number of bits remaining for the index
711         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
712                                                                 // then add the next available index
713         // of course this won't work when int is 8 bytes long on T3E
714         //idx |= 0x80000000;                                      // set the most significant bit to 1
715         idx = - idx;
716                                                                 // if int is not 32 bits, wouldn't this be wrong?
717         return idx;
718 }
719
720 extern "C"
721 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
722 {
723   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
724   register envelope *env = UsrToEnv(msg);
725   env->setMsgtype(BocInitMsg);
726   env->setEpIdx(eIdx);
727   env->setSrcPe(CkMyPe());
728   _TRACE_CREATION_N(env, CkNumPes());
729   CkGroupID gid = _groupCreate(env);
730   _TRACE_CREATION_DONE(1);
731   return gid;
732 }
733
734 extern "C"
735 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
736 {
737   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
738   register envelope *env = UsrToEnv(msg);
739   env->setMsgtype(NodeBocInitMsg);
740   env->setEpIdx(eIdx);
741   env->setSrcPe(CkMyPe());
742   _TRACE_CREATION_N(env, CkNumNodes());
743   CkGroupID gid = _nodeGroupCreate(env);
744   _TRACE_CREATION_DONE(1);
745   return gid;
746 }
747
748 static inline void *_allocNewChare(envelope *env)
749 {
750   void *tmp=malloc(_chareTable[_entryTable[env->getEpIdx()]->chareIdx]->size);
751   _MEMCHECK(tmp);
752   setMemoryTypeChare(tmp);
753   return tmp;
754 }
755
756 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
757 {
758   register void *obj = _allocNewChare(env);
759   _invokeEntry(env->getEpIdx(),env,obj);
760 }
761
762 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
763 {
764   register void *obj = _allocNewChare(env);
765   register CkChareID *pCid = (CkChareID *)
766       _allocMsg(FillVidMsg, sizeof(CkChareID));
767   pCid->onPE = CkMyPe();
768   pCid->objPtr = obj;
769   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
770   register envelope *ret = UsrToEnv(pCid);
771   ret->setVidPtr(env->getVidPtr());
772   register int srcPe = env->getSrcPe();
773   ret->setSrcPe(CkMyPe());
774   CmiSetHandler(ret, _charmHandlerIdx);
775   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
776   CpvAccess(_qd)->create();
777   _invokeEntry(env->getEpIdx(),env,obj);
778 }
779
780 /************** Receive: Chares *************/
781
782 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
783 {
784   register int epIdx = env->getEpIdx();
785   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
786   register void *obj;
787   if (mainIdx != -1)  {
788     CmiAssert(CkMyPe()==0);
789     obj = _mainTable[mainIdx]->getObj();
790   }
791   else
792     obj = env->getObjPtr();
793   _invokeEntry(epIdx,env,obj);
794 }
795
796 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
797 {
798   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
799   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
800   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
801   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
802   vptr->fill(pcid->onPE, pcid->objPtr);
803   CmiFree(env);
804 }
805
806 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
807 {
808   VidBlock *vptr = (VidBlock *) env->getVidPtr();
809   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
810   _SET_USED(env, 1);
811   vptr->send(env);
812 }
813
814 /************** Receive: Groups ****************/
815
816 /**
817  This message is sent to this groupID--prepare to
818  handle this message by looking up the group,
819  and possibly stashing the message.
820 */
821 IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
822 {
823
824         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
825         IrrGroup *obj = ck->localBranch(groupID);
826         if (obj==NULL) { /* groupmember not yet created: stash message */
827                 ck->getGroupTable()->find(groupID).enqMsg(env);
828         }
829         else { /* will be able to process message */
830                 ck->process();
831         }
832         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
833         return obj;
834 }
835
836 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
837 {
838 #if CMK_LBDB_ON
839   // if there is a running obj being measured, stop it temporarily
840   LDObjHandle objHandle;
841   int objstopped = 0;
842   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
843   if (the_lbdb->RunningObject(&objHandle)) {
844     objstopped = 1;
845     the_lbdb->ObjectStop(objHandle);
846   }
847 #endif
848   _invokeEntry(epIdx,env,obj);
849 #if CMK_LBDB_ON
850   if (objstopped) the_lbdb->ObjectStart(objHandle);
851 #endif
852   _STATS_RECORD_PROCESS_BRANCH_1();
853 }
854
855 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
856 {
857   register CkGroupID groupID =  env->getGroupNum();
858   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
859   if(obj) {
860     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
861   }
862 }
863
864 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
865 {
866   env->setMsgtype(ForChareMsg);
867   env->setObjPtr(obj);
868   _processForChareMsg(ck,env);
869   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
870 }
871
872 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
873 {
874   env->setEpIdx(epIdx);
875   _deliverForNodeBocMsg(ck,env, obj);
876 }
877
878 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
879 {
880   register CkGroupID groupID = env->getGroupNum();
881   register void *obj;
882
883   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
884   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
885   if(!obj) { // groupmember not yet created
886 #if CMK_IMMEDIATE_MSG
887     if (CmiIsImmediate(env))     // buffer immediate message
888       CmiDelayImmediate();
889     else
890 #endif
891     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
892     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
893     return;
894   }
895   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
896 #if CMK_IMMEDIATE_MSG
897   if (!CmiIsImmediate(env))
898 #endif
899   ck->process();
900   env->setMsgtype(ForChareMsg);
901   env->setObjPtr(obj);
902   _processForChareMsg(ck,env);
903   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
904 }
905
906 void _processBocInitMsg(CkCoreState *ck,envelope *env)
907 {
908   register CkGroupID groupID = env->getGroupNum();
909   register int epIdx = env->getEpIdx();
910   CkCreateLocalGroup(groupID, epIdx, env);
911 }
912
913 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
914 {
915   register CkGroupID groupID = env->getGroupNum();
916   register int epIdx = env->getEpIdx();
917   CkCreateLocalNodeGroup(groupID, epIdx, env);
918 }
919
920 /************** Receive: Arrays *************/
921
922 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
923   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
924   if (mgr) {
925     _SET_USED(env, 0);
926     mgr->insertElement((CkMessage *)EnvToUsr(env));
927   }
928 }
929 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
930   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
931   if (mgr) {
932     _SET_USED(env, 0);
933     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
934   }
935 }
936
937 //BIGSIM_OOC DEBUGGING
938 #define TELLMSGTYPE(x) //x
939
940 /**
941  * This is the main converse-level handler used by all of Charm++.
942  *
943  * \addtogroup CriticalPathFramework
944  */
945 void _processHandler(void *converseMsg,CkCoreState *ck)
946 {
947   register envelope *env = (envelope *) converseMsg;
948
949 //#if CMK_RECORD_REPLAY
950   if (ck->watcher!=NULL) {
951     if (!ck->watcher->processMessage(env,ck)) return;
952   }
953 //#endif
954 #ifdef _FAULT_MLOG_
955         Chare *obj=NULL;
956         CkObjID sender;
957         MCount SN;
958         MlogEntry *entry=NULL;
959         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
960         env->getMsgtype() == ForArrayEltMsg){
961                 sender = env->sender;
962                 SN = env->SN;
963                 int result = preProcessReceivedMessage(env,&obj,&entry);
964                 if(result == 0){
965                         return;
966                 }
967         }
968 #endif
969
970 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
971   //  CkPrintf("START\n");
972   criticalPath_start(env);
973 #endif
974
975
976   switch(env->getMsgtype()) {
977 // Group support
978     case BocInitMsg :
979       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
980       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
981       _processBocInitMsg(ck,env);
982       break;
983     case NodeBocInitMsg :
984       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
985       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
986       _processNodeBocInitMsg(ck,env);
987       break;
988     case ForBocMsg :
989       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
990       // QD processing moved inside _processForBocMsg because it is conditional
991       if(env->isPacked()) CkUnpackMessage(&env);
992       _processForBocMsg(ck,env);
993       // stats record moved inside _processForBocMsg because it is conditional
994       break;
995     case ForNodeBocMsg :
996       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
997       // QD processing moved to _processForNodeBocMsg because it is conditional
998       if(env->isPacked()) CkUnpackMessage(&env);
999       _processForNodeBocMsg(ck,env);
1000       // stats record moved to _processForNodeBocMsg because it is conditional
1001       break;
1002
1003 // Array support
1004     case ArrayEltInitMsg:
1005       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1006       if(env->isPacked()) CkUnpackMessage(&env);
1007       _processArrayEltInitMsg(ck,env);
1008       break;
1009     case ForArrayEltMsg:
1010       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1011       if(env->isPacked()) CkUnpackMessage(&env);
1012       _processArrayEltMsg(ck,env);
1013       break;
1014
1015 // Chare support
1016     case NewChareMsg :
1017       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1018       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1019       _processNewChareMsg(ck,env);
1020       _STATS_RECORD_PROCESS_CHARE_1();
1021       break;
1022     case NewVChareMsg :
1023       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1024       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1025       _processNewVChareMsg(ck,env);
1026       _STATS_RECORD_PROCESS_CHARE_1();
1027       break;
1028     case ForChareMsg :
1029       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1030       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1031       _processForChareMsg(ck,env);
1032       _STATS_RECORD_PROCESS_MSG_1();
1033       break;
1034     case ForVidMsg   :
1035       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1036       ck->process();
1037       _processForVidMsg(ck,env);
1038       break;
1039     case FillVidMsg  :
1040       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1041       ck->process();
1042       _processFillVidMsg(ck,env);
1043       break;
1044
1045     default:
1046       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1047   }
1048 #ifdef _FAULT_MLOG_
1049         if(obj != NULL){
1050                 postProcessReceivedMessage(obj,sender,SN,entry);
1051         }
1052 #endif
1053
1054
1055 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1056   criticalPath_end();
1057   //  CkPrintf("STOP\n");
1058 #endif
1059
1060
1061 }
1062
1063
1064 /******************** Message Send **********************/
1065
1066 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1067              int *queueing, int *priobits, unsigned int **prioptr)
1068 {
1069   register envelope *env = (envelope *)converseMsg;
1070   *pfn = (CldPackFn)CkPackMessage;
1071   *len = env->getTotalsize();
1072   *queueing = env->getQueueing();
1073   *priobits = env->getPriobits();
1074   *prioptr = (unsigned int *) env->getPrioPtr();
1075 }
1076
1077 void CkPackMessage(envelope **pEnv)
1078 {
1079   register envelope *env = *pEnv;
1080   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1081     register void *msg = EnvToUsr(env);
1082     _TRACE_BEGIN_PACK();
1083     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1084     _TRACE_END_PACK();
1085     env=UsrToEnv(msg);
1086     env->setPacked(1);
1087     *pEnv = env;
1088   }
1089 }
1090
1091 void CkUnpackMessage(envelope **pEnv)
1092 {
1093   register envelope *env = *pEnv;
1094   register int msgIdx = env->getMsgIdx();
1095   if(env->isPacked()) {
1096     register void *msg = EnvToUsr(env);
1097     _TRACE_BEGIN_UNPACK();
1098     msg = _msgTable[msgIdx]->unpack(msg);
1099     _TRACE_END_UNPACK();
1100     env=UsrToEnv(msg);
1101     env->setPacked(0);
1102     *pEnv = env;
1103   }
1104 }
1105
1106 //There's no reason for most messages to go through the Cld--
1107 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1108 // Thus these accellerated versions of the Cld calls.
1109
1110 int index_objectQHandler;
1111 int index_tokenHandler;
1112 static int index_skipCldHandler;
1113
1114 static void _skipCldHandler(void *converseMsg)
1115 {
1116   register envelope *env = (envelope *)(converseMsg);
1117   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1118 #if CMK_GRID_QUEUE_AVAILABLE
1119   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1120     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1121                        env, env->getQueueing (), env->getPriobits (),
1122                        (unsigned int *) env->getPrioPtr ());
1123   } else {
1124     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1125                        env, env->getQueueing (), env->getPriobits (),
1126                        (unsigned int *) env->getPrioPtr ());
1127   }
1128 #else
1129   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1130         env, env->getQueueing(),env->getPriobits(),
1131         (unsigned int *)env->getPrioPtr());
1132 #endif
1133 }
1134
1135
1136 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1137 // Made non-static to be used by ckmessagelogging
1138 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1139 {
1140   if(pe == CkMyPe() ){
1141     if(!CmiNodeAlive(CkMyPe())){
1142         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1143 //      return;
1144     }
1145   }
1146   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1147 #if CMK_OBJECT_QUEUE_AVAILABLE
1148     Chare *obj = CkFindObjectPtr(env);
1149     if (obj && obj->CkGetObjQueue().queue()) {
1150       _enqObjQueue(obj, env);
1151     }
1152     else
1153 #endif
1154     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1155         env, env->getQueueing(),env->getPriobits(),
1156         (unsigned int *)env->getPrioPtr());
1157   } else {
1158     CkPackMessage(&env);
1159     int len=env->getTotalsize();
1160     CmiSetXHandler(env,CmiGetHandler(env));
1161 #if CMK_OBJECT_QUEUE_AVAILABLE
1162     CmiSetHandler(env,index_objectQHandler);
1163 #else
1164     CmiSetHandler(env,index_skipCldHandler);
1165 #endif
1166     CmiSetInfo(env,infoFn);
1167     if (pe==CLD_BROADCAST) {
1168 #ifdef _FAULT_MLOG_             
1169                         CmiSyncBroadcast(len, (char *)env);
1170 #else
1171                         CmiSyncBroadcastAndFree(len, (char *)env); 
1172 #endif
1173
1174 }
1175     else if (pe==CLD_BROADCAST_ALL) { 
1176 #ifdef _FAULT_MLOG_             
1177                         CmiSyncBroadcastAll(len, (char *)env);
1178 #else
1179                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1180 #endif
1181
1182 }
1183     else{
1184 #ifdef _FAULT_MLOG_             
1185                         CmiSyncSend(pe, len, (char *)env);
1186 #else
1187                         CmiSyncSendAndFree(pe, len, (char *)env);
1188 #endif
1189
1190                 }
1191   }
1192 }
1193
1194 #if CMK_BLUEGENE_CHARM
1195 #   define  _skipCldEnqueue   CldEnqueue
1196 #endif
1197
1198 // by pass Charm++ priority queue, send as Converse message
1199 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1200 {
1201   CkPackMessage(&env);
1202   int len=env->getTotalsize();
1203   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1204 }
1205
1206 static void _noCldEnqueue(int pe, envelope *env)
1207 {
1208 /*
1209   if (pe == CkMyPe()) {
1210     CmiHandleMessage(env);
1211   } else
1212 */
1213   CkPackMessage(&env);
1214   int len=env->getTotalsize();
1215   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1216   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1217   else CmiSyncSendAndFree(pe, len, (char *)env);
1218 }
1219
1220 //static void _noCldNodeEnqueue(int node, envelope *env)
1221 //Made non-static to be used by ckmessagelogging
1222 void _noCldNodeEnqueue(int node, envelope *env)
1223 {
1224 /*
1225   if (node == CkMyNode()) {
1226     CmiHandleMessage(env);
1227   } else {
1228 */
1229   CkPackMessage(&env);
1230   int len=env->getTotalsize();
1231   if (node==CLD_BROADCAST) { 
1232 #ifdef _FAULT_MLOG_
1233         CmiSyncNodeBroadcast(len, (char *)env);
1234 #else
1235         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1236 #endif
1237 }
1238   else if (node==CLD_BROADCAST_ALL) { 
1239 #ifdef _FAULT_MLOG_
1240                 CmiSyncNodeBroadcastAll(len, (char *)env);
1241 #else
1242                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1243 #endif
1244
1245 }
1246   else {
1247 #ifdef _FAULT_MLOG_
1248         CmiSyncNodeSend(node, len, (char *)env);
1249 #else
1250         CmiSyncNodeSendAndFree(node, len, (char *)env);
1251 #endif
1252   }
1253 }
1254
1255 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1256 {
1257   register envelope *env = UsrToEnv(msg);
1258   _CHECK_USED(env);
1259   _SET_USED(env, 1);
1260   env->setMsgtype(ForChareMsg);
1261   env->setEpIdx(eIdx);
1262   env->setSrcPe(CkMyPe());
1263 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1264   criticalPath_send(env);
1265   automaticallySetMessagePriority(env);
1266 #endif
1267 #ifndef CMK_OPTIMIZE
1268   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1269 #endif
1270 #if CMK_OBJECT_QUEUE_AVAILABLE
1271   CmiSetHandler(env, index_objectQHandler);
1272 #else
1273   CmiSetHandler(env, _charmHandlerIdx);
1274 #endif
1275   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1276     register int pe = -(pCid->onPE+1);
1277     if(pe==CkMyPe()) {
1278       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1279       void *objPtr;
1280       if (NULL!=(objPtr=vblk->getLocalChare()))
1281       { //A ready local chare
1282         env->setObjPtr(objPtr);
1283         return pe;
1284       }
1285       else { //The vidblock is not ready-- forget it
1286         vblk->send(env);
1287         return -1;
1288       }
1289     } else { //Valid vidblock for another PE:
1290       env->setMsgtype(ForVidMsg);
1291       env->setVidPtr(pCid->objPtr);
1292       return pe;
1293     }
1294   }
1295   else {
1296     env->setObjPtr(pCid->objPtr);
1297     return pCid->onPE;
1298   }
1299 }
1300
1301 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1302 {
1303   int destPE = _prepareMsg(eIdx, msg, pCid);
1304   if (destPE != -1) {
1305     register envelope *env = UsrToEnv(msg);
1306 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1307     criticalPath_send(env);
1308     automaticallySetMessagePriority(env);
1309 #endif
1310     CmiBecomeImmediate(env);
1311   }
1312   return destPE;
1313 }
1314
1315 extern "C"
1316 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1317 {
1318   if (opts & CK_MSG_INLINE) {
1319     CkSendMsgInline(entryIdx, msg, pCid, opts);
1320     return;
1321   }
1322 #ifndef CMK_OPTIMIZE
1323   if (opts & CK_MSG_IMMEDIATE) {
1324     CmiAbort("Immediate message is not allowed in Chare!");
1325   }
1326 #endif
1327   register envelope *env = UsrToEnv(msg);
1328   int destPE=_prepareMsg(entryIdx,msg,pCid);
1329   if (destPE!=-1) {
1330     _TRACE_CREATION_1(env);
1331     CpvAccess(_qd)->create();
1332     if (opts & CK_MSG_SKIP_OR_IMM)
1333       _noCldEnqueue(destPE, env);
1334     else
1335       CldEnqueue(destPE, env, _infoIdx);
1336     _TRACE_CREATION_DONE(1);
1337   }
1338 }
1339
1340 extern "C"
1341 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1342 {
1343   if (pCid->onPE==CkMyPe())
1344   {
1345     if(!CmiNodeAlive(CkMyPe())){
1346         return;
1347     }
1348 #ifndef CMK_OPTIMIZE
1349     //Just in case we need to breakpoint or use the envelope in some way
1350     _prepareMsg(entryIndex,msg,pCid);
1351 #endif
1352                 //Just directly call the chare (skip QD handling & scheduler)
1353     register envelope *env = UsrToEnv(msg);
1354     if (env->isPacked()) CkUnpackMessage(&env);
1355     _STATS_RECORD_PROCESS_MSG_1();
1356     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1357   }
1358   else {
1359     //No way to inline a cross-processor message:
1360     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
1361   }
1362 }
1363
1364 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1365 {
1366   register envelope *env = UsrToEnv(msg);
1367   _CHECK_USED(env);
1368   _SET_USED(env, 1);
1369   env->setMsgtype(type);
1370   env->setEpIdx(eIdx);
1371   env->setGroupNum(gID);
1372   env->setSrcPe(CkMyPe());
1373 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1374   criticalPath_send(env);
1375   automaticallySetMessagePriority(env);
1376 #endif
1377 #ifndef CMK_OPTIMIZE
1378   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1379 #endif
1380   CmiSetHandler(env, _charmHandlerIdx);
1381   return env;
1382 }
1383
1384 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1385 {
1386   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1387 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1388   criticalPath_send(env);
1389   automaticallySetMessagePriority(env);
1390 #endif
1391   CmiBecomeImmediate(env);
1392   return env;
1393 }
1394
1395 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1396                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1397 {
1398   int numPes;
1399   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1400 #ifdef _FAULT_MLOG_
1401         sendTicketGroupRequest(env,pe,_infoIdx);
1402 #else
1403   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1404   _TRACE_CREATION_N(env, numPes);
1405   if (opts & CK_MSG_SKIP_OR_IMM)
1406     _noCldEnqueue(pe, env);
1407   else
1408     _skipCldEnqueue(pe, env, _infoIdx);
1409   _TRACE_CREATION_DONE(1);
1410 #endif
1411 }
1412
1413 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1414                            int npes, int *pes)
1415 {
1416   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1417   _TRACE_CREATION_MULTICAST(env, npes, pes);
1418   CldEnqueueMulti(npes, pes, env, _infoIdx);
1419   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1420 }
1421
1422 extern "C"
1423 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1424 {
1425 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1426   if (destPE==CkMyPe())
1427   {
1428     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1429     return;
1430   }
1431   //Can't inline-- send the usual way
1432   register envelope *env = UsrToEnv(msg);
1433   int numPes;
1434   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1435   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1436   _TRACE_CREATION_N(env, numPes);
1437   _noCldEnqueue(destPE, env);
1438   _STATS_RECORD_SEND_BRANCH_1();
1439   CkpvAccess(_coreState)->create();
1440   _TRACE_CREATION_DONE(1);
1441 #else
1442   // no support for immediate message, send inline
1443   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1444 #endif
1445 }
1446
1447 extern "C"
1448 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1449 {
1450   if (destPE==CkMyPe())
1451   {
1452     if(!CmiNodeAlive(CkMyPe())){
1453         return;
1454     }
1455     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1456     if (obj!=NULL)
1457     { //Just directly call the group:
1458 #ifndef CMK_OPTIMIZE
1459       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1460 #else
1461       envelope *env=UsrToEnv(msg);
1462 #endif
1463       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1464       return;
1465     }
1466   }
1467   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1468   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
1469 }
1470
1471 extern "C"
1472 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1473 {
1474   if (opts & CK_MSG_INLINE) {
1475     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1476     return;
1477   }
1478   if (opts & CK_MSG_IMMEDIATE) {
1479     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1480     return;
1481   }
1482   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1483   _STATS_RECORD_SEND_BRANCH_1();
1484   CkpvAccess(_coreState)->create();
1485 }
1486
1487 extern "C"
1488 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1489 {
1490 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1491   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1492   _TRACE_CREATION_MULTICAST(env, npes, pes);
1493   _noCldEnqueueMulti(npes, pes, env);
1494   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1495 #else
1496   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1497   CpvAccess(_qd)->create(-npes);
1498 #endif
1499   _STATS_RECORD_SEND_BRANCH_N(npes);
1500   CpvAccess(_qd)->create(npes);
1501 }
1502
1503 extern "C"
1504 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1505 {
1506   if (opts & CK_MSG_IMMEDIATE) {
1507     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1508     return;
1509   }
1510     // normal mesg
1511   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1512   _STATS_RECORD_SEND_BRANCH_N(npes);
1513   CpvAccess(_qd)->create(npes);
1514 }
1515
1516 extern "C"
1517 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1518 {
1519   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1520   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1521   CpvAccess(_qd)->create(CkNumPes());
1522 }
1523
1524 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1525                 int node=CLD_BROADCAST_ALL, int opts=0)
1526 {
1527   int numPes;
1528   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1529 #ifdef _FAULT_MLOG_
1530         sendTicketNodeGroupRequest(env,node,_infoIdx);
1531 #else
1532   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1533   _TRACE_CREATION_N(env, numPes);
1534   if (opts & CK_MSG_SKIP_OR_IMM) {
1535     _noCldNodeEnqueue(node, env);
1536     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1537       CkpvAccess(_coreState)->create(-numPes);
1538     }
1539   }
1540   else
1541     CldNodeEnqueue(node, env, _infoIdx);
1542   _TRACE_CREATION_DONE(1);
1543 #endif
1544 }
1545
1546 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1547                            int npes, int *nodes)
1548 {
1549   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1550   _TRACE_CREATION_N(env, npes);
1551   for (int i=0; i<npes; i++) {
1552     CldNodeEnqueue(nodes[i], env, _infoIdx);
1553   }
1554   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1555 }
1556
1557 extern "C"
1558 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1559 {
1560 #if CMK_IMMEDIATE_MSG
1561   if (node==CkMyNode())
1562   {
1563     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1564     return;
1565   }
1566   //Can't inline-- send the usual way
1567   register envelope *env = UsrToEnv(msg);
1568   int numPes;
1569   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1570   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1571   _TRACE_CREATION_N(env, numPes);
1572   _noCldNodeEnqueue(node, env);
1573   _STATS_RECORD_SEND_BRANCH_1();
1574   /* immeidate message is invisible to QD */
1575 //  CkpvAccess(_coreState)->create();
1576   _TRACE_CREATION_DONE(1);
1577 #else
1578   // no support for immediate message, send inline
1579   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1580 #endif
1581 }
1582
1583 extern "C"
1584 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1585 {
1586   if (node==CkMyNode())
1587   {
1588     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1589     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1590     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1591     if (obj!=NULL)
1592     { //Just directly call the group:
1593 #ifndef CMK_OPTIMIZE
1594       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1595 #else
1596       envelope *env=UsrToEnv(msg);
1597 #endif
1598       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1599       return;
1600     }
1601   }
1602   //Can't inline-- send the usual way
1603   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
1604 }
1605
1606 extern "C"
1607 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1608 {
1609   if (opts & CK_MSG_INLINE) {
1610     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1611     return;
1612   }
1613   if (opts & CK_MSG_IMMEDIATE) {
1614     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1615     return;
1616   }
1617   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1618   _STATS_RECORD_SEND_NODE_BRANCH_1();
1619   CkpvAccess(_coreState)->create();
1620 }
1621
1622 extern "C"
1623 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1624 {
1625 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1626   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1627   _noCldEnqueueMulti(npes, nodes, env);
1628 #else
1629   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1630   CpvAccess(_qd)->create(-npes);
1631 #endif
1632   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1633   CpvAccess(_qd)->create(npes);
1634 }
1635
1636 extern "C"
1637 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1638 {
1639   if (opts & CK_MSG_IMMEDIATE) {
1640     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1641     return;
1642   }
1643     // normal mesg
1644   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1645   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1646   CpvAccess(_qd)->create(npes);
1647 }
1648
1649 extern "C"
1650 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1651 {
1652   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1653   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1654   CpvAccess(_qd)->create(CkNumNodes());
1655 }
1656
1657 //Needed by delegation manager:
1658 extern "C"
1659 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1660 { return _prepareMsg(eIdx,msg,pCid); }
1661 extern "C"
1662 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1663 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1664 extern "C"
1665 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1666 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1667
1668 void _ckModuleInit(void) {
1669         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1670         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1671         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1672         CkpvInitialize(TokenPool*, _tokenPool);
1673         CkpvAccess(_tokenPool) = new TokenPool;
1674 }
1675
1676
1677 /************** Send: Arrays *************/
1678
1679 extern void CkArrayManagerInsert(int onPe,void *msg);
1680 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1681
1682 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1683 {
1684   _CHECK_USED(env);
1685   _SET_USED(env, 1);
1686   env->setMsgtype(type);
1687 #ifndef CMK_OPTIMIZE
1688   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1689 #endif
1690   CmiSetHandler(env, _charmHandlerIdx);
1691   CpvAccess(_qd)->create();
1692 }
1693
1694 extern "C"
1695 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1696   register envelope *env = UsrToEnv(msg);
1697   env->getsetArrayMgr()=aID;
1698   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1699   CldEnqueue(pe, env, _infoIdx);
1700 }
1701
1702 extern "C"
1703 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1704   register envelope *env = UsrToEnv(msg);
1705   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1706 #ifdef _FAULT_MLOG_
1707         sendTicketArrayRequest(env,pe,_infoIdx);
1708 #else
1709   if (opts & CK_MSG_IMMEDIATE)
1710     CmiBecomeImmediate(env);
1711   if (opts & CK_MSG_SKIP_OR_IMM)
1712     _noCldEnqueue(pe, env);
1713   else
1714     _skipCldEnqueue(pe, env, _infoIdx);
1715 #endif
1716 }
1717
1718 class ElementDestroyer : public CkLocIterator {
1719 private:
1720         CkLocMgr *locMgr;
1721 public:
1722         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1723         void addLocation(CkLocation &loc) {
1724           loc.destroyAll();
1725         }
1726 };
1727
1728 void CkDeleteChares() {
1729   int i;
1730   int numGroups = CkpvAccess(_groupIDTable)->size();
1731
1732   // delete all array elements
1733   for(i=0;i<numGroups;i++) {
1734     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1735     if(obj && obj->isLocMgr())  {
1736       CkLocMgr *mgr = (CkLocMgr*)obj;
1737       ElementDestroyer destroyer(mgr);
1738       mgr->iterate(destroyer);
1739 printf("[%d] DELETE!\n", CkMyPe());
1740     }
1741   }
1742
1743   // delete all groups
1744   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1745   for(i=0;i<numGroups;i++) {
1746     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1747     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1748     if (obj) delete obj;
1749   }
1750   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1751
1752   // delete all node groups
1753   if (CkMyRank() == 0) {
1754     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1755     for(i=0;i<numNodeGroups;i++) {
1756       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1757       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1758       if (obj) delete obj;
1759     }
1760   }
1761 }
1762
1763 //------------------- Message Watcher (record/replay) ----------------
1764
1765 CkMessageWatcher::~CkMessageWatcher() {}
1766
1767 class CkMessageRecorder : public CkMessageWatcher {
1768         FILE *f;
1769 public:
1770         CkMessageRecorder(FILE *f_) :f(f_) {}
1771         ~CkMessageRecorder() {
1772                 fprintf(f,"-1 -1 -1");
1773                 fclose(f);
1774         }
1775
1776         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1777                 if (env->getEvent())
1778                      fprintf(f,"%d %d %d %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg);
1779                 return CmiTrue;
1780         }
1781 };
1782
1783 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1784 #define REPLAYDEBUG(args) /* empty */
1785
1786 class CkMessageReplay : public CkMessageWatcher {
1787         FILE *f;
1788         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1789         /// Read the next message we need from the file:
1790         void getNext(void) {
1791                 if (4!=fscanf(f,"%d%d%d%d", &nextPE,&nextSize,&nextEvent,&nexttype)) {
1792                         // CkAbort("CkMessageReplay> Syntax error reading replay file");
1793                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1794                 }
1795         }
1796         /// If this is the next message we need, advance and return CmiTrue.
1797         CmiBool isNext(envelope *env) {
1798                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1799                 if (nextEvent!=env->getEvent()) return CmiFalse;
1800                 if (nextSize!=env->getTotalsize())
1801                 {
1802                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1803                         return CmiFalse;
1804                 }
1805                 return CmiTrue;
1806         }
1807
1808         /// This is a (short) list of messages we aren't yet ready for:
1809         CkQ<envelope *> delayed;
1810
1811         /// Try to flush out any delayed messages
1812         void flush(void) {
1813                 int len=delayed.length();
1814                 for (int i=0;i<len;i++) {
1815                         envelope *env=delayed.deq();
1816                         if (isNext(env)) { /* this is the next message: process it */
1817                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1818                                 CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1819                                 return;
1820                         }
1821                         else /* Not ready yet-- put it back in the
1822                                 queue */
1823                           {
1824                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1825                                 delayed.enq(env);
1826                           }
1827                 }
1828         }
1829
1830 public:
1831         CkMessageReplay(FILE *f_) :f(f_) { getNext();
1832         REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
1833                     }
1834         ~CkMessageReplay() {fclose(f);}
1835
1836         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1837           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
1838                 if (env->getEvent() == 0) return CmiTrue;
1839                 if (isNext(env)) { /* This is the message we were expecting */
1840                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1841                         getNext(); /* Advance over this message */
1842                         flush(); /* try to process queued-up stuff */
1843                         return CmiTrue;
1844                 }
1845 #if CMK_SMP
1846                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
1847                          // try next rank, we can't just buffer the msg and left
1848                          // we need to keep unprocessed msg on the fly
1849                         int nextpe = CkMyPe()+1;
1850                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
1851                         nextpe = CkNodeFirst(CkMyNode());
1852                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
1853                         return CmiFalse;
1854                 }
1855 #endif
1856                 else /*!isNext(env) */ {
1857                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
1858                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
1859                         delayed.enq(env);
1860                         flush();
1861                         return CmiFalse;
1862                 }
1863         }
1864 };
1865
1866 static FILE *openReplayFile(const char *permissions) {
1867
1868         char fName[200];
1869         sprintf(fName,"ckreplay_%06d.log",CkMyPe());
1870         FILE *f=fopen(fName,permissions);
1871         REPLAYDEBUG("openReplayfile "<<fName);
1872         if (f==NULL) {
1873                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
1874                         CkMyPe(),fName,permissions);
1875                 CkAbort("openReplayFile> Could not open replay file");
1876         }
1877         return f;
1878 }
1879
1880 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
1881         REPLAYDEBUG("CkMessageWaterInit ");
1882         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order"))
1883                 ck->watcher=new CkMessageRecorder(openReplayFile("w"));
1884         if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream"))
1885                 ck->watcher=new CkMessageReplay(openReplayFile("r"));
1886 }
1887
1888 extern "C"
1889 int CkMessageToEpIdx(void *msg) {
1890         envelope *env=UsrToEnv(msg);
1891         int ep=env->getEpIdx();
1892         if (ep==CkIndex_CkArray::recvBroadcast(0))
1893                 return env->getsetArrayBcastEp();
1894         else
1895                 return ep;
1896 }
1897
1898 extern "C"
1899 int getCharmEnvelopeSize() {
1900   return sizeof(envelope);
1901 }
1902
1903
1904 #include "CkMarshall.def.h"
1905