recover from soft failure
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isStaticInsertion;
65 bool _isNotifyChildInRed;
66
67 #define ARRAY_DEBUG_OUTPUT 0
68
69 #if ARRAY_DEBUG_OUTPUT 
70 #   define DEB(x) CkPrintf x  //General debug messages
71 #   define DEBI(x) CkPrintf x  //Index debug messages
72 #   define DEBC(x) CkPrintf x  //Construction debug messages
73 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
74 #   define DEBM(x) CkPrintf x  //Migration debug messages
75 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
76 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
77 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
78 #   define AA "ArrayBOC on %d: "
79 #   define AB ,CkMyPe()
80 #   define DEBUG(x) x
81 #else
82 #   define DEB(X) /*CkPrintf x*/
83 #   define DEBI(X) /*CkPrintf x*/
84 #   define DEBC(X) /*CkPrintf x*/
85 #   define DEBS(x) /*CkPrintf x*/
86 #   define DEBM(X) /*CkPrintf x*/
87 #   define DEBL(X) /*CkPrintf x*/
88 #   define DEBK(x) /*CkPrintf x*/
89 #   define DEBB(x) //CkPrintf x
90 #   define str(x) /**/
91 #   define DEBUG(x)
92 #endif
93
94 ///This arrayListener is in charge of delivering broadcasts to the array.
95 class CkArrayBroadcaster : public CkArrayListener {
96   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
97 public:
98   CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
99   CkArrayBroadcaster(CkMigrateMessage *m);
100   virtual void pup(PUP::er &p);
101   virtual ~CkArrayBroadcaster();
102   PUPable_decl(CkArrayBroadcaster);
103
104   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
105
106   ///Element was just created on this processor
107   /// Return false if the element migrated away or deleted itself.
108   virtual CmiBool ckElementCreated(ArrayElement *elt)
109     { return bringUpToDate(elt); }
110
111   ///Element just arrived on this processor (so just called pup)
112   /// Return false if the element migrated away or deleted itself.
113   virtual CmiBool ckElementArriving(ArrayElement *elt)
114     { return bringUpToDate(elt); }
115
116   void incoming(CkArrayMessage *msg);
117
118   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
119
120   void springCleaning(void);
121
122   void flushState();
123 private:
124   int bcastNo;//Number of broadcasts received (also serial number)
125   int oldBcastNo;//Above value last spring cleaning
126   //This queue stores old broadcasts (in case a migrant arrives
127   // and needs to be brought up to date)
128   CkQ<CkArrayMessage *> oldBcasts;
129   bool stableLocations;
130   bool broadcastViaScheduler;
131
132   CmiBool bringUpToDate(ArrayElement *el);
133 };
134
135 ///This arrayListener is in charge of performing reductions on the array.
136 class CkArrayReducer : public CkArrayListener {
137   CkGroupID mgrID;
138   CkReductionMgr *mgr;
139   typedef  contributorInfo *I;
140   inline contributorInfo *getData(ArrayElement *el)
141     {return (I)ckGetData(el);}
142 public:
143   /// Attach this array to this CkReductionMgr
144   CkArrayReducer(CkGroupID mgrID_);
145   CkArrayReducer(CkMigrateMessage *m);
146   virtual void pup(PUP::er &p);
147   virtual ~CkArrayReducer();
148   PUPable_decl(CkArrayReducer);
149
150   void ckBeginInserting(void) {mgr->creatingContributors();}
151   void ckEndInserting(void) {mgr->doneCreatingContributors();}
152
153   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
154
155   void ckElementCreating(ArrayElement *elt)
156     {mgr->contributorCreated(getData(elt));}
157   void ckElementDied(ArrayElement *elt)
158     {mgr->contributorDied(getData(elt));}
159
160   void ckElementLeaving(ArrayElement *elt)
161     {mgr->contributorLeaving(getData(elt));}
162   CmiBool ckElementArriving(ArrayElement *elt)
163     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
164 };
165
166 /*
167 void 
168 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
169        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
170 }
171 */
172 void
173 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
174         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
175         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
176 }
177
178 /*********************** CkVerboseListener ******************/
179 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
180
181 CkVerboseListener::CkVerboseListener(void)
182   :CkArrayListener(0)
183 {
184   VL_PRINT<<"INIT  Creating listener"<<endl;
185 }
186
187 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
188 {
189   CkArrayListener::ckRegister(arrMgr,dataOffset_);
190   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
191 }
192 void CkVerboseListener::ckBeginInserting(void)
193 {
194   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
195 }
196 void CkVerboseListener::ckEndInserting(void)
197 {
198   VL_PRINT<<"INIT  Done inserting elements"<<endl;
199 }
200
201 void CkVerboseListener::ckElementStamp(int *eltInfo)
202 {
203   VL_PRINT<<"LIFE  Stamping element"<<endl;
204 }
205 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
206 {
207   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
208 }
209 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
210 {
211   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
212   return CmiTrue;
213 }
214 void CkVerboseListener::ckElementDied(ArrayElement *elt)
215 {
216   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
217 }
218
219 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
220 {
221   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
222 }
223 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
224 {
225   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
226   return CmiTrue;
227 }
228
229
230 /************************* ArrayElement *******************/
231 class ArrayElement_initInfo {
232 public:
233   CkArray *thisArray;
234   CkArrayID thisArrayID;
235   CkArrayIndex numInitial;
236   int listenerData[CK_ARRAYLISTENER_MAXLEN];
237   CmiBool fromMigration;
238 };
239
240 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
241
242 void ArrayElement::initBasics(void)
243 {
244 #if CMK_OUT_OF_CORE
245   if (CkpvAccess(CkSaveRestorePrefetch)) 
246     return; /* Just restoring from disk--don't try to set up anything. */
247 #endif
248 #if CMK_GRID_QUEUE_AVAILABLE
249         grid_queue_interval = 0;
250         grid_queue_threshold = 0;
251         msg_count = 0;
252         msg_count_grid = 0;
253         border_flag = 0;
254
255         grid_queue_interval = CmiGridQueueGetInterval ();
256         grid_queue_threshold = CmiGridQueueGetThreshold ();
257 #endif
258   ArrayElement_initInfo &info=CkpvAccess(initInfo);
259   thisArray=info.thisArray;
260   thisArrayID=info.thisArrayID;
261   numInitialElements=info.numInitial.getCombinedCount();
262   if (info.listenerData) {
263     memcpy(listenerData,info.listenerData,sizeof(listenerData));
264   }
265   if (!info.fromMigration) {
266     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
267                           l->ckElementCreating(this));
268   }
269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
270         if(mlogData == NULL)
271                 mlogData = new ChareMlogData();
272         mlogData->objID.type = TypeArray;
273         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
274 #endif
275 #ifdef _PIPELINED_ALLREDUCE_
276         allredMgr = NULL;
277 #endif
278 }
279
280 ArrayElement::ArrayElement(void) 
281 {
282         initBasics();
283 #if CMK_MEM_CHECKPOINT
284         init_checkpt();
285 #endif
286 }
287
288 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
289 {
290         initBasics();
291 }
292
293 //Called by the system just before and after migration to another processor:  
294 void ArrayElement::ckAboutToMigrate(void) {
295         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
296                                 l->ckElementLeaving(this));
297         CkMigratable::ckAboutToMigrate();
298 }
299 void ArrayElement::ckJustMigrated(void) {
300         CkMigratable::ckJustMigrated();
301         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
302               if (!l->ckElementArriving(this)) return;);
303 }
304
305 void ArrayElement::ckJustRestored(void) {
306     CkMigratable::ckJustRestored();
307     //empty for out-of-core emulation
308 }
309
310 #ifdef _PIPELINED_ALLREDUCE_
311 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
312                                         CMK_REFNUM_TYPE userFlag)
313 {
314         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
315         msg->setUserFlag(userFlag);
316         msg->setMigratableContributor(true);
317         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
318 }
319 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
320                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
321 {
322         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
323         msg->setUserFlag(userFlag);
324         msg->setCallback(cb);
325         msg->setMigratableContributor(true);
326         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
327 }
328 void ArrayElement::contribute2(CkReductionMsg *msg) 
329 {
330         msg->setMigratableContributor(true);
331         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
332 }
333 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
334 {
335         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
336     msg->setUserFlag(userFlag);
337     msg->setCallback(cb);
338     msg->setMigratableContributor(true);
339     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
340 }
341 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
342 {
343     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
344     msg->setUserFlag(userFlag);
345     msg->setMigratableContributor(true);
346     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
347 }
348
349 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
350                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
351 {
352         // if it is a broadcast to myself and size is large
353         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
354         {
355                 if (!allredMgr) {
356                         allredMgr = new AllreduceMgr();
357                 }
358                 // number of fragments
359                 int fragNo = dataSize/FRAG_SIZE;
360                 int size = FRAG_SIZE;
361                 // for each fragment
362                 for (int i=0; i<fragNo; i++) {
363                         // callback to defragmentor
364                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
365                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
366                                 size = dataSize%FRAG_SIZE;
367                         }
368                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
369                         // initialize the new msg
370                         msg->reducer            = type;
371                         msg->nFrags             = fragNo;
372                         msg->fragNo             = i;
373                         msg->callback           = defrag_cb;
374                         msg->userFlag           = userFlag;
375                         allredMgr->cb           = cb;
376                         allredMgr->cb.type      = CkCallback::sendArray;
377                         allredMgr->cb.d.array.idx = myIndex;
378                         contribute2(msg);
379                 }
380                 return;
381         }
382         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
383         msg->setUserFlag(userFlag);
384         msg->setCallback(cb);
385         msg->setMigratableContributor(true);
386         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
387 }
388
389
390 #else
391 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
392    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
393 #endif
394 // _PIPELINED_ALLREDUCE_
395 void ArrayElement::defrag(CkReductionMsg *msg)
396 {
397 //      CkPrintf("in defrag\n");
398 #ifdef _PIPELINED_ALLREDUCE_
399         allredMgr->allreduce_recieve(msg);
400 #endif
401 }
402
403 /// Remote method: calls destructor
404 void ArrayElement::ckDestroy(void)
405 {
406         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
407             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
408                            l->ckElementDied(this));
409         }
410         CkMigratable::ckDestroy();
411 }
412
413 //Destructor (virtual)
414 ArrayElement::~ArrayElement()
415 {
416 #if CMK_OUT_OF_CORE
417   if (CkpvAccess(CkSaveRestorePrefetch)) 
418     return; /* Just saving to disk--don't trash anything. */
419 #endif
420   //To detect use-after-delete: 
421   thisArray=(CkArray *)0xDEADa7a1;
422 }
423
424 void ArrayElement::pup(PUP::er &p)
425 {
426   DEBM((AA"  ArrayElement::pup()\n"AB));
427   CkMigratable::pup(p);
428   thisArrayID.pup(p);
429   if (p.isUnpacking())
430         thisArray=thisArrayID.ckLocalBranch();
431   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
432 #if CMK_MEM_CHECKPOINT
433   p(budPEs, 2);
434 #endif
435   p.syncComment(PUP::sync_last_system,"ArrayElement");
436 #if CMK_GRID_QUEUE_AVAILABLE
437   p|grid_queue_interval;
438   p|grid_queue_threshold;
439   p|msg_count;
440   p|msg_count_grid;
441   p|border_flag;
442   if (p.isUnpacking ()) {
443     msg_count = 0;
444     msg_count_grid = 0;
445     border_flag = 0;
446   }
447 #endif
448 }
449
450 char *ArrayElement::ckDebugChareName(void) {
451         char buf[200];
452         const char *className=_chareTable[ckGetChareType()]->name;
453         const int *d=thisIndexMax.data();
454         const short int *s=(const short int*)d;
455         switch (thisIndexMax.dimension) {
456         case 0: sprintf(buf,"%s",className); break;
457         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
458         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
459         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
460     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
461     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
462     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
463         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
464         };
465         return strdup(buf);
466 }
467
468 int ArrayElement::ckDebugChareID(char *str, int limit) {
469   if (limit<21) return -1;
470   str[0] = 2;
471   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
472   *((CkArrayIndex*)&str[5]) = thisIndexMax;
473   return 21;
474 }
475
476 /// A more verbose form of abort
477 void ArrayElement::CkAbort(const char *str) const
478 {
479         CkError("[%d] Array element at index %s aborting:\n",
480                 CkMyPe(), idx2str(thisIndexMax));
481         CkMigratable::CkAbort(str);
482 }
483
484 void ArrayElement::recvBroadcast(CkMessage *m){
485 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
486         CkArrayMessage *bcast = (CkArrayMessage *)m;
487     envelope *env = UsrToEnv(m);
488         int epIdx= env->piggyBcastIdx;
489     ckInvokeEntry(epIdx,bcast,CmiTrue);
490 #endif
491 }
492
493 /*********************** Spring Cleaning *****************
494 Periodically (every minute or so) remove expired broadcasts
495 from the queue.
496
497 This does not get called for arrays with stable locations (all
498 insertions done at creation, migration only at discrete points).
499 */
500
501 inline void CkArray::springCleaning(void)
502 {
503   DEBK((AA"Starting spring cleaning\n"AB));
504   broadcaster->springCleaning();
505 }
506
507 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
508         ((CkArray *)forArray)->springCleaning();
509 }
510
511 /********************* Little CkArray Utilities ******************/
512
513 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
514         :CProxy(), _aid(e->ckGetArrayID())
515         {}
516 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
517         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
518         {}
519
520 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
521         {return ckLocalBranch()->getLocMgr(); }
522
523 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
524
525 CkArrayOptions::CkArrayOptions(void) //Default: empty array
526         :numInitial(),map(_defaultArrayMapID)
527 {
528     init();
529 }
530
531 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
532         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
533 {
534     init();
535 }
536
537 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
538         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
539 {
540     init();
541 }
542
543 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
544         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
545 {
546     init();
547 }
548
549 void CkArrayOptions::init()
550 {
551     locMgr.setZero();
552     anytimeMigration = _isAnytimeMigration;
553     staticInsertion = _isStaticInsertion;
554     reductionClient.type = CkCallback::invalid;
555     disableNotifyChildInRed = !_isNotifyChildInRed;
556     broadcastViaScheduler = false;
557 }
558
559 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
560 {
561     staticInsertion = b;
562     if (b && map == _defaultArrayMapID)
563         map = _fastArrayMapID;
564     return *this;
565 }
566
567 /// Bind our elements to this array
568 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
569 {
570         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
571         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
572         // not just initially.
573         //setNumInitial(arr->getNumInitial());
574         return setLocationManager(arr->getLocMgr()->getGroupID());
575 }
576 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
577 {
578         arrayListeners.push_back(listener);
579         return *this;
580 }
581
582 void CkArrayOptions::pup(PUP::er &p) {
583         p|numInitial;
584         p|map;
585         p|locMgr;
586         p|arrayListeners;
587         p|reductionClient;
588         p|anytimeMigration;
589         p|disableNotifyChildInRed;
590         p|staticInsertion;
591         p|broadcastViaScheduler;
592 }
593
594 CkArrayListener::CkArrayListener(int nInts_) 
595   :nInts(nInts_) 
596 {
597   dataOffset=-1;
598 }
599 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
600   nInts=-1; dataOffset=-1;
601 }
602 void CkArrayListener::pup(PUP::er &p) {
603   p|nInts;
604   p|dataOffset;
605 }
606
607 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
608 {
609   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
610   dataOffset=dataOffset_;
611 }
612
613 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
614                                           const CkArrayOptions &opts_)
615 {
616   CkArrayOptions opts(opts_);
617   CkGroupID locMgr = opts.getLocationManager();
618   if (locMgr.isZero())
619   { //Create a new location manager
620 #if !CMK_LBDB_ON
621     CkGroupID _lbdb;
622     CkGroupID _metalb;
623 #endif
624     CkEntryOptions  e_opts;
625     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
626     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,_metalb,opts.getNumInitial(),&e_opts);
627     opts.setLocationManager(locMgr);
628   }
629   //Create the array manager
630   m->array_ep()=ctor;
631   CkMarshalledMessage marsh(m);
632   CkEntryOptions  e_opts;
633   e_opts.setGroupDepID(locMgr);       // group creation dependence
634 #if !GROUP_LEVEL_REDUCTION
635   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
636   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
637   nodereductionProxy.setAttachedGroup(ag);
638 #else
639   CkNodeGroupID dummyid;
640   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
641 #endif
642   return (CkArrayID)ag;
643 }
644
645 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
646 {
647   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
648 }
649
650 extern IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID);
651
652 struct CkInsertIdxMsg {
653   char core[CmiReservedHeaderSize];
654   CkArrayIndex idx;
655   CkArrayMessage *m;
656   int ctor;
657   int onPe;
658   CkArrayID _aid;
659 };
660
661 static int ckinsertIdxHdl;
662
663 void ckinsertIdxFunc(void *m)
664 {
665   CkInsertIdxMsg *msg = (CkInsertIdxMsg *)m;
666   CProxy_ArrayBase   ca(msg->_aid);
667   ca.ckInsertIdx(msg->m, msg->ctor, msg->onPe, msg->idx);
668   CmiFree(msg);
669 }
670
671 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
672         const CkArrayIndex &idx)
673 {
674   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
675   m->array_ep()=ctor;
676   CkArray *ca = ckLocalBranch();
677   if (ca == NULL) {
678       CkInsertIdxMsg *msg = (CkInsertIdxMsg *)CmiAlloc(sizeof(CkInsertIdxMsg));
679       msg->idx = idx;
680       msg->m = m;
681       msg->ctor = ctor;
682       msg->onPe = onPe;
683       msg->_aid = _aid;
684       CmiSetHandler(msg, ckinsertIdxHdl);
685       ca = (CkArray *)lookupGroupAndBufferIfNotThere(CkpvAccess(_coreState), (envelope*)msg,_aid);
686       if (ca == NULL) return;
687   }
688   ca->prepareCtorMsg(m,onPe,idx);
689   if (ckIsDelegated()) {
690         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
691         return;
692   }
693   
694   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
695   CkArrayManagerInsert(onPe,m,_aid);
696 }
697
698 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
699 {
700   ckInsertIdx(m,ctorIndex,onPe,_idx);
701 }
702
703 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
704 {
705   return ckLocalBranch()->lookup(_idx);
706 }
707
708 //pack-unpack method for CProxy_ArrayBase
709 void CProxy_ArrayBase::pup(PUP::er &p)
710 {
711   CProxy::pup(p);
712   _aid.pup(p);
713 }
714 void CProxyElement_ArrayBase::pup(PUP::er &p)
715 {
716   CProxy_ArrayBase::pup(p);
717   p|_idx.nInts;
718   p|_idx.dimension;
719   p(_idx.data(),_idx.nInts);
720 }
721
722 void CProxySection_ArrayBase::pup(PUP::er &p)
723 {
724   CProxy_ArrayBase::pup(p);
725   p | _nsid;
726   if (p.isUnpacking()) {
727     if (_nsid == 1) _sid = new CkSectionID;
728     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
729     else _sid = NULL;
730   }
731   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
732 }
733
734 /*********************** CkArray Creation *************************/
735 void _ckArrayInit(void)
736 {
737   CkpvInitialize(ArrayElement_initInfo,initInfo);
738   CkDisableTracing(CkIndex_CkArray::insertElement(0));
739   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
740     // disable because broadcast listener may deliver broadcast message
741   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
742   // by default anytime migration is allowed
743   ckinsertIdxHdl = CkRegisterHandler(ckinsertIdxFunc);
744 }
745
746 CkArray::CkArray(CkArrayOptions &opts,
747                  CkMarshalledMessage &initMsg,
748                  CkNodeGroupID nodereductionID)
749   : CkReductionMgr(),
750     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
751     locMgrID(opts.getLocationManager()),
752     thisProxy(thisgroup),
753     // Register with our location manager
754     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
755     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
756     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
757 {
758   if (!stableLocations)
759       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
760                              staticSpringCleaning, (void *)this);
761   
762   //set the field in one my parent class (CkReductionMgr)
763   if(opts.disableNotifyChildInRed)
764           disableNotifyChildrenStart = CmiTrue; 
765   
766   //Find, register, and initialize the arrayListeners
767   listenerDataOffset=0;
768   broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
769   addListener(broadcaster);
770   reducer=new CkArrayReducer(thisgroup);
771   addListener(reducer);
772
773   // COMLIB HACK
774   //calistener = new ComlibArrayListener();
775   //addListener(calistener,dataOffset);
776
777   int lNo,nL=opts.getListeners(); //User-added listeners
778   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
779
780   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
781
782   ///Set up initial elements (if any)
783   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
784
785   ///adding code for Reduction using nodegroups
786
787 #if !GROUP_LEVEL_REDUCTION
788   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
789   nodeProxy = nodetemp;
790   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
791 #endif
792
793 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
794         // creating the spanning tree to be used for broadcast
795         children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
796         numChildren = 0;
797         
798         // computing the level of the tree this pe is in
799         // we should use the geometric series formula, but now a quick and dirty code should suffice
800         // PE 0 is at level 0, PEs 1.._MLOG_BCAST_BFACTOR_ are at level 1 and so on
801         int level = 0;
802         int aux = CmiMyPe();
803         int max = CmiNumPes();
804         int factor = _MLOG_BCAST_BFACTOR_;
805         int startLevel = 0;
806         int startNextLevel = 1;
807         while(aux >= 0){
808                 level++;
809                 startLevel = startNextLevel;
810                 startNextLevel += factor;
811                 aux -= factor;
812                 factor *= _MLOG_BCAST_BFACTOR_;
813         }
814
815         // adding children to the tree
816         int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
817         for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
818                 if(first + i >= CmiNumPes())
819                         break;
820                 children[i] = first + i;
821                 numChildren++;
822         }
823  
824 #endif
825
826
827   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
828       ckSetReductionClient(&opts.reductionClient);
829 }
830
831 CkArray::CkArray(CkMigrateMessage *m)
832         :CkReductionMgr(m), thisProxy(thisgroup)
833 {
834   locMgr=NULL;
835   isInserting=CmiTrue;
836 }
837
838 #if CMK_ERROR_CHECKING
839 inline void testPup(PUP::er &p,int shouldBe) {
840   int a=shouldBe;
841   p|a;
842   if (a!=shouldBe)
843     CkAbort("PUP direction mismatch!");
844 }
845 #else
846 inline void testPup(PUP::er &p,int shouldBe) {}
847 #endif
848
849 void CkArray::pup(PUP::er &p){
850         CkReductionMgr::pup(p);
851         p|numInitial;
852         p|locMgrID;
853         p|listeners;
854         p|listenerDataOffset;
855         p|stableLocations;
856         testPup(p,1234);
857         if(p.isUnpacking()){
858                 thisProxy=thisgroup;
859                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
860                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
861                 /// Restore our default listeners:
862                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
863                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
864                 /// set up broadcast cleaner
865                 if (!stableLocations)
866                     CcdCallOnConditionKeep(CcdPERIODIC_1minute,
867                                            staticSpringCleaning, (void *)this);
868         }
869 }
870
871 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
872   int dataOffset=0; \
873   for (int lNo=0;lNo<listeners.size();lNo++) { \
874     CkArrayListener *l=listeners[lNo]; \
875     l->ckElementStamp(&listenerData[dataOffset]); \
876     dataOffset+=l->ckGetLen(); \
877   } \
878 } while (0)
879
880 //Called on send side to prepare array constructor message
881 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
882 {
883   envelope *env=UsrToEnv((void *)m);
884   env->getsetArrayIndex()=idx;
885   int *listenerData=env->getsetArrayListenerData();
886   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
887   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
888   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
889         getLocMgr()->inform(idx,onPe);
890 }
891
892 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
893                         CkElementCreation_t type)
894 {
895         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
896         if (type==CkElementCreation_resume) 
897         { // HACK: Re-stamp elements on checkpoint resume--
898           //  this restores, e.g., reduction manager's gcount
899                 int *listenerData=ret->listenerData;
900                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
901         }
902         return ret;
903 }
904
905 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
906                      CkMessage *msg,CmiBool fromMigration) 
907 {
908         //Stash the element's initialization information in the global "initInfo"
909         ArrayElement_initInfo &init=CkpvAccess(initInfo);
910         init.numInitial=numInitial;
911         init.thisArray=this;
912         init.thisArrayID=thisgroup;
913         if (msg) /*Have to *copy* data because msg will be deleted*/
914           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
915                  sizeof(init.listenerData));
916         init.fromMigration=fromMigration;
917         
918         //Build the element
919         int elSize=_chareTable[elChareType]->size;
920         ArrayElement *elem = (ArrayElement *)malloc(elSize);
921 #ifndef CMK_OPTIMIZE
922         if (elem!=NULL) setMemoryTypeChare(elem);
923 #endif
924         return elem;
925 }
926
927 /// This method is called by ck.C or the user to add an element.
928 CmiBool CkArray::insertElement(CkMessage *me)
929 {
930   CK_MAGICNUMBER_CHECK
931   CkArrayMessage *m=(CkArrayMessage *)me;
932   const CkArrayIndex &idx=m->array_index();
933   int onPe;
934   if (locMgr->isRemote(idx,&onPe)) 
935   { /* element's sibling lives somewhere else, so insert there */
936         CkArrayManagerInsert(onPe,me,thisgroup);
937         return CmiFalse;
938   }
939   int ctorIdx=m->array_ep();
940   int chareType=_entryTable[ctorIdx]->chareIdx;
941   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
942 #ifndef CMK_CHARE_USE_PTR
943   ((Chare *)elt)->chareIdx = -1;
944 #endif
945   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
946   CK_ARRAYLISTENER_LOOP(listeners,
947       if (!l->ckElementCreated(elt)) return CmiFalse;);
948   return CmiTrue;
949 }
950
951 void CProxy_ArrayBase::doneInserting(void)
952 {
953   DEBC((AA"Broadcasting a doneInserting request\n"AB));
954   //Broadcast a DoneInserting
955   CProxy_CkArray(_aid).remoteDoneInserting();
956 }
957
958 void CProxy_ArrayBase::beginInserting(void)
959 {
960   DEBC((AA"Broadcasting a beginInserting request\n"AB));
961   CProxy_CkArray(_aid).remoteBeginInserting();
962 }
963
964 void CkArray::doneInserting(void)
965 {
966   thisProxy[CkMyPe()].remoteDoneInserting();
967 }
968
969 void CkArray::beginInserting(void)
970 {
971   thisProxy[CkMyPe()].remoteBeginInserting();
972 }
973
974 /// This is called on every processor after the last array insertion.
975 void CkArray::remoteDoneInserting(void)
976 {
977   CK_MAGICNUMBER_CHECK
978   if (isInserting) {
979     isInserting=CmiFalse;
980     DEBC((AA"Done inserting objects\n"AB));
981     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
982     locMgr->doneInserting();
983   }
984 }
985
986 void CkArray::remoteBeginInserting(void)
987 {
988   CK_MAGICNUMBER_CHECK;
989
990   if (!isInserting) {
991     isInserting = CmiTrue;
992     DEBC((AA"Begin inserting objects\n"AB));
993     for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
994     locMgr->startInserting();
995   }
996 }
997
998 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
999         int onPe,int ctor,CkDeliver_t type)
1000 {
1001         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
1002         prepareCtorMsg(m,onPe,idx);
1003         m->array_ep()=ctor;
1004         
1005         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
1006                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
1007                 CkArrayManagerInsert(onPe,m,thisgroup);
1008         } else /* local message, non-queued */ {
1009                 //Call local constructor directly
1010                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
1011                 return insertElement(m);
1012         }
1013         return CmiTrue;
1014 }
1015
1016 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
1017 {
1018         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
1019         if (local) {
1020           int onPe=CkMyPe();
1021           prepareCtorMsg(m,onPe,idx);
1022 #if CMK_BIGSIM_CHARM
1023           BgEntrySplit("split-array-new");
1024 #endif
1025           insertElement(m);
1026         }
1027         else {
1028           int onPe=-1;
1029           prepareCtorMsg(m,onPe,idx);
1030           CkArrayManagerInsert(onPe,m,getGroupID());
1031         }
1032 }
1033
1034 /********************* CkArray Messaging ******************/
1035 /// Fill out a message's array fields before sending it
1036 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
1037 {
1038         envelope *env=UsrToEnv((void *)msg);
1039         env->getsetArrayMgr()=aid;
1040         env->getsetArraySrcPe()=CkMyPe();
1041         env->setEpIdx(ep);
1042         env->getsetArrayHops()=0;
1043 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1044         criticalPath_send(env);
1045         automaticallySetMessagePriority(env);
1046 #endif
1047 }
1048
1049
1050 /// Just a non-inlined version of msg_prepareSend()
1051 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
1052 {
1053         envelope *env=UsrToEnv((void *)msg);
1054         env->getsetArrayMgr()=aid;
1055         env->getsetArraySrcPe()=CkMyPe();
1056         env->setEpIdx(ep);
1057         env->getsetArrayHops()=0;
1058 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1059         criticalPath_send(env);
1060         automaticallySetMessagePriority(env);
1061 #endif
1062 }
1063
1064 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
1065 {
1066 #if CMK_ERROR_CHECKING
1067         //Check our array index for validity
1068         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
1069         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
1070                 CkAbort("Array index length (nInts) is too long-- did you "
1071                         "use bytes instead of integers?\n");
1072 #endif
1073         msg_prepareSend(msg,ep,ckGetArrayID());
1074         msg->array_index()=_idx;//Insert array index
1075         if (ckIsDelegated()) //Just call our delegateMgr
1076           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
1077         else 
1078         { //Usual case: a direct send
1079           CkArray *localbranch = ckLocalBranch();
1080           if (localbranch == NULL) {             // array not created yet
1081             CkArrayManagerDeliver(CkMyPe(), msg, 0);
1082           }
1083           else {
1084             if (opts & CK_MSG_INLINE)
1085               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
1086             else
1087               localbranch->deliver(msg, CkDeliver_queue, opts);
1088           }
1089         }
1090 }
1091
1092 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
1093 {
1094         CkFutureID f=CkCreateAttachedFuture(msg);
1095         ckSend(msg,ep);
1096         return CkWaitReleaseFuture(f);
1097 }
1098
1099 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
1100 {
1101         CProxySection_ArrayBase sp(sID);
1102         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1103 }
1104
1105 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1106 {
1107         if (ckIsDelegated()) //Just call our delegateMgr
1108           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1109         else {
1110           // send through all
1111           for (int k=0; k<_nsid; ++k) {
1112             for (int i=0; i< _sid[k]._nElems-1; i++) {
1113               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
1114               void *newMsg=CkCopyMsg((void **)&msg);
1115               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1116             }
1117             if (_sid[k]._nElems > 0) {
1118               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1119               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
1120               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1121             }
1122           }
1123         }
1124 }
1125
1126 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1127 {
1128   CkArrayMessage *m=(CkArrayMessage *)msg;
1129   m->array_index()=idx;
1130   msg_prepareSend(m,entryIndex,aID);
1131   CkArray *a=(CkArray *)_localBranch(aID);
1132   if (a == NULL)
1133     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1134   else
1135     a->deliver(m,CkDeliver_queue,opts);
1136 }
1137
1138 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1139 {
1140   CkArrayMessage *m=(CkArrayMessage *)msg;
1141   m->array_index()=idx;
1142   msg_prepareSend(m,entryIndex,aID);
1143   CkArray *a=(CkArray *)_localBranch(aID);
1144   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1145   a->deliver(m,CkDeliver_inline,opts);
1146   if (oldStatus) CkEnableTracing(entryIndex);
1147 }
1148
1149
1150 /*********************** CkArray Reduction *******************/
1151 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1152   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1153    mgrID(mgrID_)
1154 {
1155   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1156 }
1157 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1158   :CkArrayListener(m)
1159 {
1160   mgr=NULL;
1161 }
1162 void CkArrayReducer::pup(PUP::er &p) {
1163   CkArrayListener::pup(p);
1164   p|mgrID;
1165   if (p.isUnpacking())
1166     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1167 }
1168 CkArrayReducer::~CkArrayReducer() {}
1169
1170 /*********************** CkArray Broadcast ******************/
1171
1172 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
1173     :CkArrayListener(1), //Each array element carries a broadcast number
1174      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
1175 { }
1176
1177 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1178     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
1179 { }
1180
1181 void CkArrayBroadcaster::pup(PUP::er &p) {
1182   CkArrayListener::pup(p);
1183   /* Assumption: no migrants during checkpoint, so no need to
1184      save old broadcasts. */
1185   p|bcastNo;
1186   p|stableLocations;
1187   p|broadcastViaScheduler;
1188   if (p.isUnpacking()) {
1189     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1190   }
1191 }
1192
1193 CkArrayBroadcaster::~CkArrayBroadcaster()
1194 {
1195   CkArrayMessage *msg;
1196   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1197 }
1198
1199 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1200 {
1201   bcastNo++;
1202   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1203
1204   if (stableLocations)
1205     return;
1206
1207   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1208   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1209 }
1210
1211 /// Deliver a copy of the given broadcast to the given local element
1212 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1213                                     CmiBool doFree)
1214 {
1215   int &elBcastNo=getData(el);
1216   // if this array element already received this message, skip it
1217   if (elBcastNo >= bcastNo) return CmiFalse;
1218   elBcastNo++;
1219   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1220   int epIdx=bcast->array_ep_bcast();
1221
1222 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1223   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1224   return CmiTrue;
1225 #else
1226   if (!broadcastViaScheduler)
1227     return el->ckInvokeEntry(epIdx, bcast, doFree);
1228   else {
1229     if (!doFree) {
1230       CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
1231       bcast = newMsg;
1232     }
1233     envelope *env = UsrToEnv(bcast);
1234     env->getsetArrayEp() = epIdx;
1235     env->getsetArrayMgr() = el->ckGetArrayID();
1236     env->getsetArrayIndex() = el->ckGetArrayIndex();
1237     CkArrayManagerDeliver(CkMyPe(), bcast, 0);
1238     return true;
1239   }
1240 #endif
1241 }
1242
1243 /// Deliver all needed broadcasts to the given local element
1244 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1245 {
1246   if (stableLocations) return CmiTrue;
1247   int &elBcastNo=getData(el);
1248   if (elBcastNo<bcastNo)
1249   {//This element needs some broadcasts-- it must have
1250    //been migrating during the broadcast.
1251     int i,nDeliver=bcastNo-elBcastNo;
1252     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1253
1254     //Skip the old junk at the front of the bcast queue
1255     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1256       oldBcasts.enq(oldBcasts.deq());
1257
1258     //Deliver the newest messages, in old-to-new order
1259     for (i=nDeliver-1;i>=0;i--)
1260     {
1261       CkArrayMessage *msg=oldBcasts.deq();
1262                 if(msg == NULL)
1263                 continue;
1264       oldBcasts.enq(msg);
1265       if (!deliver(msg, el, CmiFalse))
1266         return CmiFalse; //Element migrated away
1267     }
1268   }
1269   //Otherwise, the element survived
1270   return CmiTrue;
1271 }
1272
1273
1274 void CkArrayBroadcaster::springCleaning(void)
1275 {
1276   //Remove old broadcast messages
1277   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1278   if (nDelete>0) {
1279     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1280     for (int i=0;i<nDelete;i++)
1281       delete oldBcasts.deq();
1282   }
1283   oldBcastNo=bcastNo;
1284 }
1285
1286 void CkArrayBroadcaster::flushState() 
1287
1288   bcastNo = oldBcastNo = 0; 
1289   CkArrayMessage *msg;
1290   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1291 }
1292
1293 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1294 {
1295         CProxy_ArrayBase ap(aID);
1296         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1297 }
1298
1299 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1300 {
1301         msg->array_ep_bcast()=ep;
1302         if (ckIsDelegated()) //Just call our delegateMgr
1303           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1304         else 
1305         { //Broadcast message via serializer node
1306           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1307           int skipsched = opts & CK_MSG_EXPEDITED; 
1308           //int serializer=0;//1623802937%CkNumPes();
1309 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1310                 CProxy_CkArray ap(_aid);
1311                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1312                 CkGroupID _id = _aid;
1313                        printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1314 #else
1315           if (CkMyPe()==CpvAccess(serializer))
1316           {
1317                 DEBB((AA"Sending array broadcast\n"AB));
1318                 printf("Sending array broadcast\n");
1319                 if (skipsched)
1320                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1321                 else
1322                         CProxy_CkArray(_aid).recvBroadcast(msg);
1323           } else {
1324                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1325                 CProxy_CkArray ap(_aid);
1326                 if (skipsched)
1327                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1328                 else
1329                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1330           }
1331 #endif
1332         }
1333 }
1334
1335 /// Reflect a broadcast off this Pe:
1336 void CkArray::sendBroadcast(CkMessage *msg)
1337 {
1338         CK_MAGICNUMBER_CHECK
1339         if(CkMyPe() == CpvAccess(serializer)){
1340 #if _MLOG_BCAST_TREE_
1341                 // Using the spanning tree to broadcast the message
1342                 for(int i=0; i<numChildren; i++){
1343                         CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1344                         thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1345                 }
1346         
1347                 // delivering message locally
1348                 recvBroadcast(msg);     
1349 #else
1350                 //Broadcast the message to all processors
1351                 thisProxy.recvBroadcast(msg);
1352 #endif
1353         }else{
1354                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1355         }
1356 }
1357 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1358 {
1359         CK_MAGICNUMBER_CHECK
1360         //Broadcast the message to all processors
1361         thisProxy.recvExpeditedBroadcast(msg);
1362 }
1363
1364 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1365 int _tempBroadcastCount=0;
1366
1367 // Delivers a message using the spanning tree
1368 void CkArray::recvBroadcastViaTree(CkMessage *msg)
1369 {
1370         CK_MAGICNUMBER_CHECK
1371
1372         // Using the spanning tree to broadcast the message
1373         for(int i=0; i<numChildren; i++){
1374                 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1375                 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1376         }
1377
1378         // delivering message locally
1379         recvBroadcast(msg);     
1380 }
1381
1382 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1383     if(homePe(*index)==CmiMyPe()){
1384         CkArrayMessage *bcast = (CkArrayMessage *)data;
1385     int epIdx=bcast->array_ep_bcast();
1386         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1387         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1388         envelope *env = UsrToEnv(copy);
1389         env->sender.data.group.onPE = CkMyPe();
1390 #if defined(_FAULT_CAUSAL_)
1391         env->TN = 0;
1392 #endif
1393                 env->SN = 0;
1394         env->piggyBcastIdx = epIdx;
1395         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1396         env->getsetArrayMgr() = thisgroup;
1397         env->getsetArrayIndex() = *index;
1398     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1399         env->setSrcPe(CkMyPe());
1400         rec->deliver(copy,CkDeliver_queue);
1401         _tempBroadcastCount++;
1402     }else{
1403         if(locMgr->homeElementCount != -1){
1404             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1405         }
1406     }
1407 }
1408
1409 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1410     arr->broadcastHomeElements(data,rec,index);
1411 }
1412 #else
1413 void CkArray::recvBroadcastViaTree(CkMessage *msg){
1414 }
1415 #endif
1416
1417
1418 /// Increment broadcast count; deliver to all local elements
1419 void CkArray::recvBroadcast(CkMessage *m)
1420 {
1421         CK_MAGICNUMBER_CHECK
1422         CkArrayMessage *msg=(CkArrayMessage *)m;
1423         broadcaster->incoming(msg);
1424
1425 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1426         _tempBroadcastCount=0;
1427         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1428 #else
1429         //Run through the list of local elements
1430         int idx=0, len=0, count=0;
1431         if (stableLocations) {            /* remove all NULLs in the array */
1432           len = 0;
1433           while (elements->next(idx)!=NULL) len++;
1434           idx = 0;
1435         }
1436         ArrayElement *el;
1437 #if CMK_BIGSIM_CHARM
1438         void *root;
1439         _TRACE_BG_TLINE_END(&root);
1440         BgSetEntryName("start-broadcast", &root);
1441         CkVec<void *> logs;    // store all logs for each delivery
1442         extern void stopVTimer();
1443         extern void startVTimer();
1444 #endif
1445         while (NULL!=(el=elements->next(idx))) {
1446 #if CMK_BIGSIM_CHARM
1447                 //BgEntrySplit("split-broadcast");
1448                 stopVTimer();
1449                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1450                 logs.push_back(curlog);
1451                 startVTimer();
1452 #endif
1453                 CmiBool doFree = CmiFalse;
1454                 if (stableLocations && ++count == len) doFree = CmiTrue;
1455                 broadcaster->deliver(msg, el, doFree);
1456         }
1457 #endif
1458
1459 #if CMK_BIGSIM_CHARM
1460         //BgEntrySplit("end-broadcast");
1461         stopVTimer();
1462         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1463         startVTimer();
1464 #endif
1465
1466         // CkArrayBroadcaster doesn't have msg buffered, and there was
1467         // no last delivery to transfer ownership
1468 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1469         if (stableLocations)
1470           delete msg;
1471 #else
1472         if (stableLocations && len == 0)
1473           delete msg;
1474 #endif
1475 }
1476
1477 #include "CkArray.def.h"
1478
1479