add a command line option for staticInsertion
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isStaticInsertion;
65 bool _isNotifyChildInRed;
66
67 #define ARRAY_DEBUG_OUTPUT 0
68
69 #if ARRAY_DEBUG_OUTPUT 
70 #   define DEB(x) CkPrintf x  //General debug messages
71 #   define DEBI(x) CkPrintf x  //Index debug messages
72 #   define DEBC(x) CkPrintf x  //Construction debug messages
73 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
74 #   define DEBM(x) CkPrintf x  //Migration debug messages
75 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
76 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
77 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
78 #   define AA "ArrayBOC on %d: "
79 #   define AB ,CkMyPe()
80 #   define DEBUG(x) x
81 #else
82 #   define DEB(X) /*CkPrintf x*/
83 #   define DEBI(X) /*CkPrintf x*/
84 #   define DEBC(X) /*CkPrintf x*/
85 #   define DEBS(x) /*CkPrintf x*/
86 #   define DEBM(X) /*CkPrintf x*/
87 #   define DEBL(X) /*CkPrintf x*/
88 #   define DEBK(x) /*CkPrintf x*/
89 #   define DEBB(x) /*CkPrintf x*/
90 #   define str(x) /**/
91 #   define DEBUG(x)
92 #endif
93
94 ///This arrayListener is in charge of delivering broadcasts to the array.
95 class CkArrayBroadcaster : public CkArrayListener {
96   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
97 public:
98   CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
99   CkArrayBroadcaster(CkMigrateMessage *m);
100   virtual void pup(PUP::er &p);
101   virtual ~CkArrayBroadcaster();
102   PUPable_decl(CkArrayBroadcaster);
103
104   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
105
106   ///Element was just created on this processor
107   /// Return false if the element migrated away or deleted itself.
108   virtual CmiBool ckElementCreated(ArrayElement *elt)
109     { return bringUpToDate(elt); }
110
111   ///Element just arrived on this processor (so just called pup)
112   /// Return false if the element migrated away or deleted itself.
113   virtual CmiBool ckElementArriving(ArrayElement *elt)
114     { return bringUpToDate(elt); }
115
116   void incoming(CkArrayMessage *msg);
117
118   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
119
120   void springCleaning(void);
121
122   void flushState();
123 private:
124   int bcastNo;//Number of broadcasts received (also serial number)
125   int oldBcastNo;//Above value last spring cleaning
126   //This queue stores old broadcasts (in case a migrant arrives
127   // and needs to be brought up to date)
128   CkQ<CkArrayMessage *> oldBcasts;
129   bool stableLocations;
130   bool broadcastViaScheduler;
131
132   CmiBool bringUpToDate(ArrayElement *el);
133 };
134
135 ///This arrayListener is in charge of performing reductions on the array.
136 class CkArrayReducer : public CkArrayListener {
137   CkGroupID mgrID;
138   CkReductionMgr *mgr;
139   typedef  contributorInfo *I;
140   inline contributorInfo *getData(ArrayElement *el)
141     {return (I)ckGetData(el);}
142 public:
143   /// Attach this array to this CkReductionMgr
144   CkArrayReducer(CkGroupID mgrID_);
145   CkArrayReducer(CkMigrateMessage *m);
146   virtual void pup(PUP::er &p);
147   virtual ~CkArrayReducer();
148   PUPable_decl(CkArrayReducer);
149
150   void ckBeginInserting(void) {mgr->creatingContributors();}
151   void ckEndInserting(void) {mgr->doneCreatingContributors();}
152
153   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
154
155   void ckElementCreating(ArrayElement *elt)
156     {mgr->contributorCreated(getData(elt));}
157   void ckElementDied(ArrayElement *elt)
158     {mgr->contributorDied(getData(elt));}
159
160   void ckElementLeaving(ArrayElement *elt)
161     {mgr->contributorLeaving(getData(elt));}
162   CmiBool ckElementArriving(ArrayElement *elt)
163     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
164 };
165
166 /*
167 void 
168 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
169        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
170 }
171 */
172 void
173 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
174         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
175         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
176 }
177
178 /*********************** CkVerboseListener ******************/
179 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
180
181 CkVerboseListener::CkVerboseListener(void)
182   :CkArrayListener(0)
183 {
184   VL_PRINT<<"INIT  Creating listener"<<endl;
185 }
186
187 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
188 {
189   CkArrayListener::ckRegister(arrMgr,dataOffset_);
190   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
191 }
192 void CkVerboseListener::ckBeginInserting(void)
193 {
194   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
195 }
196 void CkVerboseListener::ckEndInserting(void)
197 {
198   VL_PRINT<<"INIT  Done inserting elements"<<endl;
199 }
200
201 void CkVerboseListener::ckElementStamp(int *eltInfo)
202 {
203   VL_PRINT<<"LIFE  Stamping element"<<endl;
204 }
205 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
206 {
207   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
208 }
209 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
210 {
211   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
212   return CmiTrue;
213 }
214 void CkVerboseListener::ckElementDied(ArrayElement *elt)
215 {
216   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
217 }
218
219 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
220 {
221   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
222 }
223 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
224 {
225   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
226   return CmiTrue;
227 }
228
229
230 /************************* ArrayElement *******************/
231 class ArrayElement_initInfo {
232 public:
233   CkArray *thisArray;
234   CkArrayID thisArrayID;
235   CkArrayIndex numInitial;
236   int listenerData[CK_ARRAYLISTENER_MAXLEN];
237   CmiBool fromMigration;
238 };
239
240 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
241
242 void ArrayElement::initBasics(void)
243 {
244 #if CMK_OUT_OF_CORE
245   if (CkpvAccess(CkSaveRestorePrefetch)) 
246     return; /* Just restoring from disk--don't try to set up anything. */
247 #endif
248 #if CMK_GRID_QUEUE_AVAILABLE
249         grid_queue_interval = 0;
250         grid_queue_threshold = 0;
251         msg_count = 0;
252         msg_count_grid = 0;
253         border_flag = 0;
254
255         grid_queue_interval = CmiGridQueueGetInterval ();
256         grid_queue_threshold = CmiGridQueueGetThreshold ();
257 #endif
258   ArrayElement_initInfo &info=CkpvAccess(initInfo);
259   thisArray=info.thisArray;
260   thisArrayID=info.thisArrayID;
261   numInitialElements=info.numInitial.getCombinedCount();
262   if (info.listenerData) {
263     memcpy(listenerData,info.listenerData,sizeof(listenerData));
264   }
265   if (!info.fromMigration) {
266     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
267                           l->ckElementCreating(this));
268   }
269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
270         mlogData->objID.type = TypeArray;
271         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
272 #endif
273 #ifdef _PIPELINED_ALLREDUCE_
274         allredMgr = NULL;
275 #endif
276 }
277
278 ArrayElement::ArrayElement(void) 
279 {
280         initBasics();
281 #if CMK_MEM_CHECKPOINT
282         init_checkpt();
283 #endif
284 }
285
286 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
287 {
288         initBasics();
289 }
290
291 //Called by the system just before and after migration to another processor:  
292 void ArrayElement::ckAboutToMigrate(void) {
293         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
294                                 l->ckElementLeaving(this));
295         CkMigratable::ckAboutToMigrate();
296 }
297 void ArrayElement::ckJustMigrated(void) {
298         CkMigratable::ckJustMigrated();
299         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
300               if (!l->ckElementArriving(this)) return;);
301 }
302
303 void ArrayElement::ckJustRestored(void) {
304     CkMigratable::ckJustRestored();
305     //empty for out-of-core emulation
306 }
307
308 #ifdef _PIPELINED_ALLREDUCE_
309 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
310                                         CMK_REFNUM_TYPE userFlag)
311 {
312         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
313         msg->setUserFlag(userFlag);
314         msg->setMigratableContributor(true);
315         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
316 }
317 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
318                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
319 {
320         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
321         msg->setUserFlag(userFlag);
322         msg->setCallback(cb);
323         msg->setMigratableContributor(true);
324         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
325 }
326 void ArrayElement::contribute2(CkReductionMsg *msg) 
327 {
328         msg->setMigratableContributor(true);
329         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
330 }
331 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
332 {
333         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
334     msg->setUserFlag(userFlag);
335     msg->setCallback(cb);
336     msg->setMigratableContributor(true);
337     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
338 }
339 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
340 {
341     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
342     msg->setUserFlag(userFlag);
343     msg->setMigratableContributor(true);
344     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
345 }
346
347 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
348                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
349 {
350         // if it is a broadcast to myself and size is large
351         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
352         {
353                 if (!allredMgr) {
354                         allredMgr = new AllreduceMgr();
355                 }
356                 // number of fragments
357                 int fragNo = dataSize/FRAG_SIZE;
358                 int size = FRAG_SIZE;
359                 // for each fragment
360                 for (int i=0; i<fragNo; i++) {
361                         // callback to defragmentor
362                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
363                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
364                                 size = dataSize%FRAG_SIZE;
365                         }
366                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
367                         // initialize the new msg
368                         msg->reducer            = type;
369                         msg->nFrags             = fragNo;
370                         msg->fragNo             = i;
371                         msg->callback           = defrag_cb;
372                         msg->userFlag           = userFlag;
373                         allredMgr->cb           = cb;
374                         allredMgr->cb.type      = CkCallback::sendArray;
375                         allredMgr->cb.d.array.idx = myIndex;
376                         contribute2(msg);
377                 }
378                 return;
379         }
380         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
381         msg->setUserFlag(userFlag);
382         msg->setCallback(cb);
383         msg->setMigratableContributor(true);
384         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
385 }
386
387
388 #else
389 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
390    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
391 #endif
392 // _PIPELINED_ALLREDUCE_
393 void ArrayElement::defrag(CkReductionMsg *msg)
394 {
395 //      CkPrintf("in defrag\n");
396 #ifdef _PIPELINED_ALLREDUCE_
397         allredMgr->allreduce_recieve(msg);
398 #endif
399 }
400
401 /// Remote method: calls destructor
402 void ArrayElement::ckDestroy(void)
403 {
404         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
405             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
406                            l->ckElementDied(this));
407         }
408         CkMigratable::ckDestroy();
409 }
410
411 //Destructor (virtual)
412 ArrayElement::~ArrayElement()
413 {
414 #if CMK_OUT_OF_CORE
415   if (CkpvAccess(CkSaveRestorePrefetch)) 
416     return; /* Just saving to disk--don't trash anything. */
417 #endif
418   //To detect use-after-delete: 
419   thisArray=(CkArray *)0xDEADa7a1;
420 }
421
422 void ArrayElement::pup(PUP::er &p)
423 {
424   DEBM((AA"  ArrayElement::pup()\n"AB));
425   CkMigratable::pup(p);
426   thisArrayID.pup(p);
427   if (p.isUnpacking())
428         thisArray=thisArrayID.ckLocalBranch();
429   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
430 #if CMK_MEM_CHECKPOINT
431   p(budPEs, 2);
432 #endif
433   p.syncComment(PUP::sync_last_system,"ArrayElement");
434 #if CMK_GRID_QUEUE_AVAILABLE
435   p|grid_queue_interval;
436   p|grid_queue_threshold;
437   p|msg_count;
438   p|msg_count_grid;
439   p|border_flag;
440   if (p.isUnpacking ()) {
441     msg_count = 0;
442     msg_count_grid = 0;
443     border_flag = 0;
444   }
445 #endif
446 }
447
448 char *ArrayElement::ckDebugChareName(void) {
449         char buf[200];
450         const char *className=_chareTable[ckGetChareType()]->name;
451         const int *d=thisIndexMax.data();
452         const short int *s=(const short int*)d;
453         switch (thisIndexMax.dimension) {
454         case 0: sprintf(buf,"%s",className); break;
455         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
456         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
457         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
458     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
459     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
460     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
461         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
462         };
463         return strdup(buf);
464 }
465
466 int ArrayElement::ckDebugChareID(char *str, int limit) {
467   if (limit<21) return -1;
468   str[0] = 2;
469   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
470   *((CkArrayIndex*)&str[5]) = thisIndexMax;
471   return 21;
472 }
473
474 /// A more verbose form of abort
475 void ArrayElement::CkAbort(const char *str) const
476 {
477         CkError("[%d] Array element at index %s aborting:\n",
478                 CkMyPe(), idx2str(thisIndexMax));
479         CkMigratable::CkAbort(str);
480 }
481
482 void ArrayElement::recvBroadcast(CkMessage *m){
483 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
484         CkArrayMessage *bcast = (CkArrayMessage *)m;
485     envelope *env = UsrToEnv(m);
486         int epIdx= env->piggyBcastIdx;
487     ckInvokeEntry(epIdx,bcast,CmiTrue);
488 #endif
489 }
490
491 /*********************** Spring Cleaning *****************
492 Periodically (every minute or so) remove expired broadcasts
493 from the queue.
494
495 This does not get called for arrays with stable locations (all
496 insertions done at creation, migration only at discrete points).
497 */
498
499 inline void CkArray::springCleaning(void)
500 {
501   DEBK((AA"Starting spring cleaning\n"AB));
502   broadcaster->springCleaning();
503 }
504
505 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
506         ((CkArray *)forArray)->springCleaning();
507 }
508
509 /********************* Little CkArray Utilities ******************/
510
511 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
512         :CProxy(), _aid(e->ckGetArrayID())
513         {}
514 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
515         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
516         {}
517
518 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
519         {return ckLocalBranch()->getLocMgr(); }
520
521 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
522
523 CkArrayOptions::CkArrayOptions(void) //Default: empty array
524         :numInitial(),map(_defaultArrayMapID)
525 {
526     init();
527 }
528
529 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
530         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
531 {
532     init();
533 }
534
535 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
536         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
537 {
538     init();
539 }
540
541 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
542         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
543 {
544     init();
545 }
546
547 void CkArrayOptions::init()
548 {
549     locMgr.setZero();
550     anytimeMigration = _isAnytimeMigration;
551     staticInsertion = _isStaticInsertion;
552     reductionClient.type = CkCallback::invalid;
553     disableNotifyChildInRed = !_isNotifyChildInRed;
554     broadcastViaScheduler = false;
555 }
556
557 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
558 {
559     staticInsertion = b;
560     if (b && map == _defaultArrayMapID)
561         map = _fastArrayMapID;
562     return *this;
563 }
564
565 /// Bind our elements to this array
566 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
567 {
568         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
569         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
570         // not just initially.
571         //setNumInitial(arr->getNumInitial());
572         return setLocationManager(arr->getLocMgr()->getGroupID());
573 }
574 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
575 {
576         arrayListeners.push_back(listener);
577         return *this;
578 }
579
580 void CkArrayOptions::pup(PUP::er &p) {
581         p|numInitial;
582         p|map;
583         p|locMgr;
584         p|arrayListeners;
585         p|reductionClient;
586         p|anytimeMigration;
587         p|disableNotifyChildInRed;
588         p|staticInsertion;
589         p|broadcastViaScheduler;
590 }
591
592 CkArrayListener::CkArrayListener(int nInts_) 
593   :nInts(nInts_) 
594 {
595   dataOffset=-1;
596 }
597 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
598   nInts=-1; dataOffset=-1;
599 }
600 void CkArrayListener::pup(PUP::er &p) {
601   p|nInts;
602   p|dataOffset;
603 }
604
605 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
606 {
607   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
608   dataOffset=dataOffset_;
609 }
610
611 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
612                                           const CkArrayOptions &opts_)
613 {
614   CkArrayOptions opts(opts_);
615   CkGroupID locMgr = opts.getLocationManager();
616   if (locMgr.isZero())
617   { //Create a new location manager
618 #if !CMK_LBDB_ON
619     CkGroupID _lbdb;
620 #endif
621     CkEntryOptions  e_opts;
622     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
623     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
624     opts.setLocationManager(locMgr);
625   }
626   //Create the array manager
627   m->array_ep()=ctor;
628   CkMarshalledMessage marsh(m);
629   CkEntryOptions  e_opts;
630   e_opts.setGroupDepID(locMgr);       // group creation dependence
631 #if !GROUP_LEVEL_REDUCTION
632   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
633   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
634   nodereductionProxy.setAttachedGroup(ag);
635 #else
636   CkNodeGroupID dummyid;
637   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
638 #endif
639   return (CkArrayID)ag;
640 }
641
642 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
643 {
644   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
645 }
646
647 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
648         const CkArrayIndex &idx)
649 {
650   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
651   m->array_ep()=ctor;
652   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
653   if (ckIsDelegated()) {
654         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
655         return;
656   }
657   
658   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
659   CkArrayManagerInsert(onPe,m,_aid);
660 }
661
662 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
663 {
664   ckInsertIdx(m,ctorIndex,onPe,_idx);
665 }
666
667 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
668 {
669   return ckLocalBranch()->lookup(_idx);
670 }
671
672 //pack-unpack method for CProxy_ArrayBase
673 void CProxy_ArrayBase::pup(PUP::er &p)
674 {
675   CProxy::pup(p);
676   _aid.pup(p);
677 }
678 void CProxyElement_ArrayBase::pup(PUP::er &p)
679 {
680   CProxy_ArrayBase::pup(p);
681   p|_idx.nInts;
682   p|_idx.dimension;
683   p(_idx.data(),_idx.nInts);
684 }
685
686 void CProxySection_ArrayBase::pup(PUP::er &p)
687 {
688   CProxy_ArrayBase::pup(p);
689   p | _nsid;
690   if (p.isUnpacking()) {
691     if (_nsid == 1) _sid = new CkSectionID;
692     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
693     else _sid = NULL;
694   }
695   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
696 }
697
698 /*********************** CkArray Creation *************************/
699 void _ckArrayInit(void)
700 {
701   CkpvInitialize(ArrayElement_initInfo,initInfo);
702   CkDisableTracing(CkIndex_CkArray::insertElement(0));
703   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
704     // disable because broadcast listener may deliver broadcast message
705   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
706   // by default anytime migration is allowed
707 }
708
709 CkArray::CkArray(CkArrayOptions &opts,
710                  CkMarshalledMessage &initMsg,
711                  CkNodeGroupID nodereductionID)
712   : CkReductionMgr(),
713     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
714     locMgrID(opts.getLocationManager()),
715     thisProxy(thisgroup),
716     // Register with our location manager
717     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
718     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
719     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
720 {
721   if (!stableLocations)
722       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
723                              staticSpringCleaning, (void *)this);
724   
725   //set the field in one my parent class (CkReductionMgr)
726   if(opts.disableNotifyChildInRed)
727           disableNotifyChildrenStart = CmiTrue; 
728   
729   //Find, register, and initialize the arrayListeners
730   listenerDataOffset=0;
731   broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
732   addListener(broadcaster);
733   reducer=new CkArrayReducer(thisgroup);
734   addListener(reducer);
735
736   // COMLIB HACK
737   //calistener = new ComlibArrayListener();
738   //addListener(calistener,dataOffset);
739
740   int lNo,nL=opts.getListeners(); //User-added listeners
741   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
742
743   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
744
745   ///Set up initial elements (if any)
746   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
747
748   ///adding code for Reduction using nodegroups
749
750 #if !GROUP_LEVEL_REDUCTION
751   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
752   nodeProxy = nodetemp;
753   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
754 #endif
755
756   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
757       ckSetReductionClient(&opts.reductionClient);
758 }
759
760 CkArray::CkArray(CkMigrateMessage *m)
761         :CkReductionMgr(m), thisProxy(thisgroup)
762 {
763   locMgr=NULL;
764   isInserting=CmiTrue;
765 }
766
767 #if CMK_ERROR_CHECKING
768 inline void testPup(PUP::er &p,int shouldBe) {
769   int a=shouldBe;
770   p|a;
771   if (a!=shouldBe)
772     CkAbort("PUP direction mismatch!");
773 }
774 #else
775 inline void testPup(PUP::er &p,int shouldBe) {}
776 #endif
777
778 void CkArray::pup(PUP::er &p){
779         CkReductionMgr::pup(p);
780         p|numInitial;
781         p|locMgrID;
782         p|listeners;
783         p|listenerDataOffset;
784         p|stableLocations;
785         testPup(p,1234);
786         if(p.isUnpacking()){
787                 thisProxy=thisgroup;
788                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
789                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
790                 /// Restore our default listeners:
791                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
792                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
793                 /// set up broadcast cleaner
794                 if (!stableLocations)
795                     CcdCallOnConditionKeep(CcdPERIODIC_1minute,
796                                            staticSpringCleaning, (void *)this);
797         }
798 }
799
800 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
801   int dataOffset=0; \
802   for (int lNo=0;lNo<listeners.size();lNo++) { \
803     CkArrayListener *l=listeners[lNo]; \
804     l->ckElementStamp(&listenerData[dataOffset]); \
805     dataOffset+=l->ckGetLen(); \
806   } \
807 } while (0)
808
809 //Called on send side to prepare array constructor message
810 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
811 {
812   envelope *env=UsrToEnv((void *)m);
813   env->getsetArrayIndex()=idx;
814   int *listenerData=env->getsetArrayListenerData();
815   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
816   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
817   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
818         getLocMgr()->inform(idx,onPe);
819 }
820
821 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
822                         CkElementCreation_t type)
823 {
824         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
825         if (type==CkElementCreation_resume) 
826         { // HACK: Re-stamp elements on checkpoint resume--
827           //  this restores, e.g., reduction manager's gcount
828                 int *listenerData=ret->listenerData;
829                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
830         }
831         return ret;
832 }
833
834 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
835                      CkMessage *msg,CmiBool fromMigration) 
836 {
837         //Stash the element's initialization information in the global "initInfo"
838         ArrayElement_initInfo &init=CkpvAccess(initInfo);
839         init.numInitial=numInitial;
840         init.thisArray=this;
841         init.thisArrayID=thisgroup;
842         if (msg) /*Have to *copy* data because msg will be deleted*/
843           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
844                  sizeof(init.listenerData));
845         init.fromMigration=fromMigration;
846         
847         //Build the element
848         int elSize=_chareTable[elChareType]->size;
849         ArrayElement *elem = (ArrayElement *)malloc(elSize);
850 #ifndef CMK_OPTIMIZE
851         if (elem!=NULL) setMemoryTypeChare(elem);
852 #endif
853         return elem;
854 }
855
856 /// This method is called by ck.C or the user to add an element.
857 CmiBool CkArray::insertElement(CkMessage *me)
858 {
859   CK_MAGICNUMBER_CHECK
860   CkArrayMessage *m=(CkArrayMessage *)me;
861   const CkArrayIndex &idx=m->array_index();
862   int onPe;
863   if (locMgr->isRemote(idx,&onPe)) 
864   { /* element's sibling lives somewhere else, so insert there */
865         CkArrayManagerInsert(onPe,me,thisgroup);
866         return CmiFalse;
867   }
868   int ctorIdx=m->array_ep();
869   int chareType=_entryTable[ctorIdx]->chareIdx;
870   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
871 #ifndef CMK_CHARE_USE_PTR
872   ((Chare *)elt)->chareIdx = -1;
873 #endif
874   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
875   CK_ARRAYLISTENER_LOOP(listeners,
876       if (!l->ckElementCreated(elt)) return CmiFalse;);
877   return CmiTrue;
878 }
879
880 void CProxy_ArrayBase::doneInserting(void)
881 {
882   DEBC((AA"Broadcasting a doneInserting request\n"AB));
883   //Broadcast a DoneInserting
884   CProxy_CkArray(_aid).remoteDoneInserting();
885 }
886
887 void CkArray::doneInserting(void)
888 {
889   thisProxy[CkMyPe()].remoteDoneInserting();
890 }
891
892 /// This is called on every processor after the last array insertion.
893 void CkArray::remoteDoneInserting(void)
894 {
895   CK_MAGICNUMBER_CHECK
896   if (isInserting) {
897     isInserting=CmiFalse;
898     DEBC((AA"Done inserting objects\n"AB));
899     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
900     locMgr->doneInserting();
901   }
902 }
903
904 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
905         int onPe,int ctor,CkDeliver_t type)
906 {
907         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
908         prepareCtorMsg(m,onPe,idx);
909         m->array_ep()=ctor;
910         
911         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
912                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
913                 CkArrayManagerInsert(onPe,m,thisgroup);
914         } else /* local message, non-queued */ {
915                 //Call local constructor directly
916                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
917                 return insertElement(m);
918         }
919         return CmiTrue;
920 }
921
922 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
923 {
924         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
925         if (local) {
926           int onPe=CkMyPe();
927           prepareCtorMsg(m,onPe,idx);
928 #if CMK_BIGSIM_CHARM
929           BgEntrySplit("split-array-new");
930 #endif
931           insertElement(m);
932         }
933         else {
934           int onPe=-1;
935           prepareCtorMsg(m,onPe,idx);
936           CkArrayManagerInsert(onPe,m,getGroupID());
937         }
938 }
939
940 /********************* CkArray Messaging ******************/
941 /// Fill out a message's array fields before sending it
942 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
943 {
944         envelope *env=UsrToEnv((void *)msg);
945         env->getsetArrayMgr()=aid;
946         env->getsetArraySrcPe()=CkMyPe();
947         env->setEpIdx(ep);
948         env->getsetArrayHops()=0;
949 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
950         criticalPath_send(env);
951         automaticallySetMessagePriority(env);
952 #endif
953 }
954
955
956 /// Just a non-inlined version of msg_prepareSend()
957 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
958 {
959         envelope *env=UsrToEnv((void *)msg);
960         env->getsetArrayMgr()=aid;
961         env->getsetArraySrcPe()=CkMyPe();
962         env->setEpIdx(ep);
963         env->getsetArrayHops()=0;
964 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
965         criticalPath_send(env);
966         automaticallySetMessagePriority(env);
967 #endif
968 }
969
970 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
971 {
972 #if CMK_ERROR_CHECKING
973         //Check our array index for validity
974         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
975         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
976                 CkAbort("Array index length (nInts) is too long-- did you "
977                         "use bytes instead of integers?\n");
978 #endif
979         msg_prepareSend(msg,ep,ckGetArrayID());
980         msg->array_index()=_idx;//Insert array index
981         if (ckIsDelegated()) //Just call our delegateMgr
982           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
983         else 
984         { //Usual case: a direct send
985           CkArray *localbranch = ckLocalBranch();
986           if (localbranch == NULL) {             // array not created yet
987             CkArrayManagerDeliver(CkMyPe(), msg, 0);
988           }
989           else {
990             if (opts & CK_MSG_INLINE)
991               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
992             else
993               localbranch->deliver(msg, CkDeliver_queue, opts);
994           }
995         }
996 }
997
998 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
999 {
1000         CkFutureID f=CkCreateAttachedFuture(msg);
1001         ckSend(msg,ep);
1002         return CkWaitReleaseFuture(f);
1003 }
1004
1005 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
1006 {
1007         CProxySection_ArrayBase sp(sID);
1008         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1009 }
1010
1011 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1012 {
1013         if (ckIsDelegated()) //Just call our delegateMgr
1014           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1015         else {
1016           // send through all
1017           for (int k=0; k<_nsid; ++k) {
1018             for (int i=0; i< _sid[k]._nElems-1; i++) {
1019               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
1020               void *newMsg=CkCopyMsg((void **)&msg);
1021               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1022             }
1023             if (_sid[k]._nElems > 0) {
1024               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1025               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
1026               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1027             }
1028           }
1029         }
1030 }
1031
1032 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1033 {
1034   CkArrayMessage *m=(CkArrayMessage *)msg;
1035   m->array_index()=idx;
1036   msg_prepareSend(m,entryIndex,aID);
1037   CkArray *a=(CkArray *)_localBranch(aID);
1038   if (a == NULL)
1039     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1040   else
1041     a->deliver(m,CkDeliver_queue,opts);
1042 }
1043
1044 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1045 {
1046   CkArrayMessage *m=(CkArrayMessage *)msg;
1047   m->array_index()=idx;
1048   msg_prepareSend(m,entryIndex,aID);
1049   CkArray *a=(CkArray *)_localBranch(aID);
1050   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1051   a->deliver(m,CkDeliver_inline,opts);
1052   if (oldStatus) CkEnableTracing(entryIndex);
1053 }
1054
1055
1056 /*********************** CkArray Reduction *******************/
1057 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1058   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1059    mgrID(mgrID_)
1060 {
1061   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1062 }
1063 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1064   :CkArrayListener(m)
1065 {
1066   mgr=NULL;
1067 }
1068 void CkArrayReducer::pup(PUP::er &p) {
1069   CkArrayListener::pup(p);
1070   p|mgrID;
1071   if (p.isUnpacking())
1072     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1073 }
1074 CkArrayReducer::~CkArrayReducer() {}
1075
1076 /*********************** CkArray Broadcast ******************/
1077
1078 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
1079     :CkArrayListener(1), //Each array element carries a broadcast number
1080      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
1081 { }
1082
1083 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1084     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
1085 { }
1086
1087 void CkArrayBroadcaster::pup(PUP::er &p) {
1088   CkArrayListener::pup(p);
1089   /* Assumption: no migrants during checkpoint, so no need to
1090      save old broadcasts. */
1091   p|bcastNo;
1092   p|stableLocations;
1093   p|broadcastViaScheduler;
1094   if (p.isUnpacking()) {
1095     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1096   }
1097 }
1098
1099 CkArrayBroadcaster::~CkArrayBroadcaster()
1100 {
1101   CkArrayMessage *msg;
1102   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1103 }
1104
1105 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1106 {
1107   bcastNo++;
1108   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1109
1110   if (stableLocations)
1111     return;
1112
1113   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1114   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1115 }
1116
1117 /// Deliver a copy of the given broadcast to the given local element
1118 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1119                                     CmiBool doFree)
1120 {
1121   int &elBcastNo=getData(el);
1122   // if this array element already received this message, skip it
1123   if (elBcastNo >= bcastNo) return CmiFalse;
1124   elBcastNo++;
1125   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1126   int epIdx=bcast->array_ep_bcast();
1127
1128 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1129   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1130   return CmiTrue;
1131 #else
1132   if (!broadcastViaScheduler)
1133     return el->ckInvokeEntry(epIdx, bcast, doFree);
1134   else {
1135     if (!doFree) {
1136       CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
1137       bcast = newMsg;
1138     }
1139     envelope *env = UsrToEnv(bcast);
1140     env->getsetArrayEp() = epIdx;
1141     env->getsetArrayMgr() = el->ckGetArrayID();
1142     env->getsetArrayIndex() = el->ckGetArrayIndex();
1143     CkArrayManagerDeliver(CkMyPe(), bcast, 0);
1144     return true;
1145   }
1146 #endif
1147 }
1148
1149 /// Deliver all needed broadcasts to the given local element
1150 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1151 {
1152   if (stableLocations) return CmiTrue;
1153   int &elBcastNo=getData(el);
1154   if (elBcastNo<bcastNo)
1155   {//This element needs some broadcasts-- it must have
1156    //been migrating during the broadcast.
1157     int i,nDeliver=bcastNo-elBcastNo;
1158     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1159
1160     //Skip the old junk at the front of the bcast queue
1161     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1162       oldBcasts.enq(oldBcasts.deq());
1163
1164     //Deliver the newest messages, in old-to-new order
1165     for (i=nDeliver-1;i>=0;i--)
1166     {
1167       CkArrayMessage *msg=oldBcasts.deq();
1168                 if(msg == NULL)
1169                 continue;
1170       oldBcasts.enq(msg);
1171       if (!deliver(msg, el, CmiFalse))
1172         return CmiFalse; //Element migrated away
1173     }
1174   }
1175   //Otherwise, the element survived
1176   return CmiTrue;
1177 }
1178
1179
1180 void CkArrayBroadcaster::springCleaning(void)
1181 {
1182   //Remove old broadcast messages
1183   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1184   if (nDelete>0) {
1185     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1186     for (int i=0;i<nDelete;i++)
1187       delete oldBcasts.deq();
1188   }
1189   oldBcastNo=bcastNo;
1190 }
1191
1192 void CkArrayBroadcaster::flushState() 
1193
1194   bcastNo = oldBcastNo = 0; 
1195   CkArrayMessage *msg;
1196   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1197 }
1198
1199 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1200 {
1201         CProxy_ArrayBase ap(aID);
1202         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1203 }
1204
1205 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1206 {
1207         msg->array_ep_bcast()=ep;
1208         if (ckIsDelegated()) //Just call our delegateMgr
1209           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1210         else 
1211         { //Broadcast message via serializer node
1212           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1213           int skipsched = opts & CK_MSG_EXPEDITED; 
1214           //int serializer=0;//1623802937%CkNumPes();
1215 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1216                 CProxy_CkArray ap(_aid);
1217                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1218                 CkGroupID _id = _aid;
1219 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1220 #else
1221           if (CkMyPe()==CpvAccess(serializer))
1222           {
1223                 DEBB((AA"Sending array broadcast\n"AB));
1224                 if (skipsched)
1225                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1226                 else
1227                         CProxy_CkArray(_aid).recvBroadcast(msg);
1228           } else {
1229                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1230                 CProxy_CkArray ap(_aid);
1231                 if (skipsched)
1232                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1233                 else
1234                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1235           }
1236 #endif
1237         }
1238 }
1239
1240 /// Reflect a broadcast off this Pe:
1241 void CkArray::sendBroadcast(CkMessage *msg)
1242 {
1243         CK_MAGICNUMBER_CHECK
1244         if(CkMyPe() == CpvAccess(serializer)){
1245                 //Broadcast the message to all processors
1246                 thisProxy.recvBroadcast(msg);
1247         }else{
1248                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1249         }
1250 }
1251 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1252 {
1253         CK_MAGICNUMBER_CHECK
1254         //Broadcast the message to all processors
1255         thisProxy.recvExpeditedBroadcast(msg);
1256 }
1257
1258 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1259 int _tempBroadcastCount=0;
1260
1261 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1262     if(homePe(*index)==CmiMyPe()){
1263         CkArrayMessage *bcast = (CkArrayMessage *)data;
1264     int epIdx=bcast->array_ep_bcast();
1265         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1266         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1267         envelope *env = UsrToEnv(copy);
1268         env->sender.data.group.onPE = CkMyPe();
1269         env->TN  = env->SN=0;
1270         env->piggyBcastIdx = epIdx;
1271         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1272         env->getsetArrayMgr() = thisgroup;
1273         env->getsetArrayIndex() = *index;
1274     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1275         env->setSrcPe(CkMyPe());
1276         rec->deliver(copy,CkDeliver_queue);
1277         _tempBroadcastCount++;
1278     }else{
1279         if(locMgr->homeElementCount != -1){
1280             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1281         }
1282     }
1283 }
1284
1285 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1286     arr->broadcastHomeElements(data,rec,index);
1287 }
1288 #endif
1289
1290
1291 /// Increment broadcast count; deliver to all local elements
1292 void CkArray::recvBroadcast(CkMessage *m)
1293 {
1294         CK_MAGICNUMBER_CHECK
1295         CkArrayMessage *msg=(CkArrayMessage *)m;
1296         broadcaster->incoming(msg);
1297
1298 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1299         _tempBroadcastCount=0;
1300         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1301 #else
1302         //Run through the list of local elements
1303         int idx=0, len=0;
1304         if (stableLocations) {            /* remove all NULLs in the array */
1305           len = 0;
1306           while (elements->next(idx)!=NULL) len++;
1307           idx = 0;
1308         }
1309         ArrayElement *el;
1310 #if CMK_BIGSIM_CHARM
1311         void *root;
1312         _TRACE_BG_TLINE_END(&root);
1313         BgSetEntryName("start-broadcast", &root);
1314         CkVec<void *> logs;    // store all logs for each delivery
1315         extern void stopVTimer();
1316         extern void startVTimer();
1317 #endif
1318         while (NULL!=(el=elements->next(idx))) {
1319 #if CMK_BIGSIM_CHARM
1320                 //BgEntrySplit("split-broadcast");
1321                 stopVTimer();
1322                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1323                 logs.push_back(curlog);
1324                 startVTimer();
1325 #endif
1326                 CmiBool doFree = CmiFalse;
1327                 if (stableLocations && idx == len) doFree = CmiTrue;
1328                 broadcaster->deliver(msg, el, doFree);
1329         }
1330 #endif
1331
1332 #if CMK_BIGSIM_CHARM
1333         //BgEntrySplit("end-broadcast");
1334         stopVTimer();
1335         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1336         startVTimer();
1337 #endif
1338
1339         // CkArrayBroadcaster doesn't have msg buffered, and there was
1340         // no last delivery to transfer ownership
1341         if (stableLocations && len == 0)
1342           delete msg;
1343 }
1344
1345 #include "CkArray.def.h"
1346
1347