5b65d126375a48d430fdc120bbaa5a2c4c1ef575
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isNotifyChildInRed;
65
66 #define ARRAY_DEBUG_OUTPUT 0
67
68 #if ARRAY_DEBUG_OUTPUT 
69 #   define DEB(x) CkPrintf x  //General debug messages
70 #   define DEBI(x) CkPrintf x  //Index debug messages
71 #   define DEBC(x) CkPrintf x  //Construction debug messages
72 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
73 #   define DEBM(x) CkPrintf x  //Migration debug messages
74 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
75 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
76 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
77 #   define AA "ArrayBOC on %d: "
78 #   define AB ,CkMyPe()
79 #   define DEBUG(x) x
80 #else
81 #   define DEB(X) /*CkPrintf x*/
82 #   define DEBI(X) /*CkPrintf x*/
83 #   define DEBC(X) /*CkPrintf x*/
84 #   define DEBS(x) /*CkPrintf x*/
85 #   define DEBM(X) /*CkPrintf x*/
86 #   define DEBL(X) /*CkPrintf x*/
87 #   define DEBK(x) /*CkPrintf x*/
88 #   define DEBB(x) /*CkPrintf x*/
89 #   define str(x) /**/
90 #   define DEBUG(x)
91 #endif
92
93 ///This arrayListener is in charge of delivering broadcasts to the array.
94 class CkArrayBroadcaster : public CkArrayListener {
95   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
96 public:
97   CkArrayBroadcaster(bool _stableLocations, bool _broadcastViaScheduler);
98   CkArrayBroadcaster(CkMigrateMessage *m);
99   virtual void pup(PUP::er &p);
100   virtual ~CkArrayBroadcaster();
101   PUPable_decl(CkArrayBroadcaster);
102
103   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
104
105   ///Element was just created on this processor
106   /// Return false if the element migrated away or deleted itself.
107   virtual CmiBool ckElementCreated(ArrayElement *elt)
108     { return bringUpToDate(elt); }
109
110   ///Element just arrived on this processor (so just called pup)
111   /// Return false if the element migrated away or deleted itself.
112   virtual CmiBool ckElementArriving(ArrayElement *elt)
113     { return bringUpToDate(elt); }
114
115   void incoming(CkArrayMessage *msg);
116
117   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
118
119   void springCleaning(void);
120
121   void flushState();
122 private:
123   int bcastNo;//Number of broadcasts received (also serial number)
124   int oldBcastNo;//Above value last spring cleaning
125   //This queue stores old broadcasts (in case a migrant arrives
126   // and needs to be brought up to date)
127   CkQ<CkArrayMessage *> oldBcasts;
128   bool stableLocations;
129   bool broadcastViaScheduler;
130
131   CmiBool bringUpToDate(ArrayElement *el);
132 };
133
134 ///This arrayListener is in charge of performing reductions on the array.
135 class CkArrayReducer : public CkArrayListener {
136   CkGroupID mgrID;
137   CkReductionMgr *mgr;
138   typedef  contributorInfo *I;
139   inline contributorInfo *getData(ArrayElement *el)
140     {return (I)ckGetData(el);}
141 public:
142   /// Attach this array to this CkReductionMgr
143   CkArrayReducer(CkGroupID mgrID_);
144   CkArrayReducer(CkMigrateMessage *m);
145   virtual void pup(PUP::er &p);
146   virtual ~CkArrayReducer();
147   PUPable_decl(CkArrayReducer);
148
149   void ckBeginInserting(void) {mgr->creatingContributors();}
150   void ckEndInserting(void) {mgr->doneCreatingContributors();}
151
152   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
153
154   void ckElementCreating(ArrayElement *elt)
155     {mgr->contributorCreated(getData(elt));}
156   void ckElementDied(ArrayElement *elt)
157     {mgr->contributorDied(getData(elt));}
158
159   void ckElementLeaving(ArrayElement *elt)
160     {mgr->contributorLeaving(getData(elt));}
161   CmiBool ckElementArriving(ArrayElement *elt)
162     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
163 };
164
165 /*
166 void 
167 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
168        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
169 }
170 */
171 void
172 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
173         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
174         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
175 }
176
177 /*********************** CkVerboseListener ******************/
178 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
179
180 CkVerboseListener::CkVerboseListener(void)
181   :CkArrayListener(0)
182 {
183   VL_PRINT<<"INIT  Creating listener"<<endl;
184 }
185
186 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
187 {
188   CkArrayListener::ckRegister(arrMgr,dataOffset_);
189   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
190 }
191 void CkVerboseListener::ckBeginInserting(void)
192 {
193   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
194 }
195 void CkVerboseListener::ckEndInserting(void)
196 {
197   VL_PRINT<<"INIT  Done inserting elements"<<endl;
198 }
199
200 void CkVerboseListener::ckElementStamp(int *eltInfo)
201 {
202   VL_PRINT<<"LIFE  Stamping element"<<endl;
203 }
204 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
205 {
206   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
207 }
208 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
209 {
210   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
211   return CmiTrue;
212 }
213 void CkVerboseListener::ckElementDied(ArrayElement *elt)
214 {
215   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
216 }
217
218 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
219 {
220   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
221 }
222 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
223 {
224   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
225   return CmiTrue;
226 }
227
228
229 /************************* ArrayElement *******************/
230 class ArrayElement_initInfo {
231 public:
232   CkArray *thisArray;
233   CkArrayID thisArrayID;
234   CkArrayIndex numInitial;
235   int listenerData[CK_ARRAYLISTENER_MAXLEN];
236   CmiBool fromMigration;
237 };
238
239 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
240
241 void ArrayElement::initBasics(void)
242 {
243 #if CMK_OUT_OF_CORE
244   if (CkpvAccess(CkSaveRestorePrefetch)) 
245     return; /* Just restoring from disk--don't try to set up anything. */
246 #endif
247 #if CMK_GRID_QUEUE_AVAILABLE
248         grid_queue_interval = 0;
249         grid_queue_threshold = 0;
250         msg_count = 0;
251         msg_count_grid = 0;
252         border_flag = 0;
253
254         grid_queue_interval = CmiGridQueueGetInterval ();
255         grid_queue_threshold = CmiGridQueueGetThreshold ();
256 #endif
257   ArrayElement_initInfo &info=CkpvAccess(initInfo);
258   thisArray=info.thisArray;
259   thisArrayID=info.thisArrayID;
260   numInitialElements=info.numInitial.getCombinedCount();
261   if (info.listenerData) {
262     memcpy(listenerData,info.listenerData,sizeof(listenerData));
263   }
264   if (!info.fromMigration) {
265     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
266                           l->ckElementCreating(this));
267   }
268 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
269         mlogData->objID.type = TypeArray;
270         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
271 #endif
272 #ifdef _PIPELINED_ALLREDUCE_
273         allredMgr = NULL;
274 #endif
275 }
276
277 ArrayElement::ArrayElement(void) 
278 {
279         initBasics();
280 #if CMK_MEM_CHECKPOINT
281         init_checkpt();
282 #endif
283 }
284
285 ArrayElement::ArrayElement(CkMigrateMessage *m) : CkMigratable(m)
286 {
287         initBasics();
288 }
289
290 //Called by the system just before and after migration to another processor:  
291 void ArrayElement::ckAboutToMigrate(void) {
292         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
293                                 l->ckElementLeaving(this));
294         CkMigratable::ckAboutToMigrate();
295 }
296 void ArrayElement::ckJustMigrated(void) {
297         CkMigratable::ckJustMigrated();
298         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
299               if (!l->ckElementArriving(this)) return;);
300 }
301
302 void ArrayElement::ckJustRestored(void) {
303     CkMigratable::ckJustRestored();
304     //empty for out-of-core emulation
305 }
306
307 #ifdef _PIPELINED_ALLREDUCE_
308 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
309                                         CMK_REFNUM_TYPE userFlag)
310 {
311         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
312         msg->setUserFlag(userFlag);
313         msg->setMigratableContributor(true);
314         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
315 }
316 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
317                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
318 {
319         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
320         msg->setUserFlag(userFlag);
321         msg->setCallback(cb);
322         msg->setMigratableContributor(true);
323         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
324 }
325 void ArrayElement::contribute2(CkReductionMsg *msg) 
326 {
327         msg->setMigratableContributor(true);
328         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
329 }
330 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
331 {
332         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
333     msg->setUserFlag(userFlag);
334     msg->setCallback(cb);
335     msg->setMigratableContributor(true);
336     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
337 }
338 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
339 {
340     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
341     msg->setUserFlag(userFlag);
342     msg->setMigratableContributor(true);
343     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
344 }
345
346 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
347                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
348 {
349         // if it is a broadcast to myself and size is large
350         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
351         {
352                 if (!allredMgr) {
353                         allredMgr = new AllreduceMgr();
354                 }
355                 // number of fragments
356                 int fragNo = dataSize/FRAG_SIZE;
357                 int size = FRAG_SIZE;
358                 // for each fragment
359                 for (int i=0; i<fragNo; i++) {
360                         // callback to defragmentor
361                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
362                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
363                                 size = dataSize%FRAG_SIZE;
364                         }
365                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
366                         // initialize the new msg
367                         msg->reducer            = type;
368                         msg->nFrags             = fragNo;
369                         msg->fragNo             = i;
370                         msg->callback           = defrag_cb;
371                         msg->userFlag           = userFlag;
372                         allredMgr->cb           = cb;
373                         allredMgr->cb.type      = CkCallback::sendArray;
374                         allredMgr->cb.d.array.idx = myIndex;
375                         contribute2(msg);
376                 }
377                 return;
378         }
379         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
380         msg->setUserFlag(userFlag);
381         msg->setCallback(cb);
382         msg->setMigratableContributor(true);
383         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
384 }
385
386
387 #else
388 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
389    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
390 #endif
391 // _PIPELINED_ALLREDUCE_
392 void ArrayElement::defrag(CkReductionMsg *msg)
393 {
394 //      CkPrintf("in defrag\n");
395 #ifdef _PIPELINED_ALLREDUCE_
396         allredMgr->allreduce_recieve(msg);
397 #endif
398 }
399
400 /// Remote method: calls destructor
401 void ArrayElement::ckDestroy(void)
402 {
403         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
404             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
405                            l->ckElementDied(this));
406         }
407         CkMigratable::ckDestroy();
408 }
409
410 //Destructor (virtual)
411 ArrayElement::~ArrayElement()
412 {
413 #if CMK_OUT_OF_CORE
414   if (CkpvAccess(CkSaveRestorePrefetch)) 
415     return; /* Just saving to disk--don't trash anything. */
416 #endif
417   //To detect use-after-delete: 
418   thisArray=(CkArray *)0xDEADa7a1;
419 }
420
421 void ArrayElement::pup(PUP::er &p)
422 {
423   DEBM((AA"  ArrayElement::pup()\n"AB));
424   CkMigratable::pup(p);
425   thisArrayID.pup(p);
426   if (p.isUnpacking())
427         thisArray=thisArrayID.ckLocalBranch();
428   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
429 #if CMK_MEM_CHECKPOINT
430   p(budPEs, 2);
431 #endif
432   p.syncComment(PUP::sync_last_system,"ArrayElement");
433 #if CMK_GRID_QUEUE_AVAILABLE
434   p|grid_queue_interval;
435   p|grid_queue_threshold;
436   p|msg_count;
437   p|msg_count_grid;
438   p|border_flag;
439   if (p.isUnpacking ()) {
440     msg_count = 0;
441     msg_count_grid = 0;
442     border_flag = 0;
443   }
444 #endif
445 }
446
447 char *ArrayElement::ckDebugChareName(void) {
448         char buf[200];
449         const char *className=_chareTable[ckGetChareType()]->name;
450         const int *d=thisIndexMax.data();
451         const short int *s=(const short int*)d;
452         switch (thisIndexMax.dimension) {
453         case 0: sprintf(buf,"%s",className); break;
454         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
455         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
456         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
457     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
458     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
459     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
460         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
461         };
462         return strdup(buf);
463 }
464
465 int ArrayElement::ckDebugChareID(char *str, int limit) {
466   if (limit<21) return -1;
467   str[0] = 2;
468   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
469   *((CkArrayIndex*)&str[5]) = thisIndexMax;
470   return 21;
471 }
472
473 /// A more verbose form of abort
474 void ArrayElement::CkAbort(const char *str) const
475 {
476         CkError("[%d] Array element at index %s aborting:\n",
477                 CkMyPe(), idx2str(thisIndexMax));
478         CkMigratable::CkAbort(str);
479 }
480
481 void ArrayElement::recvBroadcast(CkMessage *m){
482 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
483         CkArrayMessage *bcast = (CkArrayMessage *)m;
484     envelope *env = UsrToEnv(m);
485         int epIdx= env->piggyBcastIdx;
486     ckInvokeEntry(epIdx,bcast,CmiTrue);
487 #endif
488 }
489
490 /*********************** Spring Cleaning *****************
491 Periodically (every minute or so) remove expired broadcasts
492 from the queue.
493
494 This does not get called for arrays with stable locations (all
495 insertions done at creation, migration only at discrete points).
496 */
497
498 inline void CkArray::springCleaning(void)
499 {
500   DEBK((AA"Starting spring cleaning\n"AB));
501   broadcaster->springCleaning();
502 }
503
504 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
505         ((CkArray *)forArray)->springCleaning();
506 }
507
508 /********************* Little CkArray Utilities ******************/
509
510 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
511         :CProxy(), _aid(e->ckGetArrayID())
512         {}
513 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
514         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
515         {}
516
517 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
518         {return ckLocalBranch()->getLocMgr(); }
519
520 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
521
522 CkArrayOptions::CkArrayOptions(void) //Default: empty array
523         :numInitial(),map(_defaultArrayMapID)
524 {
525     init();
526 }
527
528 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
529         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
530 {
531     init();
532 }
533
534 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
535         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
536 {
537     init();
538 }
539
540 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
541         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
542 {
543     init();
544 }
545
546 void CkArrayOptions::init()
547 {
548     locMgr.setZero();
549     anytimeMigration = _isAnytimeMigration;
550     staticInsertion = false;
551     reductionClient.type = CkCallback::invalid;
552     disableNotifyChildInRed = !_isNotifyChildInRed;
553     broadcastViaScheduler = false;
554 }
555
556 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
557 {
558     staticInsertion = b;
559     if (b && map == _defaultArrayMapID)
560         map = _fastArrayMapID;
561     return *this;
562 }
563
564 /// Bind our elements to this array
565 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
566 {
567         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
568         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
569         // not just initially.
570         //setNumInitial(arr->getNumInitial());
571         return setLocationManager(arr->getLocMgr()->getGroupID());
572 }
573 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
574 {
575         arrayListeners.push_back(listener);
576         return *this;
577 }
578
579 void CkArrayOptions::pup(PUP::er &p) {
580         p|numInitial;
581         p|map;
582         p|locMgr;
583         p|arrayListeners;
584         p|reductionClient;
585         p|anytimeMigration;
586         p|disableNotifyChildInRed;
587         p|staticInsertion;
588         p|broadcastViaScheduler;
589 }
590
591 CkArrayListener::CkArrayListener(int nInts_) 
592   :nInts(nInts_) 
593 {
594   dataOffset=-1;
595 }
596 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
597   nInts=-1; dataOffset=-1;
598 }
599 void CkArrayListener::pup(PUP::er &p) {
600   p|nInts;
601   p|dataOffset;
602 }
603
604 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
605 {
606   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
607   dataOffset=dataOffset_;
608 }
609
610 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
611                                           const CkArrayOptions &opts_)
612 {
613   CkArrayOptions opts(opts_);
614   CkGroupID locMgr = opts.getLocationManager();
615   if (locMgr.isZero())
616   { //Create a new location manager
617 #if !CMK_LBDB_ON
618     CkGroupID _lbdb;
619 #endif
620     CkEntryOptions  e_opts;
621     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
622     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
623     opts.setLocationManager(locMgr);
624   }
625   //Create the array manager
626   m->array_ep()=ctor;
627   CkMarshalledMessage marsh(m);
628   CkEntryOptions  e_opts;
629   e_opts.setGroupDepID(locMgr);       // group creation dependence
630 #if !GROUP_LEVEL_REDUCTION
631   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
632   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
633   nodereductionProxy.setAttachedGroup(ag);
634 #else
635   CkNodeGroupID dummyid;
636   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
637 #endif
638   return (CkArrayID)ag;
639 }
640
641 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
642 {
643   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
644 }
645
646 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
647         const CkArrayIndex &idx)
648 {
649   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
650   m->array_ep()=ctor;
651   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
652   if (ckIsDelegated()) {
653         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
654         return;
655   }
656   
657   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
658   CkArrayManagerInsert(onPe,m,_aid);
659 }
660
661 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
662 {
663   ckInsertIdx(m,ctorIndex,onPe,_idx);
664 }
665
666 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
667 {
668   return ckLocalBranch()->lookup(_idx);
669 }
670
671 //pack-unpack method for CProxy_ArrayBase
672 void CProxy_ArrayBase::pup(PUP::er &p)
673 {
674   CProxy::pup(p);
675   _aid.pup(p);
676 }
677 void CProxyElement_ArrayBase::pup(PUP::er &p)
678 {
679   CProxy_ArrayBase::pup(p);
680   p|_idx.nInts;
681   p|_idx.dimension;
682   p(_idx.data(),_idx.nInts);
683 }
684
685 void CProxySection_ArrayBase::pup(PUP::er &p)
686 {
687   CProxy_ArrayBase::pup(p);
688   p | _nsid;
689   if (p.isUnpacking()) {
690     if (_nsid == 1) _sid = new CkSectionID;
691     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
692     else _sid = NULL;
693   }
694   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
695 }
696
697 /*********************** CkArray Creation *************************/
698 void _ckArrayInit(void)
699 {
700   CkpvInitialize(ArrayElement_initInfo,initInfo);
701   CkDisableTracing(CkIndex_CkArray::insertElement(0));
702   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
703     // disable because broadcast listener may deliver broadcast message
704   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
705   // by default anytime migration is allowed
706 }
707
708 CkArray::CkArray(CkArrayOptions &opts,
709                  CkMarshalledMessage &initMsg,
710                  CkNodeGroupID nodereductionID)
711   : CkReductionMgr(),
712     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
713     locMgrID(opts.getLocationManager()),
714     thisProxy(thisgroup),
715     // Register with our location manager
716     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
717     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
718     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
719 {
720   if (!stableLocations)
721       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
722                              staticSpringCleaning, (void *)this);
723   
724   //set the field in one my parent class (CkReductionMgr)
725   if(opts.disableNotifyChildInRed)
726           disableNotifyChildrenStart = CmiTrue; 
727   
728   //Find, register, and initialize the arrayListeners
729   listenerDataOffset=0;
730   broadcaster=new CkArrayBroadcaster(stableLocations, opts.broadcastViaScheduler);
731   addListener(broadcaster);
732   reducer=new CkArrayReducer(thisgroup);
733   addListener(reducer);
734
735   // COMLIB HACK
736   //calistener = new ComlibArrayListener();
737   //addListener(calistener,dataOffset);
738
739   int lNo,nL=opts.getListeners(); //User-added listeners
740   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
741
742   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
743
744   ///Set up initial elements (if any)
745   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
746
747   ///adding code for Reduction using nodegroups
748
749 #if !GROUP_LEVEL_REDUCTION
750   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
751   nodeProxy = nodetemp;
752   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
753 #endif
754
755   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
756       ckSetReductionClient(&opts.reductionClient);
757 }
758
759 CkArray::CkArray(CkMigrateMessage *m)
760         :CkReductionMgr(m), thisProxy(thisgroup)
761 {
762   locMgr=NULL;
763   isInserting=CmiTrue;
764 }
765
766 #if CMK_ERROR_CHECKING
767 inline void testPup(PUP::er &p,int shouldBe) {
768   int a=shouldBe;
769   p|a;
770   if (a!=shouldBe)
771     CkAbort("PUP direction mismatch!");
772 }
773 #else
774 inline void testPup(PUP::er &p,int shouldBe) {}
775 #endif
776
777 void CkArray::pup(PUP::er &p){
778         CkReductionMgr::pup(p);
779         p|numInitial;
780         p|locMgrID;
781         p|listeners;
782         p|listenerDataOffset;
783         p|stableLocations;
784         testPup(p,1234);
785         if(p.isUnpacking()){
786                 thisProxy=thisgroup;
787                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
788                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
789                 /// Restore our default listeners:
790                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
791                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
792                 /// set up broadcast cleaner
793                 if (!stableLocations)
794                     CcdCallOnConditionKeep(CcdPERIODIC_1minute,
795                                            staticSpringCleaning, (void *)this);
796         }
797 }
798
799 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
800   int dataOffset=0; \
801   for (int lNo=0;lNo<listeners.size();lNo++) { \
802     CkArrayListener *l=listeners[lNo]; \
803     l->ckElementStamp(&listenerData[dataOffset]); \
804     dataOffset+=l->ckGetLen(); \
805   } \
806 } while (0)
807
808 //Called on send side to prepare array constructor message
809 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
810 {
811   envelope *env=UsrToEnv((void *)m);
812   env->getsetArrayIndex()=idx;
813   int *listenerData=env->getsetArrayListenerData();
814   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
815   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
816   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
817         getLocMgr()->inform(idx,onPe);
818 }
819
820 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
821                         CkElementCreation_t type)
822 {
823         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
824         if (type==CkElementCreation_resume) 
825         { // HACK: Re-stamp elements on checkpoint resume--
826           //  this restores, e.g., reduction manager's gcount
827                 int *listenerData=ret->listenerData;
828                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
829         }
830         return ret;
831 }
832
833 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
834                      CkMessage *msg,CmiBool fromMigration) 
835 {
836         //Stash the element's initialization information in the global "initInfo"
837         ArrayElement_initInfo &init=CkpvAccess(initInfo);
838         init.numInitial=numInitial;
839         init.thisArray=this;
840         init.thisArrayID=thisgroup;
841         if (msg) /*Have to *copy* data because msg will be deleted*/
842           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
843                  sizeof(init.listenerData));
844         init.fromMigration=fromMigration;
845         
846         //Build the element
847         int elSize=_chareTable[elChareType]->size;
848         ArrayElement *elem = (ArrayElement *)malloc(elSize);
849 #ifndef CMK_OPTIMIZE
850         if (elem!=NULL) setMemoryTypeChare(elem);
851 #endif
852         return elem;
853 }
854
855 /// This method is called by ck.C or the user to add an element.
856 CmiBool CkArray::insertElement(CkMessage *me)
857 {
858   CK_MAGICNUMBER_CHECK
859   CkArrayMessage *m=(CkArrayMessage *)me;
860   const CkArrayIndex &idx=m->array_index();
861   int onPe;
862   if (locMgr->isRemote(idx,&onPe)) 
863   { /* element's sibling lives somewhere else, so insert there */
864         CkArrayManagerInsert(onPe,me,thisgroup);
865         return CmiFalse;
866   }
867   int ctorIdx=m->array_ep();
868   int chareType=_entryTable[ctorIdx]->chareIdx;
869   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
870 #ifndef CMK_CHARE_USE_PTR
871   ((Chare *)elt)->chareIdx = -1;
872 #endif
873   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
874   CK_ARRAYLISTENER_LOOP(listeners,
875       if (!l->ckElementCreated(elt)) return CmiFalse;);
876   return CmiTrue;
877 }
878
879 void CProxy_ArrayBase::doneInserting(void)
880 {
881   DEBC((AA"Broadcasting a doneInserting request\n"AB));
882   //Broadcast a DoneInserting
883   CProxy_CkArray(_aid).remoteDoneInserting();
884 }
885
886 void CkArray::doneInserting(void)
887 {
888   thisProxy[CkMyPe()].remoteDoneInserting();
889 }
890
891 /// This is called on every processor after the last array insertion.
892 void CkArray::remoteDoneInserting(void)
893 {
894   CK_MAGICNUMBER_CHECK
895   if (isInserting) {
896     isInserting=CmiFalse;
897     DEBC((AA"Done inserting objects\n"AB));
898     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
899     locMgr->doneInserting();
900   }
901 }
902
903 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
904         int onPe,int ctor,CkDeliver_t type)
905 {
906         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
907         prepareCtorMsg(m,onPe,idx);
908         m->array_ep()=ctor;
909         
910         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
911                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
912                 CkArrayManagerInsert(onPe,m,thisgroup);
913         } else /* local message, non-queued */ {
914                 //Call local constructor directly
915                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
916                 return insertElement(m);
917         }
918         return CmiTrue;
919 }
920
921 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
922 {
923         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
924         if (local) {
925           int onPe=CkMyPe();
926           prepareCtorMsg(m,onPe,idx);
927 #if CMK_BIGSIM_CHARM
928           BgEntrySplit("split-array-new");
929 #endif
930           insertElement(m);
931         }
932         else {
933           int onPe=-1;
934           prepareCtorMsg(m,onPe,idx);
935           CkArrayManagerInsert(onPe,m,getGroupID());
936         }
937 }
938
939 /********************* CkArray Messaging ******************/
940 /// Fill out a message's array fields before sending it
941 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
942 {
943         envelope *env=UsrToEnv((void *)msg);
944         env->getsetArrayMgr()=aid;
945         env->getsetArraySrcPe()=CkMyPe();
946         env->setEpIdx(ep);
947         env->getsetArrayHops()=0;
948 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
949         criticalPath_send(env);
950         automaticallySetMessagePriority(env);
951 #endif
952 }
953
954
955 /// Just a non-inlined version of msg_prepareSend()
956 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
957 {
958         envelope *env=UsrToEnv((void *)msg);
959         env->getsetArrayMgr()=aid;
960         env->getsetArraySrcPe()=CkMyPe();
961         env->setEpIdx(ep);
962         env->getsetArrayHops()=0;
963 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
964         criticalPath_send(env);
965         automaticallySetMessagePriority(env);
966 #endif
967 }
968
969 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
970 {
971 #if CMK_ERROR_CHECKING
972         //Check our array index for validity
973         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
974         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
975                 CkAbort("Array index length (nInts) is too long-- did you "
976                         "use bytes instead of integers?\n");
977 #endif
978         msg_prepareSend(msg,ep,ckGetArrayID());
979         msg->array_index()=_idx;//Insert array index
980         if (ckIsDelegated()) //Just call our delegateMgr
981           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
982         else 
983         { //Usual case: a direct send
984           CkArray *localbranch = ckLocalBranch();
985           if (localbranch == NULL) {             // array not created yet
986             CkArrayManagerDeliver(CkMyPe(), msg, 0);
987           }
988           else {
989             if (opts & CK_MSG_INLINE)
990               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
991             else
992               localbranch->deliver(msg, CkDeliver_queue, opts);
993           }
994         }
995 }
996
997 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
998 {
999         CkFutureID f=CkCreateAttachedFuture(msg);
1000         ckSend(msg,ep);
1001         return CkWaitReleaseFuture(f);
1002 }
1003
1004 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
1005 {
1006         CProxySection_ArrayBase sp(sID);
1007         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1008 }
1009
1010 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1011 {
1012         if (ckIsDelegated()) //Just call our delegateMgr
1013           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1014         else {
1015           // send through all
1016           for (int k=0; k<_nsid; ++k) {
1017             for (int i=0; i< _sid[k]._nElems-1; i++) {
1018               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[i]);
1019               void *newMsg=CkCopyMsg((void **)&msg);
1020               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1021             }
1022             if (_sid[k]._nElems > 0) {
1023               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1024               CProxyElement_ArrayBase ap(_sid[k]._cookie.get_aid(), _sid[k]._elems[_sid[k]._nElems-1]);
1025               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1026             }
1027           }
1028         }
1029 }
1030
1031 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1032 {
1033   CkArrayMessage *m=(CkArrayMessage *)msg;
1034   m->array_index()=idx;
1035   msg_prepareSend(m,entryIndex,aID);
1036   CkArray *a=(CkArray *)_localBranch(aID);
1037   if (a == NULL)
1038     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1039   else
1040     a->deliver(m,CkDeliver_queue,opts);
1041 }
1042
1043 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1044 {
1045   CkArrayMessage *m=(CkArrayMessage *)msg;
1046   m->array_index()=idx;
1047   msg_prepareSend(m,entryIndex,aID);
1048   CkArray *a=(CkArray *)_localBranch(aID);
1049   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1050   a->deliver(m,CkDeliver_inline,opts);
1051   if (oldStatus) CkEnableTracing(entryIndex);
1052 }
1053
1054
1055 /*********************** CkArray Reduction *******************/
1056 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1057   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1058    mgrID(mgrID_)
1059 {
1060   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1061 }
1062 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1063   :CkArrayListener(m)
1064 {
1065   mgr=NULL;
1066 }
1067 void CkArrayReducer::pup(PUP::er &p) {
1068   CkArrayListener::pup(p);
1069   p|mgrID;
1070   if (p.isUnpacking())
1071     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1072 }
1073 CkArrayReducer::~CkArrayReducer() {}
1074
1075 /*********************** CkArray Broadcast ******************/
1076
1077 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_, bool broadcastViaScheduler_)
1078     :CkArrayListener(1), //Each array element carries a broadcast number
1079      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_), broadcastViaScheduler(broadcastViaScheduler_)
1080 { }
1081
1082 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1083     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1), broadcastViaScheduler(false)
1084 { }
1085
1086 void CkArrayBroadcaster::pup(PUP::er &p) {
1087   CkArrayListener::pup(p);
1088   /* Assumption: no migrants during checkpoint, so no need to
1089      save old broadcasts. */
1090   p|bcastNo;
1091   p|stableLocations;
1092   p|broadcastViaScheduler;
1093   if (p.isUnpacking()) {
1094     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1095   }
1096 }
1097
1098 CkArrayBroadcaster::~CkArrayBroadcaster()
1099 {
1100   CkArrayMessage *msg;
1101   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1102 }
1103
1104 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1105 {
1106   bcastNo++;
1107   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1108
1109   if (stableLocations)
1110     return;
1111
1112   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1113   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1114 }
1115
1116 /// Deliver a copy of the given broadcast to the given local element
1117 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1118                                     CmiBool doFree)
1119 {
1120   int &elBcastNo=getData(el);
1121   // if this array element already received this message, skip it
1122   if (elBcastNo >= bcastNo) return CmiFalse;
1123   elBcastNo++;
1124   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1125   int epIdx=bcast->array_ep_bcast();
1126
1127 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1128   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1129   return CmiTrue;
1130 #else
1131   if (!broadcastViaScheduler)
1132     return el->ckInvokeEntry(epIdx, bcast, doFree);
1133   else {
1134     if (!doFree) {
1135       CkArrayMessage *newMsg = (CkArrayMessage *)CkCopyMsg((void **)&bcast);
1136       bcast = newMsg;
1137     }
1138     envelope *env = UsrToEnv(bcast);
1139     env->getsetArrayEp() = epIdx;
1140     env->getsetArrayMgr() = el->ckGetArrayID();
1141     env->getsetArrayIndex() = el->ckGetArrayIndex();
1142     CkArrayManagerDeliver(CkMyPe(), bcast, 0);
1143     return true;
1144   }
1145 #endif
1146 }
1147
1148 /// Deliver all needed broadcasts to the given local element
1149 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1150 {
1151   if (stableLocations) return CmiTrue;
1152   int &elBcastNo=getData(el);
1153   if (elBcastNo<bcastNo)
1154   {//This element needs some broadcasts-- it must have
1155    //been migrating during the broadcast.
1156     int i,nDeliver=bcastNo-elBcastNo;
1157     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1158
1159     //Skip the old junk at the front of the bcast queue
1160     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1161       oldBcasts.enq(oldBcasts.deq());
1162
1163     //Deliver the newest messages, in old-to-new order
1164     for (i=nDeliver-1;i>=0;i--)
1165     {
1166       CkArrayMessage *msg=oldBcasts.deq();
1167                 if(msg == NULL)
1168                 continue;
1169       oldBcasts.enq(msg);
1170       if (!deliver(msg, el, CmiFalse))
1171         return CmiFalse; //Element migrated away
1172     }
1173   }
1174   //Otherwise, the element survived
1175   return CmiTrue;
1176 }
1177
1178
1179 void CkArrayBroadcaster::springCleaning(void)
1180 {
1181   //Remove old broadcast messages
1182   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1183   if (nDelete>0) {
1184     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1185     for (int i=0;i<nDelete;i++)
1186       delete oldBcasts.deq();
1187   }
1188   oldBcastNo=bcastNo;
1189 }
1190
1191 void CkArrayBroadcaster::flushState() 
1192
1193   bcastNo = oldBcastNo = 0; 
1194   CkArrayMessage *msg;
1195   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1196 }
1197
1198 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1199 {
1200         CProxy_ArrayBase ap(aID);
1201         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1202 }
1203
1204 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1205 {
1206         msg->array_ep_bcast()=ep;
1207         if (ckIsDelegated()) //Just call our delegateMgr
1208           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1209         else 
1210         { //Broadcast message via serializer node
1211           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1212           int skipsched = opts & CK_MSG_EXPEDITED; 
1213           //int serializer=0;//1623802937%CkNumPes();
1214 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1215                 CProxy_CkArray ap(_aid);
1216                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1217                 CkGroupID _id = _aid;
1218 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1219 #else
1220           if (CkMyPe()==CpvAccess(serializer))
1221           {
1222                 DEBB((AA"Sending array broadcast\n"AB));
1223                 if (skipsched)
1224                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1225                 else
1226                         CProxy_CkArray(_aid).recvBroadcast(msg);
1227           } else {
1228                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1229                 CProxy_CkArray ap(_aid);
1230                 if (skipsched)
1231                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1232                 else
1233                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1234           }
1235 #endif
1236         }
1237 }
1238
1239 /// Reflect a broadcast off this Pe:
1240 void CkArray::sendBroadcast(CkMessage *msg)
1241 {
1242         CK_MAGICNUMBER_CHECK
1243         if(CkMyPe() == CpvAccess(serializer)){
1244                 //Broadcast the message to all processors
1245                 thisProxy.recvBroadcast(msg);
1246         }else{
1247                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1248         }
1249 }
1250 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1251 {
1252         CK_MAGICNUMBER_CHECK
1253         //Broadcast the message to all processors
1254         thisProxy.recvExpeditedBroadcast(msg);
1255 }
1256
1257 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1258 int _tempBroadcastCount=0;
1259
1260 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1261     if(homePe(*index)==CmiMyPe()){
1262         CkArrayMessage *bcast = (CkArrayMessage *)data;
1263     int epIdx=bcast->array_ep_bcast();
1264         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1265         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1266         envelope *env = UsrToEnv(copy);
1267         env->sender.data.group.onPE = CkMyPe();
1268         env->TN  = env->SN=0;
1269         env->piggyBcastIdx = epIdx;
1270         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1271         env->getsetArrayMgr() = thisgroup;
1272         env->getsetArrayIndex() = *index;
1273     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1274         env->setSrcPe(CkMyPe());
1275         rec->deliver(copy,CkDeliver_queue);
1276         _tempBroadcastCount++;
1277     }else{
1278         if(locMgr->homeElementCount != -1){
1279             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1280         }
1281     }
1282 }
1283
1284 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1285     arr->broadcastHomeElements(data,rec,index);
1286 }
1287 #endif
1288
1289
1290 /// Increment broadcast count; deliver to all local elements
1291 void CkArray::recvBroadcast(CkMessage *m)
1292 {
1293         CK_MAGICNUMBER_CHECK
1294         CkArrayMessage *msg=(CkArrayMessage *)m;
1295         broadcaster->incoming(msg);
1296
1297 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1298         _tempBroadcastCount=0;
1299         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1300 #else
1301         //Run through the list of local elements
1302         int idx=0, len=0;
1303         if (stableLocations) {            /* remove all NULLs in the array */
1304           len = 0;
1305           while (elements->next(idx)!=NULL) len++;
1306           idx = 0;
1307         }
1308         ArrayElement *el;
1309 #if CMK_BIGSIM_CHARM
1310         void *root;
1311         _TRACE_BG_TLINE_END(&root);
1312         BgSetEntryName("start-broadcast", &root);
1313         CkVec<void *> logs;    // store all logs for each delivery
1314         extern void stopVTimer();
1315         extern void startVTimer();
1316 #endif
1317         while (NULL!=(el=elements->next(idx))) {
1318 #if CMK_BIGSIM_CHARM
1319                 //BgEntrySplit("split-broadcast");
1320                 stopVTimer();
1321                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1322                 logs.push_back(curlog);
1323                 startVTimer();
1324 #endif
1325                 CmiBool doFree = CmiFalse;
1326                 if (stableLocations && idx == len) doFree = CmiTrue;
1327                 broadcaster->deliver(msg, el, doFree);
1328         }
1329 #endif
1330
1331 #if CMK_BIGSIM_CHARM
1332         //BgEntrySplit("end-broadcast");
1333         stopVTimer();
1334         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1335         startVTimer();
1336 #endif
1337
1338         // CkArrayBroadcaster doesn't have msg buffered, and there was
1339         // no last delivery to transfer ownership
1340         if (stableLocations && len == 0)
1341           delete msg;
1342 }
1343
1344 #include "CkArray.def.h"
1345
1346