fixed a race condition when an insert element message arrives before CkArray is created
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isStaticInsertion;
65 bool _isNotifyChildInRed;
66
67 #define ARRAY_DEBUG_OUTPUT 0
68
69 #if ARRAY_DEBUG_OUTPUT 
70 #   define DEB(x) CkPrintf x  //General debug messages
71 #   define DEBI(x) CkPrintf x  //Index debug messages
72 #   define DEBC(x) CkPrintf x  //Construction debug messages
73 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
74 #   define DEBM(x) CkPrintf x  //Migration debug messages
75 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
76 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
77 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
78 #   define AA "ArrayBOC on %d: "
79 #   define AB ,CkMyPe()
80 #   define DEBUG(x) x
81 #else
82 #   define DEB(X) /*CkPrintf x*/
83 #   define DEBI(X) /*CkPrintf x*/
84 #   define DEBC(X) /*CkPrintf x*/
85 #   define DEBS(x) /*CkPrintf x*/
86 #   define DEBM(X) /*CkPrintf x*/
87 #   define DEBL(X) /*CkPrintf x*/
88 #   define DEBK(x) /*CkPrintf x*/
89 #   define DEBB(x) /*CkPrintf x*/
90 #   define str(x) /**/
91 #   define DEBUG(x)
92 #endif
93
94 ///This arrayListener is in charge of delivering broadcasts to the array.
95 class CkArrayBroadcaster : public CkArrayListener {
96   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
97 public:
98   CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
99   CkArrayBroadcaster(CkMigrateMessage *m);
100   virtual void pup(PUP::er &p);
101   virtual ~CkArrayBroadcaster();
102   PUPable_decl(CkArrayBroadcaster);
103
104   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
105
106   ///Element was just created on this processor
107   /// Return false if the element migrated away or deleted itself.
108   virtual CmiBool ckElementCreated(ArrayElement *elt)
109     { return bringUpToDate(elt); }
110
111   ///Element just arrived on this processor (so just called pup)
112   /// Return false if the element migrated away or deleted itself.
113   virtual CmiBool ckElementArriving(ArrayElement *elt)
114     { return bringUpToDate(elt); }
115
116   void incoming(CkArrayMessage *msg);
117
118   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
119
120   void springCleaning(void);
121
122   void flushState();
123 private:
124   int bcastNo;//Number of broadcasts received (also serial number)
125   int oldBcastNo;//Above value last spring cleaning
126   //This queue stores old broadcasts (in case a migrant arrives
127   // and needs to be brought up to date)
128   CkQ<CkArrayMessage *> oldBcasts;
129   bool stableLocations;
130   bool broadcastViaScheduler;
131
132   CmiBool bringUpToDate(ArrayElement *el);
133 };
134
135 ///This arrayListener is in charge of performing reductions on the array.
136 class CkArrayReducer : public CkArrayListener {
137   CkGroupID mgrID;
138   CkReductionMgr *mgr;
139   typedef  contributorInfo *I;
140   inline contributorInfo *getData(ArrayElement *el)
141     {return (I)ckGetData(el);}
142 public:
143   /// Attach this array to this CkReductionMgr
144   CkArrayReducer(CkGroupID mgrID_);
145   CkArrayReducer(CkMigrateMessage *m);
146   virtual void pup(PUP::er &p);
147   virtual ~CkArrayReducer();
148   PUPable_decl(CkArrayReducer);
149
150   void ckBeginInserting(void) {mgr->creatingContributors();}
151   void ckEndInserting(void) {mgr->doneCreatingContributors();}
152
153   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
154
155   void ckElementCreating(ArrayElement *elt)
156     {mgr->contributorCreated(getData(elt));}
157   void ckElementDied(ArrayElement *elt)
158     {mgr->contributorDied(getData(elt));}
159
160   void ckElementLeaving(ArrayElement *elt)
161     {mgr->contributorLeaving(getData(elt));}
162   CmiBool ckElementArriving(ArrayElement *elt)
163     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
164 };
165
166 /*
167 void 
168 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
169        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
170 }
171 */
172 void
173 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
174         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
175         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
176 }
177
178 /*********************** CkVerboseListener ******************/
179 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
180
181 CkVerboseListener::CkVerboseListener(void)
182   :CkArrayListener(0)
183 {
184   VL_PRINT<<"INIT  Creating listener"<<endl;
185 }
186
187 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
188 {
189   CkArrayListener::ckRegister(arrMgr,dataOffset_);
190   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
191 }
192 void CkVerboseListener::ckBeginInserting(void)
193 {
194   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
195 }
196 void CkVerboseListener::ckEndInserting(void)
197 {
198   VL_PRINT<<"INIT  Done inserting elements"<<endl;
199 }
200
201 void CkVerboseListener::ckElementStamp(int *eltInfo)
202 {
203   VL_PRINT<<"LIFE  Stamping element"<<endl;
204 }
205 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
206 {
207   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
208 }
209 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
210 {
211   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
212   return CmiTrue;
213 }
214 void CkVerboseListener::ckElementDied(ArrayElement *elt)
215 {
216   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
217 }
218
219 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
220 {
221   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
222 }
223 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
224 {
225   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
226   return CmiTrue;
227 }
228
229
230 /************************* ArrayElement *******************/
231 class ArrayElement_initInfo {
232 public:
233   CkArray *thisArray;
234   CkArrayID thisArrayID;
235   CkArrayIndex numInitial;
236   int listenerData[CK_ARRAYLISTENER_MAXLEN];
237   CmiBool fromMigration;
238 };
239
240 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
241
242 void ArrayElement::initBasics(void)
243 {
244 #if CMK_OUT_OF_CORE
245   if (CkpvAccess(CkSaveRestorePrefetch)) 
246     return; /* Just restoring from disk--don't try to set up anything. */
247 #endif
248 #if CMK_GRID_QUEUE_AVAILABLE
249         grid_queue_interval = 0;
250         grid_queue_threshold = 0;
251         msg_count = 0;
252         msg_count_grid = 0;
253         border_flag = 0;
254
255         grid_queue_interval = CmiGridQueueGetInterval ();
256         grid_queue_threshold = CmiGridQueueGetThreshold ();
257 #endif
258   ArrayElement_initInfo &info=CkpvAccess(initInfo);
259   thisArray=info.thisArray;
260   thisArrayID=info.thisArrayID;
261   numInitialElements=info.numInitial.getCombinedCount();
262   if (info.listenerData) {
263     memcpy(listenerData,info.listenerData,sizeof(listenerData));
264   }
265   if (!info.fromMigration) {
266     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
267                           l->ckElementCreating(this));
268   }
269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
270         if(mlogData == NULL)
271                 mlogData = new ChareMlogData();
272         mlogData->objID.type = TypeArray;
273         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
274 #endif
275 #ifdef _PIPELINED_ALLREDUCE_
276         allredMgr = NULL;
277 #endif
278 }
279
280 ArrayElement::ArrayElement(void) 
281 {
282         initBasics();
283 #if CMK_MEM_CHECKPOINT
284         init_checkpt();
285 #endif
286 }
287
288 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
289 {
290         initBasics();
291 }
292
293 //Called by the system just before and after migration to another processor:  
294 void ArrayElement::ckAboutToMigrate(void) {
295         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
296                                 l->ckElementLeaving(this));
297         CkMigratable::ckAboutToMigrate();
298 }
299 void ArrayElement::ckJustMigrated(void) {
300         CkMigratable::ckJustMigrated();
301         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
302               if (!l->ckElementArriving(this)) return;);
303 }
304
305 void ArrayElement::ckJustRestored(void) {
306     CkMigratable::ckJustRestored();
307     //empty for out-of-core emulation
308 }
309
310 #ifdef _PIPELINED_ALLREDUCE_
311 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
312                                         CMK_REFNUM_TYPE userFlag)
313 {
314         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
315         msg->setUserFlag(userFlag);
316         msg->setMigratableContributor(true);
317         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
318 }
319 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
320                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
321 {
322         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
323         msg->setUserFlag(userFlag);
324         msg->setCallback(cb);
325         msg->setMigratableContributor(true);
326         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
327 }
328 void ArrayElement::contribute2(CkReductionMsg *msg) 
329 {
330         msg->setMigratableContributor(true);
331         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
332 }
333 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
334 {
335         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
336     msg->setUserFlag(userFlag);
337     msg->setCallback(cb);
338     msg->setMigratableContributor(true);
339     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
340 }
341 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
342 {
343     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
344     msg->setUserFlag(userFlag);
345     msg->setMigratableContributor(true);
346     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
347 }
348
349 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
350                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
351 {
352         // if it is a broadcast to myself and size is large
353         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
354         {
355                 if (!allredMgr) {
356                         allredMgr = new AllreduceMgr();
357                 }
358                 // number of fragments
359                 int fragNo = dataSize/FRAG_SIZE;
360                 int size = FRAG_SIZE;
361                 // for each fragment
362                 for (int i=0; i<fragNo; i++) {
363                         // callback to defragmentor
364                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
365                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
366                                 size = dataSize%FRAG_SIZE;
367                         }
368                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
369                         // initialize the new msg
370                         msg->reducer            = type;
371                         msg->nFrags             = fragNo;
372                         msg->fragNo             = i;
373                         msg->callback           = defrag_cb;
374                         msg->userFlag           = userFlag;
375                         allredMgr->cb           = cb;
376                         allredMgr->cb.type      = CkCallback::sendArray;
377                         allredMgr->cb.d.array.idx = myIndex;
378                         contribute2(msg);
379                 }
380                 return;
381         }
382         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
383         msg->setUserFlag(userFlag);
384         msg->setCallback(cb);
385         msg->setMigratableContributor(true);
386         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
387 }
388
389
390 #else
391 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
392    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
393 #endif
394 // _PIPELINED_ALLREDUCE_
395 void ArrayElement::defrag(CkReductionMsg *msg)
396 {
397 //      CkPrintf("in defrag\n");
398 #ifdef _PIPELINED_ALLREDUCE_
399         allredMgr->allreduce_recieve(msg);
400 #endif
401 }
402
403 /// Remote method: calls destructor
404 void ArrayElement::ckDestroy(void)
405 {
406         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
407             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
408                            l->ckElementDied(this));
409         }
410         CkMigratable::ckDestroy();
411 }
412
413 //Destructor (virtual)
414 ArrayElement::~ArrayElement()
415 {
416 #if CMK_OUT_OF_CORE
417   if (CkpvAccess(CkSaveRestorePrefetch)) 
418     return; /* Just saving to disk--don't trash anything. */
419 #endif
420   //To detect use-after-delete: 
421   thisArray=(CkArray *)0xDEADa7a1;
422 }
423
424 void ArrayElement::pup(PUP::er &p)
425 {
426   DEBM((AA"  ArrayElement::pup()\n"AB));
427   CkMigratable::pup(p);
428   thisArrayID.pup(p);
429   if (p.isUnpacking())
430         thisArray=thisArrayID.ckLocalBranch();
431   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
432 #if CMK_MEM_CHECKPOINT
433   p(budPEs, 2);
434 #endif
435   p.syncComment(PUP::sync_last_system,"ArrayElement");
436 #if CMK_GRID_QUEUE_AVAILABLE
437   p|grid_queue_interval;
438   p|grid_queue_threshold;
439   p|msg_count;
440   p|msg_count_grid;
441   p|border_flag;
442   if (p.isUnpacking ()) {
443     msg_count = 0;
444     msg_count_grid = 0;
445     border_flag = 0;
446   }
447 #endif
448 }
449
450 char *ArrayElement::ckDebugChareName(void) {
451         char buf[200];
452         const char *className=_chareTable[ckGetChareType()]->name;
453         const int *d=thisIndexMax.data();
454         const short int *s=(const short int*)d;
455         switch (thisIndexMax.dimension) {
456         case 0: sprintf(buf,"%s",className); break;
457         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
458         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
459         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
460     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
461     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
462     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
463         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
464         };
465         return strdup(buf);
466 }
467
468 int ArrayElement::ckDebugChareID(char *str, int limit) {
469   if (limit<21) return -1;
470   str[0] = 2;
471   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
472   *((CkArrayIndex*)&str[5]) = thisIndexMax;
473   return 21;
474 }
475
476 /// A more verbose form of abort
477 void ArrayElement::CkAbort(const char *str) const
478 {
479         CkError("[%d] Array element at index %s aborting:\n",
480                 CkMyPe(), idx2str(thisIndexMax));
481         CkMigratable::CkAbort(str);
482 }
483
484 void ArrayElement::recvBroadcast(CkMessage *m){
485 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
486         CkArrayMessage *bcast = (CkArrayMessage *)m;
487     envelope *env = UsrToEnv(m);
488         int epIdx= env->piggyBcastIdx;
489     ckInvokeEntry(epIdx,bcast,CmiTrue);
490 #endif
491 }
492
493 /*********************** Spring Cleaning *****************
494 Periodically (every minute or so) remove expired broadcasts
495 from the queue.
496
497 This does not get called for arrays with stable locations (all
498 insertions done at creation, migration only at discrete points).
499 */
500
501 inline void CkArray::springCleaning(void)
502 {
503   DEBK((AA"Starting spring cleaning\n"AB));
504   broadcaster->springCleaning();
505 }
506
507 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
508         ((CkArray *)forArray)->springCleaning();
509 }
510
511 /********************* Little CkArray Utilities ******************/
512
513 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
514         :CProxy(), _aid(e->ckGetArrayID())
515         {}
516 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
517         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
518         {}
519
520 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
521         {return ckLocalBranch()->getLocMgr(); }
522
523 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
524
525 CkArrayOptions::CkArrayOptions(void) //Default: empty array
526         :numInitial(),map(_defaultArrayMapID)
527 {
528     init();
529 }
530
531 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
532         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
533 {
534     init();
535 }
536
537 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
538         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
539 {
540     init();
541 }
542
543 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
544         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
545 {
546     init();
547 }
548
549 void CkArrayOptions::init()
550 {
551     locMgr.setZero();
552     anytimeMigration = _isAnytimeMigration;
553     staticInsertion = _isStaticInsertion;
554     reductionClient.type = CkCallback::invalid;
555     disableNotifyChildInRed = !_isNotifyChildInRed;
556     broadcastViaScheduler = false;
557 }
558
559 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
560 {
561     staticInsertion = b;
562     if (b && map == _defaultArrayMapID)
563         map = _fastArrayMapID;
564     return *this;
565 }
566
567 /// Bind our elements to this array
568 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
569 {
570         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
571         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
572         // not just initially.
573         //setNumInitial(arr->getNumInitial());
574         return setLocationManager(arr->getLocMgr()->getGroupID());
575 }
576 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
577 {
578         arrayListeners.push_back(listener);
579         return *this;
580 }
581
582 void CkArrayOptions::pup(PUP::er &p) {
583         p|numInitial;
584         p|map;
585         p|locMgr;
586         p|arrayListeners;
587         p|reductionClient;
588         p|anytimeMigration;
589         p|disableNotifyChildInRed;
590         p|staticInsertion;
591         p|broadcastViaScheduler;
592 }
593
594 CkArrayListener::CkArrayListener(int nInts_) 
595   :nInts(nInts_) 
596 {
597   dataOffset=-1;
598 }
599 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
600   nInts=-1; dataOffset=-1;
601 }
602 void CkArrayListener::pup(PUP::er &p) {
603   p|nInts;
604   p|dataOffset;
605 }
606
607 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
608 {
609   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
610   dataOffset=dataOffset_;
611 }
612
613 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
614                                           const CkArrayOptions &opts_)
615 {
616   CkArrayOptions opts(opts_);
617   CkGroupID locMgr = opts.getLocationManager();
618   if (locMgr.isZero())
619   { //Create a new location manager
620 #if !CMK_LBDB_ON
621     CkGroupID _lbdb;
622 #endif
623     CkEntryOptions  e_opts;
624     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
625     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
626     opts.setLocationManager(locMgr);
627   }
628   //Create the array manager
629   m->array_ep()=ctor;
630   CkMarshalledMessage marsh(m);
631   CkEntryOptions  e_opts;
632   e_opts.setGroupDepID(locMgr);       // group creation dependence
633 #if !GROUP_LEVEL_REDUCTION
634   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
635   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
636   nodereductionProxy.setAttachedGroup(ag);
637 #else
638   CkNodeGroupID dummyid;
639   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
640 #endif
641   return (CkArrayID)ag;
642 }
643
644 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
645 {
646   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
647 }
648
649 extern IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID);
650
651 struct CkInsertIdxMsg {
652   char core[CmiReservedHeaderSize];
653   CkArrayIndex idx;
654   CkArrayMessage *m;
655   int ctor;
656   int onPe;
657   CkArrayID _aid;
658 };
659
660 static int ckinsertIdxHdl;
661
662 void ckinsertIdxFunc(void *m)
663 {
664   CkInsertIdxMsg *msg = (CkInsertIdxMsg *)m;
665   CProxy_ArrayBase   ca(msg->_aid);
666   ca.ckInsertIdx(msg->m, msg->ctor, msg->onPe, msg->idx);
667   CmiFree(msg);
668 }
669
670 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
671         const CkArrayIndex &idx)
672 {
673   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
674   m->array_ep()=ctor;
675   CkArray *ca = ckLocalBranch();
676   if (ca == NULL) {
677       CkInsertIdxMsg *msg = (CkInsertIdxMsg *)CmiAlloc(sizeof(CkInsertIdxMsg));
678       msg->idx = idx;
679       msg->m = m;
680       msg->ctor = ctor;
681       msg->onPe = onPe;
682       msg->_aid = _aid;
683       CmiSetHandler(msg, ckinsertIdxHdl);
684       ca = (CkArray *)lookupGroupAndBufferIfNotThere(CkpvAccess(_coreState), (envelope*)msg,_aid);
685       if (ca == NULL) return;
686   }
687   ca->prepareCtorMsg(m,onPe,idx);
688   if (ckIsDelegated()) {
689         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
690         return;
691   }
692   
693   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
694   CkArrayManagerInsert(onPe,m,_aid);
695 }
696
697 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
698 {
699   ckInsertIdx(m,ctorIndex,onPe,_idx);
700 }
701
702 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
703 {
704   return ckLocalBranch()->lookup(_idx);
705 }
706
707 //pack-unpack method for CProxy_ArrayBase
708 void CProxy_ArrayBase::pup(PUP::er &p)
709 {
710   CProxy::pup(p);
711   _aid.pup(p);
712 }
713 void CProxyElement_ArrayBase::pup(PUP::er &p)
714 {
715   CProxy_ArrayBase::pup(p);
716   p|_idx.nInts;
717   p|_idx.dimension;
718   p(_idx.data(),_idx.nInts);
719 }
720
721 void CProxySection_ArrayBase::pup(PUP::er &p)
722 {
723   CProxy_ArrayBase::pup(p);
724   p | _nsid;
725   if (p.isUnpacking()) {
726     if (_nsid == 1) _sid = new CkSectionID;
727     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
728     else _sid = NULL;
729   }
730   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
731 }
732
733 /*********************** CkArray Creation *************************/
734 void _ckArrayInit(void)
735 {
736   CkpvInitialize(ArrayElement_initInfo,initInfo);
737   CkDisableTracing(CkIndex_CkArray::insertElement(0));
738   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
739     // disable because broadcast listener may deliver broadcast message
740   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
741   // by default anytime migration is allowed
742   ckinsertIdxHdl = CkRegisterHandler(ckinsertIdxFunc);
743 }
744
745 CkArray::CkArray(CkArrayOptions &opts,
746                  CkMarshalledMessage &initMsg,
747                  CkNodeGroupID nodereductionID)
748   : CkReductionMgr(),
749     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
750     locMgrID(opts.getLocationManager()),
751     thisProxy(thisgroup),
752     // Register with our location manager
753     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
754     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
755     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
756 {
757   if (!stableLocations)
758       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
759                              staticSpringCleaning, (void *)this);
760   
761   //set the field in one my parent class (CkReductionMgr)
762   if(opts.disableNotifyChildInRed)
763           disableNotifyChildrenStart = CmiTrue; 
764   
765   //Find, register, and initialize the arrayListeners
766   listenerDataOffset=0;
767   broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
768   addListener(broadcaster);
769   reducer=new CkArrayReducer(thisgroup);
770   addListener(reducer);
771
772   // COMLIB HACK
773   //calistener = new ComlibArrayListener();
774   //addListener(calistener,dataOffset);
775
776   int lNo,nL=opts.getListeners(); //User-added listeners
777   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
778
779   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
780
781   ///Set up initial elements (if any)
782   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
783
784   ///adding code for Reduction using nodegroups
785
786 #if !GROUP_LEVEL_REDUCTION
787   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
788   nodeProxy = nodetemp;
789   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
790 #endif
791
792 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
793         // creating the spanning tree to be used for broadcast
794         children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
795         numChildren = 0;
796         
797         // computing the level of the tree this pe is in
798         // we should use the geometric series formula, but now a quick and dirty code should suffice
799         // PE 0 is at level 0, PEs 1.._MLOG_BCAST_BFACTOR_ are at level 1 and so on
800         int level = 0;
801         int aux = CmiMyPe();
802         int max = CmiNumPes();
803         int factor = _MLOG_BCAST_BFACTOR_;
804         int startLevel = 0;
805         int startNextLevel = 1;
806         while(aux >= 0){
807                 level++;
808                 startLevel = startNextLevel;
809                 startNextLevel += factor;
810                 aux -= factor;
811                 factor *= _MLOG_BCAST_BFACTOR_;
812         }
813
814         // adding children to the tree
815         int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
816         for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
817                 if(first + i >= CmiNumPes())
818                         break;
819                 children[i] = first + i;
820                 numChildren++;
821         }
822  
823 #endif
824
825
826   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
827       ckSetReductionClient(&opts.reductionClient);
828 }
829
830 CkArray::CkArray(CkMigrateMessage *m)
831         :CkReductionMgr(m), thisProxy(thisgroup)
832 {
833   locMgr=NULL;
834   isInserting=CmiTrue;
835 }
836
837 #if CMK_ERROR_CHECKING
838 inline void testPup(PUP::er &p,int shouldBe) {
839   int a=shouldBe;
840   p|a;
841   if (a!=shouldBe)
842     CkAbort("PUP direction mismatch!");
843 }
844 #else
845 inline void testPup(PUP::er &p,int shouldBe) {}
846 #endif
847
848 void CkArray::pup(PUP::er &p){
849         CkReductionMgr::pup(p);
850         p|numInitial;
851         p|locMgrID;
852         p|listeners;
853         p|listenerDataOffset;
854         p|stableLocations;
855         testPup(p,1234);
856         if(p.isUnpacking()){
857                 thisProxy=thisgroup;
858                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
859                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
860                 /// Restore our default listeners:
861                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
862                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
863                 /// set up broadcast cleaner
864                 if (!stableLocations)
865                     CcdCallOnConditionKeep(CcdPERIODIC_1minute,
866                                            staticSpringCleaning, (void *)this);
867         }
868 }
869
870 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
871   int dataOffset=0; \
872   for (int lNo=0;lNo<listeners.size();lNo++) { \
873     CkArrayListener *l=listeners[lNo]; \
874     l->ckElementStamp(&listenerData[dataOffset]); \
875     dataOffset+=l->ckGetLen(); \
876   } \
877 } while (0)
878
879 //Called on send side to prepare array constructor message
880 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
881 {
882   envelope *env=UsrToEnv((void *)m);
883   env->getsetArrayIndex()=idx;
884   int *listenerData=env->getsetArrayListenerData();
885   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
886   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
887   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
888         getLocMgr()->inform(idx,onPe);
889 }
890
891 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
892                         CkElementCreation_t type)
893 {
894         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
895         if (type==CkElementCreation_resume) 
896         { // HACK: Re-stamp elements on checkpoint resume--
897           //  this restores, e.g., reduction manager's gcount
898                 int *listenerData=ret->listenerData;
899                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
900         }
901         return ret;
902 }
903
904 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
905                      CkMessage *msg,CmiBool fromMigration) 
906 {
907         //Stash the element's initialization information in the global "initInfo"
908         ArrayElement_initInfo &init=CkpvAccess(initInfo);
909         init.numInitial=numInitial;
910         init.thisArray=this;
911         init.thisArrayID=thisgroup;
912         if (msg) /*Have to *copy* data because msg will be deleted*/
913           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
914                  sizeof(init.listenerData));
915         init.fromMigration=fromMigration;
916         
917         //Build the element
918         int elSize=_chareTable[elChareType]->size;
919         ArrayElement *elem = (ArrayElement *)malloc(elSize);
920 #ifndef CMK_OPTIMIZE
921         if (elem!=NULL) setMemoryTypeChare(elem);
922 #endif
923         return elem;
924 }
925
926 /// This method is called by ck.C or the user to add an element.
927 CmiBool CkArray::insertElement(CkMessage *me)
928 {
929   CK_MAGICNUMBER_CHECK
930   CkArrayMessage *m=(CkArrayMessage *)me;
931   const CkArrayIndex &idx=m->array_index();
932   int onPe;
933   if (locMgr->isRemote(idx,&onPe)) 
934   { /* element's sibling lives somewhere else, so insert there */
935         CkArrayManagerInsert(onPe,me,thisgroup);
936         return CmiFalse;
937   }
938   int ctorIdx=m->array_ep();
939   int chareType=_entryTable[ctorIdx]->chareIdx;
940   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
941 #ifndef CMK_CHARE_USE_PTR
942   ((Chare *)elt)->chareIdx = -1;
943 #endif
944   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
945   CK_ARRAYLISTENER_LOOP(listeners,
946       if (!l->ckElementCreated(elt)) return CmiFalse;);
947   return CmiTrue;
948 }
949
950 void CProxy_ArrayBase::doneInserting(void)
951 {
952   DEBC((AA"Broadcasting a doneInserting request\n"AB));
953   //Broadcast a DoneInserting
954   CProxy_CkArray(_aid).remoteDoneInserting();
955 }
956
957 void CkArray::doneInserting(void)
958 {
959   thisProxy[CkMyPe()].remoteDoneInserting();
960 }
961
962 /// This is called on every processor after the last array insertion.
963 void CkArray::remoteDoneInserting(void)
964 {
965   CK_MAGICNUMBER_CHECK
966   if (isInserting) {
967     isInserting=CmiFalse;
968     DEBC((AA"Done inserting objects\n"AB));
969     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
970     locMgr->doneInserting();
971   }
972 }
973
974 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
975         int onPe,int ctor,CkDeliver_t type)
976 {
977         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
978         prepareCtorMsg(m,onPe,idx);
979         m->array_ep()=ctor;
980         
981         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
982                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
983                 CkArrayManagerInsert(onPe,m,thisgroup);
984         } else /* local message, non-queued */ {
985                 //Call local constructor directly
986                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
987                 return insertElement(m);
988         }
989         return CmiTrue;
990 }
991
992 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
993 {
994         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
995         if (local) {
996           int onPe=CkMyPe();
997           prepareCtorMsg(m,onPe,idx);
998 #if CMK_BIGSIM_CHARM
999           BgEntrySplit("split-array-new");
1000 #endif
1001           insertElement(m);
1002         }
1003         else {
1004           int onPe=-1;
1005           prepareCtorMsg(m,onPe,idx);
1006           CkArrayManagerInsert(onPe,m,getGroupID());
1007         }
1008 }
1009
1010 /********************* CkArray Messaging ******************/
1011 /// Fill out a message's array fields before sending it
1012 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
1013 {
1014         envelope *env=UsrToEnv((void *)msg);
1015         env->getsetArrayMgr()=aid;
1016         env->getsetArraySrcPe()=CkMyPe();
1017         env->setEpIdx(ep);
1018         env->getsetArrayHops()=0;
1019 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1020         criticalPath_send(env);
1021         automaticallySetMessagePriority(env);
1022 #endif
1023 }
1024
1025
1026 /// Just a non-inlined version of msg_prepareSend()
1027 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
1028 {
1029         envelope *env=UsrToEnv((void *)msg);
1030         env->getsetArrayMgr()=aid;
1031         env->getsetArraySrcPe()=CkMyPe();
1032         env->setEpIdx(ep);
1033         env->getsetArrayHops()=0;
1034 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
1035         criticalPath_send(env);
1036         automaticallySetMessagePriority(env);
1037 #endif
1038 }
1039
1040 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
1041 {
1042 #if CMK_ERROR_CHECKING
1043         //Check our array index for validity
1044         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
1045         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
1046                 CkAbort("Array index length (nInts) is too long-- did you "
1047                         "use bytes instead of integers?\n");
1048 #endif
1049         msg_prepareSend(msg,ep,ckGetArrayID());
1050         msg->array_index()=_idx;//Insert array index
1051         if (ckIsDelegated()) //Just call our delegateMgr
1052           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
1053         else 
1054         { //Usual case: a direct send
1055           CkArray *localbranch = ckLocalBranch();
1056           if (localbranch == NULL) {             // array not created yet
1057             CkArrayManagerDeliver(CkMyPe(), msg, 0);
1058           }
1059           else {
1060             if (opts & CK_MSG_INLINE)
1061               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
1062             else
1063               localbranch->deliver(msg, CkDeliver_queue, opts);
1064           }
1065         }
1066 }
1067
1068 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
1069 {
1070         CkFutureID f=CkCreateAttachedFuture(msg);
1071         ckSend(msg,ep);
1072         return CkWaitReleaseFuture(f);
1073 }
1074
1075 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
1076 {
1077         CProxySection_ArrayBase sp(sID);
1078         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1079 }
1080
1081 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1082 {
1083         if (ckIsDelegated()) //Just call our delegateMgr
1084           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1085         else {
1086           // send through all
1087           for (int k=0; k<_nsid; ++k) {
1088             for (int i=0; i< _sid[k]._nElems-1; i++) {
1089               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
1090               void *newMsg=CkCopyMsg((void **)&msg);
1091               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1092             }
1093             if (_sid[k]._nElems > 0) {
1094               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1095               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
1096               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1097             }
1098           }
1099         }
1100 }
1101
1102 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1103 {
1104   CkArrayMessage *m=(CkArrayMessage *)msg;
1105   m->array_index()=idx;
1106   msg_prepareSend(m,entryIndex,aID);
1107   CkArray *a=(CkArray *)_localBranch(aID);
1108   if (a == NULL)
1109     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1110   else
1111     a->deliver(m,CkDeliver_queue,opts);
1112 }
1113
1114 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1115 {
1116   CkArrayMessage *m=(CkArrayMessage *)msg;
1117   m->array_index()=idx;
1118   msg_prepareSend(m,entryIndex,aID);
1119   CkArray *a=(CkArray *)_localBranch(aID);
1120   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1121   a->deliver(m,CkDeliver_inline,opts);
1122   if (oldStatus) CkEnableTracing(entryIndex);
1123 }
1124
1125
1126 /*********************** CkArray Reduction *******************/
1127 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1128   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1129    mgrID(mgrID_)
1130 {
1131   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1132 }
1133 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1134   :CkArrayListener(m)
1135 {
1136   mgr=NULL;
1137 }
1138 void CkArrayReducer::pup(PUP::er &p) {
1139   CkArrayListener::pup(p);
1140   p|mgrID;
1141   if (p.isUnpacking())
1142     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1143 }
1144 CkArrayReducer::~CkArrayReducer() {}
1145
1146 /*********************** CkArray Broadcast ******************/
1147
1148 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
1149     :CkArrayListener(1), //Each array element carries a broadcast number
1150      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
1151 { }
1152
1153 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1154     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
1155 { }
1156
1157 void CkArrayBroadcaster::pup(PUP::er &p) {
1158   CkArrayListener::pup(p);
1159   /* Assumption: no migrants during checkpoint, so no need to
1160      save old broadcasts. */
1161   p|bcastNo;
1162   p|stableLocations;
1163   p|broadcastViaScheduler;
1164   if (p.isUnpacking()) {
1165     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1166   }
1167 }
1168
1169 CkArrayBroadcaster::~CkArrayBroadcaster()
1170 {
1171   CkArrayMessage *msg;
1172   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1173 }
1174
1175 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1176 {
1177   bcastNo++;
1178   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1179
1180   if (stableLocations)
1181     return;
1182
1183   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1184   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1185 }
1186
1187 /// Deliver a copy of the given broadcast to the given local element
1188 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1189                                     CmiBool doFree)
1190 {
1191   int &elBcastNo=getData(el);
1192   // if this array element already received this message, skip it
1193   if (elBcastNo >= bcastNo) return CmiFalse;
1194   elBcastNo++;
1195   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1196   int epIdx=bcast->array_ep_bcast();
1197
1198 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1199   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1200   return CmiTrue;
1201 #else
1202   if (!broadcastViaScheduler)
1203     return el->ckInvokeEntry(epIdx, bcast, doFree);
1204   else {
1205     if (!doFree) {
1206       CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
1207       bcast = newMsg;
1208     }
1209     envelope *env = UsrToEnv(bcast);
1210     env->getsetArrayEp() = epIdx;
1211     env->getsetArrayMgr() = el->ckGetArrayID();
1212     env->getsetArrayIndex() = el->ckGetArrayIndex();
1213     CkArrayManagerDeliver(CkMyPe(), bcast, 0);
1214     return true;
1215   }
1216 #endif
1217 }
1218
1219 /// Deliver all needed broadcasts to the given local element
1220 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1221 {
1222   if (stableLocations) return CmiTrue;
1223   int &elBcastNo=getData(el);
1224   if (elBcastNo<bcastNo)
1225   {//This element needs some broadcasts-- it must have
1226    //been migrating during the broadcast.
1227     int i,nDeliver=bcastNo-elBcastNo;
1228     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1229
1230     //Skip the old junk at the front of the bcast queue
1231     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1232       oldBcasts.enq(oldBcasts.deq());
1233
1234     //Deliver the newest messages, in old-to-new order
1235     for (i=nDeliver-1;i>=0;i--)
1236     {
1237       CkArrayMessage *msg=oldBcasts.deq();
1238                 if(msg == NULL)
1239                 continue;
1240       oldBcasts.enq(msg);
1241       if (!deliver(msg, el, CmiFalse))
1242         return CmiFalse; //Element migrated away
1243     }
1244   }
1245   //Otherwise, the element survived
1246   return CmiTrue;
1247 }
1248
1249
1250 void CkArrayBroadcaster::springCleaning(void)
1251 {
1252   //Remove old broadcast messages
1253   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1254   if (nDelete>0) {
1255     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1256     for (int i=0;i<nDelete;i++)
1257       delete oldBcasts.deq();
1258   }
1259   oldBcastNo=bcastNo;
1260 }
1261
1262 void CkArrayBroadcaster::flushState() 
1263
1264   bcastNo = oldBcastNo = 0; 
1265   CkArrayMessage *msg;
1266   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1267 }
1268
1269 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1270 {
1271         CProxy_ArrayBase ap(aID);
1272         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1273 }
1274
1275 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1276 {
1277         msg->array_ep_bcast()=ep;
1278         if (ckIsDelegated()) //Just call our delegateMgr
1279           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1280         else 
1281         { //Broadcast message via serializer node
1282           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1283           int skipsched = opts & CK_MSG_EXPEDITED; 
1284           //int serializer=0;//1623802937%CkNumPes();
1285 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1286                 CProxy_CkArray ap(_aid);
1287                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1288                 CkGroupID _id = _aid;
1289 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1290 #else
1291           if (CkMyPe()==CpvAccess(serializer))
1292           {
1293                 DEBB((AA"Sending array broadcast\n"AB));
1294                 if (skipsched)
1295                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1296                 else
1297                         CProxy_CkArray(_aid).recvBroadcast(msg);
1298           } else {
1299                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1300                 CProxy_CkArray ap(_aid);
1301                 if (skipsched)
1302                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1303                 else
1304                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1305           }
1306 #endif
1307         }
1308 }
1309
1310 /// Reflect a broadcast off this Pe:
1311 void CkArray::sendBroadcast(CkMessage *msg)
1312 {
1313         CK_MAGICNUMBER_CHECK
1314         if(CkMyPe() == CpvAccess(serializer)){
1315 #if _MLOG_BCAST_TREE_
1316                 // Using the spanning tree to broadcast the message
1317                 for(int i=0; i<numChildren; i++){
1318                         CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1319                         thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1320                 }
1321         
1322                 // delivering message locally
1323                 recvBroadcast(msg);     
1324 #else
1325                 //Broadcast the message to all processors
1326                 thisProxy.recvBroadcast(msg);
1327 #endif
1328         }else{
1329                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1330         }
1331 }
1332 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1333 {
1334         CK_MAGICNUMBER_CHECK
1335         //Broadcast the message to all processors
1336         thisProxy.recvExpeditedBroadcast(msg);
1337 }
1338
1339 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1340 int _tempBroadcastCount=0;
1341
1342 // Delivers a message using the spanning tree
1343 void CkArray::recvBroadcastViaTree(CkMessage *msg)
1344 {
1345         CK_MAGICNUMBER_CHECK
1346
1347         // Using the spanning tree to broadcast the message
1348         for(int i=0; i<numChildren; i++){
1349                 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1350                 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1351         }
1352
1353         // delivering message locally
1354         recvBroadcast(msg);     
1355 }
1356
1357 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1358     if(homePe(*index)==CmiMyPe()){
1359         CkArrayMessage *bcast = (CkArrayMessage *)data;
1360     int epIdx=bcast->array_ep_bcast();
1361         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1362         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1363         envelope *env = UsrToEnv(copy);
1364         env->sender.data.group.onPE = CkMyPe();
1365         env->TN  = env->SN=0;
1366         env->piggyBcastIdx = epIdx;
1367         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1368         env->getsetArrayMgr() = thisgroup;
1369         env->getsetArrayIndex() = *index;
1370     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1371         env->setSrcPe(CkMyPe());
1372         rec->deliver(copy,CkDeliver_queue);
1373         _tempBroadcastCount++;
1374     }else{
1375         if(locMgr->homeElementCount != -1){
1376             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1377         }
1378     }
1379 }
1380
1381 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1382     arr->broadcastHomeElements(data,rec,index);
1383 }
1384 #else
1385 void CkArray::recvBroadcastViaTree(CkMessage *msg){
1386 }
1387 #endif
1388
1389
1390 /// Increment broadcast count; deliver to all local elements
1391 void CkArray::recvBroadcast(CkMessage *m)
1392 {
1393         CK_MAGICNUMBER_CHECK
1394         CkArrayMessage *msg=(CkArrayMessage *)m;
1395         broadcaster->incoming(msg);
1396
1397 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1398         _tempBroadcastCount=0;
1399         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1400 #else
1401         //Run through the list of local elements
1402         int idx=0, len=0, count=0;
1403         if (stableLocations) {            /* remove all NULLs in the array */
1404           len = 0;
1405           while (elements->next(idx)!=NULL) len++;
1406           idx = 0;
1407         }
1408         ArrayElement *el;
1409 #if CMK_BIGSIM_CHARM
1410         void *root;
1411         _TRACE_BG_TLINE_END(&root);
1412         BgSetEntryName("start-broadcast", &root);
1413         CkVec<void *> logs;    // store all logs for each delivery
1414         extern void stopVTimer();
1415         extern void startVTimer();
1416 #endif
1417         while (NULL!=(el=elements->next(idx))) {
1418 #if CMK_BIGSIM_CHARM
1419                 //BgEntrySplit("split-broadcast");
1420                 stopVTimer();
1421                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1422                 logs.push_back(curlog);
1423                 startVTimer();
1424 #endif
1425                 CmiBool doFree = CmiFalse;
1426                 if (stableLocations && ++count == len) doFree = CmiTrue;
1427                 broadcaster->deliver(msg, el, doFree);
1428         }
1429 #endif
1430
1431 #if CMK_BIGSIM_CHARM
1432         //BgEntrySplit("end-broadcast");
1433         stopVTimer();
1434         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1435         startVTimer();
1436 #endif
1437
1438         // CkArrayBroadcaster doesn't have msg buffered, and there was
1439         // no last delivery to transfer ownership
1440 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1441         if (stableLocations)
1442           delete msg;
1443 #else
1444         if (stableLocations && len == 0)
1445           delete msg;
1446 #endif
1447 }
1448
1449 #include "CkArray.def.h"
1450
1451