Adding support for causal message logging.
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isStaticInsertion;
65 bool _isNotifyChildInRed;
66
67 #define ARRAY_DEBUG_OUTPUT 0
68
69 #if ARRAY_DEBUG_OUTPUT 
70 #   define DEB(x) CkPrintf x  //General debug messages
71 #   define DEBI(x) CkPrintf x  //Index debug messages
72 #   define DEBC(x) CkPrintf x  //Construction debug messages
73 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
74 #   define DEBM(x) CkPrintf x  //Migration debug messages
75 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
76 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
77 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
78 #   define AA "ArrayBOC on %d: "
79 #   define AB ,CkMyPe()
80 #   define DEBUG(x) x
81 #else
82 #   define DEB(X) /*CkPrintf x*/
83 #   define DEBI(X) /*CkPrintf x*/
84 #   define DEBC(X) /*CkPrintf x*/
85 #   define DEBS(x) /*CkPrintf x*/
86 #   define DEBM(X) /*CkPrintf x*/
87 #   define DEBL(X) /*CkPrintf x*/
88 #   define DEBK(x) /*CkPrintf x*/
89 #   define DEBB(x) /*CkPrintf x*/
90 #   define str(x) /**/
91 #   define DEBUG(x)
92 #endif
93
94 ///This arrayListener is in charge of delivering broadcasts to the array.
95 class CkArrayBroadcaster : public CkArrayListener {
96   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
97 public:
98   CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
99   CkArrayBroadcaster(CkMigrateMessage *m);
100   virtual void pup(PUP::er &p);
101   virtual ~CkArrayBroadcaster();
102   PUPable_decl(CkArrayBroadcaster);
103
104   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
105
106   ///Element was just created on this processor
107   /// Return false if the element migrated away or deleted itself.
108   virtual CmiBool ckElementCreated(ArrayElement *elt)
109     { return bringUpToDate(elt); }
110
111   ///Element just arrived on this processor (so just called pup)
112   /// Return false if the element migrated away or deleted itself.
113   virtual CmiBool ckElementArriving(ArrayElement *elt)
114     { return bringUpToDate(elt); }
115
116   void incoming(CkArrayMessage *msg);
117
118   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
119
120   void springCleaning(void);
121
122   void flushState();
123 private:
124   int bcastNo;//Number of broadcasts received (also serial number)
125   int oldBcastNo;//Above value last spring cleaning
126   //This queue stores old broadcasts (in case a migrant arrives
127   // and needs to be brought up to date)
128   CkQ<CkArrayMessage *> oldBcasts;
129   bool stableLocations;
130   bool broadcastViaScheduler;
131
132   CmiBool bringUpToDate(ArrayElement *el);
133 };
134
135 ///This arrayListener is in charge of performing reductions on the array.
136 class CkArrayReducer : public CkArrayListener {
137   CkGroupID mgrID;
138   CkReductionMgr *mgr;
139   typedef  contributorInfo *I;
140   inline contributorInfo *getData(ArrayElement *el)
141     {return (I)ckGetData(el);}
142 public:
143   /// Attach this array to this CkReductionMgr
144   CkArrayReducer(CkGroupID mgrID_);
145   CkArrayReducer(CkMigrateMessage *m);
146   virtual void pup(PUP::er &p);
147   virtual ~CkArrayReducer();
148   PUPable_decl(CkArrayReducer);
149
150   void ckBeginInserting(void) {mgr->creatingContributors();}
151   void ckEndInserting(void) {mgr->doneCreatingContributors();}
152
153   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
154
155   void ckElementCreating(ArrayElement *elt)
156     {mgr->contributorCreated(getData(elt));}
157   void ckElementDied(ArrayElement *elt)
158     {mgr->contributorDied(getData(elt));}
159
160   void ckElementLeaving(ArrayElement *elt)
161     {mgr->contributorLeaving(getData(elt));}
162   CmiBool ckElementArriving(ArrayElement *elt)
163     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
164 };
165
166 /*
167 void 
168 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
169        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
170 }
171 */
172 void
173 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
174         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
175         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
176 }
177
178 /*********************** CkVerboseListener ******************/
179 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
180
181 CkVerboseListener::CkVerboseListener(void)
182   :CkArrayListener(0)
183 {
184   VL_PRINT<<"INIT  Creating listener"<<endl;
185 }
186
187 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
188 {
189   CkArrayListener::ckRegister(arrMgr,dataOffset_);
190   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
191 }
192 void CkVerboseListener::ckBeginInserting(void)
193 {
194   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
195 }
196 void CkVerboseListener::ckEndInserting(void)
197 {
198   VL_PRINT<<"INIT  Done inserting elements"<<endl;
199 }
200
201 void CkVerboseListener::ckElementStamp(int *eltInfo)
202 {
203   VL_PRINT<<"LIFE  Stamping element"<<endl;
204 }
205 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
206 {
207   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
208 }
209 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
210 {
211   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
212   return CmiTrue;
213 }
214 void CkVerboseListener::ckElementDied(ArrayElement *elt)
215 {
216   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
217 }
218
219 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
220 {
221   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
222 }
223 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
224 {
225   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
226   return CmiTrue;
227 }
228
229
230 /************************* ArrayElement *******************/
231 class ArrayElement_initInfo {
232 public:
233   CkArray *thisArray;
234   CkArrayID thisArrayID;
235   CkArrayIndex numInitial;
236   int listenerData[CK_ARRAYLISTENER_MAXLEN];
237   CmiBool fromMigration;
238 };
239
240 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
241
242 void ArrayElement::initBasics(void)
243 {
244 #if CMK_OUT_OF_CORE
245   if (CkpvAccess(CkSaveRestorePrefetch)) 
246     return; /* Just restoring from disk--don't try to set up anything. */
247 #endif
248 #if CMK_GRID_QUEUE_AVAILABLE
249         grid_queue_interval = 0;
250         grid_queue_threshold = 0;
251         msg_count = 0;
252         msg_count_grid = 0;
253         border_flag = 0;
254
255         grid_queue_interval = CmiGridQueueGetInterval ();
256         grid_queue_threshold = CmiGridQueueGetThreshold ();
257 #endif
258   ArrayElement_initInfo &info=CkpvAccess(initInfo);
259   thisArray=info.thisArray;
260   thisArrayID=info.thisArrayID;
261   numInitialElements=info.numInitial.getCombinedCount();
262   if (info.listenerData) {
263     memcpy(listenerData,info.listenerData,sizeof(listenerData));
264   }
265   if (!info.fromMigration) {
266     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
267                           l->ckElementCreating(this));
268   }
269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
270         mlogData->objID.type = TypeArray;
271         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
272 #endif
273 #ifdef _PIPELINED_ALLREDUCE_
274         allredMgr = NULL;
275 #endif
276 }
277
278 ArrayElement::ArrayElement(void) 
279 {
280         initBasics();
281 #if CMK_MEM_CHECKPOINT
282         init_checkpt();
283 #endif
284 }
285
286 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
287 {
288         initBasics();
289 }
290
291 //Called by the system just before and after migration to another processor:  
292 void ArrayElement::ckAboutToMigrate(void) {
293         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
294                                 l->ckElementLeaving(this));
295         CkMigratable::ckAboutToMigrate();
296 }
297 void ArrayElement::ckJustMigrated(void) {
298         CkMigratable::ckJustMigrated();
299         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
300               if (!l->ckElementArriving(this)) return;);
301 }
302
303 void ArrayElement::ckJustRestored(void) {
304     CkMigratable::ckJustRestored();
305     //empty for out-of-core emulation
306 }
307
308 #ifdef _PIPELINED_ALLREDUCE_
309 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
310                                         CMK_REFNUM_TYPE userFlag)
311 {
312         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
313         msg->setUserFlag(userFlag);
314         msg->setMigratableContributor(true);
315         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
316 }
317 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
318                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
319 {
320         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
321         msg->setUserFlag(userFlag);
322         msg->setCallback(cb);
323         msg->setMigratableContributor(true);
324         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
325 }
326 void ArrayElement::contribute2(CkReductionMsg *msg) 
327 {
328         msg->setMigratableContributor(true);
329         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
330 }
331 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
332 {
333         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
334     msg->setUserFlag(userFlag);
335     msg->setCallback(cb);
336     msg->setMigratableContributor(true);
337     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
338 }
339 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
340 {
341     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
342     msg->setUserFlag(userFlag);
343     msg->setMigratableContributor(true);
344     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
345 }
346
347 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
348                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
349 {
350         // if it is a broadcast to myself and size is large
351         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
352         {
353                 if (!allredMgr) {
354                         allredMgr = new AllreduceMgr();
355                 }
356                 // number of fragments
357                 int fragNo = dataSize/FRAG_SIZE;
358                 int size = FRAG_SIZE;
359                 // for each fragment
360                 for (int i=0; i<fragNo; i++) {
361                         // callback to defragmentor
362                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
363                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
364                                 size = dataSize%FRAG_SIZE;
365                         }
366                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
367                         // initialize the new msg
368                         msg->reducer            = type;
369                         msg->nFrags             = fragNo;
370                         msg->fragNo             = i;
371                         msg->callback           = defrag_cb;
372                         msg->userFlag           = userFlag;
373                         allredMgr->cb           = cb;
374                         allredMgr->cb.type      = CkCallback::sendArray;
375                         allredMgr->cb.d.array.idx = myIndex;
376                         contribute2(msg);
377                 }
378                 return;
379         }
380         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
381         msg->setUserFlag(userFlag);
382         msg->setCallback(cb);
383         msg->setMigratableContributor(true);
384         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
385 }
386
387
388 #else
389 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
390    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
391 #endif
392 // _PIPELINED_ALLREDUCE_
393 void ArrayElement::defrag(CkReductionMsg *msg)
394 {
395 //      CkPrintf("in defrag\n");
396 #ifdef _PIPELINED_ALLREDUCE_
397         allredMgr->allreduce_recieve(msg);
398 #endif
399 }
400
401 /// Remote method: calls destructor
402 void ArrayElement::ckDestroy(void)
403 {
404         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
405             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
406                            l->ckElementDied(this));
407         }
408         CkMigratable::ckDestroy();
409 }
410
411 //Destructor (virtual)
412 ArrayElement::~ArrayElement()
413 {
414 #if CMK_OUT_OF_CORE
415   if (CkpvAccess(CkSaveRestorePrefetch)) 
416     return; /* Just saving to disk--don't trash anything. */
417 #endif
418   //To detect use-after-delete: 
419   thisArray=(CkArray *)0xDEADa7a1;
420 }
421
422 void ArrayElement::pup(PUP::er &p)
423 {
424   DEBM((AA"  ArrayElement::pup()\n"AB));
425   CkMigratable::pup(p);
426   thisArrayID.pup(p);
427   if (p.isUnpacking())
428         thisArray=thisArrayID.ckLocalBranch();
429   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
430 #if CMK_MEM_CHECKPOINT
431   p(budPEs, 2);
432 #endif
433   p.syncComment(PUP::sync_last_system,"ArrayElement");
434 #if CMK_GRID_QUEUE_AVAILABLE
435   p|grid_queue_interval;
436   p|grid_queue_threshold;
437   p|msg_count;
438   p|msg_count_grid;
439   p|border_flag;
440   if (p.isUnpacking ()) {
441     msg_count = 0;
442     msg_count_grid = 0;
443     border_flag = 0;
444   }
445 #endif
446 }
447
448 char *ArrayElement::ckDebugChareName(void) {
449         char buf[200];
450         const char *className=_chareTable[ckGetChareType()]->name;
451         const int *d=thisIndexMax.data();
452         const short int *s=(const short int*)d;
453         switch (thisIndexMax.dimension) {
454         case 0: sprintf(buf,"%s",className); break;
455         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
456         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
457         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
458     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
459     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
460     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
461         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
462         };
463         return strdup(buf);
464 }
465
466 int ArrayElement::ckDebugChareID(char *str, int limit) {
467   if (limit<21) return -1;
468   str[0] = 2;
469   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
470   *((CkArrayIndex*)&str[5]) = thisIndexMax;
471   return 21;
472 }
473
474 /// A more verbose form of abort
475 void ArrayElement::CkAbort(const char *str) const
476 {
477         CkError("[%d] Array element at index %s aborting:\n",
478                 CkMyPe(), idx2str(thisIndexMax));
479         CkMigratable::CkAbort(str);
480 }
481
482 void ArrayElement::recvBroadcast(CkMessage *m){
483 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
484         CkArrayMessage *bcast = (CkArrayMessage *)m;
485     envelope *env = UsrToEnv(m);
486         int epIdx= env->piggyBcastIdx;
487     ckInvokeEntry(epIdx,bcast,CmiTrue);
488 #endif
489 }
490
491 /*********************** Spring Cleaning *****************
492 Periodically (every minute or so) remove expired broadcasts
493 from the queue.
494
495 This does not get called for arrays with stable locations (all
496 insertions done at creation, migration only at discrete points).
497 */
498
499 inline void CkArray::springCleaning(void)
500 {
501   DEBK((AA"Starting spring cleaning\n"AB));
502   broadcaster->springCleaning();
503 }
504
505 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
506         ((CkArray *)forArray)->springCleaning();
507 }
508
509 /********************* Little CkArray Utilities ******************/
510
511 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
512         :CProxy(), _aid(e->ckGetArrayID())
513         {}
514 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
515         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
516         {}
517
518 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
519         {return ckLocalBranch()->getLocMgr(); }
520
521 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
522
523 CkArrayOptions::CkArrayOptions(void) //Default: empty array
524         :numInitial(),map(_defaultArrayMapID)
525 {
526     init();
527 }
528
529 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
530         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
531 {
532     init();
533 }
534
535 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
536         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
537 {
538     init();
539 }
540
541 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
542         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
543 {
544     init();
545 }
546
547 void CkArrayOptions::init()
548 {
549     locMgr.setZero();
550     anytimeMigration = _isAnytimeMigration;
551     staticInsertion = _isStaticInsertion;
552     reductionClient.type = CkCallback::invalid;
553     disableNotifyChildInRed = !_isNotifyChildInRed;
554     broadcastViaScheduler = false;
555 }
556
557 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
558 {
559     staticInsertion = b;
560     if (b && map == _defaultArrayMapID)
561         map = _fastArrayMapID;
562     return *this;
563 }
564
565 /// Bind our elements to this array
566 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
567 {
568         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
569         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
570         // not just initially.
571         //setNumInitial(arr->getNumInitial());
572         return setLocationManager(arr->getLocMgr()->getGroupID());
573 }
574 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
575 {
576         arrayListeners.push_back(listener);
577         return *this;
578 }
579
580 void CkArrayOptions::pup(PUP::er &p) {
581         p|numInitial;
582         p|map;
583         p|locMgr;
584         p|arrayListeners;
585         p|reductionClient;
586         p|anytimeMigration;
587         p|disableNotifyChildInRed;
588         p|staticInsertion;
589         p|broadcastViaScheduler;
590 }
591
592 CkArrayListener::CkArrayListener(int nInts_) 
593   :nInts(nInts_) 
594 {
595   dataOffset=-1;
596 }
597 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
598   nInts=-1; dataOffset=-1;
599 }
600 void CkArrayListener::pup(PUP::er &p) {
601   p|nInts;
602   p|dataOffset;
603 }
604
605 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
606 {
607   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
608   dataOffset=dataOffset_;
609 }
610
611 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
612                                           const CkArrayOptions &opts_)
613 {
614   CkArrayOptions opts(opts_);
615   CkGroupID locMgr = opts.getLocationManager();
616   if (locMgr.isZero())
617   { //Create a new location manager
618 #if !CMK_LBDB_ON
619     CkGroupID _lbdb;
620 #endif
621     CkEntryOptions  e_opts;
622     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
623     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
624     opts.setLocationManager(locMgr);
625   }
626   //Create the array manager
627   m->array_ep()=ctor;
628   CkMarshalledMessage marsh(m);
629   CkEntryOptions  e_opts;
630   e_opts.setGroupDepID(locMgr);       // group creation dependence
631 #if !GROUP_LEVEL_REDUCTION
632   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
633   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
634   nodereductionProxy.setAttachedGroup(ag);
635 #else
636   CkNodeGroupID dummyid;
637   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
638 #endif
639   return (CkArrayID)ag;
640 }
641
642 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
643 {
644   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
645 }
646
647 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
648         const CkArrayIndex &idx)
649 {
650   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
651   m->array_ep()=ctor;
652   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
653   if (ckIsDelegated()) {
654         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
655         return;
656   }
657   
658   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
659   CkArrayManagerInsert(onPe,m,_aid);
660 }
661
662 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
663 {
664   ckInsertIdx(m,ctorIndex,onPe,_idx);
665 }
666
667 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
668 {
669   return ckLocalBranch()->lookup(_idx);
670 }
671
672 //pack-unpack method for CProxy_ArrayBase
673 void CProxy_ArrayBase::pup(PUP::er &p)
674 {
675   CProxy::pup(p);
676   _aid.pup(p);
677 }
678 void CProxyElement_ArrayBase::pup(PUP::er &p)
679 {
680   CProxy_ArrayBase::pup(p);
681   p|_idx.nInts;
682   p|_idx.dimension;
683   p(_idx.data(),_idx.nInts);
684 }
685
686 void CProxySection_ArrayBase::pup(PUP::er &p)
687 {
688   CProxy_ArrayBase::pup(p);
689   p | _nsid;
690   if (p.isUnpacking()) {
691     if (_nsid == 1) _sid = new CkSectionID;
692     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
693     else _sid = NULL;
694   }
695   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
696 }
697
698 /*********************** CkArray Creation *************************/
699 void _ckArrayInit(void)
700 {
701   CkpvInitialize(ArrayElement_initInfo,initInfo);
702   CkDisableTracing(CkIndex_CkArray::insertElement(0));
703   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
704     // disable because broadcast listener may deliver broadcast message
705   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
706   // by default anytime migration is allowed
707 }
708
709 CkArray::CkArray(CkArrayOptions &opts,
710                  CkMarshalledMessage &initMsg,
711                  CkNodeGroupID nodereductionID)
712   : CkReductionMgr(),
713     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
714     locMgrID(opts.getLocationManager()),
715     thisProxy(thisgroup),
716     // Register with our location manager
717     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
718     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
719     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
720 {
721   if (!stableLocations)
722       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
723                              staticSpringCleaning, (void *)this);
724   
725   //set the field in one my parent class (CkReductionMgr)
726   if(opts.disableNotifyChildInRed)
727           disableNotifyChildrenStart = CmiTrue; 
728   
729   //Find, register, and initialize the arrayListeners
730   listenerDataOffset=0;
731   broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
732   addListener(broadcaster);
733   reducer=new CkArrayReducer(thisgroup);
734   addListener(reducer);
735
736   // COMLIB HACK
737   //calistener = new ComlibArrayListener();
738   //addListener(calistener,dataOffset);
739
740   int lNo,nL=opts.getListeners(); //User-added listeners
741   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
742
743   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
744
745   ///Set up initial elements (if any)
746   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
747
748   ///adding code for Reduction using nodegroups
749
750 #if !GROUP_LEVEL_REDUCTION
751   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
752   nodeProxy = nodetemp;
753   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
754 #endif
755
756 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
757         // creating the spanning tree to be used for broadcast
758         children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
759         numChildren = 0;
760         
761         // computing the level of the tree this pe is in
762         // we should use the geometric series formula, but now a quick and dirty code should suffice
763         // PE 0 is at level 0, PEs 1.._MLOG_BCAST_BFACTOR_ are at level 1 and so on
764         int level = 0;
765         int aux = CmiMyPe();
766         int max = CmiNumPes();
767         int factor = _MLOG_BCAST_BFACTOR_;
768         int startLevel = 0;
769         int startNextLevel = 1;
770         while(aux >= 0){
771                 level++;
772                 startLevel = startNextLevel;
773                 startNextLevel += factor;
774                 aux -= factor;
775                 factor *= _MLOG_BCAST_BFACTOR_;
776         }
777
778         // adding children to the tree
779         int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
780         for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
781                 if(first + i >= CmiNumPes())
782                         break;
783                 children[i] = first + i;
784                 numChildren++;
785         }
786  
787 #endif
788
789
790   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
791       ckSetReductionClient(&opts.reductionClient);
792 }
793
794 CkArray::CkArray(CkMigrateMessage *m)
795         :CkReductionMgr(m), thisProxy(thisgroup)
796 {
797   locMgr=NULL;
798   isInserting=CmiTrue;
799 }
800
801 #if CMK_ERROR_CHECKING
802 inline void testPup(PUP::er &p,int shouldBe) {
803   int a=shouldBe;
804   p|a;
805   if (a!=shouldBe)
806     CkAbort("PUP direction mismatch!");
807 }
808 #else
809 inline void testPup(PUP::er &p,int shouldBe) {}
810 #endif
811
812 void CkArray::pup(PUP::er &p){
813         CkReductionMgr::pup(p);
814         p|numInitial;
815         p|locMgrID;
816         p|listeners;
817         p|listenerDataOffset;
818         p|stableLocations;
819         testPup(p,1234);
820         if(p.isUnpacking()){
821                 thisProxy=thisgroup;
822                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
823                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
824                 /// Restore our default listeners:
825                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
826                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
827                 /// set up broadcast cleaner
828                 if (!stableLocations)
829                     CcdCallOnConditionKeep(CcdPERIODIC_1minute,
830                                            staticSpringCleaning, (void *)this);
831         }
832 }
833
834 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
835   int dataOffset=0; \
836   for (int lNo=0;lNo<listeners.size();lNo++) { \
837     CkArrayListener *l=listeners[lNo]; \
838     l->ckElementStamp(&listenerData[dataOffset]); \
839     dataOffset+=l->ckGetLen(); \
840   } \
841 } while (0)
842
843 //Called on send side to prepare array constructor message
844 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
845 {
846   envelope *env=UsrToEnv((void *)m);
847   env->getsetArrayIndex()=idx;
848   int *listenerData=env->getsetArrayListenerData();
849   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
850   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
851   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
852         getLocMgr()->inform(idx,onPe);
853 }
854
855 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
856                         CkElementCreation_t type)
857 {
858         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
859         if (type==CkElementCreation_resume) 
860         { // HACK: Re-stamp elements on checkpoint resume--
861           //  this restores, e.g., reduction manager's gcount
862                 int *listenerData=ret->listenerData;
863                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
864         }
865         return ret;
866 }
867
868 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
869                      CkMessage *msg,CmiBool fromMigration) 
870 {
871         //Stash the element's initialization information in the global "initInfo"
872         ArrayElement_initInfo &init=CkpvAccess(initInfo);
873         init.numInitial=numInitial;
874         init.thisArray=this;
875         init.thisArrayID=thisgroup;
876         if (msg) /*Have to *copy* data because msg will be deleted*/
877           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
878                  sizeof(init.listenerData));
879         init.fromMigration=fromMigration;
880         
881         //Build the element
882         int elSize=_chareTable[elChareType]->size;
883         ArrayElement *elem = (ArrayElement *)malloc(elSize);
884 #ifndef CMK_OPTIMIZE
885         if (elem!=NULL) setMemoryTypeChare(elem);
886 #endif
887         return elem;
888 }
889
890 /// This method is called by ck.C or the user to add an element.
891 CmiBool CkArray::insertElement(CkMessage *me)
892 {
893   CK_MAGICNUMBER_CHECK
894   CkArrayMessage *m=(CkArrayMessage *)me;
895   const CkArrayIndex &idx=m->array_index();
896   int onPe;
897   if (locMgr->isRemote(idx,&onPe)) 
898   { /* element's sibling lives somewhere else, so insert there */
899         CkArrayManagerInsert(onPe,me,thisgroup);
900         return CmiFalse;
901   }
902   int ctorIdx=m->array_ep();
903   int chareType=_entryTable[ctorIdx]->chareIdx;
904   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
905 #ifndef CMK_CHARE_USE_PTR
906   ((Chare *)elt)->chareIdx = -1;
907 #endif
908   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
909   CK_ARRAYLISTENER_LOOP(listeners,
910       if (!l->ckElementCreated(elt)) return CmiFalse;);
911   return CmiTrue;
912 }
913
914 void CProxy_ArrayBase::doneInserting(void)
915 {
916   DEBC((AA"Broadcasting a doneInserting request\n"AB));
917   //Broadcast a DoneInserting
918   CProxy_CkArray(_aid).remoteDoneInserting();
919 }
920
921 void CkArray::doneInserting(void)
922 {
923   thisProxy[CkMyPe()].remoteDoneInserting();
924 }
925
926 /// This is called on every processor after the last array insertion.
927 void CkArray::remoteDoneInserting(void)
928 {
929   CK_MAGICNUMBER_CHECK
930   if (isInserting) {
931     isInserting=CmiFalse;
932     DEBC((AA"Done inserting objects\n"AB));
933     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
934     locMgr->doneInserting();
935   }
936 }
937
938 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
939         int onPe,int ctor,CkDeliver_t type)
940 {
941         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
942         prepareCtorMsg(m,onPe,idx);
943         m->array_ep()=ctor;
944         
945         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
946                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
947                 CkArrayManagerInsert(onPe,m,thisgroup);
948         } else /* local message, non-queued */ {
949                 //Call local constructor directly
950                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
951                 return insertElement(m);
952         }
953         return CmiTrue;
954 }
955
956 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
957 {
958         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
959         if (local) {
960           int onPe=CkMyPe();
961           prepareCtorMsg(m,onPe,idx);
962 #if CMK_BIGSIM_CHARM
963           BgEntrySplit("split-array-new");
964 #endif
965           insertElement(m);
966         }
967         else {
968           int onPe=-1;
969           prepareCtorMsg(m,onPe,idx);
970           CkArrayManagerInsert(onPe,m,getGroupID());
971         }
972 }
973
974 /********************* CkArray Messaging ******************/
975 /// Fill out a message's array fields before sending it
976 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
977 {
978         envelope *env=UsrToEnv((void *)msg);
979         env->getsetArrayMgr()=aid;
980         env->getsetArraySrcPe()=CkMyPe();
981         env->setEpIdx(ep);
982         env->getsetArrayHops()=0;
983 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
984         criticalPath_send(env);
985         automaticallySetMessagePriority(env);
986 #endif
987 }
988
989
990 /// Just a non-inlined version of msg_prepareSend()
991 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
992 {
993         envelope *env=UsrToEnv((void *)msg);
994         env->getsetArrayMgr()=aid;
995         env->getsetArraySrcPe()=CkMyPe();
996         env->setEpIdx(ep);
997         env->getsetArrayHops()=0;
998 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
999         criticalPath_send(env);
1000         automaticallySetMessagePriority(env);
1001 #endif
1002 }
1003
1004 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
1005 {
1006 #if CMK_ERROR_CHECKING
1007         //Check our array index for validity
1008         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
1009         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
1010                 CkAbort("Array index length (nInts) is too long-- did you "
1011                         "use bytes instead of integers?\n");
1012 #endif
1013         msg_prepareSend(msg,ep,ckGetArrayID());
1014         msg->array_index()=_idx;//Insert array index
1015         if (ckIsDelegated()) //Just call our delegateMgr
1016           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
1017         else 
1018         { //Usual case: a direct send
1019           CkArray *localbranch = ckLocalBranch();
1020           if (localbranch == NULL) {             // array not created yet
1021             CkArrayManagerDeliver(CkMyPe(), msg, 0);
1022           }
1023           else {
1024             if (opts & CK_MSG_INLINE)
1025               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
1026             else
1027               localbranch->deliver(msg, CkDeliver_queue, opts);
1028           }
1029         }
1030 }
1031
1032 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
1033 {
1034         CkFutureID f=CkCreateAttachedFuture(msg);
1035         ckSend(msg,ep);
1036         return CkWaitReleaseFuture(f);
1037 }
1038
1039 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
1040 {
1041         CProxySection_ArrayBase sp(sID);
1042         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1043 }
1044
1045 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1046 {
1047         if (ckIsDelegated()) //Just call our delegateMgr
1048           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1049         else {
1050           // send through all
1051           for (int k=0; k<_nsid; ++k) {
1052             for (int i=0; i< _sid[k]._nElems-1; i++) {
1053               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
1054               void *newMsg=CkCopyMsg((void **)&msg);
1055               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1056             }
1057             if (_sid[k]._nElems > 0) {
1058               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1059               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
1060               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1061             }
1062           }
1063         }
1064 }
1065
1066 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1067 {
1068   CkArrayMessage *m=(CkArrayMessage *)msg;
1069   m->array_index()=idx;
1070   msg_prepareSend(m,entryIndex,aID);
1071   CkArray *a=(CkArray *)_localBranch(aID);
1072   if (a == NULL)
1073     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1074   else
1075     a->deliver(m,CkDeliver_queue,opts);
1076 }
1077
1078 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1079 {
1080   CkArrayMessage *m=(CkArrayMessage *)msg;
1081   m->array_index()=idx;
1082   msg_prepareSend(m,entryIndex,aID);
1083   CkArray *a=(CkArray *)_localBranch(aID);
1084   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1085   a->deliver(m,CkDeliver_inline,opts);
1086   if (oldStatus) CkEnableTracing(entryIndex);
1087 }
1088
1089
1090 /*********************** CkArray Reduction *******************/
1091 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1092   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1093    mgrID(mgrID_)
1094 {
1095   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1096 }
1097 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1098   :CkArrayListener(m)
1099 {
1100   mgr=NULL;
1101 }
1102 void CkArrayReducer::pup(PUP::er &p) {
1103   CkArrayListener::pup(p);
1104   p|mgrID;
1105   if (p.isUnpacking())
1106     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1107 }
1108 CkArrayReducer::~CkArrayReducer() {}
1109
1110 /*********************** CkArray Broadcast ******************/
1111
1112 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
1113     :CkArrayListener(1), //Each array element carries a broadcast number
1114      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
1115 { }
1116
1117 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1118     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
1119 { }
1120
1121 void CkArrayBroadcaster::pup(PUP::er &p) {
1122   CkArrayListener::pup(p);
1123   /* Assumption: no migrants during checkpoint, so no need to
1124      save old broadcasts. */
1125   p|bcastNo;
1126   p|stableLocations;
1127   p|broadcastViaScheduler;
1128   if (p.isUnpacking()) {
1129     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1130   }
1131 }
1132
1133 CkArrayBroadcaster::~CkArrayBroadcaster()
1134 {
1135   CkArrayMessage *msg;
1136   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1137 }
1138
1139 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1140 {
1141   bcastNo++;
1142   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1143
1144   if (stableLocations)
1145     return;
1146
1147   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1148   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1149 }
1150
1151 /// Deliver a copy of the given broadcast to the given local element
1152 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1153                                     CmiBool doFree)
1154 {
1155   int &elBcastNo=getData(el);
1156   // if this array element already received this message, skip it
1157   if (elBcastNo >= bcastNo) return CmiFalse;
1158   elBcastNo++;
1159   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1160   int epIdx=bcast->array_ep_bcast();
1161
1162 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1163   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1164   return CmiTrue;
1165 #else
1166   if (!broadcastViaScheduler)
1167     return el->ckInvokeEntry(epIdx, bcast, doFree);
1168   else {
1169     if (!doFree) {
1170       CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
1171       bcast = newMsg;
1172     }
1173     envelope *env = UsrToEnv(bcast);
1174     env->getsetArrayEp() = epIdx;
1175     env->getsetArrayMgr() = el->ckGetArrayID();
1176     env->getsetArrayIndex() = el->ckGetArrayIndex();
1177     CkArrayManagerDeliver(CkMyPe(), bcast, 0);
1178     return true;
1179   }
1180 #endif
1181 }
1182
1183 /// Deliver all needed broadcasts to the given local element
1184 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1185 {
1186   if (stableLocations) return CmiTrue;
1187   int &elBcastNo=getData(el);
1188   if (elBcastNo<bcastNo)
1189   {//This element needs some broadcasts-- it must have
1190    //been migrating during the broadcast.
1191     int i,nDeliver=bcastNo-elBcastNo;
1192     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1193
1194     //Skip the old junk at the front of the bcast queue
1195     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1196       oldBcasts.enq(oldBcasts.deq());
1197
1198     //Deliver the newest messages, in old-to-new order
1199     for (i=nDeliver-1;i>=0;i--)
1200     {
1201       CkArrayMessage *msg=oldBcasts.deq();
1202                 if(msg == NULL)
1203                 continue;
1204       oldBcasts.enq(msg);
1205       if (!deliver(msg, el, CmiFalse))
1206         return CmiFalse; //Element migrated away
1207     }
1208   }
1209   //Otherwise, the element survived
1210   return CmiTrue;
1211 }
1212
1213
1214 void CkArrayBroadcaster::springCleaning(void)
1215 {
1216   //Remove old broadcast messages
1217   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1218   if (nDelete>0) {
1219     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1220     for (int i=0;i<nDelete;i++)
1221       delete oldBcasts.deq();
1222   }
1223   oldBcastNo=bcastNo;
1224 }
1225
1226 void CkArrayBroadcaster::flushState() 
1227
1228   bcastNo = oldBcastNo = 0; 
1229   CkArrayMessage *msg;
1230   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1231 }
1232
1233 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1234 {
1235         CProxy_ArrayBase ap(aID);
1236         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1237 }
1238
1239 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1240 {
1241         msg->array_ep_bcast()=ep;
1242         if (ckIsDelegated()) //Just call our delegateMgr
1243           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1244         else 
1245         { //Broadcast message via serializer node
1246           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1247           int skipsched = opts & CK_MSG_EXPEDITED; 
1248           //int serializer=0;//1623802937%CkNumPes();
1249 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1250                 CProxy_CkArray ap(_aid);
1251                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1252                 CkGroupID _id = _aid;
1253 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1254 #else
1255           if (CkMyPe()==CpvAccess(serializer))
1256           {
1257                 DEBB((AA"Sending array broadcast\n"AB));
1258                 if (skipsched)
1259                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1260                 else
1261                         CProxy_CkArray(_aid).recvBroadcast(msg);
1262           } else {
1263                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1264                 CProxy_CkArray ap(_aid);
1265                 if (skipsched)
1266                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1267                 else
1268                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1269           }
1270 #endif
1271         }
1272 }
1273
1274 /// Reflect a broadcast off this Pe:
1275 void CkArray::sendBroadcast(CkMessage *msg)
1276 {
1277         CK_MAGICNUMBER_CHECK
1278         if(CkMyPe() == CpvAccess(serializer)){
1279 #if _MLOG_BCAST_TREE_
1280                 // Using the spanning tree to broadcast the message
1281                 for(int i=0; i<numChildren; i++){
1282                         CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1283                         thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1284                 }
1285         
1286                 // delivering message locally
1287                 recvBroadcast(msg);     
1288 #else
1289                 //Broadcast the message to all processors
1290                 thisProxy.recvBroadcast(msg);
1291 #endif
1292         }else{
1293                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1294         }
1295 }
1296 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1297 {
1298         CK_MAGICNUMBER_CHECK
1299         //Broadcast the message to all processors
1300         thisProxy.recvExpeditedBroadcast(msg);
1301 }
1302
1303 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1304 int _tempBroadcastCount=0;
1305
1306 // Delivers a message using the spanning tree
1307 void CkArray::recvBroadcastViaTree(CkMessage *msg)
1308 {
1309         CK_MAGICNUMBER_CHECK
1310
1311         // Using the spanning tree to broadcast the message
1312         for(int i=0; i<numChildren; i++){
1313                 CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
1314                 thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
1315         }
1316
1317         // delivering message locally
1318         recvBroadcast(msg);     
1319 }
1320
1321 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1322     if(homePe(*index)==CmiMyPe()){
1323         CkArrayMessage *bcast = (CkArrayMessage *)data;
1324     int epIdx=bcast->array_ep_bcast();
1325         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1326         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1327         envelope *env = UsrToEnv(copy);
1328         env->sender.data.group.onPE = CkMyPe();
1329         env->TN  = env->SN=0;
1330         env->piggyBcastIdx = epIdx;
1331         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1332         env->getsetArrayMgr() = thisgroup;
1333         env->getsetArrayIndex() = *index;
1334     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1335         env->setSrcPe(CkMyPe());
1336         rec->deliver(copy,CkDeliver_queue);
1337         _tempBroadcastCount++;
1338     }else{
1339         if(locMgr->homeElementCount != -1){
1340             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1341         }
1342     }
1343 }
1344
1345 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1346     arr->broadcastHomeElements(data,rec,index);
1347 }
1348 #else
1349 void CkArray::recvBroadcastViaTree(CkMessage *msg){
1350 }
1351 #endif
1352
1353
1354 /// Increment broadcast count; deliver to all local elements
1355 void CkArray::recvBroadcast(CkMessage *m)
1356 {
1357         CK_MAGICNUMBER_CHECK
1358         CkArrayMessage *msg=(CkArrayMessage *)m;
1359         broadcaster->incoming(msg);
1360
1361 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1362         _tempBroadcastCount=0;
1363         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1364 #else
1365         //Run through the list of local elements
1366         int idx=0, len=0, count=0;
1367         if (stableLocations) {            /* remove all NULLs in the array */
1368           len = 0;
1369           while (elements->next(idx)!=NULL) len++;
1370           idx = 0;
1371         }
1372         ArrayElement *el;
1373 #if CMK_BIGSIM_CHARM
1374         void *root;
1375         _TRACE_BG_TLINE_END(&root);
1376         BgSetEntryName("start-broadcast", &root);
1377         CkVec<void *> logs;    // store all logs for each delivery
1378         extern void stopVTimer();
1379         extern void startVTimer();
1380 #endif
1381         while (NULL!=(el=elements->next(idx))) {
1382 #if CMK_BIGSIM_CHARM
1383                 //BgEntrySplit("split-broadcast");
1384                 stopVTimer();
1385                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1386                 logs.push_back(curlog);
1387                 startVTimer();
1388 #endif
1389                 CmiBool doFree = CmiFalse;
1390                 if (stableLocations && ++count == len) doFree = CmiTrue;
1391                 broadcaster->deliver(msg, el, doFree);
1392         }
1393 #endif
1394
1395 #if CMK_BIGSIM_CHARM
1396         //BgEntrySplit("end-broadcast");
1397         stopVTimer();
1398         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1399         startVTimer();
1400 #endif
1401
1402         // CkArrayBroadcaster doesn't have msg buffered, and there was
1403         // no last delivery to transfer ownership
1404 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1405         if (stableLocations)
1406           delete msg;
1407 #else
1408         if (stableLocations && len == 0)
1409           delete msg;
1410 #endif
1411 }
1412
1413 #include "CkArray.def.h"
1414
1415