080b91b0f6b1191b6917d7d8eff4f6ae17b07a20
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
20 #include "pathHistory.h"
21 void automaticallySetMessagePriority(envelope *env); // in control point framework.
22 #endif
23
24 #if CMK_LBDB_ON
25 #include "LBDatabase.h"
26 #endif // CMK_LBDB_ON
27
28 #if CMK_FT_CHARE
29 CpvDeclare(CkVec<void *>, chare_objs);
30 CpvDeclare(CkVec<VidBlock *>, vidblocks);
31 #endif
32
33 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
34
35 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
36
37 int CMessage_CkMessage::__idx=-1;
38 int CMessage_CkArgMsg::__idx=0;
39 int CkIndex_Chare::__idx;
40 int CkIndex_Group::__idx;
41 int CkIndex_ArrayBase::__idx=-1;
42
43 extern int _defaultObjectQ;
44
45 //Charm++ virtual functions: declaring these here results in a smaller executable
46 Chare::Chare(void) {
47   thishandle.onPE=CkMyPe();
48   thishandle.objPtr=this;
49 #if CMK_FT_CHARE
50      // for plain chare, objPtr is actually the index to chare table
51   if (chareIdx >= 0) thishandle.objPtr=(void*)chareIdx;
52 #endif
53 #ifdef _FAULT_MLOG_
54   mlogData = new ChareMlogData();
55   mlogData->objID.type = TypeChare;
56   mlogData->objID.data.chare.id = thishandle;
57 #endif
58 #if CMK_OBJECT_QUEUE_AVAILABLE
59   if (_defaultObjectQ)  CkEnableObjQ();
60 #endif
61 }
62
63 Chare::Chare(CkMigrateMessage* m) {
64   thishandle.onPE=CkMyPe();
65   thishandle.objPtr=this;
66
67 #if CMK_OBJECT_QUEUE_AVAILABLE
68   if (_defaultObjectQ)  CkEnableObjQ();
69 #endif
70 }
71
72 void Chare::CkEnableObjQ()
73 {
74 #if CMK_OBJECT_QUEUE_AVAILABLE
75   objQ.create();
76 #endif
77 }
78
79 Chare::~Chare() {}
80
81 void Chare::pup(PUP::er &p)
82 {
83   p(thishandle.onPE);
84   thishandle.objPtr=(void *)this;
85 #if CMK_FT_CHARE
86   p(chareIdx);
87   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
88 #endif
89 #ifdef _FAULT_MLOG_
90   if(p.isUnpacking()){
91     mlogData = new ChareMlogData();
92   }
93   mlogData->pup(p);
94 #endif
95 }
96
97 int Chare::ckGetChareType() const {
98   return -3;
99 }
100 char *Chare::ckDebugChareName(void) {
101   char buf[100];
102   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
103   return strdup(buf);
104 }
105 int Chare::ckDebugChareID(char *str, int limit) {
106   // pure chares for now do not have a valid ID
107   str[0] = 0;
108   return 1;
109 }
110 void Chare::ckDebugPup(PUP::er &p) {
111   pup(p);
112 }
113
114 /// This method is called before starting a [threaded] entry method.
115 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
116   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
117   traceAddThreadListeners(th, UsrToEnv(msg));
118 }
119
120
121 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
122   p.comment("Bytes");
123   int ts=UsrToEnv(msg)->getTotalsize();
124   int msgLen=ts-sizeof(envelope);
125   if (msgLen>0)
126     p((char*)msg,msgLen);
127 }
128
129 IrrGroup::IrrGroup(void) {
130   thisgroup = CkpvAccess(_currentGroup);
131 #ifdef _FAULT_MLOG_
132         mlogData->objID.type = TypeGroup;
133         mlogData->objID.data.group.id = thisgroup;
134         mlogData->objID.data.group.onPE = CkMyPe();
135 #endif
136 }
137
138 IrrGroup::~IrrGroup() {
139   // remove the object pointer
140   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
141   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
142   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
143 }
144
145 void IrrGroup::pup(PUP::er &p)
146 {
147   Chare::pup(p);
148   p|thisgroup;
149 }
150
151 int IrrGroup::ckGetChareType() const {
152   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
153 }
154
155 int IrrGroup::ckDebugChareID(char *str, int limit) {
156   if (limit<5) return -1;
157   str[0] = 1;
158   *((int*)&str[1]) = thisgroup.idx;
159   return 5;
160 }
161
162 char *IrrGroup::ckDebugChareName() {
163   return strdup(_chareTable[ckGetChareType()]->name);
164 }
165
166 void IrrGroup::ckJustMigrated(void)
167 {
168 }
169
170 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
171   /* FIXME: **CW** not entirely sure what we should do here yet */
172 }
173
174 void Group::CkAddThreadListeners(CthThread th, void *msg) {
175   Chare::CkAddThreadListeners(th, msg);
176   CthSetThreadID(th, thisgroup.idx, 0, 0);
177 }
178
179 void Group::pup(PUP::er &p)
180 {
181   CkReductionMgr::pup(p);
182   reductionInfo.pup(p);
183 }
184
185 /**** Delegation Manager Group */
186 CkDelegateMgr::~CkDelegateMgr() { }
187
188 //Default delegator implementation: do not delegate-- send directly
189 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
190   { CkSendMsg(ep,m,c); }
191 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
192   { CkSendMsgBranch(ep,m,onPE,g); }
193 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
194   { CkBroadcastMsgBranch(ep,m,g); }
195 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
196   { CkSendMsgBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
197 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
198   { CkSendMsgNodeBranch(ep,m,onNode,g); }
199 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
200   { CkBroadcastMsgNodeBranch(ep,m,g); }
201 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
202   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.aid,s->npes,s->pelist); }
203 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
204 {
205         CProxyElement_ArrayBase ap(a,idx);
206         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
207 }
208 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
209 {
210         CProxyElement_ArrayBase ap(a,idx);
211         ap.ckSend((CkArrayMessage *)m,ep);
212 }
213 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
214 {
215         CProxy_ArrayBase ap(a);
216         ap.ckBroadcast((CkArrayMessage *)m,ep);
217 }
218
219 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
220 {
221         CmiAbort("ArraySectionSend is not implemented!\n");
222 /*
223         CProxyElement_ArrayBase ap(a,idx);
224         ap.ckSend((CkArrayMessage *)m,ep);
225 */
226 }
227
228 /*** Proxy <-> delegator communication */
229 CkDelegateData::~CkDelegateData() {}
230
231 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
232   return pd; // default implementation ignores pup call
233 }
234
235 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
236    this tricky manual reference counting business... */
237
238 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
239         if (dPtr) dPtr->ref();
240         ckUndelegate();
241         delegatedMgr = dTo;
242         delegatedPtr = dPtr;
243 }
244 void CProxy::ckUndelegate(void) {
245         delegatedMgr=NULL;
246         if (delegatedPtr) delegatedPtr->unref();
247         delegatedPtr=NULL;
248 }
249
250 /// Copy constructor
251 CProxy::CProxy(const CProxy &src)
252     :delegatedMgr(src.delegatedMgr)
253 {
254     delegatedPtr = NULL;
255     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
256         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
257 }
258
259 /// Assignment operator
260 CProxy& CProxy::operator=(const CProxy &src) {
261         CkDelegateData *oldPtr=delegatedPtr;
262         ckUndelegate();
263         delegatedMgr=src.delegatedMgr;
264
265         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
266             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
267         else
268             delegatedPtr = NULL;
269
270         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
271         if (oldPtr) oldPtr->unref();
272         return *this;
273 }
274
275 void CProxy::pup(PUP::er &p) {
276       CkGroupID delegatedTo;
277       delegatedTo.setZero();
278       int isNodeGroup = 0;
279       if (!p.isUnpacking()) {
280         if (delegatedMgr) {
281           delegatedTo = delegatedMgr->CkGetGroupID();
282           isNodeGroup = delegatedMgr->isNodeGroup();
283         }
284       }
285       p|delegatedTo;
286       if (!delegatedTo.isZero()) {
287         p|isNodeGroup;
288         if (p.isUnpacking()) {
289           if (isNodeGroup)
290                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
291           else
292                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
293         }
294
295         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
296         if (p.isUnpacking() && delegatedPtr)
297             delegatedPtr->ref();
298       }
299 }
300
301 /**** Array sections */
302 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
303 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
304   _cookie.aid = aid;    \
305   _cookie.get_pe() = CkMyPe();  \
306   _elems = new CkArrayIndexMax[nElems]; \
307   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
308   pelist = NULL;        \
309   npes  = 0;    \
310 }
311
312 CKSECTIONID_CONSTRUCTOR_DEF(1D)
313 CKSECTIONID_CONSTRUCTOR_DEF(2D)
314 CKSECTIONID_CONSTRUCTOR_DEF(3D)
315 CKSECTIONID_CONSTRUCTOR_DEF(4D)
316 CKSECTIONID_CONSTRUCTOR_DEF(5D)
317 CKSECTIONID_CONSTRUCTOR_DEF(6D)
318 CKSECTIONID_CONSTRUCTOR_DEF(Max)
319
320 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
321   pelist = new int[npes];
322   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
323   _cookie.aid = gid;
324 }
325
326 CkSectionID::CkSectionID(const CkSectionID &sid) {
327   int i;
328   _cookie = sid._cookie;
329   _nElems = sid._nElems;
330   if (_nElems > 0) {
331     _elems = new CkArrayIndexMax[_nElems];
332     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
333   } else _elems = NULL;
334   npes = sid.npes;
335   if (npes > 0) {
336     pelist = new int[npes];
337     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
338   } else pelist = NULL;
339 }
340
341 void CkSectionID::operator=(const CkSectionID &sid) {
342   int i;
343   _cookie = sid._cookie;
344   _nElems = sid._nElems;
345   if (_nElems > 0) {
346     _elems = new CkArrayIndexMax[_nElems];
347     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
348   } else _elems = NULL;
349   npes = sid.npes;
350   if (npes > 0) {
351     pelist = new int[npes];
352     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
353   } else pelist = NULL;
354 }
355
356 void CkSectionID::pup(PUP::er &p) {
357     p | _cookie;
358     p(_nElems);
359     if (_nElems > 0) {
360       if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
361       for (int i=0; i< _nElems; i++) p | _elems[i];
362       npes = 0;
363       pelist = NULL;
364     } else {
365       // If _nElems is zero, than this section describes processors instead of array elements
366       _elems = NULL;
367       p(npes);
368       if (p.isUnpacking()) pelist = new int[npes];
369       p(pelist, npes);
370     }
371 }
372
373 /**** Tiny random API routines */
374
375 #ifdef CMK_CUDA
376 void CUDACallbackManager(void *fn) {
377   if (fn != NULL) {
378     CkCallback *cb = (CkCallback*) fn;
379     cb->send();
380   }
381 }
382
383 #endif
384
385 extern "C"
386 void CkSetRefNum(void *msg, int ref)
387 {
388   UsrToEnv(msg)->setRef(ref);
389 }
390
391 extern "C"
392 int CkGetRefNum(void *msg)
393 {
394   return UsrToEnv(msg)->getRef();
395 }
396
397 extern "C"
398 int CkGetSrcPe(void *msg)
399 {
400   return UsrToEnv(msg)->getSrcPe();
401 }
402
403 extern "C"
404 int CkGetSrcNode(void *msg)
405 {
406   return CmiNodeOf(CkGetSrcPe(msg));
407 }
408
409 extern "C"
410 void *CkLocalBranch(CkGroupID gID) {
411   return _localBranch(gID);
412 }
413
414 static
415 void *_ckLocalNodeBranch(CkGroupID groupID) {
416   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
417   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
418   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
419   return retval;
420 }
421
422 extern "C"
423 void *CkLocalNodeBranch(CkGroupID groupID)
424 {
425   void *retval;
426   // we are called in a constructor
427   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
428     return CkpvAccess(_currentNodeGroupObj);
429   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
430   { // Nodegroup hasn't finished being created yet-- schedule...
431     CsdScheduler(0);
432   }
433   return retval;
434 }
435
436 extern "C"
437 void *CkLocalChare(const CkChareID *pCid)
438 {
439         int pe=pCid->onPE;
440         if (pe<0) { //A virtual chare ID
441                 if (pe!=(-(CkMyPe()+1)))
442                         return NULL;//VID block not on this PE
443                 VidBlock *v=(VidBlock *)pCid->objPtr;
444                 return v->getLocalChare();
445         }
446         else
447         { //An ordinary chare ID
448                 if (pe!=CkMyPe())
449                         return NULL;//Chare not on this PE
450                 return pCid->objPtr;
451         }
452 }
453
454 CkpvDeclare(char **,Ck_argv);
455 extern "C" char **CkGetArgv(void) {
456         return CkpvAccess(Ck_argv);
457 }
458 extern "C" int CkGetArgc(void) {
459         return CmiGetArgc(CkpvAccess(Ck_argv));
460 }
461
462 /******************** Basic support *****************/
463 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
464 {
465 #ifdef _FAULT_MLOG_
466         CpvAccess(_currentObj) = (Chare *)obj;
467 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
468 #endif
469   //BIGSIM_OOC DEBUGGING
470   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
471   //fflush(stdout);
472 #ifndef CMK_OPTIMIZE
473   CpdBeforeEp(epIdx, obj, msg);
474 #endif
475   _entryTable[epIdx]->call(msg, obj);
476 #ifndef CMK_OPTIMIZE
477   CpdAfterEp(epIdx);
478 #endif
479   if (_entryTable[epIdx]->noKeep)
480   { /* Method doesn't keep/delete the message, so we have to: */
481     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
482   }
483 }
484 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
485 {
486   //BIGSIM_OOC DEBUGGING
487   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
488   //fflush(stdout);
489
490   void *deliverMsg;
491 #ifdef _FAULT_MLOG_
492         CpvAccess(_currentObj) = (Chare *)obj;
493 #endif
494   if (_entryTable[epIdx]->noKeep)
495   { /* Deliver a read-only copy of the message */
496     deliverMsg=(void *)msg;
497   } else
498   { /* Method needs a copy of the message to keep/delete */
499     void *oldMsg=(void *)msg;
500     deliverMsg=CkCopyMsg(&oldMsg);
501 #ifndef CMK_OPTIMIZE
502     if (oldMsg!=msg)
503       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
504 #endif
505   }
506 #ifndef CMK_OPTIMIZE
507   CpdBeforeEp(epIdx, obj, (void*)msg);
508 #endif
509   _entryTable[epIdx]->call(deliverMsg, obj);
510 #ifndef CMK_OPTIMIZE
511   CpdAfterEp(epIdx);
512 #endif
513 }
514
515 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
516 {
517   register void *msg = EnvToUsr(env);
518   _SET_USED(env, 0);
519   CkDeliverMessageFree(epIdx,msg,obj);
520 }
521
522 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
523 {
524
525 #ifndef CMK_OPTIMIZE /* Consider tracing: */
526   if (_entryTable[epIdx]->traceEnabled) {
527     _TRACE_BEGIN_EXECUTE(env);
528     _invokeEntryNoTrace(epIdx,env,obj);
529     _TRACE_END_EXECUTE();
530   }
531   else
532 #endif
533     _invokeEntryNoTrace(epIdx,env,obj);
534
535 }
536
537 /********************* Creation ********************/
538
539 extern "C"
540 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
541 {
542   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
543   envelope *env = UsrToEnv(msg);
544   _CHECK_USED(env);
545   if(pCid == 0) {
546     env->setMsgtype(NewChareMsg);
547   } else {
548     pCid->onPE = (-(CkMyPe()+1));
549     //  pCid->magic = _GETIDX(cIdx);
550     pCid->objPtr = (void *) new VidBlock();
551     _MEMCHECK(pCid->objPtr);
552     env->setMsgtype(NewVChareMsg);
553     env->setVidPtr(pCid->objPtr);
554 #if CMK_FT_CHARE
555     CpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
556     int idx = CpvAccess(vidblocks).size()-1;
557     pCid->objPtr = (void *)idx;
558     env->setVidPtr((void *)idx);
559 #endif
560   }
561   env->setEpIdx(eIdx);
562   env->setSrcPe(CkMyPe());
563   CmiSetHandler(env, _charmHandlerIdx);
564   _TRACE_CREATION_1(env);
565   CpvAccess(_qd)->create();
566   _STATS_RECORD_CREATE_CHARE_1();
567   _SET_USED(env, 1);
568   if(destPE == CK_PE_ANY)
569     env->setForAnyPE(1);
570   else
571     env->setForAnyPE(0);
572   CldEnqueue(destPE, env, _infoIdx);
573   _TRACE_CREATION_DONE(1);
574 }
575
576 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
577 {
578   register int gIdx = _entryTable[epIdx]->chareIdx;
579   register void *obj = malloc(_chareTable[gIdx]->size);
580   _MEMCHECK(obj);
581   setMemoryTypeChare(obj);
582   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
583   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
584   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
585   CkpvAccess(_groupIDTable)->push_back(groupID);
586   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
587   if(ptrq) {
588     void *pending;
589     while((pending=ptrq->deq())!=0)
590       CldEnqueue(CkMyPe(), pending, _infoIdx);
591 //    delete ptrq;
592       CkpvAccess(_groupTable)->find(groupID).clearPending();
593   }
594   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
595
596   CkpvAccess(_currentGroup) = groupID;
597   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
598 #if CMK_FT_CHARE
599   ((Chare *)obj)->chareIdx = -1;
600 #endif
601   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
602   _STATS_RECORD_PROCESS_GROUP_1();
603 }
604
605 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
606 {
607   register int gIdx = _entryTable[epIdx]->chareIdx;
608   int objSize=_chareTable[gIdx]->size;
609   register void *obj = malloc(objSize);
610   _MEMCHECK(obj);
611   setMemoryTypeChare(obj);
612   CkpvAccess(_currentGroup) = groupID;
613
614 // Now that the NodeGroup is created, add it to the table.
615 //  NodeGroups can be accessed by multiple processors, so
616 //  this is in the opposite order from groups - invoking the constructor
617 //  before registering it.
618 // User may call CkLocalNodeBranch() inside the nodegroup constructor
619 //  store nodegroup into _currentNodeGroupObj
620   CkpvAccess(_currentNodeGroupObj) = obj;
621 #if CMK_FT_CHARE
622   ((Chare *)obj)->chareIdx = -1;
623 #endif
624   _invokeEntryNoTrace(epIdx,env,obj);
625   CkpvAccess(_currentNodeGroupObj) = NULL;
626   _STATS_RECORD_PROCESS_NODE_GROUP_1();
627
628   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
629   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
630   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
631   CksvAccess(_nodeGroupIDTable).push_back(groupID);
632
633   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
634   if(ptrq) {
635     void *pending;
636     while((pending=ptrq->deq())!=0)
637       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
638 //    delete ptrq;
639       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
640   }
641   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
642 }
643
644 void _createGroup(CkGroupID groupID, envelope *env)
645 {
646   _CHECK_USED(env);
647   _SET_USED(env, 1);
648   register int epIdx = env->getEpIdx();
649   int gIdx = _entryTable[epIdx]->chareIdx;
650   CkNodeGroupID rednMgr;
651   if(_chareTable[gIdx]->isIrr == 0){
652                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
653                 rednMgr = rednMgrProxy;
654 //              rednMgrProxy.setAttachedGroup(groupID);
655   }else{
656         rednMgr.setZero();
657   }
658   env->setGroupNum(groupID);
659   env->setSrcPe(CkMyPe());
660   env->setRednMgr(rednMgr);
661   env->setGroupEpoch(CkpvAccess(_charmEpoch));
662
663   if(CkNumPes()>1) {
664     CkPackMessage(&env);
665     CmiSetHandler(env, _bocHandlerIdx);
666     _numInitMsgs++;
667     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
668     CpvAccess(_qd)->create(CkNumPes()-1);
669     CkUnpackMessage(&env);
670   }
671   _STATS_RECORD_CREATE_GROUP_1();
672   CkCreateLocalGroup(groupID, epIdx, env);
673 }
674
675 void _createNodeGroup(CkGroupID groupID, envelope *env)
676 {
677   _CHECK_USED(env);
678   _SET_USED(env, 1);
679   register int epIdx = env->getEpIdx();
680   env->setGroupNum(groupID);
681   env->setSrcPe(CkMyPe());
682   env->setGroupEpoch(CkpvAccess(_charmEpoch));
683   if(CkNumNodes()>1) {
684     CkPackMessage(&env);
685     CmiSetHandler(env, _bocHandlerIdx);
686     _numInitMsgs++;
687     CksvAccess(_numInitNodeMsgs)++;
688     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
689     CpvAccess(_qd)->create(CkNumNodes()-1);
690     CkUnpackMessage(&env);
691   }
692   _STATS_RECORD_CREATE_NODE_GROUP_1();
693   CkCreateLocalNodeGroup(groupID, epIdx, env);
694 }
695
696 // new _groupCreate
697
698 static CkGroupID _groupCreate(envelope *env)
699 {
700   register CkGroupID groupNum;
701
702   // check CkMyPe(). if it is 0 then idx is _numGroups++
703   // if not, then something else...
704   if(CkMyPe() == 0)
705      groupNum.idx = CkpvAccess(_numGroups)++;
706   else
707      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
708   _createGroup(groupNum, env);
709   return groupNum;
710 }
711
712 // new _nodeGroupCreate
713 static CkGroupID _nodeGroupCreate(envelope *env)
714 {
715   register CkGroupID groupNum;
716   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
717   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
718           groupNum.idx = CksvAccess(_numNodeGroups)++;
719    else
720           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
721   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
722   _createNodeGroup(groupNum, env);
723   return groupNum;
724 }
725
726 /**** generate the group idx when group is creator pe is not pe0
727  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
728  **** remaining bits contain the group creator processor number and
729  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
730
731 int _getGroupIdx(int numNodes,int myNode,int numGroups)
732 {
733         int idx;
734         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
735         int n = 32 - (x+1);                                     // number of bits remaining for the index
736         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
737                                                                 // then add the next available index
738         // of course this won't work when int is 8 bytes long on T3E
739         //idx |= 0x80000000;                                      // set the most significant bit to 1
740         idx = - idx;
741                                                                 // if int is not 32 bits, wouldn't this be wrong?
742         return idx;
743 }
744
745 extern "C"
746 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
747 {
748   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
749   register envelope *env = UsrToEnv(msg);
750   env->setMsgtype(BocInitMsg);
751   env->setEpIdx(eIdx);
752   env->setSrcPe(CkMyPe());
753   _TRACE_CREATION_N(env, CkNumPes());
754   CkGroupID gid = _groupCreate(env);
755   _TRACE_CREATION_DONE(1);
756   return gid;
757 }
758
759 extern "C"
760 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
761 {
762   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
763   register envelope *env = UsrToEnv(msg);
764   env->setMsgtype(NodeBocInitMsg);
765   env->setEpIdx(eIdx);
766   env->setSrcPe(CkMyPe());
767   _TRACE_CREATION_N(env, CkNumNodes());
768   CkGroupID gid = _nodeGroupCreate(env);
769   _TRACE_CREATION_DONE(1);
770   return gid;
771 }
772
773 static inline void *_allocNewChare(envelope *env, int &idx)
774 {
775   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
776   void *tmp=malloc(_chareTable[chareIdx]->size);
777 #if CMK_FT_CHARE
778   CpvAccess(chare_objs).push_back(tmp);
779   idx = CpvAccess(chare_objs).size()-1;
780 #endif
781   _MEMCHECK(tmp);
782   setMemoryTypeChare(tmp);
783   return tmp;
784 }
785
786 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
787 {
788   int idx;
789   register void *obj = _allocNewChare(env, idx);
790 #if CMK_FT_CHARE
791   ((Chare *)obj)->chareIdx = idx;
792 #endif
793   _invokeEntry(env->getEpIdx(),env,obj);
794 }
795
796 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
797 {
798   int idx;
799   register void *obj = _allocNewChare(env, idx);
800   register CkChareID *pCid = (CkChareID *)
801       _allocMsg(FillVidMsg, sizeof(CkChareID));
802   pCid->onPE = CkMyPe();
803 #if CMK_FT_CHARE
804   pCid->objPtr = (void*)idx;
805 #else
806   pCid->objPtr = obj;
807 #endif
808   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
809   register envelope *ret = UsrToEnv(pCid);
810   ret->setVidPtr(env->getVidPtr());
811   register int srcPe = env->getSrcPe();
812   ret->setSrcPe(CkMyPe());
813   CmiSetHandler(ret, _charmHandlerIdx);
814   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
815   CpvAccess(_qd)->create();
816 #if CMK_FT_CHARE
817   ((Chare *)obj)->chareIdx = idx;
818 #endif
819   _invokeEntry(env->getEpIdx(),env,obj);
820 }
821
822 /************** Receive: Chares *************/
823
824 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
825 {
826   register int epIdx = env->getEpIdx();
827   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
828   register void *obj;
829   if (mainIdx != -1)  {
830     CmiAssert(CkMyPe()==0);
831     obj = _mainTable[mainIdx]->getObj();
832   }
833   else {
834 #if CMK_FT_CHARE
835     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
836       obj = CpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
837     else
838       obj = env->getObjPtr();
839 #else
840     obj = env->getObjPtr();
841 #endif
842   }
843   _invokeEntry(epIdx,env,obj);
844 }
845
846 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
847 {
848   register int epIdx = env->getEpIdx();
849   register void *obj = env->getObjPtr();
850   _invokeEntry(epIdx,env,obj);
851 }
852
853 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
854 {
855 #if CMK_FT_CHARE
856   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
857 #else
858   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
859   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
860 #endif
861   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
862   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
863   vptr->fill(pcid->onPE, pcid->objPtr);
864   CmiFree(env);
865 }
866
867 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
868 {
869 #if CMK_FT_CHARE
870   register VidBlock *vptr = CpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
871 #else
872   VidBlock *vptr = (VidBlock *) env->getVidPtr();
873   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
874 #endif
875   _SET_USED(env, 1);
876   vptr->send(env);
877 }
878
879 /************** Receive: Groups ****************/
880
881 /**
882  This message is sent to this groupID--prepare to
883  handle this message by looking up the group,
884  and possibly stashing the message.
885 */
886 IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
887 {
888
889         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
890         IrrGroup *obj = ck->localBranch(groupID);
891         if (obj==NULL) { /* groupmember not yet created: stash message */
892                 ck->getGroupTable()->find(groupID).enqMsg(env);
893         }
894         else { /* will be able to process message */
895                 ck->process();
896         }
897         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
898         return obj;
899 }
900
901 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
902 {
903 #if CMK_LBDB_ON
904   // if there is a running obj being measured, stop it temporarily
905   LDObjHandle objHandle;
906   int objstopped = 0;
907   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
908   if (the_lbdb->RunningObject(&objHandle)) {
909     objstopped = 1;
910     the_lbdb->ObjectStop(objHandle);
911   }
912 #endif
913   _invokeEntry(epIdx,env,obj);
914 #if CMK_LBDB_ON
915   if (objstopped) the_lbdb->ObjectStart(objHandle);
916 #endif
917   _STATS_RECORD_PROCESS_BRANCH_1();
918 }
919
920 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
921 {
922   register CkGroupID groupID =  env->getGroupNum();
923   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
924   if(obj) {
925     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
926   }
927 }
928
929 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
930 {
931   env->setMsgtype(ForChareMsg);
932   env->setObjPtr(obj);
933   _processForChareMsg(ck,env);
934   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
935 }
936
937 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
938 {
939   env->setEpIdx(epIdx);
940   _deliverForNodeBocMsg(ck,env, obj);
941 }
942
943 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
944 {
945   register CkGroupID groupID = env->getGroupNum();
946   register void *obj;
947
948   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
949   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
950   if(!obj) { // groupmember not yet created
951 #if CMK_IMMEDIATE_MSG
952     if (CmiIsImmediate(env))     // buffer immediate message
953       CmiDelayImmediate();
954     else
955 #endif
956     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
957     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
958     return;
959   }
960   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
961 #if CMK_IMMEDIATE_MSG
962   if (!CmiIsImmediate(env))
963 #endif
964   ck->process();
965   env->setMsgtype(ForChareMsg);
966   env->setObjPtr(obj);
967   _processForChareMsg(ck,env);
968   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
969 }
970
971 void _processBocInitMsg(CkCoreState *ck,envelope *env)
972 {
973   register CkGroupID groupID = env->getGroupNum();
974   register int epIdx = env->getEpIdx();
975   CkCreateLocalGroup(groupID, epIdx, env);
976 }
977
978 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
979 {
980   register CkGroupID groupID = env->getGroupNum();
981   register int epIdx = env->getEpIdx();
982   CkCreateLocalNodeGroup(groupID, epIdx, env);
983 }
984
985 /************** Receive: Arrays *************/
986
987 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
988   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
989   if (mgr) {
990     _SET_USED(env, 0);
991     mgr->insertElement((CkMessage *)EnvToUsr(env));
992   }
993 }
994 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
995   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
996   if (mgr) {
997     _SET_USED(env, 0);
998     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
999   }
1000 }
1001
1002 //BIGSIM_OOC DEBUGGING
1003 #define TELLMSGTYPE(x) //x
1004
1005 /**
1006  * This is the main converse-level handler used by all of Charm++.
1007  *
1008  * \addtogroup CriticalPathFramework
1009  */
1010 void _processHandler(void *converseMsg,CkCoreState *ck)
1011 {
1012   register envelope *env = (envelope *) converseMsg;
1013
1014 //#if CMK_RECORD_REPLAY
1015   if (ck->watcher!=NULL) {
1016     if (!ck->watcher->processMessage(env,ck)) return;
1017   }
1018 //#endif
1019 #ifdef _FAULT_MLOG_
1020         Chare *obj=NULL;
1021         CkObjID sender;
1022         MCount SN;
1023         MlogEntry *entry=NULL;
1024         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
1025         env->getMsgtype() == ForArrayEltMsg){
1026                 sender = env->sender;
1027                 SN = env->SN;
1028                 int result = preProcessReceivedMessage(env,&obj,&entry);
1029                 if(result == 0){
1030                         return;
1031                 }
1032         }
1033 #endif
1034
1035 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1036   //  CkPrintf("START\n");
1037   criticalPath_start(env);
1038 #endif
1039
1040
1041   switch(env->getMsgtype()) {
1042 // Group support
1043     case BocInitMsg :
1044       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1045       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1046       _processBocInitMsg(ck,env);
1047       break;
1048     case NodeBocInitMsg :
1049       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1050       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1051       _processNodeBocInitMsg(ck,env);
1052       break;
1053     case ForBocMsg :
1054       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1055       // QD processing moved inside _processForBocMsg because it is conditional
1056       if(env->isPacked()) CkUnpackMessage(&env);
1057       _processForBocMsg(ck,env);
1058       // stats record moved inside _processForBocMsg because it is conditional
1059       break;
1060     case ForNodeBocMsg :
1061       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1062       // QD processing moved to _processForNodeBocMsg because it is conditional
1063       if(env->isPacked()) CkUnpackMessage(&env);
1064       _processForNodeBocMsg(ck,env);
1065       // stats record moved to _processForNodeBocMsg because it is conditional
1066       break;
1067
1068 // Array support
1069     case ArrayEltInitMsg:
1070       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ArrayEltInitMsg\n", CkMyPe());)
1071       if(env->isPacked()) CkUnpackMessage(&env);
1072       _processArrayEltInitMsg(ck,env);
1073       break;
1074     case ForArrayEltMsg:
1075       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1076       if(env->isPacked()) CkUnpackMessage(&env);
1077       _processArrayEltMsg(ck,env);
1078       break;
1079
1080 // Chare support
1081     case NewChareMsg :
1082       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1083       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1084       _processNewChareMsg(ck,env);
1085       _STATS_RECORD_PROCESS_CHARE_1();
1086       break;
1087     case NewVChareMsg :
1088       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1089       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1090       _processNewVChareMsg(ck,env);
1091       _STATS_RECORD_PROCESS_CHARE_1();
1092       break;
1093     case ForChareMsg :
1094       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1095       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1096       _processForPlainChareMsg(ck,env);
1097       _STATS_RECORD_PROCESS_MSG_1();
1098       break;
1099     case ForVidMsg   :
1100       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1101       ck->process();
1102       _processForVidMsg(ck,env);
1103       break;
1104     case FillVidMsg  :
1105       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1106       ck->process();
1107       _processFillVidMsg(ck,env);
1108       break;
1109
1110     default:
1111       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1112   }
1113 #ifdef _FAULT_MLOG_
1114         if(obj != NULL){
1115                 postProcessReceivedMessage(obj,sender,SN,entry);
1116         }
1117 #endif
1118
1119
1120 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1121   criticalPath_end();
1122   //  CkPrintf("STOP\n");
1123 #endif
1124
1125
1126 }
1127
1128
1129 /******************** Message Send **********************/
1130
1131 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1132              int *queueing, int *priobits, unsigned int **prioptr)
1133 {
1134   register envelope *env = (envelope *)converseMsg;
1135   *pfn = (CldPackFn)CkPackMessage;
1136   *len = env->getTotalsize();
1137   *queueing = env->getQueueing();
1138   *priobits = env->getPriobits();
1139   *prioptr = (unsigned int *) env->getPrioPtr();
1140 }
1141
1142 void CkPackMessage(envelope **pEnv)
1143 {
1144   register envelope *env = *pEnv;
1145   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1146     register void *msg = EnvToUsr(env);
1147     _TRACE_BEGIN_PACK();
1148     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1149     _TRACE_END_PACK();
1150     env=UsrToEnv(msg);
1151     env->setPacked(1);
1152     *pEnv = env;
1153   }
1154 }
1155
1156 void CkUnpackMessage(envelope **pEnv)
1157 {
1158   register envelope *env = *pEnv;
1159   register int msgIdx = env->getMsgIdx();
1160   if(env->isPacked()) {
1161     register void *msg = EnvToUsr(env);
1162     _TRACE_BEGIN_UNPACK();
1163     msg = _msgTable[msgIdx]->unpack(msg);
1164     _TRACE_END_UNPACK();
1165     env=UsrToEnv(msg);
1166     env->setPacked(0);
1167     *pEnv = env;
1168   }
1169 }
1170
1171 //There's no reason for most messages to go through the Cld--
1172 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1173 // Thus these accellerated versions of the Cld calls.
1174
1175 int index_objectQHandler;
1176 int index_tokenHandler;
1177 static int index_skipCldHandler;
1178
1179 static void _skipCldHandler(void *converseMsg)
1180 {
1181   register envelope *env = (envelope *)(converseMsg);
1182   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1183 #if CMK_GRID_QUEUE_AVAILABLE
1184   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1185     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1186                        env, env->getQueueing (), env->getPriobits (),
1187                        (unsigned int *) env->getPrioPtr ());
1188   } else {
1189     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1190                        env, env->getQueueing (), env->getPriobits (),
1191                        (unsigned int *) env->getPrioPtr ());
1192   }
1193 #else
1194   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1195         env, env->getQueueing(),env->getPriobits(),
1196         (unsigned int *)env->getPrioPtr());
1197 #endif
1198 }
1199
1200
1201 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1202 // Made non-static to be used by ckmessagelogging
1203 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1204 {
1205   if(pe == CkMyPe() ){
1206     if(!CmiNodeAlive(CkMyPe())){
1207         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1208 //      return;
1209     }
1210   }
1211   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1212 #if CMK_OBJECT_QUEUE_AVAILABLE
1213     Chare *obj = CkFindObjectPtr(env);
1214     if (obj && obj->CkGetObjQueue().queue()) {
1215       _enqObjQueue(obj, env);
1216     }
1217     else
1218 #endif
1219     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1220         env, env->getQueueing(),env->getPriobits(),
1221         (unsigned int *)env->getPrioPtr());
1222   } else {
1223     CkPackMessage(&env);
1224     int len=env->getTotalsize();
1225     CmiSetXHandler(env,CmiGetHandler(env));
1226 #if CMK_OBJECT_QUEUE_AVAILABLE
1227     CmiSetHandler(env,index_objectQHandler);
1228 #else
1229     CmiSetHandler(env,index_skipCldHandler);
1230 #endif
1231     CmiSetInfo(env,infoFn);
1232     if (pe==CLD_BROADCAST) {
1233 #ifdef _FAULT_MLOG_             
1234                         CmiSyncBroadcast(len, (char *)env);
1235 #else
1236                         CmiSyncBroadcastAndFree(len, (char *)env); 
1237 #endif
1238
1239 }
1240     else if (pe==CLD_BROADCAST_ALL) { 
1241 #ifdef _FAULT_MLOG_             
1242                         CmiSyncBroadcastAll(len, (char *)env);
1243 #else
1244                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1245 #endif
1246
1247 }
1248     else{
1249 #ifdef _FAULT_MLOG_             
1250                         CmiSyncSend(pe, len, (char *)env);
1251 #else
1252                         CmiSyncSendAndFree(pe, len, (char *)env);
1253 #endif
1254
1255                 }
1256   }
1257 }
1258
1259 #if CMK_BLUEGENE_CHARM
1260 #   define  _skipCldEnqueue   CldEnqueue
1261 #endif
1262
1263 // by pass Charm++ priority queue, send as Converse message
1264 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1265 {
1266   CkPackMessage(&env);
1267   int len=env->getTotalsize();
1268   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1269 }
1270
1271 static void _noCldEnqueue(int pe, envelope *env)
1272 {
1273 /*
1274   if (pe == CkMyPe()) {
1275     CmiHandleMessage(env);
1276   } else
1277 */
1278   CkPackMessage(&env);
1279   int len=env->getTotalsize();
1280   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1281   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1282   else CmiSyncSendAndFree(pe, len, (char *)env);
1283 }
1284
1285 //static void _noCldNodeEnqueue(int node, envelope *env)
1286 //Made non-static to be used by ckmessagelogging
1287 void _noCldNodeEnqueue(int node, envelope *env)
1288 {
1289 /*
1290   if (node == CkMyNode()) {
1291     CmiHandleMessage(env);
1292   } else {
1293 */
1294   CkPackMessage(&env);
1295   int len=env->getTotalsize();
1296   if (node==CLD_BROADCAST) { 
1297 #ifdef _FAULT_MLOG_
1298         CmiSyncNodeBroadcast(len, (char *)env);
1299 #else
1300         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1301 #endif
1302 }
1303   else if (node==CLD_BROADCAST_ALL) { 
1304 #ifdef _FAULT_MLOG_
1305                 CmiSyncNodeBroadcastAll(len, (char *)env);
1306 #else
1307                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1308 #endif
1309
1310 }
1311   else {
1312 #ifdef _FAULT_MLOG_
1313         CmiSyncNodeSend(node, len, (char *)env);
1314 #else
1315         CmiSyncNodeSendAndFree(node, len, (char *)env);
1316 #endif
1317   }
1318 }
1319
1320 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1321 {
1322   register envelope *env = UsrToEnv(msg);
1323   _CHECK_USED(env);
1324   _SET_USED(env, 1);
1325   env->setMsgtype(ForChareMsg);
1326   env->setEpIdx(eIdx);
1327   env->setSrcPe(CkMyPe());
1328 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1329   criticalPath_send(env);
1330   automaticallySetMessagePriority(env);
1331 #endif
1332 #ifndef CMK_OPTIMIZE
1333   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1334 #endif
1335 #if CMK_OBJECT_QUEUE_AVAILABLE
1336   CmiSetHandler(env, index_objectQHandler);
1337 #else
1338   CmiSetHandler(env, _charmHandlerIdx);
1339 #endif
1340   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1341     register int pe = -(pCid->onPE+1);
1342     if(pe==CkMyPe()) {
1343 #if CMK_FT_CHARE
1344       VidBlock *vblk = CpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1345 #else
1346       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1347 #endif
1348       void *objPtr;
1349       if (NULL!=(objPtr=vblk->getLocalChare()))
1350       { //A ready local chare
1351         env->setObjPtr(objPtr);
1352         return pe;
1353       }
1354       else { //The vidblock is not ready-- forget it
1355         vblk->send(env);
1356         return -1;
1357       }
1358     } else { //Valid vidblock for another PE:
1359       env->setMsgtype(ForVidMsg);
1360       env->setVidPtr(pCid->objPtr);
1361       return pe;
1362     }
1363   }
1364   else {
1365     env->setObjPtr(pCid->objPtr);
1366     return pCid->onPE;
1367   }
1368 }
1369
1370 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1371 {
1372   int destPE = _prepareMsg(eIdx, msg, pCid);
1373   if (destPE != -1) {
1374     register envelope *env = UsrToEnv(msg);
1375 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1376     criticalPath_send(env);
1377     automaticallySetMessagePriority(env);
1378 #endif
1379     CmiBecomeImmediate(env);
1380   }
1381   return destPE;
1382 }
1383
1384 extern "C"
1385 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1386 {
1387   if (opts & CK_MSG_INLINE) {
1388     CkSendMsgInline(entryIdx, msg, pCid, opts);
1389     return;
1390   }
1391 #ifndef CMK_OPTIMIZE
1392   if (opts & CK_MSG_IMMEDIATE) {
1393     CmiAbort("Immediate message is not allowed in Chare!");
1394   }
1395 #endif
1396   register envelope *env = UsrToEnv(msg);
1397   int destPE=_prepareMsg(entryIdx,msg,pCid);
1398   if (destPE!=-1) {
1399     _TRACE_CREATION_1(env);
1400     CpvAccess(_qd)->create();
1401     if (opts & CK_MSG_SKIP_OR_IMM)
1402       _noCldEnqueue(destPE, env);
1403     else
1404       CldEnqueue(destPE, env, _infoIdx);
1405     _TRACE_CREATION_DONE(1);
1406   }
1407 }
1408
1409 extern "C"
1410 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1411 {
1412   if (pCid->onPE==CkMyPe())
1413   {
1414     if(!CmiNodeAlive(CkMyPe())){
1415         return;
1416     }
1417 #ifndef CMK_OPTIMIZE
1418     //Just in case we need to breakpoint or use the envelope in some way
1419     _prepareMsg(entryIndex,msg,pCid);
1420 #endif
1421                 //Just directly call the chare (skip QD handling & scheduler)
1422     register envelope *env = UsrToEnv(msg);
1423     if (env->isPacked()) CkUnpackMessage(&env);
1424     _STATS_RECORD_PROCESS_MSG_1();
1425     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1426   }
1427   else {
1428     //No way to inline a cross-processor message:
1429     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
1430   }
1431 }
1432
1433 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1434 {
1435   register envelope *env = UsrToEnv(msg);
1436   _CHECK_USED(env);
1437   _SET_USED(env, 1);
1438   env->setMsgtype(type);
1439   env->setEpIdx(eIdx);
1440   env->setGroupNum(gID);
1441   env->setSrcPe(CkMyPe());
1442 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1443   criticalPath_send(env);
1444   automaticallySetMessagePriority(env);
1445 #endif
1446 #ifndef CMK_OPTIMIZE
1447   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1448 #endif
1449   CmiSetHandler(env, _charmHandlerIdx);
1450   return env;
1451 }
1452
1453 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1454 {
1455   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1456 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1457   criticalPath_send(env);
1458   automaticallySetMessagePriority(env);
1459 #endif
1460   CmiBecomeImmediate(env);
1461   return env;
1462 }
1463
1464 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1465                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1466 {
1467   int numPes;
1468   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1469 #ifdef _FAULT_MLOG_
1470         sendTicketGroupRequest(env,pe,_infoIdx);
1471 #else
1472   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1473   _TRACE_CREATION_N(env, numPes);
1474   if (opts & CK_MSG_SKIP_OR_IMM)
1475     _noCldEnqueue(pe, env);
1476   else
1477     _skipCldEnqueue(pe, env, _infoIdx);
1478   _TRACE_CREATION_DONE(1);
1479 #endif
1480 }
1481
1482 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1483                            int npes, int *pes)
1484 {
1485   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1486   _TRACE_CREATION_MULTICAST(env, npes, pes);
1487   CldEnqueueMulti(npes, pes, env, _infoIdx);
1488   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1489 }
1490
1491 extern "C"
1492 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1493 {
1494 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1495   if (destPE==CkMyPe())
1496   {
1497     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1498     return;
1499   }
1500   //Can't inline-- send the usual way
1501   register envelope *env = UsrToEnv(msg);
1502   int numPes;
1503   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1504   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1505   _TRACE_CREATION_N(env, numPes);
1506   _noCldEnqueue(destPE, env);
1507   _STATS_RECORD_SEND_BRANCH_1();
1508   CkpvAccess(_coreState)->create();
1509   _TRACE_CREATION_DONE(1);
1510 #else
1511   // no support for immediate message, send inline
1512   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1513 #endif
1514 }
1515
1516 extern "C"
1517 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1518 {
1519   if (destPE==CkMyPe())
1520   {
1521     if(!CmiNodeAlive(CkMyPe())){
1522         return;
1523     }
1524     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1525     if (obj!=NULL)
1526     { //Just directly call the group:
1527 #ifndef CMK_OPTIMIZE
1528       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1529 #else
1530       envelope *env=UsrToEnv(msg);
1531 #endif
1532       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1533       return;
1534     }
1535   }
1536   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1537   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
1538 }
1539
1540 extern "C"
1541 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1542 {
1543   if (opts & CK_MSG_INLINE) {
1544     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1545     return;
1546   }
1547   if (opts & CK_MSG_IMMEDIATE) {
1548     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1549     return;
1550   }
1551   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1552   _STATS_RECORD_SEND_BRANCH_1();
1553   CkpvAccess(_coreState)->create();
1554 }
1555
1556 extern "C"
1557 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1558 {
1559 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1560   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1561   _TRACE_CREATION_MULTICAST(env, npes, pes);
1562   _noCldEnqueueMulti(npes, pes, env);
1563   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1564 #else
1565   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1566   CpvAccess(_qd)->create(-npes);
1567 #endif
1568   _STATS_RECORD_SEND_BRANCH_N(npes);
1569   CpvAccess(_qd)->create(npes);
1570 }
1571
1572 extern "C"
1573 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1574 {
1575   if (opts & CK_MSG_IMMEDIATE) {
1576     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1577     return;
1578   }
1579     // normal mesg
1580   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1581   _STATS_RECORD_SEND_BRANCH_N(npes);
1582   CpvAccess(_qd)->create(npes);
1583 }
1584
1585 extern "C"
1586 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1587 {
1588   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1589   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1590   CpvAccess(_qd)->create(CkNumPes());
1591 }
1592
1593 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1594                 int node=CLD_BROADCAST_ALL, int opts=0)
1595 {
1596   int numPes;
1597   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1598 #ifdef _FAULT_MLOG_
1599         sendTicketNodeGroupRequest(env,node,_infoIdx);
1600 #else
1601   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1602   _TRACE_CREATION_N(env, numPes);
1603   if (opts & CK_MSG_SKIP_OR_IMM) {
1604     _noCldNodeEnqueue(node, env);
1605     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1606       CkpvAccess(_coreState)->create(-numPes);
1607     }
1608   }
1609   else
1610     CldNodeEnqueue(node, env, _infoIdx);
1611   _TRACE_CREATION_DONE(1);
1612 #endif
1613 }
1614
1615 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1616                            int npes, int *nodes)
1617 {
1618   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1619   _TRACE_CREATION_N(env, npes);
1620   for (int i=0; i<npes; i++) {
1621     CldNodeEnqueue(nodes[i], env, _infoIdx);
1622   }
1623   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1624 }
1625
1626 extern "C"
1627 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1628 {
1629 #if CMK_IMMEDIATE_MSG
1630   if (node==CkMyNode())
1631   {
1632     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1633     return;
1634   }
1635   //Can't inline-- send the usual way
1636   register envelope *env = UsrToEnv(msg);
1637   int numPes;
1638   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1639   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1640   _TRACE_CREATION_N(env, numPes);
1641   _noCldNodeEnqueue(node, env);
1642   _STATS_RECORD_SEND_BRANCH_1();
1643   /* immeidate message is invisible to QD */
1644 //  CkpvAccess(_coreState)->create();
1645   _TRACE_CREATION_DONE(1);
1646 #else
1647   // no support for immediate message, send inline
1648   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1649 #endif
1650 }
1651
1652 extern "C"
1653 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1654 {
1655   if (node==CkMyNode())
1656   {
1657     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1658     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1659     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1660     if (obj!=NULL)
1661     { //Just directly call the group:
1662 #ifndef CMK_OPTIMIZE
1663       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1664 #else
1665       envelope *env=UsrToEnv(msg);
1666 #endif
1667       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1668       return;
1669     }
1670   }
1671   //Can't inline-- send the usual way
1672   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
1673 }
1674
1675 extern "C"
1676 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1677 {
1678   if (opts & CK_MSG_INLINE) {
1679     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1680     return;
1681   }
1682   if (opts & CK_MSG_IMMEDIATE) {
1683     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1684     return;
1685   }
1686   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1687   _STATS_RECORD_SEND_NODE_BRANCH_1();
1688   CkpvAccess(_coreState)->create();
1689 }
1690
1691 extern "C"
1692 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1693 {
1694 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1695   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1696   _noCldEnqueueMulti(npes, nodes, env);
1697 #else
1698   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1699   CpvAccess(_qd)->create(-npes);
1700 #endif
1701   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1702   CpvAccess(_qd)->create(npes);
1703 }
1704
1705 extern "C"
1706 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1707 {
1708   if (opts & CK_MSG_IMMEDIATE) {
1709     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1710     return;
1711   }
1712     // normal mesg
1713   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1714   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1715   CpvAccess(_qd)->create(npes);
1716 }
1717
1718 extern "C"
1719 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1720 {
1721   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1722   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1723   CpvAccess(_qd)->create(CkNumNodes());
1724 }
1725
1726 //Needed by delegation manager:
1727 extern "C"
1728 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1729 { return _prepareMsg(eIdx,msg,pCid); }
1730 extern "C"
1731 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1732 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1733 extern "C"
1734 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1735 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1736
1737 void _ckModuleInit(void) {
1738         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1739         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1740         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1741         CkpvInitialize(TokenPool*, _tokenPool);
1742         CkpvAccess(_tokenPool) = new TokenPool;
1743 }
1744
1745
1746 /************** Send: Arrays *************/
1747
1748 extern void CkArrayManagerInsert(int onPe,void *msg);
1749 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1750
1751 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1752 {
1753   _CHECK_USED(env);
1754   _SET_USED(env, 1);
1755   env->setMsgtype(type);
1756 #ifndef CMK_OPTIMIZE
1757   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1758 #endif
1759   CmiSetHandler(env, _charmHandlerIdx);
1760   CpvAccess(_qd)->create();
1761 }
1762
1763 extern "C"
1764 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1765   register envelope *env = UsrToEnv(msg);
1766   env->getsetArrayMgr()=aID;
1767   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1768   CldEnqueue(pe, env, _infoIdx);
1769 }
1770
1771 extern "C"
1772 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1773   register envelope *env = UsrToEnv(msg);
1774   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1775 #ifdef _FAULT_MLOG_
1776         sendTicketArrayRequest(env,pe,_infoIdx);
1777 #else
1778   if (opts & CK_MSG_IMMEDIATE)
1779     CmiBecomeImmediate(env);
1780   if (opts & CK_MSG_SKIP_OR_IMM)
1781     _noCldEnqueue(pe, env);
1782   else
1783     _skipCldEnqueue(pe, env, _infoIdx);
1784 #endif
1785 }
1786
1787 class ElementDestroyer : public CkLocIterator {
1788 private:
1789         CkLocMgr *locMgr;
1790 public:
1791         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1792         void addLocation(CkLocation &loc) {
1793           loc.destroyAll();
1794         }
1795 };
1796
1797 void CkDeleteChares() {
1798   int i;
1799   int numGroups = CkpvAccess(_groupIDTable)->size();
1800
1801   // delete all array elements
1802   for(i=0;i<numGroups;i++) {
1803     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1804     if(obj && obj->isLocMgr())  {
1805       CkLocMgr *mgr = (CkLocMgr*)obj;
1806       ElementDestroyer destroyer(mgr);
1807       mgr->iterate(destroyer);
1808 printf("[%d] DELETE!\n", CkMyPe());
1809     }
1810   }
1811
1812   // delete all groups
1813   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1814   for(i=0;i<numGroups;i++) {
1815     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1816     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1817     if (obj) delete obj;
1818   }
1819   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1820
1821   // delete all node groups
1822   if (CkMyRank() == 0) {
1823     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1824     for(i=0;i<numNodeGroups;i++) {
1825       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1826       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1827       if (obj) delete obj;
1828     }
1829   }
1830 }
1831
1832 //------------------- Message Watcher (record/replay) ----------------
1833
1834 #include "crc32.h"
1835 CkMessageWatcher::~CkMessageWatcher() {}
1836
1837 class CkMessageRecorder : public CkMessageWatcher {
1838 public:
1839   CkMessageRecorder(FILE *f_) { f=f_; }
1840   ~CkMessageRecorder() {
1841     fprintf(f,"-1 -1 -1");
1842     fclose(f);
1843   }
1844
1845 private:
1846   virtual CmiBool process(envelope *env,CkCoreState *ck) {
1847     if (env->getEvent()) {
1848       bool wasPacked = env->isPacked();
1849       if (!wasPacked) CkPackMessage(&env);
1850       //unsigned int crc = crc32(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
1851       unsigned int crc1 = crc32(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
1852       unsigned int crc2 = crc32(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
1853       fprintf(f,"%d %d %d %hhd %x %x\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2);
1854       if (!wasPacked) CkUnpackMessage(&env);
1855     }
1856     return CmiTrue;
1857   }
1858 };
1859
1860 class CkMessageDetailRecorder : public CkMessageWatcher {
1861 public:
1862   CkMessageDetailRecorder(FILE *f_) {
1863     f=f_;
1864     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
1865      * The value of 'x' is the pointer size.
1866      */
1867     CmiUInt2 little = sizeof(void*);
1868     fwrite(&little, 2, 1, f);
1869   }
1870   ~CkMessageDetailRecorder() {fclose(f);}
1871 private:
1872   virtual CmiBool process(envelope *env, CkCoreState *ck) {
1873     bool wasPacked = env->isPacked();
1874     if (!wasPacked) CkPackMessage(&env);
1875     CmiUInt4 size = env->getTotalsize();
1876     fwrite(&size, 4, 1, f);
1877     fwrite(env, env->getTotalsize(), 1, f);
1878     if (!wasPacked) CkUnpackMessage(&env);
1879     return CmiTrue;
1880   }
1881 };
1882
1883 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1884 #define REPLAYDEBUG(args) /* empty */
1885
1886 class CkMessageReplay : public CkMessageWatcher {
1887         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1888         unsigned int crc1, crc2;
1889         /// Read the next message we need from the file:
1890         void getNext(void) {
1891                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
1892                         // CkAbort("CkMessageReplay> Syntax error reading replay file");
1893                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1894                 }
1895         }
1896         /// If this is the next message we need, advance and return CmiTrue.
1897         CmiBool isNext(envelope *env) {
1898                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1899                 if (nextEvent!=env->getEvent()) return CmiFalse;
1900                 if (nextSize!=env->getTotalsize())
1901                 {
1902                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]\n", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1903                         return CmiFalse;
1904                 }
1905                 bool wasPacked = env->isPacked();
1906                 if (!wasPacked) CkPackMessage(&env);
1907                 //unsigned int crcnew = crc32(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
1908                 unsigned int crcnew1 = crc32(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
1909                 unsigned int crcnew2 = crc32(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
1910                 if (crcnew1 != crc1) {
1911                   CkPrintf("CkMessageReplay> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",crc1,crcnew1);
1912                 }
1913         if (crcnew2 != crc2) {
1914           CkPrintf("CkMessageReplay> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",crc2,crcnew2);
1915         }
1916         if (!wasPacked) CkUnpackMessage(&env);
1917                 return CmiTrue;
1918         }
1919
1920         /// This is a (short) list of messages we aren't yet ready for:
1921         CkQ<envelope *> delayed;
1922
1923         /// Try to flush out any delayed messages
1924         void flush(void) {
1925                 int len=delayed.length();
1926                 for (int i=0;i<len;i++) {
1927                         envelope *env=delayed.deq();
1928                         if (isNext(env)) { /* this is the next message: process it */
1929                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1930                                 CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1931                                 return;
1932                         }
1933                         else /* Not ready yet-- put it back in the
1934                                 queue */
1935                           {
1936                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1937                                 delayed.enq(env);
1938                           }
1939                 }
1940         }
1941
1942 public:
1943         CkMessageReplay(FILE *f_) {
1944           f=f_;
1945           getNext();
1946           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
1947         }
1948         ~CkMessageReplay() {fclose(f);}
1949
1950 private:
1951         virtual CmiBool process(envelope *env,CkCoreState *ck) {
1952           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
1953                 if (env->getEvent() == 0) return CmiTrue;
1954                 if (isNext(env)) { /* This is the message we were expecting */
1955                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1956                         getNext(); /* Advance over this message */
1957                         flush(); /* try to process queued-up stuff */
1958                         return CmiTrue;
1959                 }
1960 #if CMK_SMP
1961                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
1962                          // try next rank, we can't just buffer the msg and left
1963                          // we need to keep unprocessed msg on the fly
1964                         int nextpe = CkMyPe()+1;
1965                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
1966                         nextpe = CkNodeFirst(CkMyNode());
1967                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
1968                         return CmiFalse;
1969                 }
1970 #endif
1971                 else /*!isNext(env) */ {
1972                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
1973                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
1974                         delayed.enq(env);
1975                         flush();
1976                         return CmiFalse;
1977                 }
1978         }
1979 };
1980
1981 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
1982
1983 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
1984
1985         int i;
1986         char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
1987         strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
1988         sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
1989         FILE *f=fopen(fName,permissions);
1990         REPLAYDEBUG("openReplayfile "<<fName);
1991         if (f==NULL) {
1992                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
1993                         CkMyPe(),fName,permissions);
1994                 CkAbort("openReplayFile> Could not open replay file");
1995         }
1996         return f;
1997 }
1998
1999 #include "ckliststring.h"
2000 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2001     char *procs = NULL;
2002         REPLAYDEBUG("CkMessageWaterInit ");
2003     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2004         CkListString list(procs);
2005         if (list.includes(CkMyPe())) {
2006           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2007         }
2008     }
2009         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2010                 ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2011         }
2012         if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream")) {
2013                 ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2014         }
2015         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Re-play the specified processors from recorded message content")) {
2016           /*Nothing yet*/
2017         }
2018 }
2019
2020 extern "C"
2021 int CkMessageToEpIdx(void *msg) {
2022         envelope *env=UsrToEnv(msg);
2023         int ep=env->getEpIdx();
2024         if (ep==CkIndex_CkArray::recvBroadcast(0))
2025                 return env->getsetArrayBcastEp();
2026         else
2027                 return ep;
2028 }
2029
2030 extern "C"
2031 int getCharmEnvelopeSize() {
2032   return sizeof(envelope);
2033 }
2034
2035
2036 #include "CkMarshall.def.h"
2037