3673107e71c3172af41e14ce48a3212aee3a27f8
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #if CMK_LBDB_ON
20 #include "LBDatabase.h"
21 #endif // CMK_LBDB_ON
22
23 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
24
25 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
26
27 int CMessage_CkMessage::__idx=-1;
28 int CMessage_CkArgMsg::__idx=0;
29 int CkIndex_Chare::__idx;
30 int CkIndex_Group::__idx;
31 int CkIndex_ArrayBase::__idx=-1;
32
33 extern int _defaultObjectQ;
34
35 //Charm++ virtual functions: declaring these here results in a smaller executable
36 Chare::Chare(void) {
37   thishandle.onPE=CkMyPe();
38   thishandle.objPtr=this;
39 #if CMK_OBJECT_QUEUE_AVAILABLE
40   if (_defaultObjectQ)  CkEnableObjQ();
41 #endif
42 }
43
44 Chare::Chare(CkMigrateMessage* m) {
45   thishandle.onPE=CkMyPe();
46   thishandle.objPtr=this;
47 #if CMK_OBJECT_QUEUE_AVAILABLE
48   if (_defaultObjectQ)  CkEnableObjQ();
49 #endif
50 }
51
52 void Chare::CkEnableObjQ()
53 {
54 #if CMK_OBJECT_QUEUE_AVAILABLE
55   objQ.create();
56 #endif
57 }
58
59 Chare::~Chare() {}
60
61 void Chare::pup(PUP::er &p)
62 {
63   p(thishandle.onPE);
64   thishandle.objPtr=(void *)this;
65 }
66
67 int Chare::ckGetChareType() const {
68   return -3;
69 }
70 char *Chare::ckDebugChareName(void) {
71   char buf[100];
72   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
73   return strdup(buf);
74 }
75 int Chare::ckDebugChareID(char *str, int limit) {
76   // pure chares for now do not have a valid ID
77   str[0] = 0;
78   return 1;
79 }
80 void Chare::ckDebugPup(PUP::er &p) {
81   pup(p);
82 }
83
84 /// This method is called before starting a [threaded] entry method.
85 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
86   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
87   traceAddThreadListeners(th, UsrToEnv(msg));
88 }
89
90
91 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
92   p.comment("Message has no debug pup routine.  Bytes:");
93   int ts=UsrToEnv(msg)->getTotalsize();
94   int msgLen=ts-sizeof(envelope);
95   if (msgLen>0)
96     p((char*)msg,msgLen);
97 }
98
99 IrrGroup::IrrGroup(void) {
100   thisgroup = CkpvAccess(_currentGroup);
101 }
102
103 IrrGroup::~IrrGroup() {
104   // remove the object pointer
105   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
106   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
107   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
108 }
109
110 void IrrGroup::pup(PUP::er &p)
111 {
112   Chare::pup(p);
113   p|thisgroup;
114 }
115
116 int IrrGroup::ckGetChareType() const {
117   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
118 }
119
120 int IrrGroup::ckDebugChareID(char *str, int limit) {
121   if (limit<5) return -1;
122   str[0] = 1;
123   *((int*)&str[1]) = thisgroup.idx;
124   return 5;
125 }
126
127 char *IrrGroup::ckDebugChareName() {
128   return strdup(_chareTable[ckGetChareType()]->name);
129 }
130
131 void IrrGroup::ckJustMigrated(void)
132 {
133 }
134
135 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
136   /* FIXME: **CW** not entirely sure what we should do here yet */
137 }
138
139 void Group::CkAddThreadListeners(CthThread th, void *msg) {
140   Chare::CkAddThreadListeners(th, msg);
141   CthSetThreadID(th, thisgroup.idx, 0, 0);
142 }
143
144 void Group::pup(PUP::er &p)
145 {
146   CkReductionMgr::pup(p);
147   reductionInfo.pup(p);
148 }
149
150 /**** Delegation Manager Group */
151 CkDelegateMgr::~CkDelegateMgr() { }
152
153 //Default delegator implementation: do not delegate-- send directly
154 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
155   { CkSendMsg(ep,m,c); }
156 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
157   { CkSendMsgBranch(ep,m,onPE,g); }
158 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
159   { CkBroadcastMsgBranch(ep,m,g); }
160 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
161   { CkSendMsgNodeBranch(ep,m,onNode,g); }
162 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
163   { CkBroadcastMsgNodeBranch(ep,m,g); }
164 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
165 {
166         CProxyElement_ArrayBase ap(a,idx);
167         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
168 }
169 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
170 {
171         CProxyElement_ArrayBase ap(a,idx);
172         ap.ckSend((CkArrayMessage *)m,ep);
173 }
174 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
175 {
176         CProxy_ArrayBase ap(a);
177         ap.ckBroadcast((CkArrayMessage *)m,ep);
178 }
179
180 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, CkArrayID a,CkSectionID &s, int opts)
181 {
182         CmiAbort("ArraySectionSend is not implemented!\n");
183 /*
184         CProxyElement_ArrayBase ap(a,idx);
185         ap.ckSend((CkArrayMessage *)m,ep);
186 */
187 }
188
189 /*** Proxy <-> delegator communication */
190 CkDelegateData::~CkDelegateData() {}
191
192 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
193   return pd; // default implementation ignores pup call
194 }
195
196 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
197    this tricky manual reference counting business... */
198
199 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
200         if (dPtr) dPtr->ref();
201         ckUndelegate();
202         delegatedMgr = dTo;
203         delegatedPtr = dPtr;
204 }
205 void CProxy::ckUndelegate(void) {
206         delegatedMgr=NULL;
207         if (delegatedPtr) delegatedPtr->unref();
208         delegatedPtr=NULL;
209 }
210
211 /// Copy constructor
212 CProxy::CProxy(const CProxy &src)
213     :delegatedMgr(src.delegatedMgr)
214 {
215     delegatedPtr = NULL;
216     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
217         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
218 }
219
220 /// Assignment operator
221 CProxy& CProxy::operator=(const CProxy &src) {
222         CkDelegateData *oldPtr=delegatedPtr;
223         ckUndelegate();
224         delegatedMgr=src.delegatedMgr;
225
226         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
227             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
228         else
229             delegatedPtr = NULL;
230
231         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
232         if (oldPtr) oldPtr->unref();
233         return *this;
234 }
235
236 void CProxy::pup(PUP::er &p) {
237       CkGroupID delegatedTo;
238       delegatedTo.setZero();
239       int isNodeGroup = 0;
240       if (!p.isUnpacking()) {
241         if (delegatedMgr) {
242           delegatedTo = delegatedMgr->CkGetGroupID();
243           isNodeGroup = delegatedMgr->isNodeGroup();
244         }
245       }
246       p|delegatedTo;
247       if (!delegatedTo.isZero()) {
248         p|isNodeGroup;
249         if (p.isUnpacking()) {
250           if (isNodeGroup)
251                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
252           else
253                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
254         }
255
256         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
257         if (p.isUnpacking() && delegatedPtr)
258             delegatedPtr->ref();
259       }
260 }
261
262 /**** Array sections */
263 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
264 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
265   _cookie.aid = aid;    \
266   _cookie.get_pe() = CkMyPe();  \
267   _elems = new CkArrayIndexMax[nElems]; \
268   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
269   pelist = NULL;        \
270   npes  = 0;    \
271 }
272
273 CKSECTIONID_CONSTRUCTOR_DEF(1D)
274 CKSECTIONID_CONSTRUCTOR_DEF(2D)
275 CKSECTIONID_CONSTRUCTOR_DEF(3D)
276 CKSECTIONID_CONSTRUCTOR_DEF(4D)
277 CKSECTIONID_CONSTRUCTOR_DEF(5D)
278 CKSECTIONID_CONSTRUCTOR_DEF(6D)
279 CKSECTIONID_CONSTRUCTOR_DEF(Max)
280
281 CkSectionID::CkSectionID(const CkSectionID &sid) {
282   _cookie = sid._cookie;
283   _nElems = sid._nElems;
284   _elems = new CkArrayIndexMax[_nElems];
285   for (int i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
286   pelist = NULL;
287   npes = 0;
288 }
289
290 void CkSectionID::operator=(const CkSectionID &sid) {
291   _cookie = sid._cookie;
292   _nElems = sid._nElems;
293   _elems = new CkArrayIndexMax[_nElems];
294   for (int i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
295 }
296
297 CkSectionID::~CkSectionID() {
298     delete [] _elems;
299     if(pelist != NULL)
300         delete [] pelist;
301 }
302
303 void CkSectionID::pup(PUP::er &p) {
304     p | _cookie;
305     p(_nElems);
306     if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
307     for (int i=0; i< _nElems; i++) p | _elems[i];
308 }
309
310 /**** Tiny random API routines */
311
312 #ifdef CMK_CUDA
313 void CUDACallbackManager(void *fn) {
314   if (fn != NULL) {
315     CkCallback *cb = (CkCallback*) fn;
316     cb->send();
317   }
318 }
319
320 #endif
321
322 extern "C"
323 void CkSetRefNum(void *msg, int ref)
324 {
325   UsrToEnv(msg)->setRef(ref);
326 }
327
328 extern "C"
329 int CkGetRefNum(void *msg)
330 {
331   return UsrToEnv(msg)->getRef();
332 }
333
334 extern "C"
335 int CkGetSrcPe(void *msg)
336 {
337   return UsrToEnv(msg)->getSrcPe();
338 }
339
340 extern "C"
341 int CkGetSrcNode(void *msg)
342 {
343   return CmiNodeOf(CkGetSrcPe(msg));
344 }
345
346 extern "C"
347 void *CkLocalBranch(CkGroupID gID) {
348   return _localBranch(gID);
349 }
350
351 static
352 void *_ckLocalNodeBranch(CkGroupID groupID) {
353   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
354   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
355   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
356   return retval;
357 }
358
359 extern "C"
360 void *CkLocalNodeBranch(CkGroupID groupID)
361 {
362   void *retval;
363   // we are called in a constructor
364   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
365     return CkpvAccess(_currentNodeGroupObj);
366   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
367   { // Nodegroup hasn't finished being created yet-- schedule...
368     CsdScheduler(0);
369   }
370   return retval;
371 }
372
373 extern "C"
374 void *CkLocalChare(const CkChareID *pCid)
375 {
376         int pe=pCid->onPE;
377         if (pe<0) { //A virtual chare ID
378                 if (pe!=(-(CkMyPe()+1)))
379                         return NULL;//VID block not on this PE
380                 VidBlock *v=(VidBlock *)pCid->objPtr;
381                 return v->getLocalChare();
382         }
383         else
384         { //An ordinary chare ID
385                 if (pe!=CkMyPe())
386                         return NULL;//Chare not on this PE
387                 return pCid->objPtr;
388         }
389 }
390
391 CkpvDeclare(char **,Ck_argv);
392 extern "C" char **CkGetArgv(void) {
393         return CkpvAccess(Ck_argv);
394 }
395 extern "C" int CkGetArgc(void) {
396         return CmiGetArgc(CkpvAccess(Ck_argv));
397 }
398
399 /******************** Basic support *****************/
400 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
401 {
402 #ifndef CMK_OPTIMIZE
403   CpdBeforeEp(epIdx, obj);
404 #endif
405   _entryTable[epIdx]->call(msg, obj);
406 #ifndef CMK_OPTIMIZE
407   CpdAfterEp(epIdx);
408 #endif
409   if (_entryTable[epIdx]->noKeep)
410   { /* Method doesn't keep/delete the message, so we have to: */
411      CkFreeMsg(msg);
412   }
413 }
414 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
415 {
416   void *deliverMsg;
417   if (_entryTable[epIdx]->noKeep)
418   { /* Deliver a read-only copy of the message */
419     deliverMsg=(void *)msg;
420   } else
421   { /* Method needs a copy of the message to keep/delete */
422     void *oldMsg=(void *)msg;
423     deliverMsg=CkCopyMsg(&oldMsg);
424 #ifndef CMK_OPTIMIZE
425     if (oldMsg!=msg)
426       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
427 #endif
428   }
429 #ifndef CMK_OPTIMIZE
430   CpdBeforeEp(epIdx, obj);
431 #endif
432   _entryTable[epIdx]->call(deliverMsg, obj);
433 #ifndef CMK_OPTIMIZE
434   CpdAfterEp(epIdx);
435 #endif
436 }
437
438 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
439 {
440   register void *msg = EnvToUsr(env);
441   _SET_USED(env, 0);
442   CkDeliverMessageFree(epIdx,msg,obj);
443 }
444
445 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
446 {
447 #ifndef CMK_OPTIMIZE /* Consider tracing: */
448   if (_entryTable[epIdx]->traceEnabled) {
449     _TRACE_BEGIN_EXECUTE(env);
450     _invokeEntryNoTrace(epIdx,env,obj);
451     _TRACE_END_EXECUTE();
452   }
453   else
454 #endif
455     _invokeEntryNoTrace(epIdx,env,obj);
456 }
457
458 /********************* Creation ********************/
459
460 extern "C"
461 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
462 {
463   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
464   envelope *env = UsrToEnv(msg);
465   _CHECK_USED(env);
466   if(pCid == 0) {
467     env->setMsgtype(NewChareMsg);
468   } else {
469     pCid->onPE = (-(CkMyPe()+1));
470     //  pCid->magic = _GETIDX(cIdx);
471     pCid->objPtr = (void *) new VidBlock();
472     _MEMCHECK(pCid->objPtr);
473     env->setMsgtype(NewVChareMsg);
474     env->setVidPtr(pCid->objPtr);
475   }
476   env->setEpIdx(eIdx);
477   env->setSrcPe(CkMyPe());
478   CmiSetHandler(env, _charmHandlerIdx);
479   _TRACE_CREATION_1(env);
480   CpvAccess(_qd)->create();
481   _STATS_RECORD_CREATE_CHARE_1();
482   _SET_USED(env, 1);
483   if(destPE == CK_PE_ANY)
484     env->setForAnyPE(1);
485   else
486     env->setForAnyPE(0);
487   CldEnqueue(destPE, env, _infoIdx);
488   _TRACE_CREATION_DONE(1);
489 }
490
491 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
492 {
493   register int gIdx = _entryTable[epIdx]->chareIdx;
494   register void *obj = malloc(_chareTable[gIdx]->size);
495   _MEMCHECK(obj);
496   setMemoryTypeChare(obj);
497   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
498   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
499   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
500   CkpvAccess(_groupIDTable)->push_back(groupID);
501   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
502   if(ptrq) {
503     void *pending;
504     while((pending=ptrq->deq())!=0)
505       CldEnqueue(CkMyPe(), pending, _infoIdx);
506 //    delete ptrq;
507       CkpvAccess(_groupTable)->find(groupID).clearPending();
508   }
509   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
510
511   CkpvAccess(_currentGroup) = groupID;
512   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
513   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
514   _STATS_RECORD_PROCESS_GROUP_1();
515 }
516
517 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
518 {
519   register int gIdx = _entryTable[epIdx]->chareIdx;
520   int objSize=_chareTable[gIdx]->size;
521   register void *obj = malloc(objSize);
522   _MEMCHECK(obj);
523   setMemoryTypeChare(obj);
524   CkpvAccess(_currentGroup) = groupID;
525
526 // Now that the NodeGroup is created, add it to the table.
527 //  NodeGroups can be accessed by multiple processors, so
528 //  this is in the opposite order from groups - invoking the constructor
529 //  before registering it.
530 // User may call CkLocalNodeBranch() inside the nodegroup constructor
531 //  store nodegroup into _currentNodeGroupObj
532   CkpvAccess(_currentNodeGroupObj) = obj;
533   _invokeEntryNoTrace(epIdx,env,obj);
534   CkpvAccess(_currentNodeGroupObj) = NULL;
535   _STATS_RECORD_PROCESS_NODE_GROUP_1();
536
537   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
538   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
539   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
540   CksvAccess(_nodeGroupIDTable).push_back(groupID);
541
542   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
543   if(ptrq) {
544     void *pending;
545     while((pending=ptrq->deq())!=0)
546       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
547 //    delete ptrq;
548       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
549   }
550   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
551 }
552
553 void _createGroup(CkGroupID groupID, envelope *env)
554 {
555   _CHECK_USED(env);
556   _SET_USED(env, 1);
557   register int epIdx = env->getEpIdx();
558   register int msgIdx = env->getMsgIdx();
559   int gIdx = _entryTable[epIdx]->chareIdx;
560   CkNodeGroupID rednMgr;
561   if(_chareTable[gIdx]->isIrr == 0){
562                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
563                 rednMgr = rednMgrProxy;
564 //              rednMgrProxy.setAttachedGroup(groupID);
565   }else{
566         rednMgr.setZero();
567   }
568   env->setGroupNum(groupID);
569   env->setSrcPe(CkMyPe());
570   env->setRednMgr(rednMgr);
571   env->setGroupEpoch(CkpvAccess(_charmEpoch));
572
573   if(CkNumPes()>1) {
574     CkPackMessage(&env);
575     CmiSetHandler(env, _bocHandlerIdx);
576     _numInitMsgs++;
577     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
578     CpvAccess(_qd)->create(CkNumPes()-1);
579     CkUnpackMessage(&env);
580   }
581   _STATS_RECORD_CREATE_GROUP_1();
582   CkCreateLocalGroup(groupID, epIdx, env);
583 }
584
585 void _createNodeGroup(CkGroupID groupID, envelope *env)
586 {
587   _CHECK_USED(env);
588   _SET_USED(env, 1);
589   register int epIdx = env->getEpIdx();
590   register int msgIdx = env->getMsgIdx();
591   env->setGroupNum(groupID);
592   env->setSrcPe(CkMyPe());
593   env->setGroupEpoch(CkpvAccess(_charmEpoch));
594   register void *msg =  EnvToUsr(env);
595   if(CkNumNodes()>1) {
596     CkPackMessage(&env);
597     CmiSetHandler(env, _bocHandlerIdx);
598     _numInitMsgs++;
599     CksvAccess(_numInitNodeMsgs)++;
600     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
601     CpvAccess(_qd)->create(CkNumNodes()-1);
602     CkUnpackMessage(&env);
603   }
604   _STATS_RECORD_CREATE_NODE_GROUP_1();
605   CkCreateLocalNodeGroup(groupID, epIdx, env);
606 }
607
608 // new _groupCreate
609
610 static CkGroupID _groupCreate(envelope *env)
611 {
612   register CkGroupID groupNum;
613
614   // check CkMyPe(). if it is 0 then idx is _numGroups++
615   // if not, then something else...
616   if(CkMyPe() == 0)
617      groupNum.idx = CkpvAccess(_numGroups)++;
618   else
619      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
620   _createGroup(groupNum, env);
621   return groupNum;
622 }
623
624 // new _nodeGroupCreate
625 static CkGroupID _nodeGroupCreate(envelope *env)
626 {
627   register CkGroupID groupNum;
628   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
629   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
630           groupNum.idx = CksvAccess(_numNodeGroups)++;
631    else
632           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
633   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
634   _createNodeGroup(groupNum, env);
635   return groupNum;
636 }
637
638 /**** generate the group idx when group is creator pe is not pe0
639  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
640  **** remaining bits contain the group creator processor number and
641  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
642
643 int _getGroupIdx(int numNodes,int myNode,int numGroups)
644 {
645         int idx;
646         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
647         int n = 32 - (x+1);                                     // number of bits remaining for the index
648         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
649                                                                 // then add the next available index
650         // of course this won't work when int is 8 bytes long on T3E
651         //idx |= 0x80000000;                                      // set the most significant bit to 1
652         idx = - idx;
653                                                                 // if int is not 32 bits, wouldn't this be wrong?
654         return idx;
655 }
656
657 extern "C"
658 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
659 {
660   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
661   register envelope *env = UsrToEnv(msg);
662   env->setMsgtype(BocInitMsg);
663   env->setEpIdx(eIdx);
664   env->setSrcPe(CkMyPe());
665   _TRACE_CREATION_N(env, CkNumPes());
666   CkGroupID gid = _groupCreate(env);
667   _TRACE_CREATION_DONE(1);
668   return gid;
669 }
670
671 extern "C"
672 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
673 {
674   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
675   register envelope *env = UsrToEnv(msg);
676   env->setMsgtype(NodeBocInitMsg);
677   env->setEpIdx(eIdx);
678   env->setSrcPe(CkMyPe());
679   _TRACE_CREATION_N(env, CkNumNodes());
680   CkGroupID gid = _nodeGroupCreate(env);
681   _TRACE_CREATION_DONE(1);
682   return gid;
683 }
684
685 static inline void *_allocNewChare(envelope *env)
686 {
687   void *tmp=malloc(_chareTable[_entryTable[env->getEpIdx()]->chareIdx]->size);
688   _MEMCHECK(tmp);
689   setMemoryTypeChare(tmp);
690   return tmp;
691 }
692
693 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
694 {
695   register void *obj = _allocNewChare(env);
696   _invokeEntry(env->getEpIdx(),env,obj);
697 }
698
699 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
700 {
701   register void *obj = _allocNewChare(env);
702   register CkChareID *pCid = (CkChareID *)
703       _allocMsg(FillVidMsg, sizeof(CkChareID));
704   pCid->onPE = CkMyPe();
705   pCid->objPtr = obj;
706   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
707   register envelope *ret = UsrToEnv(pCid);
708   ret->setVidPtr(env->getVidPtr());
709   register int srcPe = env->getSrcPe();
710   ret->setSrcPe(CkMyPe());
711   CmiSetHandler(ret, _charmHandlerIdx);
712   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
713   CpvAccess(_qd)->create();
714   _invokeEntry(env->getEpIdx(),env,obj);
715 }
716
717 /************** Receive: Chares *************/
718
719 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
720 {
721   register int epIdx = env->getEpIdx();
722   register void *obj = env->getObjPtr();
723   _invokeEntry(epIdx,env,obj);
724 }
725
726 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
727 {
728   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
729   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
730   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
731   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
732   vptr->fill(pcid->onPE, pcid->objPtr);
733   CmiFree(env);
734 }
735
736 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
737 {
738   VidBlock *vptr = (VidBlock *) env->getVidPtr();
739   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
740   _SET_USED(env, 1);
741   vptr->send(env);
742 }
743
744 /************** Receive: Groups ****************/
745
746 /**
747  This message is sent to this groupID--prepare to
748  handle this message by looking up the group,
749  and possibly stashing the message.
750 */
751 IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
752 {
753
754         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
755         IrrGroup *obj = ck->localBranch(groupID);
756         if (obj==NULL) { /* groupmember not yet created: stash message */
757                 ck->getGroupTable()->find(groupID).enqMsg(env);
758         }
759         else { /* will be able to process message */
760                 ck->process();
761         }
762         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
763         return obj;
764 }
765
766 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
767 {
768 #if CMK_LBDB_ON
769   // if there is a running obj being measured, stop it temporarily
770   LDObjHandle objHandle;
771   int objstopped = 0;
772   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
773   if (the_lbdb->RunningObject(&objHandle)) {
774     objstopped = 1;
775     the_lbdb->ObjectStop(objHandle);
776   }
777 #endif
778   _invokeEntry(epIdx,env,obj);
779 #if CMK_LBDB_ON
780   if (objstopped) the_lbdb->ObjectStart(objHandle);
781 #endif
782   _STATS_RECORD_PROCESS_BRANCH_1();
783 }
784
785 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
786 {
787   register CkGroupID groupID =  env->getGroupNum();
788   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
789   if(obj) {
790     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
791   }
792 }
793
794 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
795 {
796   env->setMsgtype(ForChareMsg);
797   env->setObjPtr(obj);
798   _processForChareMsg(ck,env);
799   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
800 }
801
802 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
803 {
804   env->setEpIdx(epIdx);
805   _deliverForNodeBocMsg(ck,env, obj);
806 }
807
808 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
809 {
810   register CkGroupID groupID = env->getGroupNum();
811   register void *obj;
812
813   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
814   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
815   if(!obj) { // groupmember not yet created
816 #if CMK_IMMEDIATE_MSG
817     if (CmiIsImmediate(env))     // buffer immediate message
818       CmiDelayImmediate();
819     else
820 #endif
821     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
822     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
823     return;
824   }
825   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
826 #if CMK_IMMEDIATE_MSG
827   if (!CmiIsImmediate(env))
828 #endif
829   ck->process();
830   env->setMsgtype(ForChareMsg);
831   env->setObjPtr(obj);
832   _processForChareMsg(ck,env);
833   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
834 }
835
836 void _processBocInitMsg(CkCoreState *ck,envelope *env)
837 {
838   register CkGroupID groupID = env->getGroupNum();
839   register int epIdx = env->getEpIdx();
840   CkCreateLocalGroup(groupID, epIdx, env);
841 }
842
843 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
844 {
845   register CkGroupID groupID = env->getGroupNum();
846   register int epIdx = env->getEpIdx();
847   CkCreateLocalNodeGroup(groupID, epIdx, env);
848 }
849
850 /************** Receive: Arrays *************/
851
852 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
853   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
854   if (mgr) {
855     _SET_USED(env, 0);
856     mgr->insertElement((CkMessage *)EnvToUsr(env));
857   }
858 }
859 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
860   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
861   if (mgr) {
862     _SET_USED(env, 0);
863     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
864   }
865 }
866
867 /**
868  * This is the main converse-level handler used by all of Charm++.
869  */
870 void _processHandler(void *converseMsg,CkCoreState *ck)
871 {
872   register envelope *env = (envelope *) converseMsg;
873 //#if CMK_RECORD_REPLAY
874   if (ck->watcher!=NULL) {
875     if (!ck->watcher->processMessage(env,ck)) return;
876   }
877 //#endif
878   switch(env->getMsgtype()) {
879 // Group support
880     case BocInitMsg :
881       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
882       _processBocInitMsg(ck,env);
883       break;
884     case NodeBocInitMsg :
885       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
886       _processNodeBocInitMsg(ck,env);
887       break;
888     case ForBocMsg :
889       // QD processing moved inside _processForBocMsg because it is conditional
890       if(env->isPacked()) CkUnpackMessage(&env);
891       _processForBocMsg(ck,env);
892       // stats record moved inside _processForBocMsg because it is conditional
893       break;
894     case ForNodeBocMsg :
895       // QD processing moved to _processForNodeBocMsg because it is conditional
896       if(env->isPacked()) CkUnpackMessage(&env);
897       _processForNodeBocMsg(ck,env);
898       // stats record moved to _processForNodeBocMsg because it is conditional
899       break;
900
901 // Array support
902     case ArrayEltInitMsg:
903       if(env->isPacked()) CkUnpackMessage(&env);
904       _processArrayEltInitMsg(ck,env);
905       break;
906     case ForArrayEltMsg:
907       if(env->isPacked()) CkUnpackMessage(&env);
908       _processArrayEltMsg(ck,env);
909       break;
910
911 // Chare support
912     case NewChareMsg :
913       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
914       _processNewChareMsg(ck,env);
915       _STATS_RECORD_PROCESS_CHARE_1();
916       break;
917     case NewVChareMsg :
918       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
919       _processNewVChareMsg(ck,env);
920       _STATS_RECORD_PROCESS_CHARE_1();
921       break;
922     case ForChareMsg :
923       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
924       _processForChareMsg(ck,env);
925       _STATS_RECORD_PROCESS_MSG_1();
926       break;
927     case ForVidMsg   :
928       ck->process();
929       _processForVidMsg(ck,env);
930       break;
931     case FillVidMsg  :
932       ck->process();
933       _processFillVidMsg(ck,env);
934       break;
935
936     default:
937       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
938   }
939 }
940
941
942 /******************** Message Send **********************/
943
944 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
945              int *queueing, int *priobits, unsigned int **prioptr)
946 {
947   register envelope *env = (envelope *)converseMsg;
948   *pfn = (CldPackFn)CkPackMessage;
949   *len = env->getTotalsize();
950   *queueing = env->getQueueing();
951   *priobits = env->getPriobits();
952   *prioptr = (unsigned int *) env->getPrioPtr();
953 }
954
955 void CkPackMessage(envelope **pEnv)
956 {
957   register envelope *env = *pEnv;
958   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
959     register void *msg = EnvToUsr(env);
960     _TRACE_BEGIN_PACK();
961     msg = _msgTable[env->getMsgIdx()]->pack(msg);
962     _TRACE_END_PACK();
963     env=UsrToEnv(msg);
964     env->setPacked(1);
965     *pEnv = env;
966   }
967 }
968
969 void CkUnpackMessage(envelope **pEnv)
970 {
971   register envelope *env = *pEnv;
972   register int msgIdx = env->getMsgIdx();
973   if(env->isPacked()) {
974     register void *msg = EnvToUsr(env);
975     _TRACE_BEGIN_UNPACK();
976     msg = _msgTable[msgIdx]->unpack(msg);
977     _TRACE_END_UNPACK();
978     env=UsrToEnv(msg);
979     env->setPacked(0);
980     *pEnv = env;
981   }
982 }
983
984 //There's no reason for most messages to go through the Cld--
985 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
986 // Thus these accellerated versions of the Cld calls.
987
988 int index_objectQHandler;
989 int index_tokenHandler;
990 static int index_skipCldHandler;
991
992 static void _skipCldHandler(void *converseMsg)
993 {
994   register envelope *env = (envelope *)(converseMsg);
995   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
996 #if CMK_GRID_QUEUE_AVAILABLE
997   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
998     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
999                        env, env->getQueueing (), env->getPriobits (),
1000                        (unsigned int *) env->getPrioPtr ());
1001   } else {
1002     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1003                        env, env->getQueueing (), env->getPriobits (),
1004                        (unsigned int *) env->getPrioPtr ());
1005   }
1006 #else
1007   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1008         env, env->getQueueing(),env->getPriobits(),
1009         (unsigned int *)env->getPrioPtr());
1010 #endif
1011 }
1012
1013
1014 static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1015 {
1016   if(pe == CkMyPe() ){
1017     if(!CmiNodeAlive(CkMyPe())){
1018         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1019 //      return;
1020     }
1021   }
1022   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1023 #if CMK_OBJECT_QUEUE_AVAILABLE
1024     Chare *obj = CkFindObjectPtr(env);
1025     if (obj && obj->CkGetObjQueue().queue()) {
1026       _enqObjQueue(obj, env);
1027     }
1028     else
1029 #endif
1030     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1031         env, env->getQueueing(),env->getPriobits(),
1032         (unsigned int *)env->getPrioPtr());
1033   } else {
1034     CkPackMessage(&env);
1035     int len=env->getTotalsize();
1036     CmiSetXHandler(env,CmiGetHandler(env));
1037 #if CMK_OBJECT_QUEUE_AVAILABLE
1038     CmiSetHandler(env,index_objectQHandler);
1039 #else
1040     CmiSetHandler(env,index_skipCldHandler);
1041 #endif
1042     CmiSetInfo(env,infoFn);
1043     if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1044     else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1045     else{
1046                         CmiSyncSendAndFree(pe, len, (char *)env);
1047                 }
1048   }
1049 }
1050
1051 #if CMK_BLUEGENE_CHARM
1052 #   define  _skipCldEnqueue   CldEnqueue
1053 #endif
1054
1055 // by pass Charm++ priority queue, send as Converse message
1056 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1057 {
1058   CkPackMessage(&env);
1059   int len=env->getTotalsize();
1060   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1061 }
1062
1063 static void _noCldEnqueue(int pe, envelope *env)
1064 {
1065 /*
1066   if (pe == CkMyPe()) {
1067     CmiHandleMessage(env);
1068   } else
1069 */
1070   CkPackMessage(&env);
1071   int len=env->getTotalsize();
1072   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1073   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1074   else CmiSyncSendAndFree(pe, len, (char *)env);
1075 }
1076
1077 static void _noCldNodeEnqueue(int node, envelope *env)
1078 {
1079 /*
1080   if (node == CkMyNode()) {
1081     CmiHandleMessage(env);
1082   } else {
1083 */
1084   CkPackMessage(&env);
1085   int len=env->getTotalsize();
1086   if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, (char *)env); }
1087   else if (node==CLD_BROADCAST_ALL) { CmiSyncNodeBroadcastAllAndFree(len, (char *)env); }
1088   else CmiSyncNodeSendAndFree(node, len, (char *)env);
1089 }
1090
1091 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1092 {
1093   register envelope *env = UsrToEnv(msg);
1094   _CHECK_USED(env);
1095   _SET_USED(env, 1);
1096   env->setMsgtype(ForChareMsg);
1097   env->setEpIdx(eIdx);
1098   env->setSrcPe(CkMyPe());
1099 #ifndef CMK_OPTIMIZE
1100   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1101 #endif
1102 #if CMK_OBJECT_QUEUE_AVAILABLE
1103   CmiSetHandler(env, index_objectQHandler);
1104 #else
1105   CmiSetHandler(env, _charmHandlerIdx);
1106 #endif
1107   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1108     register int pe = -(pCid->onPE+1);
1109     if(pe==CkMyPe()) {
1110       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1111       void *objPtr;
1112       if (NULL!=(objPtr=vblk->getLocalChare()))
1113       { //A ready local chare
1114         env->setObjPtr(objPtr);
1115         return pe;
1116       }
1117       else { //The vidblock is not ready-- forget it
1118         vblk->send(env);
1119         return -1;
1120       }
1121     } else { //Valid vidblock for another PE:
1122       env->setMsgtype(ForVidMsg);
1123       env->setVidPtr(pCid->objPtr);
1124       return pe;
1125     }
1126   }
1127   else {
1128     env->setObjPtr(pCid->objPtr);
1129     return pCid->onPE;
1130   }
1131 }
1132
1133 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1134 {
1135   int destPE = _prepareMsg(eIdx, msg, pCid);
1136   if (destPE != -1) {
1137     register envelope *env = UsrToEnv(msg);
1138     CmiBecomeImmediate(env);
1139   }
1140   return destPE;
1141 }
1142
1143 extern "C"
1144 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1145 {
1146   if (opts & CK_MSG_INLINE) {
1147     CkSendMsgInline(entryIdx, msg, pCid, opts);
1148     return;
1149   }
1150 #ifndef CMK_OPTIMIZE
1151   if (opts & CK_MSG_IMMEDIATE) {
1152     CmiAbort("Immediate message is not allowed in Chare!");
1153   }
1154 #endif
1155   register envelope *env = UsrToEnv(msg);
1156   int destPE=_prepareMsg(entryIdx,msg,pCid);
1157   if (destPE!=-1) {
1158     _TRACE_CREATION_1(env);
1159     CpvAccess(_qd)->create();
1160     if (opts & CK_MSG_SKIP_OR_IMM)
1161       _noCldEnqueue(destPE, env);
1162     else
1163       CldEnqueue(destPE, env, _infoIdx);
1164     _TRACE_CREATION_DONE(1);
1165   }
1166 }
1167
1168 extern "C"
1169 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1170 {
1171   if (pCid->onPE==CkMyPe())
1172   {
1173     if(!CmiNodeAlive(CkMyPe())){
1174         return;
1175     }
1176 #ifndef CMK_OPTIMIZE
1177     //Just in case we need to breakpoint or use the envelope in some way
1178     _prepareMsg(entryIndex,msg,pCid);
1179 #endif
1180                 //Just directly call the chare (skip QD handling & scheduler)
1181     register envelope *env = UsrToEnv(msg);
1182     if (env->isPacked()) CkUnpackMessage(&env);
1183     _STATS_RECORD_PROCESS_MSG_1();
1184     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1185   }
1186   else {
1187     //No way to inline a cross-processor message:
1188     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
1189   }
1190 }
1191
1192 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1193 {
1194   register envelope *env = UsrToEnv(msg);
1195   _CHECK_USED(env);
1196   _SET_USED(env, 1);
1197   env->setMsgtype(type);
1198   env->setEpIdx(eIdx);
1199   env->setGroupNum(gID);
1200   env->setSrcPe(CkMyPe());
1201 #ifndef CMK_OPTIMIZE
1202   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1203 #endif
1204   CmiSetHandler(env, _charmHandlerIdx);
1205   return env;
1206 }
1207
1208 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1209 {
1210   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1211   CmiBecomeImmediate(env);
1212   return env;
1213 }
1214
1215 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1216                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1217 {
1218   int numPes;
1219   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1220   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1221   _TRACE_CREATION_N(env, numPes);
1222   if (opts & CK_MSG_SKIP_OR_IMM)
1223     _noCldEnqueue(pe, env);
1224   else
1225     _skipCldEnqueue(pe, env, _infoIdx);
1226   _TRACE_CREATION_DONE(1);
1227 }
1228
1229 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1230                            int npes, int *pes)
1231 {
1232   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1233   _TRACE_CREATION_MULTICAST(env, npes, pes);
1234   CldEnqueueMulti(npes, pes, env, _infoIdx);
1235   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1236 }
1237
1238 extern "C"
1239 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1240 {
1241 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1242   if (destPE==CkMyPe())
1243   {
1244     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1245     return;
1246   }
1247   //Can't inline-- send the usual way
1248   register envelope *env = UsrToEnv(msg);
1249   int numPes;
1250   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1251   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1252   _TRACE_CREATION_N(env, numPes);
1253   _noCldEnqueue(destPE, env);
1254   _STATS_RECORD_SEND_BRANCH_1();
1255   CkpvAccess(_coreState)->create();
1256   _TRACE_CREATION_DONE(1);
1257 #else
1258   // no support for immediate message, send inline
1259   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1260 #endif
1261 }
1262
1263 extern "C"
1264 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1265 {
1266   if (destPE==CkMyPe())
1267   {
1268     if(!CmiNodeAlive(CkMyPe())){
1269         return;
1270     }
1271     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1272     if (obj!=NULL)
1273     { //Just directly call the group:
1274 #ifndef CMK_OPTIMIZE
1275       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1276 #else
1277       envelope *env=UsrToEnv(msg);
1278 #endif
1279       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1280       return;
1281     }
1282   }
1283   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1284   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
1285 }
1286
1287 extern "C"
1288 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1289 {
1290   if (opts & CK_MSG_INLINE) {
1291     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1292     return;
1293   }
1294   if (opts & CK_MSG_IMMEDIATE) {
1295     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1296     return;
1297   }
1298   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1299   _STATS_RECORD_SEND_BRANCH_1();
1300   CkpvAccess(_coreState)->create();
1301 }
1302
1303 extern "C"
1304 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,int npes,int *pes,CkGroupID gID)
1305 {
1306 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1307   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1308   _TRACE_CREATION_MULTICAST(env, npes, pes);
1309   _noCldEnqueueMulti(npes, pes, env);
1310   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1311 #else
1312   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1313   CpvAccess(_qd)->create(-npes);
1314 #endif
1315   _STATS_RECORD_SEND_BRANCH_N(npes);
1316   CpvAccess(_qd)->create(npes);
1317 }
1318
1319 extern "C"
1320 void CkSendMsgBranchMulti(int eIdx,void *msg,int npes,int *pes,CkGroupID gID, int opts)
1321 {
1322   if (opts & CK_MSG_IMMEDIATE) {
1323     CkSendMsgBranchMultiImmediate(eIdx,msg,npes,pes,gID);
1324     return;
1325   }
1326     // normal mesg
1327   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1328   _STATS_RECORD_SEND_BRANCH_N(npes);
1329   CpvAccess(_qd)->create(npes);
1330 }
1331
1332 extern "C"
1333 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1334 {
1335   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1336   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1337   CpvAccess(_qd)->create(CkNumPes());
1338 }
1339
1340 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1341                 int node=CLD_BROADCAST_ALL, int opts=0)
1342 {
1343   int numPes;
1344   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1345   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1346   _TRACE_CREATION_N(env, numPes);
1347   if (opts & CK_MSG_SKIP_OR_IMM) {
1348     _noCldNodeEnqueue(node, env);
1349     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1350       CkpvAccess(_coreState)->create(-numPes);
1351     }
1352   }
1353   else
1354     CldNodeEnqueue(node, env, _infoIdx);
1355   _TRACE_CREATION_DONE(1);
1356 }
1357
1358 extern "C"
1359 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1360 {
1361 #if CMK_IMMEDIATE_MSG
1362   if (node==CkMyNode())
1363   {
1364     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1365     return;
1366   }
1367   //Can't inline-- send the usual way
1368   register envelope *env = UsrToEnv(msg);
1369   int numPes;
1370   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1371   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1372   _TRACE_CREATION_N(env, numPes);
1373   _noCldNodeEnqueue(node, env);
1374   _STATS_RECORD_SEND_BRANCH_1();
1375   /* immeidate message is invisible to QD */
1376 //  CkpvAccess(_coreState)->create();
1377   _TRACE_CREATION_DONE(1);
1378 #else
1379   // no support for immediate message, send inline
1380   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1381 #endif
1382 }
1383
1384 extern "C"
1385 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1386 {
1387   if (node==CkMyNode())
1388   {
1389     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1390     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1391     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1392     if (obj!=NULL)
1393     { //Just directly call the group:
1394 #ifndef CMK_OPTIMIZE
1395       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1396 #else
1397       envelope *env=UsrToEnv(msg);
1398 #endif
1399       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1400       return;
1401     }
1402   }
1403   //Can't inline-- send the usual way
1404   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
1405 }
1406
1407 extern "C"
1408 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1409 {
1410   if (opts & CK_MSG_INLINE) {
1411     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1412     return;
1413   }
1414   if (opts & CK_MSG_IMMEDIATE) {
1415     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1416     return;
1417   }
1418   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1419   _STATS_RECORD_SEND_NODE_BRANCH_1();
1420   CkpvAccess(_coreState)->create();
1421 }
1422
1423 extern "C"
1424 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1425 {
1426   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1427   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1428   CpvAccess(_qd)->create(CkNumNodes());
1429 }
1430
1431 //Needed by delegation manager:
1432 extern "C"
1433 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1434 { return _prepareMsg(eIdx,msg,pCid); }
1435 extern "C"
1436 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1437 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1438 extern "C"
1439 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1440 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1441
1442 void _ckModuleInit(void) {
1443         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1444         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1445         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1446         CkpvInitialize(TokenPool*, _tokenPool);
1447         CkpvAccess(_tokenPool) = new TokenPool;
1448 }
1449
1450
1451 /************** Send: Arrays *************/
1452
1453 extern void CkArrayManagerInsert(int onPe,void *msg);
1454 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1455
1456 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1457 {
1458   _CHECK_USED(env);
1459   _SET_USED(env, 1);
1460   env->setMsgtype(type);
1461 #ifndef CMK_OPTIMIZE
1462   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1463 #endif
1464   CmiSetHandler(env, _charmHandlerIdx);
1465   CpvAccess(_qd)->create();
1466 }
1467
1468 extern "C"
1469 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1470   register envelope *env = UsrToEnv(msg);
1471   env->getsetArrayMgr()=aID;
1472   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1473   CldEnqueue(pe, env, _infoIdx);
1474 }
1475
1476 extern "C"
1477 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1478   register envelope *env = UsrToEnv(msg);
1479   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1480   if (opts & CK_MSG_IMMEDIATE)
1481     CmiBecomeImmediate(env);
1482   if (opts & CK_MSG_SKIP_OR_IMM)
1483     _noCldEnqueue(pe, env);
1484   else
1485     _skipCldEnqueue(pe, env, _infoIdx);
1486 }
1487
1488 class ElementDestroyer : public CkLocIterator {
1489 private:
1490         CkLocMgr *locMgr;
1491 public:
1492         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1493         void addLocation(CkLocation &loc) {
1494           loc.destroyAll();
1495         }
1496 };
1497
1498 void CkDeleteChares() {
1499   int i;
1500   int numGroups = CkpvAccess(_groupIDTable)->size();
1501
1502   // delete all array elements
1503   for(i=0;i<numGroups;i++) {
1504     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1505     if(obj && obj->isLocMgr())  {
1506       CkLocMgr *mgr = (CkLocMgr*)obj;
1507       ElementDestroyer destroyer(mgr);
1508       mgr->iterate(destroyer);
1509 printf("[%d] DELETE!\n", CkMyPe());
1510     }
1511   }
1512
1513   // delete all groups
1514   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1515   for(i=0;i<numGroups;i++) {
1516     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1517     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1518     if (obj) delete obj;
1519   }
1520   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1521
1522   // delete all node groups
1523   if (CkMyRank() == 0) {
1524     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1525     for(i=0;i<numNodeGroups;i++) {
1526       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1527       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1528       if (obj) delete obj;
1529     }
1530   }
1531 }
1532
1533 //------------------- Message Watcher (record/replay) ----------------
1534
1535 CkMessageWatcher::~CkMessageWatcher() {}
1536
1537 class CkMessageRecorder : public CkMessageWatcher {
1538         FILE *f;
1539 public:
1540         CkMessageRecorder(FILE *f_) :f(f_) {}
1541         ~CkMessageRecorder() {
1542                 fprintf(f,"-1 -1 -1");
1543                 fclose(f);
1544         }
1545
1546         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1547                 if (env->getEvent())
1548                      fprintf(f,"%d %d %d %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg);
1549                 return CmiTrue;
1550         }
1551 };
1552
1553 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1554 #define REPLAYDEBUG(args) /* empty */
1555
1556 class CkMessageReplay : public CkMessageWatcher {
1557         FILE *f;
1558         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1559         /// Read the next message we need from the file:
1560         void getNext(void) {
1561                 if (4!=fscanf(f,"%d%d%d%d", &nextPE,&nextSize,&nextEvent,&nexttype)) {
1562                         // CkAbort("CkMessageReplay> Syntax error reading replay file");
1563                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1564                 }
1565         }
1566         /// If this is the next message we need, advance and return CmiTrue.
1567         CmiBool isNext(envelope *env) {
1568                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1569                 if (nextEvent!=env->getEvent()) return CmiFalse;
1570                 if (nextSize!=env->getTotalsize())
1571                 {
1572                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1573                         return CmiFalse;
1574                 }
1575                 return CmiTrue;
1576         }
1577
1578         /// This is a (short) list of messages we aren't yet ready for:
1579         CkQ<envelope *> delayed;
1580
1581         /// Try to flush out any delayed messages
1582         void flush(void) {
1583                 int len=delayed.length();
1584                 for (int i=0;i<len;i++) {
1585                         envelope *env=delayed.deq();
1586                         if (isNext(env)) { /* this is the next message: process it */
1587                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1588                                 CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1589                                 return;
1590                         }
1591                         else /* Not ready yet-- put it back in the
1592                                 queue */
1593                           {
1594                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1595                                 delayed.enq(env);
1596                           }
1597                 }
1598         }
1599
1600 public:
1601         CkMessageReplay(FILE *f_) :f(f_) { getNext();
1602         REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
1603                     }
1604         ~CkMessageReplay() {fclose(f);}
1605
1606         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1607           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
1608                 if (env->getEvent() == 0) return CmiTrue;
1609                 if (isNext(env)) { /* This is the message we were expecting */
1610                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1611                         getNext(); /* Advance over this message */
1612                         flush(); /* try to process queued-up stuff */
1613                         return CmiTrue;
1614                 }
1615 #if CMK_SMP
1616                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
1617                          // try next rank, we can't just buffer the msg and left
1618                          // we need to keep unprocessed msg on the fly
1619                         int nextpe = CkMyPe()+1;
1620                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
1621                         nextpe = CkNodeFirst(CkMyNode());
1622                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
1623                         return CmiFalse;
1624                 }
1625 #endif
1626                 else /*!isNext(env) */ {
1627                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
1628                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
1629                         delayed.enq(env);
1630                         flush();
1631                         return CmiFalse;
1632                 }
1633         }
1634 };
1635
1636 static FILE *openReplayFile(const char *permissions) {
1637
1638         char fName[200];
1639         sprintf(fName,"ckreplay_%06d.log",CkMyPe());
1640         FILE *f=fopen(fName,permissions);
1641         REPLAYDEBUG("openReplayfile "<<fName);
1642         if (f==NULL) {
1643                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
1644                         CkMyPe(),fName,permissions);
1645                 CkAbort("openReplayFile> Could not open replay file");
1646         }
1647         return f;
1648 }
1649
1650 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
1651         REPLAYDEBUG("CkMessageWaterInit ");
1652         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order"))
1653                 ck->watcher=new CkMessageRecorder(openReplayFile("w"));
1654         if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream"))
1655                 ck->watcher=new CkMessageReplay(openReplayFile("r"));
1656 }
1657
1658 extern "C"
1659 int CkMessageToEpIdx(void *msg) {
1660         envelope *env=UsrToEnv(msg);
1661         int ep=env->getEpIdx();
1662         if (ep==CkIndex_CkArray::recvBroadcast(0))
1663                 return env->getsetArrayBcastEp();
1664         else
1665                 return ep;
1666 }
1667
1668 extern "C"
1669 int getCharmEnvelopeSize() {
1670   return sizeof(envelope);
1671 }
1672
1673
1674 #include "CkMarshall.def.h"
1675