optimization for stableLocation did not work due to miscalculated length of elements.
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isNotifyChildInRed;
65
66 #define ARRAY_DEBUG_OUTPUT 0
67
68 #if ARRAY_DEBUG_OUTPUT 
69 #   define DEB(x) CkPrintf x  //General debug messages
70 #   define DEBI(x) CkPrintf x  //Index debug messages
71 #   define DEBC(x) CkPrintf x  //Construction debug messages
72 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
73 #   define DEBM(x) CkPrintf x  //Migration debug messages
74 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
75 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
76 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
77 #   define AA "ArrayBOC on %d: "
78 #   define AB ,CkMyPe()
79 #   define DEBUG(x) x
80 #else
81 #   define DEB(X) /*CkPrintf x*/
82 #   define DEBI(X) /*CkPrintf x*/
83 #   define DEBC(X) /*CkPrintf x*/
84 #   define DEBS(x) /*CkPrintf x*/
85 #   define DEBM(X) /*CkPrintf x*/
86 #   define DEBL(X) /*CkPrintf x*/
87 #   define DEBK(x) /*CkPrintf x*/
88 #   define DEBB(x) /*CkPrintf x*/
89 #   define str(x) /**/
90 #   define DEBUG(x)
91 #endif
92
93 ///This arrayListener is in charge of delivering broadcasts to the array.
94 class CkArrayBroadcaster : public CkArrayListener {
95   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
96 public:
97   CkArrayBroadcaster(bool _stableLocations);
98   CkArrayBroadcaster(CkMigrateMessage *m);
99   virtual void pup(PUP::er &p);
100   virtual ~CkArrayBroadcaster();
101   PUPable_decl(CkArrayBroadcaster);
102
103   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
104
105   ///Element was just created on this processor
106   /// Return false if the element migrated away or deleted itself.
107   virtual CmiBool ckElementCreated(ArrayElement *elt)
108     { return bringUpToDate(elt); }
109
110   ///Element just arrived on this processor (so just called pup)
111   /// Return false if the element migrated away or deleted itself.
112   virtual CmiBool ckElementArriving(ArrayElement *elt)
113     { return bringUpToDate(elt); }
114
115   void incoming(CkArrayMessage *msg);
116
117   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
118
119   void springCleaning(void);
120
121   void flushState();
122 private:
123   int bcastNo;//Number of broadcasts received (also serial number)
124   int oldBcastNo;//Above value last spring cleaning
125   //This queue stores old broadcasts (in case a migrant arrives
126   // and needs to be brought up to date)
127   CkQ<CkArrayMessage *> oldBcasts;
128   bool stableLocations;
129
130   CmiBool bringUpToDate(ArrayElement *el);
131 };
132
133 ///This arrayListener is in charge of performing reductions on the array.
134 class CkArrayReducer : public CkArrayListener {
135   CkGroupID mgrID;
136   CkReductionMgr *mgr;
137   typedef  contributorInfo *I;
138   inline contributorInfo *getData(ArrayElement *el)
139     {return (I)ckGetData(el);}
140 public:
141   /// Attach this array to this CkReductionMgr
142   CkArrayReducer(CkGroupID mgrID_);
143   CkArrayReducer(CkMigrateMessage *m);
144   virtual void pup(PUP::er &p);
145   virtual ~CkArrayReducer();
146   PUPable_decl(CkArrayReducer);
147
148   void ckBeginInserting(void) {mgr->creatingContributors();}
149   void ckEndInserting(void) {mgr->doneCreatingContributors();}
150
151   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
152
153   void ckElementCreating(ArrayElement *elt)
154     {mgr->contributorCreated(getData(elt));}
155   void ckElementDied(ArrayElement *elt)
156     {mgr->contributorDied(getData(elt));}
157
158   void ckElementLeaving(ArrayElement *elt)
159     {mgr->contributorLeaving(getData(elt));}
160   CmiBool ckElementArriving(ArrayElement *elt)
161     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
162 };
163
164 /*
165 void 
166 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
167        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
168 }
169 */
170 void
171 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
172         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
173         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
174 }
175
176 /*********************** CkVerboseListener ******************/
177 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
178
179 CkVerboseListener::CkVerboseListener(void)
180   :CkArrayListener(0)
181 {
182   VL_PRINT<<"INIT  Creating listener"<<endl;
183 }
184
185 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
186 {
187   CkArrayListener::ckRegister(arrMgr,dataOffset_);
188   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
189 }
190 void CkVerboseListener::ckBeginInserting(void)
191 {
192   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
193 }
194 void CkVerboseListener::ckEndInserting(void)
195 {
196   VL_PRINT<<"INIT  Done inserting elements"<<endl;
197 }
198
199 void CkVerboseListener::ckElementStamp(int *eltInfo)
200 {
201   VL_PRINT<<"LIFE  Stamping element"<<endl;
202 }
203 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
204 {
205   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
206 }
207 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
208 {
209   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
210   return CmiTrue;
211 }
212 void CkVerboseListener::ckElementDied(ArrayElement *elt)
213 {
214   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
215 }
216
217 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
218 {
219   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
220 }
221 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
222 {
223   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
224   return CmiTrue;
225 }
226
227
228 /************************* ArrayElement *******************/
229 class ArrayElement_initInfo {
230 public:
231   CkArray *thisArray;
232   CkArrayID thisArrayID;
233   CkArrayIndex numInitial;
234   int listenerData[CK_ARRAYLISTENER_MAXLEN];
235   CmiBool fromMigration;
236 };
237
238 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
239
240 void ArrayElement::initBasics(void)
241 {
242 #if CMK_OUT_OF_CORE
243   if (CkpvAccess(CkSaveRestorePrefetch)) 
244     return; /* Just restoring from disk--don't try to set up anything. */
245 #endif
246 #if CMK_GRID_QUEUE_AVAILABLE
247         grid_queue_interval = 0;
248         grid_queue_threshold = 0;
249         msg_count = 0;
250         msg_count_grid = 0;
251         border_flag = 0;
252
253         grid_queue_interval = CmiGridQueueGetInterval ();
254         grid_queue_threshold = CmiGridQueueGetThreshold ();
255 #endif
256   ArrayElement_initInfo &info=CkpvAccess(initInfo);
257   thisArray=info.thisArray;
258   thisArrayID=info.thisArrayID;
259   numInitialElements=info.numInitial.getCombinedCount();
260   if (info.listenerData) {
261     memcpy(listenerData,info.listenerData,sizeof(listenerData));
262   }
263   if (!info.fromMigration) {
264     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
265                           l->ckElementCreating(this));
266   }
267 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
268         mlogData->objID.type = TypeArray;
269         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
270 #endif
271 }
272
273 ArrayElement::ArrayElement(void) 
274 {
275         initBasics();
276 #if CMK_MEM_CHECKPOINT
277         init_checkpt();
278 #endif
279 }
280
281 ArrayElement::ArrayElement(CkMigrateMessage *m) 
282 {
283         initBasics();
284 }
285
286 //Called by the system just before and after migration to another processor:  
287 void ArrayElement::ckAboutToMigrate(void) {
288         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
289                                 l->ckElementLeaving(this));
290         CkMigratable::ckAboutToMigrate();
291 }
292 void ArrayElement::ckJustMigrated(void) {
293         CkMigratable::ckJustMigrated();
294         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
295               if (!l->ckElementArriving(this)) return;);
296 }
297
298 void ArrayElement::ckJustRestored(void) {
299     CkMigratable::ckJustRestored();
300     //empty for out-of-core emulation
301 }
302
303 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
304    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
305
306 /// Remote method: calls destructor
307 void ArrayElement::ckDestroy(void)
308 {
309         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
310             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
311                            l->ckElementDied(this));
312         }
313         CkMigratable::ckDestroy();
314 }
315
316 //Destructor (virtual)
317 ArrayElement::~ArrayElement()
318 {
319 #if CMK_OUT_OF_CORE
320   if (CkpvAccess(CkSaveRestorePrefetch)) 
321     return; /* Just saving to disk--don't trash anything. */
322 #endif
323   //To detect use-after-delete: 
324   thisArray=(CkArray *)0xDEADa7a1;
325 }
326
327 void ArrayElement::pup(PUP::er &p)
328 {
329   DEBM((AA"  ArrayElement::pup()\n"AB));
330   CkMigratable::pup(p);
331   thisArrayID.pup(p);
332   if (p.isUnpacking())
333         thisArray=thisArrayID.ckLocalBranch();
334   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
335 #if CMK_MEM_CHECKPOINT
336   p(budPEs, 2);
337 #endif
338   p.syncComment(PUP::sync_last_system,"ArrayElement");
339 #if CMK_GRID_QUEUE_AVAILABLE
340   p|grid_queue_interval;
341   p|grid_queue_threshold;
342   p|msg_count;
343   p|msg_count_grid;
344   p|border_flag;
345   if (p.isUnpacking ()) {
346     msg_count = 0;
347     msg_count_grid = 0;
348     border_flag = 0;
349   }
350 #endif
351 }
352
353 char *ArrayElement::ckDebugChareName(void) {
354         char buf[200];
355         const char *className=_chareTable[ckGetChareType()]->name;
356         const int *d=thisIndexMax.data();
357         const short int *s=(const short int*)d;
358         switch (thisIndexMax.dimension) {
359         case 0: sprintf(buf,"%s",className); break;
360         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
361         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
362         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
363     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
364     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
365     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
366         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
367         };
368         return strdup(buf);
369 }
370
371 int ArrayElement::ckDebugChareID(char *str, int limit) {
372   if (limit<21) return -1;
373   str[0] = 2;
374   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
375   *((CkArrayIndex*)&str[5]) = thisIndexMax;
376   return 21;
377 }
378
379 /// A more verbose form of abort
380 void ArrayElement::CkAbort(const char *str) const
381 {
382         CkError("[%d] Array element at index %s aborting:\n",
383                 CkMyPe(), idx2str(thisIndexMax));
384         CkMigratable::CkAbort(str);
385 }
386
387 void ArrayElement::recvBroadcast(CkMessage *m){
388 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
389         CkArrayMessage *bcast = (CkArrayMessage *)m;
390     envelope *env = UsrToEnv(m);
391         int epIdx= env->piggyBcastIdx;
392     ckInvokeEntry(epIdx,bcast,CmiTrue);
393 #endif
394 }
395
396 /*********************** Spring Cleaning *****************
397 Periodically (every minute or so) remove expired broadcasts
398 from the queue.
399
400 This does not get called for arrays with stable locations (all
401 insertions done at creation, migration only at discrete points).
402 */
403
404 inline void CkArray::springCleaning(void)
405 {
406   DEBK((AA"Starting spring cleaning\n"AB));
407   broadcaster->springCleaning();
408 }
409
410 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
411         ((CkArray *)forArray)->springCleaning();
412 }
413
414 /********************* Little CkArray Utilities ******************/
415
416 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
417         :CProxy(), _aid(e->ckGetArrayID())
418         {}
419 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
420         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
421         {}
422
423 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
424         {return ckLocalBranch()->getLocMgr(); }
425
426 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
427
428 CkArrayOptions::CkArrayOptions(void) //Default: empty array
429         :numInitial(),map(_defaultArrayMapID)
430 {
431     init();
432 }
433
434 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
435         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
436 {
437     init();
438 }
439
440 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
441         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
442 {
443     init();
444 }
445
446 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
447         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
448 {
449     init();
450 }
451
452 void CkArrayOptions::init()
453 {
454     locMgr.setZero();
455     anytimeMigration = _isAnytimeMigration;
456     staticInsertion = false;
457     reductionClient.type = CkCallback::invalid;
458     disableNotifyChildInRed = !_isNotifyChildInRed;
459 }
460
461 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
462 {
463     staticInsertion = b;
464     if (b && map == _defaultArrayMapID)
465         map = _fastArrayMapID;
466     return *this;
467 }
468
469 /// Bind our elements to this array
470 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
471 {
472         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
473         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
474         // not just initially.
475         //setNumInitial(arr->getNumInitial());
476         return setLocationManager(arr->getLocMgr()->getGroupID());
477 }
478 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
479 {
480         arrayListeners.push_back(listener);
481         return *this;
482 }
483
484 void CkArrayOptions::pup(PUP::er &p) {
485         p|numInitial;
486         p|map;
487         p|locMgr;
488         p|arrayListeners;
489         p|reductionClient;
490         p|anytimeMigration;
491         p|disableNotifyChildInRed;
492         p|staticInsertion;
493 }
494
495 CkArrayListener::CkArrayListener(int nInts_) 
496   :nInts(nInts_) 
497 {
498   dataOffset=-1;
499 }
500 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
501   nInts=-1; dataOffset=-1;
502 }
503 void CkArrayListener::pup(PUP::er &p) {
504   p|nInts;
505   p|dataOffset;
506 }
507
508 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
509 {
510   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
511   dataOffset=dataOffset_;
512 }
513
514 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
515                                           const CkArrayOptions &opts_)
516 {
517   CkArrayOptions opts(opts_);
518   CkGroupID locMgr = opts.getLocationManager();
519   if (locMgr.isZero())
520   { //Create a new location manager
521 #if !CMK_LBDB_ON
522     CkGroupID _lbdb;
523 #endif
524     CkEntryOptions  e_opts;
525     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
526     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
527     opts.setLocationManager(locMgr);
528   }
529   //Create the array manager
530   m->array_ep()=ctor;
531   CkMarshalledMessage marsh(m);
532   CkEntryOptions  e_opts;
533   e_opts.setGroupDepID(locMgr);       // group creation dependence
534 #if !GROUP_LEVEL_REDUCTION
535   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
536   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
537   nodereductionProxy.setAttachedGroup(ag);
538 #else
539   CkNodeGroupID dummyid;
540   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
541 #endif
542   return (CkArrayID)ag;
543 }
544
545 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
546 {
547   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
548 }
549
550 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
551         const CkArrayIndex &idx)
552 {
553   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
554   m->array_ep()=ctor;
555   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
556   if (ckIsDelegated()) {
557         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
558         return;
559   }
560   
561   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
562   CkArrayManagerInsert(onPe,m,_aid);
563 }
564
565 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
566 {
567   ckInsertIdx(m,ctorIndex,onPe,_idx);
568 }
569
570 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
571 {
572   return ckLocalBranch()->lookup(_idx);
573 }
574
575 //pack-unpack method for CProxy_ArrayBase
576 void CProxy_ArrayBase::pup(PUP::er &p)
577 {
578   CProxy::pup(p);
579   _aid.pup(p);
580 }
581 void CProxyElement_ArrayBase::pup(PUP::er &p)
582 {
583   CProxy_ArrayBase::pup(p);
584   p|_idx.nInts;
585   p|_idx.dimension;
586   p(_idx.data(),_idx.nInts);
587 }
588
589 void CProxySection_ArrayBase::pup(PUP::er &p)
590 {
591   CProxy_ArrayBase::pup(p);
592   p | _nsid;
593   if (p.isUnpacking()) {
594     if (_nsid == 1) _sid = new CkSectionID;
595     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
596     else _sid = NULL;
597   }
598   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
599 }
600
601 /*********************** CkArray Creation *************************/
602 void _ckArrayInit(void)
603 {
604   CkpvInitialize(ArrayElement_initInfo,initInfo);
605   CkDisableTracing(CkIndex_CkArray::insertElement(0));
606   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
607     // disable because broadcast listener may deliver broadcast message
608   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
609   // by default anytime migration is allowed
610 }
611
612 CkArray::CkArray(CkArrayOptions &opts,
613                  CkMarshalledMessage &initMsg,
614                  CkNodeGroupID nodereductionID)
615   : CkReductionMgr(),
616     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
617     locMgrID(opts.getLocationManager()),
618     thisProxy(thisgroup),
619     // Register with our location manager
620     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
621     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
622     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
623 {
624   if (!stableLocations)
625       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
626                              staticSpringCleaning, (void *)this);
627   
628   //set the field in one my parent class (CkReductionMgr)
629   if(opts.disableNotifyChildInRed)
630           disableNotifyChildrenStart = CmiTrue; 
631   
632   //Find, register, and initialize the arrayListeners
633   listenerDataOffset=0;
634   broadcaster=new CkArrayBroadcaster(stableLocations);
635   addListener(broadcaster);
636   reducer=new CkArrayReducer(thisgroup);
637   addListener(reducer);
638
639   // COMLIB HACK
640   //calistener = new ComlibArrayListener();
641   //addListener(calistener,dataOffset);
642
643   int lNo,nL=opts.getListeners(); //User-added listeners
644   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
645
646   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
647
648   ///Set up initial elements (if any)
649   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
650
651   ///adding code for Reduction using nodegroups
652
653 #if !GROUP_LEVEL_REDUCTION
654   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
655   nodeProxy = nodetemp;
656   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
657 #endif
658
659   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
660       ckSetReductionClient(&opts.reductionClient);
661 }
662
663 CkArray::CkArray(CkMigrateMessage *m)
664         :CkReductionMgr(m), thisProxy(thisgroup)
665 {
666   locMgr=NULL;
667   isInserting=CmiTrue;
668 }
669
670 #if CMK_ERROR_CHECKING
671 inline void testPup(PUP::er &p,int shouldBe) {
672   int a=shouldBe;
673   p|a;
674   if (a!=shouldBe)
675     CkAbort("PUP direction mismatch!");
676 }
677 #else
678 inline void testPup(PUP::er &p,int shouldBe) {}
679 #endif
680
681 void CkArray::pup(PUP::er &p){
682         CkReductionMgr::pup(p);
683         p|numInitial;
684         p|locMgrID;
685         p|listeners;
686         p|listenerDataOffset;
687         testPup(p,1234);
688         if(p.isUnpacking()){
689                 thisProxy=thisgroup;
690                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
691                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
692                 /// Restore our default listeners:
693                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
694                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
695         }
696 }
697
698 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
699   int dataOffset=0; \
700   for (int lNo=0;lNo<listeners.size();lNo++) { \
701     CkArrayListener *l=listeners[lNo]; \
702     l->ckElementStamp(&listenerData[dataOffset]); \
703     dataOffset+=l->ckGetLen(); \
704   } \
705 } while (0)
706
707 //Called on send side to prepare array constructor message
708 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
709 {
710   envelope *env=UsrToEnv((void *)m);
711   env->getsetArrayIndex()=idx;
712   int *listenerData=env->getsetArrayListenerData();
713   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
714   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
715   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
716         getLocMgr()->inform(idx,onPe);
717 }
718
719 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
720                         CkElementCreation_t type)
721 {
722         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
723         if (type==CkElementCreation_resume) 
724         { // HACK: Re-stamp elements on checkpoint resume--
725           //  this restores, e.g., reduction manager's gcount
726                 int *listenerData=ret->listenerData;
727                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
728         }
729         return ret;
730 }
731
732 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
733                      CkMessage *msg,CmiBool fromMigration) 
734 {
735         //Stash the element's initialization information in the global "initInfo"
736         ArrayElement_initInfo &init=CkpvAccess(initInfo);
737         init.numInitial=numInitial;
738         init.thisArray=this;
739         init.thisArrayID=thisgroup;
740         if (msg) /*Have to *copy* data because msg will be deleted*/
741           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
742                  sizeof(init.listenerData));
743         init.fromMigration=fromMigration;
744         
745         //Build the element
746         int elSize=_chareTable[elChareType]->size;
747         ArrayElement *elem = (ArrayElement *)malloc(elSize);
748 #ifndef CMK_OPTIMIZE
749         if (elem!=NULL) setMemoryTypeChare(elem);
750 #endif
751         return elem;
752 }
753
754 /// This method is called by ck.C or the user to add an element.
755 CmiBool CkArray::insertElement(CkMessage *me)
756 {
757   CK_MAGICNUMBER_CHECK
758   CkArrayMessage *m=(CkArrayMessage *)me;
759   const CkArrayIndex &idx=m->array_index();
760   int onPe;
761   if (locMgr->isRemote(idx,&onPe)) 
762   { /* element's sibling lives somewhere else, so insert there */
763         CkArrayManagerInsert(onPe,me,thisgroup);
764         return CmiFalse;
765   }
766   int ctorIdx=m->array_ep();
767   int chareType=_entryTable[ctorIdx]->chareIdx;
768   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
769 #ifndef CMK_CHARE_USE_PTR
770   ((Chare *)elt)->chareIdx = -1;
771 #endif
772   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
773   CK_ARRAYLISTENER_LOOP(listeners,
774       if (!l->ckElementCreated(elt)) return CmiFalse;);
775   return CmiTrue;
776 }
777
778 void CProxy_ArrayBase::doneInserting(void)
779 {
780   DEBC((AA"Broadcasting a doneInserting request\n"AB));
781   //Broadcast a DoneInserting
782   CProxy_CkArray(_aid).remoteDoneInserting();
783 }
784
785 void CkArray::doneInserting(void)
786 {
787   thisProxy[CkMyPe()].remoteDoneInserting();
788 }
789
790 /// This is called on every processor after the last array insertion.
791 void CkArray::remoteDoneInserting(void)
792 {
793   CK_MAGICNUMBER_CHECK
794   if (isInserting) {
795     isInserting=CmiFalse;
796     DEBC((AA"Done inserting objects\n"AB));
797     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
798     locMgr->doneInserting();
799   }
800 }
801
802 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
803         int onPe,int ctor,CkDeliver_t type)
804 {
805         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
806         prepareCtorMsg(m,onPe,idx);
807         m->array_ep()=ctor;
808         
809         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
810                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
811                 CkArrayManagerInsert(onPe,m,thisgroup);
812         } else /* local message, non-queued */ {
813                 //Call local constructor directly
814                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
815                 return insertElement(m);
816         }
817         return CmiTrue;
818 }
819
820 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
821 {
822         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
823         if (local) {
824           int onPe=CkMyPe();
825           prepareCtorMsg(m,onPe,idx);
826 #if CMK_BLUEGENE_CHARM
827           BgEntrySplit("split-array-new");
828 #endif
829           insertElement(m);
830         }
831         else {
832           int onPe=-1;
833           prepareCtorMsg(m,onPe,idx);
834           CkArrayManagerInsert(onPe,m,getGroupID());
835         }
836 }
837
838 /********************* CkArray Messaging ******************/
839 /// Fill out a message's array fields before sending it
840 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
841 {
842         envelope *env=UsrToEnv((void *)msg);
843         env->getsetArrayMgr()=aid;
844         env->getsetArraySrcPe()=CkMyPe();
845         env->setEpIdx(ep);
846         env->getsetArrayHops()=0;
847 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
848         criticalPath_send(env);
849         automaticallySetMessagePriority(env);
850 #endif
851 }
852
853
854 /// Just a non-inlined version of msg_prepareSend()
855 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
856 {
857         envelope *env=UsrToEnv((void *)msg);
858         env->getsetArrayMgr()=aid;
859         env->getsetArraySrcPe()=CkMyPe();
860         env->setEpIdx(ep);
861         env->getsetArrayHops()=0;
862 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
863         criticalPath_send(env);
864         automaticallySetMessagePriority(env);
865 #endif
866 }
867
868 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
869 {
870 #if CMK_ERROR_CHECKING
871         //Check our array index for validity
872         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
873         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
874                 CkAbort("Array index length (nInts) is too long-- did you "
875                         "use bytes instead of integers?\n");
876 #endif
877         msg_prepareSend(msg,ep,ckGetArrayID());
878         msg->array_index()=_idx;//Insert array index
879         if (ckIsDelegated()) //Just call our delegateMgr
880           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
881         else 
882         { //Usual case: a direct send
883           CkArray *localbranch = ckLocalBranch();
884           if (localbranch == NULL) {             // array not created yet
885             CkArrayManagerDeliver(CkMyPe(), msg, 0);
886           }
887           else {
888             if (opts & CK_MSG_INLINE)
889               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
890             else
891               localbranch->deliver(msg, CkDeliver_queue, opts);
892           }
893         }
894 }
895
896 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
897 {
898         CkFutureID f=CkCreateAttachedFuture(msg);
899         ckSend(msg,ep);
900         return CkWaitReleaseFuture(f);
901 }
902
903 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
904 {
905         if (ckIsDelegated()) //Just call our delegateMgr
906           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
907         else {
908           // send through all
909           for (int k=0; k<_nsid; ++k) {
910             for (int i=0; i< _sid[k]._nElems-1; i++) {
911               CProxyElement_ArrayBase ap(_sid[k]._cookie.aid, _sid[k]._elems[i]);
912               void *newMsg=CkCopyMsg((void **)&msg);
913               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
914             }
915             if (_sid[k]._nElems > 0) {
916               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
917               CProxyElement_ArrayBase ap(_sid[k]._cookie.aid, _sid[k]._elems[_sid[k]._nElems-1]);
918               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
919             }
920           }
921         }
922 }
923
924 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
925 {
926   CkArrayMessage *m=(CkArrayMessage *)msg;
927   m->array_index()=idx;
928   msg_prepareSend(m,entryIndex,aID);
929   CkArray *a=(CkArray *)_localBranch(aID);
930   if (a == NULL)
931     CkArrayManagerDeliver(CkMyPe(), msg, 0);
932   else
933     a->deliver(m,CkDeliver_queue,opts);
934 }
935
936 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
937 {
938   CkArrayMessage *m=(CkArrayMessage *)msg;
939   m->array_index()=idx;
940   msg_prepareSend(m,entryIndex,aID);
941   CkArray *a=(CkArray *)_localBranch(aID);
942   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
943   a->deliver(m,CkDeliver_inline,opts);
944   if (oldStatus) CkEnableTracing(entryIndex);
945 }
946
947
948 /*********************** CkArray Reduction *******************/
949 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
950   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
951    mgrID(mgrID_)
952 {
953   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
954 }
955 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
956   :CkArrayListener(m)
957 {
958   mgr=NULL;
959 }
960 void CkArrayReducer::pup(PUP::er &p) {
961   CkArrayListener::pup(p);
962   p|mgrID;
963   if (p.isUnpacking())
964     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
965 }
966 CkArrayReducer::~CkArrayReducer() {}
967
968 /*********************** CkArray Broadcast ******************/
969
970 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_)
971     :CkArrayListener(1), //Each array element carries a broadcast number
972      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_)
973 { }
974 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
975     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1)
976 { }
977
978 void CkArrayBroadcaster::pup(PUP::er &p) {
979   CkArrayListener::pup(p);
980   /* Assumption: no migrants during checkpoint, so no need to
981      save old broadcasts. */
982   p|bcastNo;
983   p|stableLocations;
984   if (p.isUnpacking()) {
985     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
986   }
987 }
988
989 CkArrayBroadcaster::~CkArrayBroadcaster()
990 {
991   CkArrayMessage *msg;
992   while (NULL!=(msg=oldBcasts.deq())) delete msg;
993 }
994
995 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
996 {
997   bcastNo++;
998   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
999
1000   if (stableLocations)
1001     return;
1002
1003   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1004   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1005 }
1006
1007 /// Deliver a copy of the given broadcast to the given local element
1008 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1009                                     CmiBool doFree)
1010 {
1011   int &elBcastNo=getData(el);
1012   // if this array element already received this message, skip it
1013   if (elBcastNo >= bcastNo) return CmiFalse;
1014   elBcastNo++;
1015   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1016   int epIdx=bcast->array_ep_bcast();
1017
1018 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1019   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1020   return CmiTrue;
1021 #else
1022   return el->ckInvokeEntry(epIdx, bcast, doFree);
1023 #endif
1024 }
1025
1026 /// Deliver all needed broadcasts to the given local element
1027 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1028 {
1029   if (stableLocations) return CmiTrue;
1030   int &elBcastNo=getData(el);
1031   if (elBcastNo<bcastNo)
1032   {//This element needs some broadcasts-- it must have
1033    //been migrating during the broadcast.
1034     int i,nDeliver=bcastNo-elBcastNo;
1035     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1036
1037     //Skip the old junk at the front of the bcast queue
1038     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1039       oldBcasts.enq(oldBcasts.deq());
1040
1041     //Deliver the newest messages, in old-to-new order
1042     for (i=nDeliver-1;i>=0;i--)
1043     {
1044       CkArrayMessage *msg=oldBcasts.deq();
1045                 if(msg == NULL)
1046                 continue;
1047       oldBcasts.enq(msg);
1048       if (!deliver(msg, el, CmiFalse))
1049         return CmiFalse; //Element migrated away
1050     }
1051   }
1052   //Otherwise, the element survived
1053   return CmiTrue;
1054 }
1055
1056
1057 void CkArrayBroadcaster::springCleaning(void)
1058 {
1059   //Remove old broadcast messages
1060   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1061   if (nDelete>0) {
1062     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1063     for (int i=0;i<nDelete;i++)
1064       delete oldBcasts.deq();
1065   }
1066   oldBcastNo=bcastNo;
1067 }
1068
1069 void CkArrayBroadcaster::flushState() 
1070
1071   bcastNo = oldBcastNo = 0; 
1072   CkArrayMessage *msg;
1073   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1074 }
1075
1076 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1077 {
1078         CProxy_ArrayBase ap(aID);
1079         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1080 }
1081
1082 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1083 {
1084         msg->array_ep_bcast()=ep;
1085         if (ckIsDelegated()) //Just call our delegateMgr
1086           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1087         else 
1088         { //Broadcast message via serializer node
1089           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1090           int skipsched = opts & CK_MSG_EXPEDITED; 
1091           //int serializer=0;//1623802937%CkNumPes();
1092 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1093                 CProxy_CkArray ap(_aid);
1094                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1095                 CkGroupID _id = _aid;
1096 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1097 #else
1098           if (CkMyPe()==CpvAccess(serializer))
1099           {
1100                 DEBB((AA"Sending array broadcast\n"AB));
1101                 if (skipsched)
1102                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1103                 else
1104                         CProxy_CkArray(_aid).recvBroadcast(msg);
1105           } else {
1106                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1107                 CProxy_CkArray ap(_aid);
1108                 if (skipsched)
1109                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1110                 else
1111                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1112           }
1113 #endif
1114         }
1115 }
1116
1117 /// Reflect a broadcast off this Pe:
1118 void CkArray::sendBroadcast(CkMessage *msg)
1119 {
1120         CK_MAGICNUMBER_CHECK
1121         if(CkMyPe() == CpvAccess(serializer)){
1122                 //Broadcast the message to all processors
1123                 thisProxy.recvBroadcast(msg);
1124         }else{
1125                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1126         }
1127 }
1128 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1129 {
1130         CK_MAGICNUMBER_CHECK
1131         //Broadcast the message to all processors
1132         thisProxy.recvExpeditedBroadcast(msg);
1133 }
1134
1135 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1136 int _tempBroadcastCount=0;
1137
1138 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1139     if(homePe(*index)==CmiMyPe()){
1140         CkArrayMessage *bcast = (CkArrayMessage *)data;
1141     int epIdx=bcast->array_ep_bcast();
1142         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1143         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1144         envelope *env = UsrToEnv(copy);
1145         env->sender.data.group.onPE = CkMyPe();
1146         env->TN  = env->SN=0;
1147         env->piggyBcastIdx = epIdx;
1148         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1149         env->getsetArrayMgr() = thisgroup;
1150         env->getsetArrayIndex() = *index;
1151     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1152         env->setSrcPe(CkMyPe());
1153         rec->deliver(copy,CkDeliver_queue);
1154         _tempBroadcastCount++;
1155     }else{
1156         if(locMgr->homeElementCount != -1){
1157             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1158         }
1159     }
1160 }
1161
1162 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1163     arr->broadcastHomeElements(data,rec,index);
1164 }
1165 #endif
1166
1167
1168 /// Increment broadcast count; deliver to all local elements
1169 void CkArray::recvBroadcast(CkMessage *m)
1170 {
1171         CK_MAGICNUMBER_CHECK
1172         CkArrayMessage *msg=(CkArrayMessage *)m;
1173         broadcaster->incoming(msg);
1174
1175 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1176         _tempBroadcastCount=0;
1177         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1178 #else
1179         //Run through the list of local elements
1180         int idx=0, len=0;
1181         if (stableLocations) {            /* remove all NULLs in the array */
1182           len = 0;
1183           while (elements->next(idx)!=NULL) len++;
1184           idx = 0;
1185         }
1186         ArrayElement *el;
1187 #if CMK_BLUEGENE_CHARM
1188         void *root;
1189         _TRACE_BG_TLINE_END(&root);
1190         BgSetEntryName("start-broadcast", &root);
1191         CkVec<void *> logs;    // store all logs for each delivery
1192         extern void stopVTimer();
1193         extern void startVTimer();
1194 #endif
1195         while (NULL!=(el=elements->next(idx))) {
1196 #if CMK_BLUEGENE_CHARM
1197                 //BgEntrySplit("split-broadcast");
1198                 stopVTimer();
1199                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1200                 logs.push_back(curlog);
1201                 startVTimer();
1202 #endif
1203                 CmiBool doFree = CmiFalse;
1204                 if (stableLocations && idx == len) doFree = CmiTrue;
1205                 broadcaster->deliver(msg, el, doFree);
1206         }
1207 #endif
1208
1209 #if CMK_BLUEGENE_CHARM
1210         //BgEntrySplit("end-broadcast");
1211         stopVTimer();
1212         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1213         startVTimer();
1214 #endif
1215
1216         // CkArrayBroadcaster doesn't have msg buffered, and there was
1217         // no last delivery to transfer ownership
1218         if (stableLocations && len == 0)
1219           delete msg;
1220 }
1221
1222 #include "CkArray.def.h"
1223
1224