Adding optional support for critical path detection(currently disabled by default...
[charm.git] / src / ck-core / ck.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /**
8 \addtogroup Ck
9
10 These routines implement a basic remote-method-invocation system
11 consisting of chares and groups.  There is no migration. All
12 the bindings are written to the C language, although most
13 clients, including the rest of Charm++, are actually C++.
14 */
15 #include "ck.h"
16 #include "trace.h"
17 #include "queueing.h"
18
19 #if CMK_LBDB_ON
20 #include "LBDatabase.h"
21 #endif // CMK_LBDB_ON
22
23 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
24
25
26
27
28
29 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
30 // store the pointer to the currently executing msg
31 // see envelope.h
32 // TODO: convert to CkPv
33 envelope * currentlyExecutingMsg = NULL;
34 bool thisMethodSentAMessage = false;
35 double timeEntryMethodStarted = 0.0;
36
37 void resetCricitalPathDetection(){
38   CkAbort("shouldn't be called anymore. Deprecated");
39   
40   // First we should register this currently executing message as a path, because it is likely an important one to consider.
41   registerTerminalEntryMethod();
42
43   // Print the Critical path known to this PE
44   if(CkMyPe() == 0){
45     CkPrintf("[pe=%d] Critical paths discovered on this pe (in resetCriticalPathCounts() ):\n", CkMyPe());
46     printPECriticalPath();
47   }
48   
49   // Reset the counts for the critical path on this PE
50   resetPECriticalPath();
51   
52   // Reset the counts for the currently executing message
53   resetThisEntryPath();
54   
55 }
56
57 void resetThisEntryPath(){
58   // Reset the counts for the currently executing message
59   if(currentlyExecutingMsg != NULL){
60     currentlyExecutingMsg->resetEpIdxHistory();
61   }
62 }
63
64
65 #endif
66
67
68 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
69
70 int CMessage_CkMessage::__idx=-1;
71 int CMessage_CkArgMsg::__idx=0;
72 int CkIndex_Chare::__idx;
73 int CkIndex_Group::__idx;
74 int CkIndex_ArrayBase::__idx=-1;
75
76 extern int _defaultObjectQ;
77
78 //Charm++ virtual functions: declaring these here results in a smaller executable
79 Chare::Chare(void) {
80   thishandle.onPE=CkMyPe();
81   thishandle.objPtr=this;
82 #if CMK_OBJECT_QUEUE_AVAILABLE
83   if (_defaultObjectQ)  CkEnableObjQ();
84 #endif
85 }
86
87 Chare::Chare(CkMigrateMessage* m) {
88   thishandle.onPE=CkMyPe();
89   thishandle.objPtr=this;
90 #if CMK_OBJECT_QUEUE_AVAILABLE
91   if (_defaultObjectQ)  CkEnableObjQ();
92 #endif
93 }
94
95 void Chare::CkEnableObjQ()
96 {
97 #if CMK_OBJECT_QUEUE_AVAILABLE
98   objQ.create();
99 #endif
100 }
101
102 Chare::~Chare() {}
103
104 void Chare::pup(PUP::er &p)
105 {
106   p(thishandle.onPE);
107   thishandle.objPtr=(void *)this;
108 }
109
110 int Chare::ckGetChareType() const {
111   return -3;
112 }
113 char *Chare::ckDebugChareName(void) {
114   char buf[100];
115   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
116   return strdup(buf);
117 }
118 int Chare::ckDebugChareID(char *str, int limit) {
119   // pure chares for now do not have a valid ID
120   str[0] = 0;
121   return 1;
122 }
123 void Chare::ckDebugPup(PUP::er &p) {
124   pup(p);
125 }
126
127 /// This method is called before starting a [threaded] entry method.
128 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
129   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
130   traceAddThreadListeners(th, UsrToEnv(msg));
131 }
132
133
134 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
135   p.comment("Message has no debug pup routine.  Bytes:");
136   int ts=UsrToEnv(msg)->getTotalsize();
137   int msgLen=ts-sizeof(envelope);
138   if (msgLen>0)
139     p((char*)msg,msgLen);
140 }
141
142 IrrGroup::IrrGroup(void) {
143   thisgroup = CkpvAccess(_currentGroup);
144 }
145
146 IrrGroup::~IrrGroup() {
147   // remove the object pointer
148   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
149   CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
150   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
151 }
152
153 void IrrGroup::pup(PUP::er &p)
154 {
155   Chare::pup(p);
156   p|thisgroup;
157 }
158
159 int IrrGroup::ckGetChareType() const {
160   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
161 }
162
163 int IrrGroup::ckDebugChareID(char *str, int limit) {
164   if (limit<5) return -1;
165   str[0] = 1;
166   *((int*)&str[1]) = thisgroup.idx;
167   return 5;
168 }
169
170 char *IrrGroup::ckDebugChareName() {
171   return strdup(_chareTable[ckGetChareType()]->name);
172 }
173
174 void IrrGroup::ckJustMigrated(void)
175 {
176 }
177
178 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
179   /* FIXME: **CW** not entirely sure what we should do here yet */
180 }
181
182 void Group::CkAddThreadListeners(CthThread th, void *msg) {
183   Chare::CkAddThreadListeners(th, msg);
184   CthSetThreadID(th, thisgroup.idx, 0, 0);
185 }
186
187 void Group::pup(PUP::er &p)
188 {
189   CkReductionMgr::pup(p);
190   reductionInfo.pup(p);
191 }
192
193 /**** Delegation Manager Group */
194 CkDelegateMgr::~CkDelegateMgr() { }
195
196 //Default delegator implementation: do not delegate-- send directly
197 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
198   { CkSendMsg(ep,m,c); }
199 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
200   { CkSendMsgBranch(ep,m,onPE,g); }
201 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
202   { CkBroadcastMsgBranch(ep,m,g); }
203 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
204   { CkSendMsgNodeBranch(ep,m,onNode,g); }
205 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
206   { CkBroadcastMsgNodeBranch(ep,m,g); }
207 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,int onPE,CkArrayID a)
208 {
209         CProxyElement_ArrayBase ap(a,idx);
210         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
211 }
212 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndexMax &idx,CkArrayID a)
213 {
214         CProxyElement_ArrayBase ap(a,idx);
215         ap.ckSend((CkArrayMessage *)m,ep);
216 }
217 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
218 {
219         CProxy_ArrayBase ap(a);
220         ap.ckBroadcast((CkArrayMessage *)m,ep);
221 }
222
223 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, CkArrayID a,CkSectionID &s, int opts)
224 {
225         CmiAbort("ArraySectionSend is not implemented!\n");
226 /*
227         CProxyElement_ArrayBase ap(a,idx);
228         ap.ckSend((CkArrayMessage *)m,ep);
229 */
230 }
231
232 /*** Proxy <-> delegator communication */
233 CkDelegateData::~CkDelegateData() {}
234
235 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
236   return pd; // default implementation ignores pup call
237 }
238
239 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
240    this tricky manual reference counting business... */
241
242 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
243         if (dPtr) dPtr->ref();
244         ckUndelegate();
245         delegatedMgr = dTo;
246         delegatedPtr = dPtr;
247 }
248 void CProxy::ckUndelegate(void) {
249         delegatedMgr=NULL;
250         if (delegatedPtr) delegatedPtr->unref();
251         delegatedPtr=NULL;
252 }
253
254 /// Copy constructor
255 CProxy::CProxy(const CProxy &src)
256     :delegatedMgr(src.delegatedMgr)
257 {
258     delegatedPtr = NULL;
259     if(delegatedMgr != NULL && src.delegatedPtr != NULL)
260         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
261 }
262
263 /// Assignment operator
264 CProxy& CProxy::operator=(const CProxy &src) {
265         CkDelegateData *oldPtr=delegatedPtr;
266         ckUndelegate();
267         delegatedMgr=src.delegatedMgr;
268
269         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
270             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
271         else
272             delegatedPtr = NULL;
273
274         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
275         if (oldPtr) oldPtr->unref();
276         return *this;
277 }
278
279 void CProxy::pup(PUP::er &p) {
280       CkGroupID delegatedTo;
281       delegatedTo.setZero();
282       int isNodeGroup = 0;
283       if (!p.isUnpacking()) {
284         if (delegatedMgr) {
285           delegatedTo = delegatedMgr->CkGetGroupID();
286           isNodeGroup = delegatedMgr->isNodeGroup();
287         }
288       }
289       p|delegatedTo;
290       if (!delegatedTo.isZero()) {
291         p|isNodeGroup;
292         if (p.isUnpacking()) {
293           if (isNodeGroup)
294                 delegatedMgr=(CkDelegateMgr *)CkLocalNodeBranch(delegatedTo);
295           else
296                 delegatedMgr=(CkDelegateMgr *)CkLocalBranch(delegatedTo);
297         }
298
299         delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
300         if (p.isUnpacking() && delegatedPtr)
301             delegatedPtr->ref();
302       }
303 }
304
305 /**** Array sections */
306 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
307 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
308   _cookie.aid = aid;    \
309   _cookie.get_pe() = CkMyPe();  \
310   _elems = new CkArrayIndexMax[nElems]; \
311   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
312   pelist = NULL;        \
313   npes  = 0;    \
314 }
315
316 CKSECTIONID_CONSTRUCTOR_DEF(1D)
317 CKSECTIONID_CONSTRUCTOR_DEF(2D)
318 CKSECTIONID_CONSTRUCTOR_DEF(3D)
319 CKSECTIONID_CONSTRUCTOR_DEF(4D)
320 CKSECTIONID_CONSTRUCTOR_DEF(5D)
321 CKSECTIONID_CONSTRUCTOR_DEF(6D)
322 CKSECTIONID_CONSTRUCTOR_DEF(Max)
323
324 CkSectionID::CkSectionID(const CkSectionID &sid) {
325   _cookie = sid._cookie;
326   _nElems = sid._nElems;
327   _elems = new CkArrayIndexMax[_nElems];
328   for (int i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
329   pelist = NULL;
330   npes = 0;
331 }
332
333 void CkSectionID::operator=(const CkSectionID &sid) {
334   _cookie = sid._cookie;
335   _nElems = sid._nElems;
336   _elems = new CkArrayIndexMax[_nElems];
337   for (int i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
338 }
339
340 CkSectionID::~CkSectionID() {
341     delete [] _elems;
342     if(pelist != NULL)
343         delete [] pelist;
344 }
345
346 void CkSectionID::pup(PUP::er &p) {
347     p | _cookie;
348     p(_nElems);
349     if (p.isUnpacking()) _elems = new CkArrayIndexMax[_nElems];
350     for (int i=0; i< _nElems; i++) p | _elems[i];
351 }
352
353 /**** Tiny random API routines */
354
355 #ifdef CMK_CUDA
356 void CUDACallbackManager(void *fn) {
357   if (fn != NULL) {
358     CkCallback *cb = (CkCallback*) fn;
359     cb->send();
360   }
361 }
362
363 #endif
364
365 extern "C"
366 void CkSetRefNum(void *msg, int ref)
367 {
368   UsrToEnv(msg)->setRef(ref);
369 }
370
371 extern "C"
372 int CkGetRefNum(void *msg)
373 {
374   return UsrToEnv(msg)->getRef();
375 }
376
377 extern "C"
378 int CkGetSrcPe(void *msg)
379 {
380   return UsrToEnv(msg)->getSrcPe();
381 }
382
383 extern "C"
384 int CkGetSrcNode(void *msg)
385 {
386   return CmiNodeOf(CkGetSrcPe(msg));
387 }
388
389 extern "C"
390 void *CkLocalBranch(CkGroupID gID) {
391   return _localBranch(gID);
392 }
393
394 static
395 void *_ckLocalNodeBranch(CkGroupID groupID) {
396   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
397   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
398   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
399   return retval;
400 }
401
402 extern "C"
403 void *CkLocalNodeBranch(CkGroupID groupID)
404 {
405   void *retval;
406   // we are called in a constructor
407   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
408     return CkpvAccess(_currentNodeGroupObj);
409   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
410   { // Nodegroup hasn't finished being created yet-- schedule...
411     CsdScheduler(0);
412   }
413   return retval;
414 }
415
416 extern "C"
417 void *CkLocalChare(const CkChareID *pCid)
418 {
419         int pe=pCid->onPE;
420         if (pe<0) { //A virtual chare ID
421                 if (pe!=(-(CkMyPe()+1)))
422                         return NULL;//VID block not on this PE
423                 VidBlock *v=(VidBlock *)pCid->objPtr;
424                 return v->getLocalChare();
425         }
426         else
427         { //An ordinary chare ID
428                 if (pe!=CkMyPe())
429                         return NULL;//Chare not on this PE
430                 return pCid->objPtr;
431         }
432 }
433
434 CkpvDeclare(char **,Ck_argv);
435 extern "C" char **CkGetArgv(void) {
436         return CkpvAccess(Ck_argv);
437 }
438 extern "C" int CkGetArgc(void) {
439         return CmiGetArgc(CkpvAccess(Ck_argv));
440 }
441
442 /******************** Basic support *****************/
443 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
444 {
445 #ifndef CMK_OPTIMIZE
446   CpdBeforeEp(epIdx, obj);
447 #endif
448   _entryTable[epIdx]->call(msg, obj);
449 #ifndef CMK_OPTIMIZE
450   CpdAfterEp(epIdx);
451 #endif
452   if (_entryTable[epIdx]->noKeep)
453   { /* Method doesn't keep/delete the message, so we have to: */
454      CkFreeMsg(msg);
455   }
456 }
457 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
458 {
459   void *deliverMsg;
460   if (_entryTable[epIdx]->noKeep)
461   { /* Deliver a read-only copy of the message */
462     deliverMsg=(void *)msg;
463   } else
464   { /* Method needs a copy of the message to keep/delete */
465     void *oldMsg=(void *)msg;
466     deliverMsg=CkCopyMsg(&oldMsg);
467 #ifndef CMK_OPTIMIZE
468     if (oldMsg!=msg)
469       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
470 #endif
471   }
472 #ifndef CMK_OPTIMIZE
473   CpdBeforeEp(epIdx, obj);
474 #endif
475   _entryTable[epIdx]->call(deliverMsg, obj);
476 #ifndef CMK_OPTIMIZE
477   CpdAfterEp(epIdx);
478 #endif
479 }
480
481 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
482 {
483   register void *msg = EnvToUsr(env);
484   _SET_USED(env, 0);
485   CkDeliverMessageFree(epIdx,msg,obj);
486 }
487
488 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
489 {
490
491
492 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
493   // store the pointer to the currently executing msg
494   currentlyExecutingMsg = env; 
495   thisMethodSentAMessage = false;
496
497   // Increase the counts for the entry that we are about to execute
498   env->updateCounts();
499   
500   // Increase the reference count for the message so the user won't delete it
501   CmiReference(env);
502   timeEntryMethodStarted = CmiWallTimer();
503
504 #endif
505
506
507
508 #ifndef CMK_OPTIMIZE /* Consider tracing: */
509   if (_entryTable[epIdx]->traceEnabled) {
510     _TRACE_BEGIN_EXECUTE(env);
511     _invokeEntryNoTrace(epIdx,env,obj);
512     _TRACE_END_EXECUTE();
513   }
514   else
515 #endif
516     _invokeEntryNoTrace(epIdx,env,obj);
517
518
519 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
520  double timeEntryMethodEnded = CmiWallTimer();
521  env->pathHistory.incrementTotalTime(timeEntryMethodEnded-timeEntryMethodStarted);
522  if(!thisMethodSentAMessage){
523     registerTerminalEntryMethod();
524   }
525
526   CmiFree(env); // free the message, because we incremented its reference count above
527
528   // set to NULL the pointer to the currently executing msg
529   currentlyExecutingMsg = NULL; 
530   //  CkPrintf("This entry method is %s\n", (int)thisMethodSentAMessage?"non-terminal":"terminal");
531
532 #endif
533
534 }
535
536 /********************* Creation ********************/
537
538 extern "C"
539 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
540 {
541   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
542   envelope *env = UsrToEnv(msg);
543   _CHECK_USED(env);
544   if(pCid == 0) {
545     env->setMsgtype(NewChareMsg);
546   } else {
547     pCid->onPE = (-(CkMyPe()+1));
548     //  pCid->magic = _GETIDX(cIdx);
549     pCid->objPtr = (void *) new VidBlock();
550     _MEMCHECK(pCid->objPtr);
551     env->setMsgtype(NewVChareMsg);
552     env->setVidPtr(pCid->objPtr);
553   }
554   env->setEpIdx(eIdx);
555   env->setSrcPe(CkMyPe());
556   CmiSetHandler(env, _charmHandlerIdx);
557   _TRACE_CREATION_1(env);
558   CpvAccess(_qd)->create();
559   _STATS_RECORD_CREATE_CHARE_1();
560   _SET_USED(env, 1);
561   if(destPE == CK_PE_ANY)
562     env->setForAnyPE(1);
563   else
564     env->setForAnyPE(0);
565   CldEnqueue(destPE, env, _infoIdx);
566   _TRACE_CREATION_DONE(1);
567 }
568
569 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
570 {
571   register int gIdx = _entryTable[epIdx]->chareIdx;
572   register void *obj = malloc(_chareTable[gIdx]->size);
573   _MEMCHECK(obj);
574   setMemoryTypeChare(obj);
575   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
576   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
577   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
578   CkpvAccess(_groupIDTable)->push_back(groupID);
579   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
580   if(ptrq) {
581     void *pending;
582     while((pending=ptrq->deq())!=0)
583       CldEnqueue(CkMyPe(), pending, _infoIdx);
584 //    delete ptrq;
585       CkpvAccess(_groupTable)->find(groupID).clearPending();
586   }
587   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
588
589   CkpvAccess(_currentGroup) = groupID;
590   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
591   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
592   _STATS_RECORD_PROCESS_GROUP_1();
593 }
594
595 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
596 {
597   register int gIdx = _entryTable[epIdx]->chareIdx;
598   int objSize=_chareTable[gIdx]->size;
599   register void *obj = malloc(objSize);
600   _MEMCHECK(obj);
601   setMemoryTypeChare(obj);
602   CkpvAccess(_currentGroup) = groupID;
603
604 // Now that the NodeGroup is created, add it to the table.
605 //  NodeGroups can be accessed by multiple processors, so
606 //  this is in the opposite order from groups - invoking the constructor
607 //  before registering it.
608 // User may call CkLocalNodeBranch() inside the nodegroup constructor
609 //  store nodegroup into _currentNodeGroupObj
610   CkpvAccess(_currentNodeGroupObj) = obj;
611   _invokeEntryNoTrace(epIdx,env,obj);
612   CkpvAccess(_currentNodeGroupObj) = NULL;
613   _STATS_RECORD_PROCESS_NODE_GROUP_1();
614
615   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
616   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
617   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
618   CksvAccess(_nodeGroupIDTable).push_back(groupID);
619
620   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
621   if(ptrq) {
622     void *pending;
623     while((pending=ptrq->deq())!=0)
624       CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
625 //    delete ptrq;
626       CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
627   }
628   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
629 }
630
631 void _createGroup(CkGroupID groupID, envelope *env)
632 {
633   _CHECK_USED(env);
634   _SET_USED(env, 1);
635   register int epIdx = env->getEpIdx();
636   register int msgIdx = env->getMsgIdx();
637   int gIdx = _entryTable[epIdx]->chareIdx;
638   CkNodeGroupID rednMgr;
639   if(_chareTable[gIdx]->isIrr == 0){
640                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
641                 rednMgr = rednMgrProxy;
642 //              rednMgrProxy.setAttachedGroup(groupID);
643   }else{
644         rednMgr.setZero();
645   }
646   env->setGroupNum(groupID);
647   env->setSrcPe(CkMyPe());
648   env->setRednMgr(rednMgr);
649   env->setGroupEpoch(CkpvAccess(_charmEpoch));
650
651   if(CkNumPes()>1) {
652     CkPackMessage(&env);
653     CmiSetHandler(env, _bocHandlerIdx);
654     _numInitMsgs++;
655     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
656     CpvAccess(_qd)->create(CkNumPes()-1);
657     CkUnpackMessage(&env);
658   }
659   _STATS_RECORD_CREATE_GROUP_1();
660   CkCreateLocalGroup(groupID, epIdx, env);
661 }
662
663 void _createNodeGroup(CkGroupID groupID, envelope *env)
664 {
665   _CHECK_USED(env);
666   _SET_USED(env, 1);
667   register int epIdx = env->getEpIdx();
668   register int msgIdx = env->getMsgIdx();
669   env->setGroupNum(groupID);
670   env->setSrcPe(CkMyPe());
671   env->setGroupEpoch(CkpvAccess(_charmEpoch));
672   register void *msg =  EnvToUsr(env);
673   if(CkNumNodes()>1) {
674     CkPackMessage(&env);
675     CmiSetHandler(env, _bocHandlerIdx);
676     _numInitMsgs++;
677     CksvAccess(_numInitNodeMsgs)++;
678     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
679     CpvAccess(_qd)->create(CkNumNodes()-1);
680     CkUnpackMessage(&env);
681   }
682   _STATS_RECORD_CREATE_NODE_GROUP_1();
683   CkCreateLocalNodeGroup(groupID, epIdx, env);
684 }
685
686 // new _groupCreate
687
688 static CkGroupID _groupCreate(envelope *env)
689 {
690   register CkGroupID groupNum;
691
692   // check CkMyPe(). if it is 0 then idx is _numGroups++
693   // if not, then something else...
694   if(CkMyPe() == 0)
695      groupNum.idx = CkpvAccess(_numGroups)++;
696   else
697      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
698   _createGroup(groupNum, env);
699   return groupNum;
700 }
701
702 // new _nodeGroupCreate
703 static CkGroupID _nodeGroupCreate(envelope *env)
704 {
705   register CkGroupID groupNum;
706   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
707   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
708           groupNum.idx = CksvAccess(_numNodeGroups)++;
709    else
710           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
711   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
712   _createNodeGroup(groupNum, env);
713   return groupNum;
714 }
715
716 /**** generate the group idx when group is creator pe is not pe0
717  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
718  **** remaining bits contain the group creator processor number and
719  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
720
721 int _getGroupIdx(int numNodes,int myNode,int numGroups)
722 {
723         int idx;
724         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
725         int n = 32 - (x+1);                                     // number of bits remaining for the index
726         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
727                                                                 // then add the next available index
728         // of course this won't work when int is 8 bytes long on T3E
729         //idx |= 0x80000000;                                      // set the most significant bit to 1
730         idx = - idx;
731                                                                 // if int is not 32 bits, wouldn't this be wrong?
732         return idx;
733 }
734
735 extern "C"
736 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
737 {
738   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
739   register envelope *env = UsrToEnv(msg);
740   env->setMsgtype(BocInitMsg);
741   env->setEpIdx(eIdx);
742   env->setSrcPe(CkMyPe());
743   _TRACE_CREATION_N(env, CkNumPes());
744   CkGroupID gid = _groupCreate(env);
745   _TRACE_CREATION_DONE(1);
746   return gid;
747 }
748
749 extern "C"
750 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
751 {
752   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
753   register envelope *env = UsrToEnv(msg);
754   env->setMsgtype(NodeBocInitMsg);
755   env->setEpIdx(eIdx);
756   env->setSrcPe(CkMyPe());
757   _TRACE_CREATION_N(env, CkNumNodes());
758   CkGroupID gid = _nodeGroupCreate(env);
759   _TRACE_CREATION_DONE(1);
760   return gid;
761 }
762
763 static inline void *_allocNewChare(envelope *env)
764 {
765   void *tmp=malloc(_chareTable[_entryTable[env->getEpIdx()]->chareIdx]->size);
766   _MEMCHECK(tmp);
767   setMemoryTypeChare(tmp);
768   return tmp;
769 }
770
771 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
772 {
773   register void *obj = _allocNewChare(env);
774   _invokeEntry(env->getEpIdx(),env,obj);
775 }
776
777 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
778 {
779   register void *obj = _allocNewChare(env);
780   register CkChareID *pCid = (CkChareID *)
781       _allocMsg(FillVidMsg, sizeof(CkChareID));
782   pCid->onPE = CkMyPe();
783   pCid->objPtr = obj;
784   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
785   register envelope *ret = UsrToEnv(pCid);
786   ret->setVidPtr(env->getVidPtr());
787   register int srcPe = env->getSrcPe();
788   ret->setSrcPe(CkMyPe());
789   CmiSetHandler(ret, _charmHandlerIdx);
790   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
791   CpvAccess(_qd)->create();
792   _invokeEntry(env->getEpIdx(),env,obj);
793 }
794
795 /************** Receive: Chares *************/
796
797 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
798 {
799   register int epIdx = env->getEpIdx();
800   register void *obj = env->getObjPtr();
801   _invokeEntry(epIdx,env,obj);
802 }
803
804 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
805 {
806   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
807   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
808   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
809   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
810   vptr->fill(pcid->onPE, pcid->objPtr);
811   CmiFree(env);
812 }
813
814 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
815 {
816   VidBlock *vptr = (VidBlock *) env->getVidPtr();
817   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
818   _SET_USED(env, 1);
819   vptr->send(env);
820 }
821
822 /************** Receive: Groups ****************/
823
824 /**
825  This message is sent to this groupID--prepare to
826  handle this message by looking up the group,
827  and possibly stashing the message.
828 */
829 IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
830 {
831
832         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
833         IrrGroup *obj = ck->localBranch(groupID);
834         if (obj==NULL) { /* groupmember not yet created: stash message */
835                 ck->getGroupTable()->find(groupID).enqMsg(env);
836         }
837         else { /* will be able to process message */
838                 ck->process();
839         }
840         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
841         return obj;
842 }
843
844 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
845 {
846 #if CMK_LBDB_ON
847   // if there is a running obj being measured, stop it temporarily
848   LDObjHandle objHandle;
849   int objstopped = 0;
850   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
851   if (the_lbdb->RunningObject(&objHandle)) {
852     objstopped = 1;
853     the_lbdb->ObjectStop(objHandle);
854   }
855 #endif
856   _invokeEntry(epIdx,env,obj);
857 #if CMK_LBDB_ON
858   if (objstopped) the_lbdb->ObjectStart(objHandle);
859 #endif
860   _STATS_RECORD_PROCESS_BRANCH_1();
861 }
862
863 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
864 {
865   register CkGroupID groupID =  env->getGroupNum();
866   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
867   if(obj) {
868     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
869   }
870 }
871
872 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
873 {
874   env->setMsgtype(ForChareMsg);
875   env->setObjPtr(obj);
876   _processForChareMsg(ck,env);
877   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
878 }
879
880 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
881 {
882   env->setEpIdx(epIdx);
883   _deliverForNodeBocMsg(ck,env, obj);
884 }
885
886 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
887 {
888   register CkGroupID groupID = env->getGroupNum();
889   register void *obj;
890
891   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
892   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
893   if(!obj) { // groupmember not yet created
894 #if CMK_IMMEDIATE_MSG
895     if (CmiIsImmediate(env))     // buffer immediate message
896       CmiDelayImmediate();
897     else
898 #endif
899     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
900     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
901     return;
902   }
903   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
904 #if CMK_IMMEDIATE_MSG
905   if (!CmiIsImmediate(env))
906 #endif
907   ck->process();
908   env->setMsgtype(ForChareMsg);
909   env->setObjPtr(obj);
910   _processForChareMsg(ck,env);
911   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
912 }
913
914 void _processBocInitMsg(CkCoreState *ck,envelope *env)
915 {
916   register CkGroupID groupID = env->getGroupNum();
917   register int epIdx = env->getEpIdx();
918   CkCreateLocalGroup(groupID, epIdx, env);
919 }
920
921 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
922 {
923   register CkGroupID groupID = env->getGroupNum();
924   register int epIdx = env->getEpIdx();
925   CkCreateLocalNodeGroup(groupID, epIdx, env);
926 }
927
928 /************** Receive: Arrays *************/
929
930 static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
931   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
932   if (mgr) {
933     _SET_USED(env, 0);
934     mgr->insertElement((CkMessage *)EnvToUsr(env));
935   }
936 }
937 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
938   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
939   if (mgr) {
940     _SET_USED(env, 0);
941     mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
942   }
943 }
944
945 /**
946  * This is the main converse-level handler used by all of Charm++.
947  */
948 void _processHandler(void *converseMsg,CkCoreState *ck)
949 {
950   register envelope *env = (envelope *) converseMsg;
951 //#if CMK_RECORD_REPLAY
952   if (ck->watcher!=NULL) {
953     if (!ck->watcher->processMessage(env,ck)) return;
954   }
955 //#endif
956   switch(env->getMsgtype()) {
957 // Group support
958     case BocInitMsg :
959       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
960       _processBocInitMsg(ck,env);
961       break;
962     case NodeBocInitMsg :
963       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
964       _processNodeBocInitMsg(ck,env);
965       break;
966     case ForBocMsg :
967       // QD processing moved inside _processForBocMsg because it is conditional
968       if(env->isPacked()) CkUnpackMessage(&env);
969       _processForBocMsg(ck,env);
970       // stats record moved inside _processForBocMsg because it is conditional
971       break;
972     case ForNodeBocMsg :
973       // QD processing moved to _processForNodeBocMsg because it is conditional
974       if(env->isPacked()) CkUnpackMessage(&env);
975       _processForNodeBocMsg(ck,env);
976       // stats record moved to _processForNodeBocMsg because it is conditional
977       break;
978
979 // Array support
980     case ArrayEltInitMsg:
981       if(env->isPacked()) CkUnpackMessage(&env);
982       _processArrayEltInitMsg(ck,env);
983       break;
984     case ForArrayEltMsg:
985       if(env->isPacked()) CkUnpackMessage(&env);
986       _processArrayEltMsg(ck,env);
987       break;
988
989 // Chare support
990     case NewChareMsg :
991       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
992       _processNewChareMsg(ck,env);
993       _STATS_RECORD_PROCESS_CHARE_1();
994       break;
995     case NewVChareMsg :
996       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
997       _processNewVChareMsg(ck,env);
998       _STATS_RECORD_PROCESS_CHARE_1();
999       break;
1000     case ForChareMsg :
1001       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1002       _processForChareMsg(ck,env);
1003       _STATS_RECORD_PROCESS_MSG_1();
1004       break;
1005     case ForVidMsg   :
1006       ck->process();
1007       _processForVidMsg(ck,env);
1008       break;
1009     case FillVidMsg  :
1010       ck->process();
1011       _processFillVidMsg(ck,env);
1012       break;
1013
1014     default:
1015       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1016   }
1017 }
1018
1019
1020 /******************** Message Send **********************/
1021
1022 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1023              int *queueing, int *priobits, unsigned int **prioptr)
1024 {
1025   register envelope *env = (envelope *)converseMsg;
1026   *pfn = (CldPackFn)CkPackMessage;
1027   *len = env->getTotalsize();
1028   *queueing = env->getQueueing();
1029   *priobits = env->getPriobits();
1030   *prioptr = (unsigned int *) env->getPrioPtr();
1031 }
1032
1033 void CkPackMessage(envelope **pEnv)
1034 {
1035   register envelope *env = *pEnv;
1036   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1037     register void *msg = EnvToUsr(env);
1038     _TRACE_BEGIN_PACK();
1039     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1040     _TRACE_END_PACK();
1041     env=UsrToEnv(msg);
1042     env->setPacked(1);
1043     *pEnv = env;
1044   }
1045 }
1046
1047 void CkUnpackMessage(envelope **pEnv)
1048 {
1049   register envelope *env = *pEnv;
1050   register int msgIdx = env->getMsgIdx();
1051   if(env->isPacked()) {
1052     register void *msg = EnvToUsr(env);
1053     _TRACE_BEGIN_UNPACK();
1054     msg = _msgTable[msgIdx]->unpack(msg);
1055     _TRACE_END_UNPACK();
1056     env=UsrToEnv(msg);
1057     env->setPacked(0);
1058     *pEnv = env;
1059   }
1060 }
1061
1062 //There's no reason for most messages to go through the Cld--
1063 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1064 // Thus these accellerated versions of the Cld calls.
1065
1066 int index_objectQHandler;
1067 int index_tokenHandler;
1068 static int index_skipCldHandler;
1069
1070 static void _skipCldHandler(void *converseMsg)
1071 {
1072   register envelope *env = (envelope *)(converseMsg);
1073   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1074 #if CMK_GRID_QUEUE_AVAILABLE
1075   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1076     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1077                        env, env->getQueueing (), env->getPriobits (),
1078                        (unsigned int *) env->getPrioPtr ());
1079   } else {
1080     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1081                        env, env->getQueueing (), env->getPriobits (),
1082                        (unsigned int *) env->getPrioPtr ());
1083   }
1084 #else
1085   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1086         env, env->getQueueing(),env->getPriobits(),
1087         (unsigned int *)env->getPrioPtr());
1088 #endif
1089 }
1090
1091
1092 static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1093 {
1094   if(pe == CkMyPe() ){
1095     if(!CmiNodeAlive(CkMyPe())){
1096         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1097 //      return;
1098     }
1099   }
1100   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1101 #if CMK_OBJECT_QUEUE_AVAILABLE
1102     Chare *obj = CkFindObjectPtr(env);
1103     if (obj && obj->CkGetObjQueue().queue()) {
1104       _enqObjQueue(obj, env);
1105     }
1106     else
1107 #endif
1108     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1109         env, env->getQueueing(),env->getPriobits(),
1110         (unsigned int *)env->getPrioPtr());
1111   } else {
1112     CkPackMessage(&env);
1113     int len=env->getTotalsize();
1114     CmiSetXHandler(env,CmiGetHandler(env));
1115 #if CMK_OBJECT_QUEUE_AVAILABLE
1116     CmiSetHandler(env,index_objectQHandler);
1117 #else
1118     CmiSetHandler(env,index_skipCldHandler);
1119 #endif
1120     CmiSetInfo(env,infoFn);
1121     if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1122     else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1123     else{
1124                         CmiSyncSendAndFree(pe, len, (char *)env);
1125                 }
1126   }
1127 }
1128
1129 #if CMK_BLUEGENE_CHARM
1130 #   define  _skipCldEnqueue   CldEnqueue
1131 #endif
1132
1133 // by pass Charm++ priority queue, send as Converse message
1134 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1135 {
1136   CkPackMessage(&env);
1137   int len=env->getTotalsize();
1138   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1139 }
1140
1141 static void _noCldEnqueue(int pe, envelope *env)
1142 {
1143 /*
1144   if (pe == CkMyPe()) {
1145     CmiHandleMessage(env);
1146   } else
1147 */
1148   CkPackMessage(&env);
1149   int len=env->getTotalsize();
1150   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1151   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1152   else CmiSyncSendAndFree(pe, len, (char *)env);
1153 }
1154
1155 static void _noCldNodeEnqueue(int node, envelope *env)
1156 {
1157 /*
1158   if (node == CkMyNode()) {
1159     CmiHandleMessage(env);
1160   } else {
1161 */
1162   CkPackMessage(&env);
1163   int len=env->getTotalsize();
1164   if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, (char *)env); }
1165   else if (node==CLD_BROADCAST_ALL) { CmiSyncNodeBroadcastAllAndFree(len, (char *)env); }
1166   else CmiSyncNodeSendAndFree(node, len, (char *)env);
1167 }
1168
1169 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1170 {
1171   register envelope *env = UsrToEnv(msg);
1172   _CHECK_USED(env);
1173   _SET_USED(env, 1);
1174   env->setMsgtype(ForChareMsg);
1175   env->setEpIdx(eIdx);
1176   env->setSrcPe(CkMyPe());
1177 #ifndef CMK_OPTIMIZE
1178   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1179 #endif
1180 #if CMK_OBJECT_QUEUE_AVAILABLE
1181   CmiSetHandler(env, index_objectQHandler);
1182 #else
1183   CmiSetHandler(env, _charmHandlerIdx);
1184 #endif
1185   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1186     register int pe = -(pCid->onPE+1);
1187     if(pe==CkMyPe()) {
1188       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1189       void *objPtr;
1190       if (NULL!=(objPtr=vblk->getLocalChare()))
1191       { //A ready local chare
1192         env->setObjPtr(objPtr);
1193         return pe;
1194       }
1195       else { //The vidblock is not ready-- forget it
1196         vblk->send(env);
1197         return -1;
1198       }
1199     } else { //Valid vidblock for another PE:
1200       env->setMsgtype(ForVidMsg);
1201       env->setVidPtr(pCid->objPtr);
1202       return pe;
1203     }
1204   }
1205   else {
1206     env->setObjPtr(pCid->objPtr);
1207     return pCid->onPE;
1208   }
1209 }
1210
1211 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1212 {
1213   int destPE = _prepareMsg(eIdx, msg, pCid);
1214   if (destPE != -1) {
1215     register envelope *env = UsrToEnv(msg);
1216     CmiBecomeImmediate(env);
1217   }
1218   return destPE;
1219 }
1220
1221 extern "C"
1222 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1223 {
1224   if (opts & CK_MSG_INLINE) {
1225     CkSendMsgInline(entryIdx, msg, pCid, opts);
1226     return;
1227   }
1228 #ifndef CMK_OPTIMIZE
1229   if (opts & CK_MSG_IMMEDIATE) {
1230     CmiAbort("Immediate message is not allowed in Chare!");
1231   }
1232 #endif
1233   register envelope *env = UsrToEnv(msg);
1234   int destPE=_prepareMsg(entryIdx,msg,pCid);
1235   if (destPE!=-1) {
1236     _TRACE_CREATION_1(env);
1237     CpvAccess(_qd)->create();
1238     if (opts & CK_MSG_SKIP_OR_IMM)
1239       _noCldEnqueue(destPE, env);
1240     else
1241       CldEnqueue(destPE, env, _infoIdx);
1242     _TRACE_CREATION_DONE(1);
1243   }
1244 }
1245
1246 extern "C"
1247 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1248 {
1249   if (pCid->onPE==CkMyPe())
1250   {
1251     if(!CmiNodeAlive(CkMyPe())){
1252         return;
1253     }
1254 #ifndef CMK_OPTIMIZE
1255     //Just in case we need to breakpoint or use the envelope in some way
1256     _prepareMsg(entryIndex,msg,pCid);
1257 #endif
1258                 //Just directly call the chare (skip QD handling & scheduler)
1259     register envelope *env = UsrToEnv(msg);
1260     if (env->isPacked()) CkUnpackMessage(&env);
1261     _STATS_RECORD_PROCESS_MSG_1();
1262     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1263   }
1264   else {
1265     //No way to inline a cross-processor message:
1266     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
1267   }
1268 }
1269
1270 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1271 {
1272   register envelope *env = UsrToEnv(msg);
1273   _CHECK_USED(env);
1274   _SET_USED(env, 1);
1275   env->setMsgtype(type);
1276   env->setEpIdx(eIdx);
1277   env->setGroupNum(gID);
1278   env->setSrcPe(CkMyPe());
1279 #ifndef CMK_OPTIMIZE
1280   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1281 #endif
1282   CmiSetHandler(env, _charmHandlerIdx);
1283   return env;
1284 }
1285
1286 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1287 {
1288   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1289   CmiBecomeImmediate(env);
1290   return env;
1291 }
1292
1293 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1294                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1295 {
1296   int numPes;
1297   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1298   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1299   _TRACE_CREATION_N(env, numPes);
1300   if (opts & CK_MSG_SKIP_OR_IMM)
1301     _noCldEnqueue(pe, env);
1302   else
1303     _skipCldEnqueue(pe, env, _infoIdx);
1304   _TRACE_CREATION_DONE(1);
1305 }
1306
1307 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1308                            int npes, int *pes)
1309 {
1310   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1311   _TRACE_CREATION_MULTICAST(env, npes, pes);
1312   CldEnqueueMulti(npes, pes, env, _infoIdx);
1313   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1314 }
1315
1316 extern "C"
1317 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1318 {
1319 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1320   if (destPE==CkMyPe())
1321   {
1322     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1323     return;
1324   }
1325   //Can't inline-- send the usual way
1326   register envelope *env = UsrToEnv(msg);
1327   int numPes;
1328   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1329   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1330   _TRACE_CREATION_N(env, numPes);
1331   _noCldEnqueue(destPE, env);
1332   _STATS_RECORD_SEND_BRANCH_1();
1333   CkpvAccess(_coreState)->create();
1334   _TRACE_CREATION_DONE(1);
1335 #else
1336   // no support for immediate message, send inline
1337   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1338 #endif
1339 }
1340
1341 extern "C"
1342 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1343 {
1344   if (destPE==CkMyPe())
1345   {
1346     if(!CmiNodeAlive(CkMyPe())){
1347         return;
1348     }
1349     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1350     if (obj!=NULL)
1351     { //Just directly call the group:
1352 #ifndef CMK_OPTIMIZE
1353       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1354 #else
1355       envelope *env=UsrToEnv(msg);
1356 #endif
1357       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1358       return;
1359     }
1360   }
1361   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1362   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
1363 }
1364
1365 extern "C"
1366 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1367 {
1368   if (opts & CK_MSG_INLINE) {
1369     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1370     return;
1371   }
1372   if (opts & CK_MSG_IMMEDIATE) {
1373     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1374     return;
1375   }
1376   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1377   _STATS_RECORD_SEND_BRANCH_1();
1378   CkpvAccess(_coreState)->create();
1379 }
1380
1381 extern "C"
1382 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,int npes,int *pes,CkGroupID gID)
1383 {
1384 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1385   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1386   _TRACE_CREATION_MULTICAST(env, npes, pes);
1387   _noCldEnqueueMulti(npes, pes, env);
1388   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1389 #else
1390   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1391   CpvAccess(_qd)->create(-npes);
1392 #endif
1393   _STATS_RECORD_SEND_BRANCH_N(npes);
1394   CpvAccess(_qd)->create(npes);
1395 }
1396
1397 extern "C"
1398 void CkSendMsgBranchMulti(int eIdx,void *msg,int npes,int *pes,CkGroupID gID, int opts)
1399 {
1400   if (opts & CK_MSG_IMMEDIATE) {
1401     CkSendMsgBranchMultiImmediate(eIdx,msg,npes,pes,gID);
1402     return;
1403   }
1404     // normal mesg
1405   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1406   _STATS_RECORD_SEND_BRANCH_N(npes);
1407   CpvAccess(_qd)->create(npes);
1408 }
1409
1410 extern "C"
1411 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1412 {
1413   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1414   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1415   CpvAccess(_qd)->create(CkNumPes());
1416 }
1417
1418 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1419                 int node=CLD_BROADCAST_ALL, int opts=0)
1420 {
1421   int numPes;
1422   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1423   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1424   _TRACE_CREATION_N(env, numPes);
1425   if (opts & CK_MSG_SKIP_OR_IMM) {
1426     _noCldNodeEnqueue(node, env);
1427     if (opts & CK_MSG_IMMEDIATE) {    // immediate msg is invisible to QD
1428       CkpvAccess(_coreState)->create(-numPes);
1429     }
1430   }
1431   else
1432     CldNodeEnqueue(node, env, _infoIdx);
1433   _TRACE_CREATION_DONE(1);
1434 }
1435
1436 extern "C"
1437 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1438 {
1439 #if CMK_IMMEDIATE_MSG
1440   if (node==CkMyNode())
1441   {
1442     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1443     return;
1444   }
1445   //Can't inline-- send the usual way
1446   register envelope *env = UsrToEnv(msg);
1447   int numPes;
1448   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1449   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1450   _TRACE_CREATION_N(env, numPes);
1451   _noCldNodeEnqueue(node, env);
1452   _STATS_RECORD_SEND_BRANCH_1();
1453   /* immeidate message is invisible to QD */
1454 //  CkpvAccess(_coreState)->create();
1455   _TRACE_CREATION_DONE(1);
1456 #else
1457   // no support for immediate message, send inline
1458   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1459 #endif
1460 }
1461
1462 extern "C"
1463 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1464 {
1465   if (node==CkMyNode())
1466   {
1467     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1468     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1469     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1470     if (obj!=NULL)
1471     { //Just directly call the group:
1472 #ifndef CMK_OPTIMIZE
1473       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1474 #else
1475       envelope *env=UsrToEnv(msg);
1476 #endif
1477       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1478       return;
1479     }
1480   }
1481   //Can't inline-- send the usual way
1482   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
1483 }
1484
1485 extern "C"
1486 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1487 {
1488   if (opts & CK_MSG_INLINE) {
1489     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1490     return;
1491   }
1492   if (opts & CK_MSG_IMMEDIATE) {
1493     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1494     return;
1495   }
1496   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1497   _STATS_RECORD_SEND_NODE_BRANCH_1();
1498   CkpvAccess(_coreState)->create();
1499 }
1500
1501 extern "C"
1502 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1503 {
1504   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1505   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
1506   CpvAccess(_qd)->create(CkNumNodes());
1507 }
1508
1509 //Needed by delegation manager:
1510 extern "C"
1511 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
1512 { return _prepareMsg(eIdx,msg,pCid); }
1513 extern "C"
1514 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1515 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
1516 extern "C"
1517 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
1518 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
1519
1520 void _ckModuleInit(void) {
1521         index_skipCldHandler = CkRegisterHandler((CmiHandler)_skipCldHandler);
1522         index_objectQHandler = CkRegisterHandler((CmiHandler)_ObjectQHandler);
1523         index_tokenHandler = CkRegisterHandler((CmiHandler)_TokenHandler);
1524         CkpvInitialize(TokenPool*, _tokenPool);
1525         CkpvAccess(_tokenPool) = new TokenPool;
1526 }
1527
1528
1529 /************** Send: Arrays *************/
1530
1531 extern void CkArrayManagerInsert(int onPe,void *msg);
1532 //extern void CkArrayManagerDeliver(int onPe,void *msg);
1533
1534 static void _prepareOutgoingArrayMsg(envelope *env,int type)
1535 {
1536   _CHECK_USED(env);
1537   _SET_USED(env, 1);
1538   env->setMsgtype(type);
1539 #ifndef CMK_OPTIMIZE
1540   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1541 #endif
1542   CmiSetHandler(env, _charmHandlerIdx);
1543   CpvAccess(_qd)->create();
1544 }
1545
1546 extern "C"
1547 void CkArrayManagerInsert(int pe,void *msg,CkGroupID aID) {
1548   register envelope *env = UsrToEnv(msg);
1549   env->getsetArrayMgr()=aID;
1550   _prepareOutgoingArrayMsg(env,ArrayEltInitMsg);
1551   CldEnqueue(pe, env, _infoIdx);
1552 }
1553
1554 extern "C"
1555 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
1556   register envelope *env = UsrToEnv(msg);
1557   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
1558   if (opts & CK_MSG_IMMEDIATE)
1559     CmiBecomeImmediate(env);
1560   if (opts & CK_MSG_SKIP_OR_IMM)
1561     _noCldEnqueue(pe, env);
1562   else
1563     _skipCldEnqueue(pe, env, _infoIdx);
1564 }
1565
1566 class ElementDestroyer : public CkLocIterator {
1567 private:
1568         CkLocMgr *locMgr;
1569 public:
1570         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
1571         void addLocation(CkLocation &loc) {
1572           loc.destroyAll();
1573         }
1574 };
1575
1576 void CkDeleteChares() {
1577   int i;
1578   int numGroups = CkpvAccess(_groupIDTable)->size();
1579
1580   // delete all array elements
1581   for(i=0;i<numGroups;i++) {
1582     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1583     if(obj && obj->isLocMgr())  {
1584       CkLocMgr *mgr = (CkLocMgr*)obj;
1585       ElementDestroyer destroyer(mgr);
1586       mgr->iterate(destroyer);
1587 printf("[%d] DELETE!\n", CkMyPe());
1588     }
1589   }
1590
1591   // delete all groups
1592   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1593   for(i=0;i<numGroups;i++) {
1594     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1595     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1596     if (obj) delete obj;
1597   }
1598   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1599
1600   // delete all node groups
1601   if (CkMyRank() == 0) {
1602     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1603     for(i=0;i<numNodeGroups;i++) {
1604       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
1605       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1606       if (obj) delete obj;
1607     }
1608   }
1609 }
1610
1611 //------------------- Message Watcher (record/replay) ----------------
1612
1613 CkMessageWatcher::~CkMessageWatcher() {}
1614
1615 class CkMessageRecorder : public CkMessageWatcher {
1616         FILE *f;
1617 public:
1618         CkMessageRecorder(FILE *f_) :f(f_) {}
1619         ~CkMessageRecorder() {
1620                 fprintf(f,"-1 -1 -1");
1621                 fclose(f);
1622         }
1623
1624         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1625                 if (env->getEvent())
1626                      fprintf(f,"%d %d %d %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg);
1627                 return CmiTrue;
1628         }
1629 };
1630
1631 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
1632 #define REPLAYDEBUG(args) /* empty */
1633
1634 class CkMessageReplay : public CkMessageWatcher {
1635         FILE *f;
1636         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
1637         /// Read the next message we need from the file:
1638         void getNext(void) {
1639                 if (4!=fscanf(f,"%d%d%d%d", &nextPE,&nextSize,&nextEvent,&nexttype)) {
1640                         // CkAbort("CkMessageReplay> Syntax error reading replay file");
1641                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
1642                 }
1643         }
1644         /// If this is the next message we need, advance and return CmiTrue.
1645         CmiBool isNext(envelope *env) {
1646                 if (nextPE!=env->getSrcPe()) return CmiFalse;
1647                 if (nextEvent!=env->getEvent()) return CmiFalse;
1648                 if (nextSize!=env->getTotalsize())
1649                 {
1650                         CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
1651                         return CmiFalse;
1652                 }
1653                 return CmiTrue;
1654         }
1655
1656         /// This is a (short) list of messages we aren't yet ready for:
1657         CkQ<envelope *> delayed;
1658
1659         /// Try to flush out any delayed messages
1660         void flush(void) {
1661                 int len=delayed.length();
1662                 for (int i=0;i<len;i++) {
1663                         envelope *env=delayed.deq();
1664                         if (isNext(env)) { /* this is the next message: process it */
1665                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1666                                 CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
1667                                 return;
1668                         }
1669                         else /* Not ready yet-- put it back in the
1670                                 queue */
1671                           {
1672                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1673                                 delayed.enq(env);
1674                           }
1675                 }
1676         }
1677
1678 public:
1679         CkMessageReplay(FILE *f_) :f(f_) { getNext();
1680         REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
1681                     }
1682         ~CkMessageReplay() {fclose(f);}
1683
1684         virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
1685           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
1686                 if (env->getEvent() == 0) return CmiTrue;
1687                 if (isNext(env)) { /* This is the message we were expecting */
1688                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
1689                         getNext(); /* Advance over this message */
1690                         flush(); /* try to process queued-up stuff */
1691                         return CmiTrue;
1692                 }
1693 #if CMK_SMP
1694                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
1695                          // try next rank, we can't just buffer the msg and left
1696                          // we need to keep unprocessed msg on the fly
1697                         int nextpe = CkMyPe()+1;
1698                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
1699                         nextpe = CkNodeFirst(CkMyNode());
1700                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
1701                         return CmiFalse;
1702                 }
1703 #endif
1704                 else /*!isNext(env) */ {
1705                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()
1706                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
1707                         delayed.enq(env);
1708                         flush();
1709                         return CmiFalse;
1710                 }
1711         }
1712 };
1713
1714 static FILE *openReplayFile(const char *permissions) {
1715
1716         char fName[200];
1717         sprintf(fName,"ckreplay_%06d.log",CkMyPe());
1718         FILE *f=fopen(fName,permissions);
1719         REPLAYDEBUG("openReplayfile "<<fName);
1720         if (f==NULL) {
1721                 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
1722                         CkMyPe(),fName,permissions);
1723                 CkAbort("openReplayFile> Could not open replay file");
1724         }
1725         return f;
1726 }
1727
1728 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
1729         REPLAYDEBUG("CkMessageWaterInit ");
1730         if (CmiGetArgFlagDesc(argv,"+record","Record message processing order"))
1731                 ck->watcher=new CkMessageRecorder(openReplayFile("w"));
1732         if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream"))
1733                 ck->watcher=new CkMessageReplay(openReplayFile("r"));
1734 }
1735
1736 extern "C"
1737 int CkMessageToEpIdx(void *msg) {
1738         envelope *env=UsrToEnv(msg);
1739         int ep=env->getEpIdx();
1740         if (ep==CkIndex_CkArray::recvBroadcast(0))
1741                 return env->getsetArrayBcastEp();
1742         else
1743                 return ep;
1744 }
1745
1746 extern "C"
1747 int getCharmEnvelopeSize() {
1748   return sizeof(envelope);
1749 }
1750
1751
1752 #include "CkMarshall.def.h"
1753