Chare Arrays: Add interface to signal online dynamic element insertion
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isStaticInsertion;
65 bool _isNotifyChildInRed;
66
67 #define ARRAY_DEBUG_OUTPUT 0
68
69 #if ARRAY_DEBUG_OUTPUT 
70 #   define DEB(x) CkPrintf x  //General debug messages
71 #   define DEBI(x) CkPrintf x  //Index debug messages
72 #   define DEBC(x) CkPrintf x  //Construction debug messages
73 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
74 #   define DEBM(x) CkPrintf x  //Migration debug messages
75 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
76 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
77 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
78 #   define AA "ArrayBOC on %d: "
79 #   define AB ,CkMyPe()
80 #   define DEBUG(x) x
81 #else
82 #   define DEB(X) /*CkPrintf x*/
83 #   define DEBI(X) /*CkPrintf x*/
84 #   define DEBC(X) /*CkPrintf x*/
85 #   define DEBS(x) /*CkPrintf x*/
86 #   define DEBM(X) /*CkPrintf x*/
87 #   define DEBL(X) /*CkPrintf x*/
88 #   define DEBK(x) /*CkPrintf x*/
89 #   define DEBB(x) /*CkPrintf x*/
90 #   define str(x) /**/
91 #   define DEBUG(x)
92 #endif
93
94 ///This arrayListener is in charge of delivering broadcasts to the array.
95 class CkArrayBroadcaster : public CkArrayListener {
96   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
97 public:
98   CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
99   CkArrayBroadcaster(CkMigrateMessage *m);
100   virtual void pup(PUP::er &p);
101   virtual ~CkArrayBroadcaster();
102   PUPable_decl(CkArrayBroadcaster);
103
104   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
105
106   ///Element was just created on this processor
107   /// Return false if the element migrated away or deleted itself.
108   virtual CmiBool ckElementCreated(ArrayElement *elt)
109     { return bringUpToDate(elt); }
110
111   ///Element just arrived on this processor (so just called pup)
112   /// Return false if the element migrated away or deleted itself.
113   virtual CmiBool ckElementArriving(ArrayElement *elt)
114     { return bringUpToDate(elt); }
115
116   void incoming(CkArrayMessage *msg);
117
118   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
119
120   void springCleaning(void);
121
122   void flushState();
123 private:
124   int bcastNo;//Number of broadcasts received (also serial number)
125   int oldBcastNo;//Above value last spring cleaning
126   //This queue stores old broadcasts (in case a migrant arrives
127   // and needs to be brought up to date)
128   CkQ<CkArrayMessage *> oldBcasts;
129   bool stableLocations;
130   bool broadcastViaScheduler;
131
132   CmiBool bringUpToDate(ArrayElement *el);
133 };
134
135 ///This arrayListener is in charge of performing reductions on the array.
136 class CkArrayReducer : public CkArrayListener {
137   CkGroupID mgrID;
138   CkReductionMgr *mgr;
139   typedef  contributorInfo *I;
140   inline contributorInfo *getData(ArrayElement *el)
141     {return (I)ckGetData(el);}
142 public:
143   /// Attach this array to this CkReductionMgr
144   CkArrayReducer(CkGroupID mgrID_);
145   CkArrayReducer(CkMigrateMessage *m);
146   virtual void pup(PUP::er &p);
147   virtual ~CkArrayReducer();
148   PUPable_decl(CkArrayReducer);
149
150   void ckBeginInserting(void) {mgr->creatingContributors();}
151   void ckEndInserting(void) {mgr->doneCreatingContributors();}
152
153   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
154
155   void ckElementCreating(ArrayElement *elt)
156     {mgr->contributorCreated(getData(elt));}
157   void ckElementDied(ArrayElement *elt)
158     {mgr->contributorDied(getData(elt));}
159
160   void ckElementLeaving(ArrayElement *elt)
161     {mgr->contributorLeaving(getData(elt));}
162   CmiBool ckElementArriving(ArrayElement *elt)
163     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
164 };
165
166 /*
167 void 
168 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
169        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
170 }
171 */
172 void
173 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
174         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
175         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
176 }
177
178 /*********************** CkVerboseListener ******************/
179 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
180
181 CkVerboseListener::CkVerboseListener(void)
182   :CkArrayListener(0)
183 {
184   VL_PRINT<<"INIT  Creating listener"<<endl;
185 }
186
187 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
188 {
189   CkArrayListener::ckRegister(arrMgr,dataOffset_);
190   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
191 }
192 void CkVerboseListener::ckBeginInserting(void)
193 {
194   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
195 }
196 void CkVerboseListener::ckEndInserting(void)
197 {
198   VL_PRINT<<"INIT  Done inserting elements"<<endl;
199 }
200
201 void CkVerboseListener::ckElementStamp(int *eltInfo)
202 {
203   VL_PRINT<<"LIFE  Stamping element"<<endl;
204 }
205 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
206 {
207   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
208 }
209 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
210 {
211   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
212   return CmiTrue;
213 }
214 void CkVerboseListener::ckElementDied(ArrayElement *elt)
215 {
216   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
217 }
218
219 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
220 {
221   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
222 }
223 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
224 {
225   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
226   return CmiTrue;
227 }
228
229
230 /************************* ArrayElement *******************/
231 class ArrayElement_initInfo {
232 public:
233   CkArray *thisArray;
234   CkArrayID thisArrayID;
235   CkArrayIndex numInitial;
236   int listenerData[CK_ARRAYLISTENER_MAXLEN];
237   CmiBool fromMigration;
238 };
239
240 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
241
242 void ArrayElement::initBasics(void)
243 {
244 #if CMK_OUT_OF_CORE
245   if (CkpvAccess(CkSaveRestorePrefetch)) 
246     return; /* Just restoring from disk--don't try to set up anything. */
247 #endif
248 #if CMK_GRID_QUEUE_AVAILABLE
249         grid_queue_interval = 0;
250         grid_queue_threshold = 0;
251         msg_count = 0;
252         msg_count_grid = 0;
253         border_flag = 0;
254
255         grid_queue_interval = CmiGridQueueGetInterval ();
256         grid_queue_threshold = CmiGridQueueGetThreshold ();
257 #endif
258   ArrayElement_initInfo &info=CkpvAccess(initInfo);
259   thisArray=info.thisArray;
260   thisArrayID=info.thisArrayID;
261   numInitialElements=info.numInitial.getCombinedCount();
262   if (info.listenerData) {
263     memcpy(listenerData,info.listenerData,sizeof(listenerData));
264   }
265   if (!info.fromMigration) {
266     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
267                           l->ckElementCreating(this));
268   }
269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
270         if(mlogData == NULL)
271                 mlogData = new ChareMlogData();
272         mlogData->objID.type = TypeArray;
273         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
274 #endif
275 #ifdef _PIPELINED_ALLREDUCE_
276         allredMgr = NULL;
277 #endif
278 }
279
280 ArrayElement::ArrayElement(void) 
281 {
282         initBasics();
283 #if CMK_MEM_CHECKPOINT
284         init_checkpt();
285 #endif
286 }
287
288 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
289 {
290         initBasics();
291 }
292
293 //Called by the system just before and after migration to another processor:  
294 void ArrayElement::ckAboutToMigrate(void) {
295         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
296                                 l->ckElementLeaving(this));
297         CkMigratable::ckAboutToMigrate();
298 }
299 void ArrayElement::ckJustMigrated(void) {
300         CkMigratable::ckJustMigrated();
301         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
302               if (!l->ckElementArriving(this)) return;);
303 }
304
305 void ArrayElement::ckJustRestored(void) {
306     CkMigratable::ckJustRestored();
307     //empty for out-of-core emulation
308 }
309
310 #ifdef _PIPELINED_ALLREDUCE_
311 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
312                                         CMK_REFNUM_TYPE userFlag)
313 {
314         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
315         msg->setUserFlag(userFlag);
316         msg->setMigratableContributor(true);
317         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
318 }
319 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
320                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
321 {
322         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
323         msg->setUserFlag(userFlag);
324         msg->setCallback(cb);
325         msg->setMigratableContributor(true);
326         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
327 }
328 void ArrayElement::contribute2(CkReductionMsg *msg) 
329 {
330         msg->setMigratableContributor(true);
331         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
332 }
333 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
334 {
335         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
336     msg->setUserFlag(userFlag);
337     msg->setCallback(cb);
338     msg->setMigratableContributor(true);
339     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
340 }
341 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
342 {
343     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
344     msg->setUserFlag(userFlag);
345     msg->setMigratableContributor(true);
346     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
347 }
348
349 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
350                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
351 {
352         // if it is a broadcast to myself and size is large
353         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
354         {
355                 if (!allredMgr) {
356                         allredMgr = new AllreduceMgr();
357                 }
358                 // number of fragments
359                 int fragNo = dataSize/FRAG_SIZE;
360                 int size = FRAG_SIZE;
361                 // for each fragment
362                 for (int i=0; i<fragNo; i++) {
363                         // callback to defragmentor
364                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
365                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
366                                 size = dataSize%FRAG_SIZE;
367                         }
368                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
369                         // initialize the new msg
370                         msg->reducer            = type;
371                         msg->nFrags             = fragNo;
372                         msg->fragNo             = i;
373                         msg->callback           = defrag_cb;
374                         msg->userFlag           = userFlag;
375                         allredMgr->cb           = cb;
376                         allredMgr->cb.type      = CkCallback::sendArray;
377                         allredMgr->cb.d.array.idx = myIndex;
378                         contribute2(msg);
379                 }
380                 return;
381         }
382         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
383         msg->setUserFlag(userFlag);
384         msg->setCallback(cb);
385         msg->setMigratableContributor(true);
386         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
387 }
388
389
390 #else
391 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
392    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
393 #endif
394 // _PIPELINED_ALLREDUCE_
395 void ArrayElement::defrag(CkReductionMsg *msg)
396 {
397 //      CkPrintf("in defrag\n");
398 #ifdef _PIPELINED_ALLREDUCE_
399         allredMgr->allreduce_recieve(msg);
400 #endif
401 }
402
403 /// Remote method: calls destructor
404 void ArrayElement::ckDestroy(void)
405 {
406         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
407             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
408                            l->ckElementDied(this));
409         }
410         CkMigratable::ckDestroy();
411 }
412
413 //Destructor (virtual)
414 ArrayElement::~ArrayElement()
415 {
416 #if CMK_OUT_OF_CORE
417   if (CkpvAccess(CkSaveRestorePrefetch)) 
418     return; /* Just saving to disk--don't trash anything. */
419 #endif
420   //To detect use-after-delete: 
421   thisArray=(CkArray *)0xDEADa7a1;
422 }
423
424 void ArrayElement::pup(PUP::er &p)
425 {
426   DEBM((AA"  ArrayElement::pup()\n"AB));
427   CkMigratable::pup(p);
428   thisArrayID.pup(p);
429   if (p.isUnpacking())
430         thisArray=thisArrayID.ckLocalBranch();
431   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
432 #if CMK_MEM_CHECKPOINT
433   p(budPEs, 2);
434 #endif
435   p.syncComment(PUP::sync_last_system,"ArrayElement");
436 #if CMK_GRID_QUEUE_AVAILABLE
437   p|grid_queue_interval;
438   p|grid_queue_threshold;
439   p|msg_count;
440   p|msg_count_grid;
441   p|border_flag;
442   if (p.isUnpacking ()) {
443     msg_count = 0;
444     msg_count_grid = 0;
445     border_flag = 0;
446   }
447 #endif
448 }
449
450 char *ArrayElement::ckDebugChareName(void) {
451         char buf[200];
452         const char *className=_chareTable[ckGetChareType()]->name;
453         const int *d=thisIndexMax.data();
454         const short int *s=(const short int*)d;
455         switch (thisIndexMax.dimension) {
456         case 0: sprintf(buf,"%s",className); break;
457         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
458         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
459         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
460     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
461     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
462     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
463         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
464         };
465         return strdup(buf);
466 }
467
468 int ArrayElement::ckDebugChareID(char *str, int limit) {
469   if (limit<21) return -1;
470   str[0] = 2;
471   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
472   *((CkArrayIndex*)&str[5]) = thisIndexMax;
473   return 21;
474 }
475
476 /// A more verbose form of abort
477 void ArrayElement::CkAbort(const char *str) const
478 {
479         CkError("[%d] Array element at index %s aborting:\n",
480                 CkMyPe(), idx2str(thisIndexMax));
481         CkMigratable::CkAbort(str);
482 }
483
484 void ArrayElement::recvBroadcast(CkMessage *m){
485 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
486         CkArrayMessage *bcast = (CkArrayMessage *)m;
487     envelope *env = UsrToEnv(m);
488         int epIdx= env->piggyBcastIdx;
489     ckInvokeEntry(epIdx,bcast,CmiTrue);
490 #endif
491 }
492
493 /*********************** Spring Cleaning *****************
494 Periodically (every minute or so) remove expired broadcasts
495 from the queue.
496
497 This does not get called for arrays with stable locations (all
498 insertions done at creation, migration only at discrete points).
499 */
500
501 inline void CkArray::springCleaning(void)
502 {
503   DEBK((AA"Starting spring cleaning\n"AB));
504   broadcaster->springCleaning();
505 }
506
507 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
508         ((CkArray *)forArray)->springCleaning();
509 }
510
511 /********************* Little CkArray Utilities ******************/
512
513 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
514         :CProxy(), _aid(e->ckGetArrayID())
515         {}
516 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
517         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
518         {}
519
520 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
521         {return ckLocalBranch()->getLocMgr(); }
522
523 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
524
525 CkArrayOptions::CkArrayOptions(void) //Default: empty array
526         :numInitial(),map(_defaultArrayMapID)
527 {
528     init();
529 }
530
531 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
532         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
533 {
534     init();
535 }
536
537 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
538         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
539 {
540     init();
541 }
542
543 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
544         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
545 {
546     init();
547 }
548
549 void CkArrayOptions::init()
550 {
551     locMgr.setZero();
552     anytimeMigration = _isAnytimeMigration;
553     staticInsertion = _isStaticInsertion;
554     reductionClient.type = CkCallback::invalid;
555     disableNotifyChildInRed = !_isNotifyChildInRed;
556     broadcastViaScheduler = false;
557 }
558
559 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
560 {
561     staticInsertion = b;
562     if (b && map == _defaultArrayMapID)
563         map = _fastArrayMapID;
564     return *this;
565 }
566
567 /// Bind our elements to this array
568 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
569 {
570         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
571         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
572         // not just initially.
573         //setNumInitial(arr->getNumInitial());
574         return setLocationManager(arr->getLocMgr()->getGroupID());
575 }
576 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
577 {
578         arrayListeners.push_back(listener);
579         return *this;
580 }
581
582 void CkArrayOptions::pup(PUP::er &p) {
583         p|numInitial;
584         p|map;
585         p|locMgr;
586         p|arrayListeners;
587         p|reductionClient;
588         p|anytimeMigration;
589         p|disableNotifyChildInRed;
590         p|staticInsertion;
591         p|broadcastViaScheduler;
592 }
593
594 CkArrayListener::CkArrayListener(int nInts_) 
595   :nInts(nInts_) 
596 {
597   dataOffset=-1;
598 }
599 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
600   nInts=-1; dataOffset=-1;
601 }
602 void CkArrayListener::pup(PUP::er &p) {
603   p|nInts;
604   p|dataOffset;
605 }
606
607 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
608 {
609   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
610   dataOffset=dataOffset_;
611 }
612
613 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
614                                           const CkArrayOptions &opts_)
615 {
616   CkArrayOptions opts(opts_);
617   CkGroupID locMgr = opts.getLocationManager();
618   if (locMgr.isZero())
619   { //Create a new location manager
620 #if !CMK_LBDB_ON
621     CkGroupID _lbdb;
622 #endif
623     CkEntryOptions  e_opts;
624     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
625     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
626     opts.setLocationManager(locMgr);
627   }
628   //Create the array manager
629   m->array_ep()=ctor;
630   CkMarshalledMessage marsh(m);
631   CkEntryOptions  e_opts;
632   e_opts.setGroupDepID(locMgr);       // group creation dependence
633 #if !GROUP_LEVEL_REDUCTION
634   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
635   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
636   nodereductionProxy.setAttachedGroup(ag);
637 #else
638   CkNodeGroupID dummyid;
639   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
640 #endif
641   return (CkArrayID)ag;
642 }
643
644 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
645 {
646   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
647 }
648
649 extern IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID);
650
651 struct CkInsertIdxMsg {
652   char core[CmiReservedHeaderSize];
653   CkArrayIndex idx;
654   CkArrayMessage *m;
655   int ctor;
656   int onPe;
657   CkArrayID _aid;
658 };
659
660 static int ckinsertIdxHdl;
661
662 void ckinsertIdxFunc(void *m)
663 {
664   CkInsertIdxMsg *msg = (CkInsertIdxMsg *)m;
665   CProxy_ArrayBase   ca(msg->_aid);
666   ca.ckInsertIdx(msg->m, msg->ctor, msg->onPe, msg->idx);
667   CmiFree(msg);
668 }
669
670 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
671         const CkArrayIndex &idx)
672 {
673   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
674   m->array_ep()=ctor;
675   CkArray *ca = ckLocalBranch();
676   if (ca == NULL) {
677       CkInsertIdxMsg *msg = (CkInsertIdxMsg *)CmiAlloc(sizeof(CkInsertIdxMsg));
678       msg->idx = idx;
679       msg->m = m;
680       msg->ctor = ctor;
681       msg->onPe = onPe;
682       msg->_aid = _aid;
683       CmiSetHandler(msg, ckinsertIdxHdl);
684       ca = (CkArray *)lookupGroupAndBufferIfNotThere(CkpvAccess(_coreState), (envelope*)msg,_aid);
685       if (ca == NULL) return;
686   }
687   ca->prepareCtorMsg(m,onPe,idx);
688   if (ckIsDelegated()) {
689         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
690         return;
691   }
692   
693   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
694   CkArrayManagerInsert(onPe,m,_aid);
695 }
696
697 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
698 {
699   ckInsertIdx(m,ctorIndex,onPe,_idx);
700 }
701
702 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
703 {
704   return ckLocalBranch()->lookup(_idx);
705 }
706
707 //pack-unpack method for CProxy_ArrayBase
708 void CProxy_ArrayBase::pup(PUP::er &p)
709 {
710   CProxy::pup(p);
711   _aid.pup(p);
712 }
713 void CProxyElement_ArrayBase::pup(PUP::er &p)
714 {
715   CProxy_ArrayBase::pup(p);
716   p|_idx.nInts;
717   p|_idx.dimension;
718   p(_idx.data(),_idx.nInts);
719 }
720
721 void CProxySection_ArrayBase::pup(PUP::er &p)
722 {
723   CProxy_ArrayBase::pup(p);
724   p | _nsid;
725   if (p.isUnpacking()) {
726     if (_nsid == 1) _sid = new CkSectionID;
727     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
728     else _sid = NULL;
729   }
730   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
731 }
732
733 /*********************** CkArray Creation *************************/
734 void _ckArrayInit(void)
735 {
736   CkpvInitialize(ArrayElement_initInfo,initInfo);
737   CkDisableTracing(CkIndex_CkArray::insertElement(0));
738   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
739     // disable because broadcast listener may deliver broadcast message
740   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
741   // by default anytime migration is allowed
742   ckinsertIdxHdl = CkRegisterHandler(ckinsertIdxFunc);
743 }
744
745 CkArray::CkArray(CkArrayOptions &opts,
746                  CkMarshalledMessage &initMsg,
747                  CkNodeGroupID nodereductionID)
748   : CkReductionMgr(),
749     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
750     locMgrID(opts.getLocationManager()),
751     thisProxy(thisgroup),
752     // Register with our location manager
753     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
754     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
755     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
756 {
757   if (!stableLocations)
758       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
759                              staticSpringCleaning, (void *)this);
760   
761   //set the field in one my parent class (CkReductionMgr)
762   if(opts.disableNotifyChildInRed)
763           disableNotifyChildrenStart = CmiTrue; 
764   
765   //Find, register, and initialize the arrayListeners
766   listenerDataOffset=0;
767   broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
768   addListener(broadcaster);
769   reducer=new CkArrayReducer(thisgroup);
770   addListener(reducer);
771
772   // COMLIB HACK
773   //calistener = new ComlibArrayListener();
774   //addListener(calistener,dataOffset);
775
776   int lNo,nL=opts.getListeners(); //User-added listeners
777   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
778
779   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
780
781   ///Set up initial elements (if any)
782   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
783
784   ///adding code for Reduction using nodegroups
785
786 #if !GROUP_LEVEL_REDUCTION
787   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
788   nodeProxy = nodetemp;
789   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
790 #endif
791
792 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
793         // creating the spanning tree to be used for broadcast
794         children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
795         numChildren = 0;
796         
797         // computing the level of the tree this pe is in
798         // we should use the geometric series formula, but now a quick and dirty code should suffice
799         // PE 0 is at level 0, PEs 1.._MLOG_BCAST_BFACTOR_ are at level 1 and so on
800         int level = 0;
801         int aux = CmiMyPe();
802         int max = CmiNumPes();
803         int factor = _MLOG_BCAST_BFACTOR_;
804         int startLevel = 0;
805         int startNextLevel = 1;
806         while(aux >= 0){
807                 level++;
808                 startLevel = startNextLevel;
809                 startNextLevel += factor;
810                 aux -= factor;
811                 factor *= _MLOG_BCAST_BFACTOR_;
812         }
813
814         // adding children to the tree
815         int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
816         for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
817                 if(first + i >= CmiNumPes())
818                         break;
819                 children[i] = first + i;
820                 numChildren++;
821         }
822  
823 #endif
824
825
826   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
827       ckSetReductionClient(&opts.reductionClient);
828 }
829
830 CkArray::CkArray(CkMigrateMessage *m)
831         :CkReductionMgr(m), thisProxy(thisgroup)
832 {
833   locMgr=NULL;
834   isInserting=CmiTrue;
835 }
836
837 #if CMK_ERROR_CHECKING
838 inline void testPup(PUP::er &p,int shouldBe) {
839   int a=shouldBe;
840   p|a;
841   if (a!=shouldBe)
842     CkAbort("PUP direction mismatch!");
843 }
844 #else
845 inline void testPup(PUP::er &p,int shouldBe) {}
846 #endif
847
848 void CkArray::pup(PUP::er &p){
849         CkReductionMgr::pup(p);
850         p|numInitial;
851         p|locMgrID;
852         p|listeners;
853         p|listenerDataOffset;
854         p|stableLocations;
855         testPup(p,1234);
856         if(p.isUnpacking()){
857                 thisProxy=thisgroup;
858                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
859                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
860                 /// Restore our default listeners:
861                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
862                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
863                 /// set up broadcast cleaner
864                 if (!stableLocations)
865                     CcdCallOnConditionKeep(CcdPERIODIC_1minute,
866                                            staticSpringCleaning, (void *)this);
867         }
868 }
869
870 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
871   int dataOffset=0; \
872   for (int lNo=0;lNo<listeners.size();lNo++) { \
873     CkArrayListener *l=listeners[lNo]; \
874     l->ckElementStamp(&listenerData[dataOffset]); \
875     dataOffset+=l->ckGetLen(); \
876   } \
877 } while (0)
878
879 //Called on send side to prepare array constructor message
880 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
881 {
882   envelope *env=UsrToEnv((void *)m);
883   env->getsetArrayIndex()=idx;
884   int *listenerData=env->getsetArrayListenerData();
885   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
886   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
887   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
888         getLocMgr()->inform(idx,onPe);
889 }
890
891 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
892                         CkElementCreation_t type)
893 {
894         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
895         if (type==CkElementCreation_resume) 
896         { // HACK: Re-stamp elements on checkpoint resume--
897           //  this restores, e.g., reduction manager's gcount
898                 int *listenerData=ret->listenerData;
899                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
900         }
901         return ret;
902 }
903
904 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
905                      CkMessage *msg,CmiBool fromMigration) 
906 {
907         //Stash the element's initialization information in the global "initInfo"
908         ArrayElement_initInfo &init=CkpvAccess(initInfo);
909         init.numInitial=numInitial;
910         init.thisArray=this;
911         init.thisArrayID=thisgroup;
912         if (msg) /*Have to *copy* data because msg will be deleted*/
913           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
914                  sizeof(init.listenerData));
915         init.fromMigration=fromMigration;
916         
917         //Build the element
918         int elSize=_chareTable[elChareType]->size;
919         ArrayElement *elem = (ArrayElement *)malloc(elSize);
920 #ifndef CMK_OPTIMIZE
921         if (elem!=NULL) setMemoryTypeChare(elem);
922 #endif
923         return elem;
924 }
925
926 /// This method is called by ck.C or the user to add an element.
927 CmiBool CkArray::insertElement(CkMessage *me)
928 {
929   CK_MAGICNUMBER_CHECK
930   CkArrayMessage *m=(CkArrayMessage *)me;
931   const CkArrayIndex &idx=m->array_index();
932   int onPe;
933   if (locMgr->isRemote(idx,&onPe)) 
934   { /* element's sibling lives somewhere else, so insert there */
935         CkArrayManagerInsert(onPe,me,thisgroup);
936         return CmiFalse;
937   }
938   int ctorIdx=m->array_ep();
939   int chareType=_entryTable[ctorIdx]->chareIdx;
940   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
941 #ifndef CMK_CHARE_USE_PTR
942   ((Chare *)elt)->chareIdx = -1;
943 #endif
944   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
945   CK_ARRAYLISTENER_LOOP(listeners,
946       if (!l->ckElementCreated(elt)) return CmiFalse;);
947   return CmiTrue;
948 }
949
950 void CProxy_ArrayBase::doneInserting(void)
951 {
952   DEBC((AA"Broadcasting a doneInserting request\n"AB));
953   //Broadcast a DoneInserting
954   CProxy_CkArray(_aid).remoteDoneInserting();
955 }
956
957 void CProxy_ArrayBase::beginInserting(void)
958 {
959   DEBC((AA"Broadcasting a beginInserting request\n"AB));
960   CProxy_CkArray(_aid).remoteBeginInserting();
961 }
962
963 void CkArray::doneInserting(void)
964 {
965   thisProxy[CkMyPe()].remoteDoneInserting();
966 }
967
968 void CkArray::beginInserting(void)
969 {
970   thisProxy[CkMyPe()].remoteBeginInserting();
971 }
972
973 /// This is called on every processor after the last array insertion.
974 void CkArray::remoteDoneInserting(void)
975 {
976   CK_MAGICNUMBER_CHECK
977   if (isInserting) {
978     isInserting=CmiFalse;
979     DEBC((AA"Done inserting objects\n"AB));
980     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
981     locMgr->doneInserting();
982   }
983 }
984
985 void CkArray::remoteBeginInserting(void)
986 {
987   CK_MAGICNUMBER_CHECK;
988
989   if (!isInserting) {
990     isInserting = CmiTrue;
991     DEBC((AA"Begin inserting objects\n"AB));
992     for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
993     locMgr->startInserting();
994   }
995 }
996
997 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
998         int onPe,int ctor,CkDeliver_t type)
999 {
1000         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
1001         prepareCtorMsg(m,onPe,idx);
1002         m->array_ep()=ctor;
1003         
1004         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
1005                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
1006                 CkArrayManagerInsert(onPe,m,thisgroup);
1007         } else /* local message, non-queued */ {
1008                 //Call local constructor directly
1009                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
1010                 return insertElement(m);
1011         }
1012         return CmiTrue;
1013 }
1014
1015 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
1016 {
1017         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
1018         if (local) {
1019           int onPe=CkMyPe();
1020           prepareCtorMsg(m,onPe,idx);
1021 #if CMK_BIGSIM_CHARM
1022           BgEntrySplit("split-array-new");
1023 #endif
1024           insertElement(m);
1025         }
1026         else {
1027           int onPe=-1;
1028           prepareCtorMsg(m,onPe,idx);
1029           CkArrayManagerInsert(onPe,m,getGroupID());
1030         }
1031 }
1032
1033 /********************* CkArray Messaging ******************/
1034 /// Fill out a message's array fields before sending it
1035 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
1036 {
1037         envelope *env=UsrToEnv((void *)msg);
1038         env->getsetArrayMgr()=aid;
1039         env->getsetArraySrcPe()=CkMyPe();
1040         env->setEpIdx(ep);
1041         env->getsetArrayHops()=0;
1042 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1043         criticalPath_send(env);
1044         automaticallySetMessagePriority(env);
1045 #endif
1046 }
1047
1048
1049 /// Just a non-inlined version of msg_prepareSend()
1050 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
1051 {
1052         envelope *env=UsrToEnv((void *)msg);
1053         env->getsetArrayMgr()=aid;
1054         env->getsetArraySrcPe()=CkMyPe();
1055         env->setEpIdx(ep);
1056         env->getsetArrayHops()=0;
1057 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1058         criticalPath_send(env);
1059         automaticallySetMessagePriority(env);
1060 #endif
1061 }
1062
1063 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
1064 {
1065 #if CMK_ERROR_CHECKING
1066         //Check our array index for validity
1067         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
1068         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
1069                 CkAbort("Array index length (nInts) is too long-- did you "
1070                         "use bytes instead of integers?\n");
1071 #endif
1072         msg_prepareSend(msg,ep,ckGetArrayID());
1073         msg->array_index()=_idx;//Insert array index
1074         if (ckIsDelegated()) //Just call our delegateMgr
1075           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
1076         else 
1077         { //Usual case: a direct send
1078           CkArray *localbranch = ckLocalBranch();
1079           if (localbranch == NULL) {             // array not created yet
1080             CkArrayManagerDeliver(CkMyPe(), msg, 0);
1081           }
1082           else {
1083             if (opts & CK_MSG_INLINE)
1084               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
1085             else
1086               localbranch->deliver(msg, CkDeliver_queue, opts);
1087           }
1088         }
1089 }
1090
1091 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
1092 {
1093         CkFutureID f=CkCreateAttachedFuture(msg);
1094         ckSend(msg,ep);
1095         return CkWaitReleaseFuture(f);
1096 }
1097
1098 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
1099 {
1100         CProxySection_ArrayBase sp(sID);
1101         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1102 }
1103
1104 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1105 {
1106         if (ckIsDelegated()) //Just call our delegateMgr
1107           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1108         else {
1109           // send through all
1110           for (int k=0; k<_nsid; ++k) {
1111             for (int i=0; i< _sid[k]._nElems-1; i++) {
1112               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
1113               void *newMsg=CkCopyMsg((void **)&msg);
1114               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1115             }
1116             if (_sid[k]._nElems > 0) {
1117               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1118               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
1119               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1120             }
1121           }
1122         }
1123 }
1124
1125 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1126 {
1127   CkArrayMessage *m=(CkArrayMessage *)msg;
1128   m->array_index()=idx;
1129   msg_prepareSend(m,entryIndex,aID);
1130   CkArray *a=(CkArray *)_localBranch(aID);
1131   if (a == NULL)
1132     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1133   else
1134     a->deliver(m,CkDeliver_queue,opts);
1135 }
1136
1137 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1138 {
1139   CkArrayMessage *m=(CkArrayMessage *)msg;
1140   m->array_index()=idx;
1141   msg_prepareSend(m,entryIndex,aID);
1142   CkArray *a=(CkArray *)_localBranch(aID);
1143   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1144   a->deliver(m,CkDeliver_inline,opts);
1145   if (oldStatus) CkEnableTracing(entryIndex);
1146 }
1147
1148
1149 /*********************** CkArray Reduction *******************/
1150 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1151   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1152    mgrID(mgrID_)
1153 {
1154   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1155 }
1156 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1157   :CkArrayListener(m)
1158 {
1159   mgr=NULL;
1160 }
1161 void CkArrayReducer::pup(PUP::er &p) {
1162   CkArrayListener::pup(p);
1163   p|mgrID;
1164   if (p.isUnpacking())
1165     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1166 }
1167 CkArrayReducer::~CkArrayReducer() {}
1168
1169 /*********************** CkArray Broadcast ******************/
1170
1171 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
1172     :CkArrayListener(1), //Each array element carries a broadcast number
1173      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
1174 { }
1175
1176 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1177     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
1178 { }
1179
1180 void CkArrayBroadcaster::pup(PUP::er &p) {
1181   CkArrayListener::pup(p);
1182   /* Assumption: no migrants during checkpoint, so no need to
1183      save old broadcasts. */
1184   p|bcastNo;
1185   p|stableLocations;
1186   p|broadcastViaScheduler;
1187   if (p.isUnpacking()) {
1188     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1189   }
1190 }
1191
1192 CkArrayBroadcaster::~CkArrayBroadcaster()
1193 {
1194   CkArrayMessage *msg;
1195   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1196 }
1197
1198 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1199 {
1200   bcastNo++;
1201   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1202
1203   if (stableLocations)
1204     return;
1205
1206   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1207   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1208 }
1209
1210 /// Deliver a copy of the given broadcast to the given local element
1211 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1212                                     CmiBool doFree)
1213 {
1214   int &elBcastNo=getData(el);
1215   // if this array element already received this message, skip it
1216   if (elBcastNo >= bcastNo) return CmiFalse;
1217   elBcastNo++;
1218   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1219   int epIdx=bcast->array_ep_bcast();
1220
1221 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1222   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1223   return CmiTrue;
1224 #else
1225   if (!broadcastViaScheduler)
1226     return el->ckInvokeEntry(epIdx, bcast, doFree);
1227   else {
1228     if (!doFree) {
1229       CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
1230       bcast = newMsg;
1231     }
1232     envelope *env = UsrToEnv(bcast);
1233     env->getsetArrayEp() = epIdx;
1234     env->getsetArrayMgr() = el->ckGetArrayID();
1235     env->getsetArrayIndex() = el->ckGetArrayIndex();
1236     CkArrayManagerDeliver(CkMyPe(), bcast, 0);
1237     return true;
1238   }
1239 #endif
1240 }
1241
1242 /// Deliver all needed broadcasts to the given local element
1243 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1244 {
1245   if (stableLocations) return CmiTrue;
1246   int &elBcastNo=getData(el);
1247   if (elBcastNo<bcastNo)
1248   {//This element needs some broadcasts-- it must have
1249    //been migrating during the broadcast.
1250     int i,nDeliver=bcastNo-elBcastNo;
1251     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1252
1253     //Skip the old junk at the front of the bcast queue
1254     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1255       oldBcasts.enq(oldBcasts.deq());
1256
1257     //Deliver the newest messages, in old-to-new order
1258     for (i=nDeliver-1;i>=0;i--)
1259     {
1260       CkArrayMessage *msg=oldBcasts.deq();
1261                 if(msg == NULL)
1262                 continue;
1263       oldBcasts.enq(msg);
1264       if (!deliver(msg, el, CmiFalse))
1265         return CmiFalse; //Element migrated away
1266     }
1267   }
1268   //Otherwise, the element survived
1269   return CmiTrue;
1270 }
1271
1272
1273 void CkArrayBroadcaster::springCleaning(void)
1274 {
1275   //Remove old broadcast messages
1276   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1277   if (nDelete>0) {
1278     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1279     for (int i=0;i<nDelete;i++)
1280       delete oldBcasts.deq();
1281   }
1282   oldBcastNo=bcastNo;
1283 }
1284
1285 void CkArrayBroadcaster::flushState() 
1286
1287   bcastNo = oldBcastNo = 0; 
1288   CkArrayMessage *msg;
1289   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1290 }
1291
1292 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1293 {
1294         CProxy_ArrayBase ap(aID);
1295         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1296 }
1297
1298 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1299 {
1300         msg->array_ep_bcast()=ep;
1301         if (ckIsDelegated()) //Just call our delegateMgr
1302           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1303         else 
1304         { //Broadcast message via serializer node
1305           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1306           int skipsched = opts & CK_MSG_EXPEDITED; 
1307           //int serializer=0;//1623802937%CkNumPes();
1308 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1309                 CProxy_CkArray ap(_aid);
1310                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1311                 CkGroupID _id = _aid;
1312 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1313 #else
1314           if (CkMyPe()==CpvAccess(serializer))
1315           {
1316                 DEBB((AA"Sending array broadcast\n"AB));
1317                 if (skipsched)
1318                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1319                 else
1320                         CProxy_CkArray(_aid).recvBroadcast(msg);
1321           } else {
1322                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1323                 CProxy_CkArray ap(_aid);
1324                 if (skipsched)
1325                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1326                 else
1327                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1328           }
1329 #endif
1330         }
1331 }
1332
1333 /// Reflect a broadcast off this Pe:
1334 void CkArray::sendBroadcast(CkMessage *msg)
1335 {
1336         CK_MAGICNUMBER_CHECK
1337         if(CkMyPe() == CpvAccess(serializer)){
1338 #if _MLOG_BCAST_TREE_
1339                 // Using the spanning tree to broadcast the message
1340                 for(int i=0; i<numChildren; i++){
1341                         CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1342                         thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1343                 }
1344         
1345                 // delivering message locally
1346                 recvBroadcast(msg);     
1347 #else
1348                 //Broadcast the message to all processors
1349                 thisProxy.recvBroadcast(msg);
1350 #endif
1351         }else{
1352                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1353         }
1354 }
1355 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1356 {
1357         CK_MAGICNUMBER_CHECK
1358         //Broadcast the message to all processors
1359         thisProxy.recvExpeditedBroadcast(msg);
1360 }
1361
1362 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1363 int _tempBroadcastCount=0;
1364
1365 // Delivers a message using the spanning tree
1366 void CkArray::recvBroadcastViaTree(CkMessage *msg)
1367 {
1368         CK_MAGICNUMBER_CHECK
1369
1370         // Using the spanning tree to broadcast the message
1371         for(int i=0; i<numChildren; i++){
1372                 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1373                 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1374         }
1375
1376         // delivering message locally
1377         recvBroadcast(msg);     
1378 }
1379
1380 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1381     if(homePe(*index)==CmiMyPe()){
1382         CkArrayMessage *bcast = (CkArrayMessage *)data;
1383     int epIdx=bcast->array_ep_bcast();
1384         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1385         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1386         envelope *env = UsrToEnv(copy);
1387         env->sender.data.group.onPE = CkMyPe();
1388         env->TN  = env->SN=0;
1389         env->piggyBcastIdx = epIdx;
1390         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1391         env->getsetArrayMgr() = thisgroup;
1392         env->getsetArrayIndex() = *index;
1393     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1394         env->setSrcPe(CkMyPe());
1395         rec->deliver(copy,CkDeliver_queue);
1396         _tempBroadcastCount++;
1397     }else{
1398         if(locMgr->homeElementCount != -1){
1399             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1400         }
1401     }
1402 }
1403
1404 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1405     arr->broadcastHomeElements(data,rec,index);
1406 }
1407 #else
1408 void CkArray::recvBroadcastViaTree(CkMessage *msg){
1409 }
1410 #endif
1411
1412
1413 /// Increment broadcast count; deliver to all local elements
1414 void CkArray::recvBroadcast(CkMessage *m)
1415 {
1416         CK_MAGICNUMBER_CHECK
1417         CkArrayMessage *msg=(CkArrayMessage *)m;
1418         broadcaster->incoming(msg);
1419
1420 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1421         _tempBroadcastCount=0;
1422         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1423 #else
1424         //Run through the list of local elements
1425         int idx=0, len=0, count=0;
1426         if (stableLocations) {            /* remove all NULLs in the array */
1427           len = 0;
1428           while (elements->next(idx)!=NULL) len++;
1429           idx = 0;
1430         }
1431         ArrayElement *el;
1432 #if CMK_BIGSIM_CHARM
1433         void *root;
1434         _TRACE_BG_TLINE_END(&root);
1435         BgSetEntryName("start-broadcast", &root);
1436         CkVec<void *> logs;    // store all logs for each delivery
1437         extern void stopVTimer();
1438         extern void startVTimer();
1439 #endif
1440         while (NULL!=(el=elements->next(idx))) {
1441 #if CMK_BIGSIM_CHARM
1442                 //BgEntrySplit("split-broadcast");
1443                 stopVTimer();
1444                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1445                 logs.push_back(curlog);
1446                 startVTimer();
1447 #endif
1448                 CmiBool doFree = CmiFalse;
1449                 if (stableLocations && ++count == len) doFree = CmiTrue;
1450                 broadcaster->deliver(msg, el, doFree);
1451         }
1452 #endif
1453
1454 #if CMK_BIGSIM_CHARM
1455         //BgEntrySplit("end-broadcast");
1456         stopVTimer();
1457         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1458         startVTimer();
1459 #endif
1460
1461         // CkArrayBroadcaster doesn't have msg buffered, and there was
1462         // no last delivery to transfer ownership
1463 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1464         if (stableLocations)
1465           delete msg;
1466 #else
1467         if (stableLocations && len == 0)
1468           delete msg;
1469 #endif
1470 }
1471
1472 #include "CkArray.def.h"
1473
1474