add section callback
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 bool _isAnytimeMigration;
64 bool _isNotifyChildInRed;
65
66 #define ARRAY_DEBUG_OUTPUT 0
67
68 #if ARRAY_DEBUG_OUTPUT 
69 #   define DEB(x) CkPrintf x  //General debug messages
70 #   define DEBI(x) CkPrintf x  //Index debug messages
71 #   define DEBC(x) CkPrintf x  //Construction debug messages
72 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
73 #   define DEBM(x) CkPrintf x  //Migration debug messages
74 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
75 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
76 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
77 #   define AA "ArrayBOC on %d: "
78 #   define AB ,CkMyPe()
79 #   define DEBUG(x) x
80 #else
81 #   define DEB(X) /*CkPrintf x*/
82 #   define DEBI(X) /*CkPrintf x*/
83 #   define DEBC(X) /*CkPrintf x*/
84 #   define DEBS(x) /*CkPrintf x*/
85 #   define DEBM(X) /*CkPrintf x*/
86 #   define DEBL(X) /*CkPrintf x*/
87 #   define DEBK(x) /*CkPrintf x*/
88 #   define DEBB(x) /*CkPrintf x*/
89 #   define str(x) /**/
90 #   define DEBUG(x)
91 #endif
92
93 ///This arrayListener is in charge of delivering broadcasts to the array.
94 class CkArrayBroadcaster : public CkArrayListener {
95   inline int &getData(ArrayElement *el) {return *ckGetData(el);}
96 public:
97   CkArrayBroadcaster(bool _stableLocations);
98   CkArrayBroadcaster(CkMigrateMessage *m);
99   virtual void pup(PUP::er &p);
100   virtual ~CkArrayBroadcaster();
101   PUPable_decl(CkArrayBroadcaster);
102
103   virtual void ckElementStamp(int *eltInfo) {*eltInfo=bcastNo;}
104
105   ///Element was just created on this processor
106   /// Return false if the element migrated away or deleted itself.
107   virtual CmiBool ckElementCreated(ArrayElement *elt)
108     { return bringUpToDate(elt); }
109
110   ///Element just arrived on this processor (so just called pup)
111   /// Return false if the element migrated away or deleted itself.
112   virtual CmiBool ckElementArriving(ArrayElement *elt)
113     { return bringUpToDate(elt); }
114
115   void incoming(CkArrayMessage *msg);
116
117   CmiBool deliver(CkArrayMessage *bcast, ArrayElement *el, bool doFree);
118
119   void springCleaning(void);
120
121   void flushState();
122 private:
123   int bcastNo;//Number of broadcasts received (also serial number)
124   int oldBcastNo;//Above value last spring cleaning
125   //This queue stores old broadcasts (in case a migrant arrives
126   // and needs to be brought up to date)
127   CkQ<CkArrayMessage *> oldBcasts;
128   bool stableLocations;
129
130   CmiBool bringUpToDate(ArrayElement *el);
131 };
132
133 ///This arrayListener is in charge of performing reductions on the array.
134 class CkArrayReducer : public CkArrayListener {
135   CkGroupID mgrID;
136   CkReductionMgr *mgr;
137   typedef  contributorInfo *I;
138   inline contributorInfo *getData(ArrayElement *el)
139     {return (I)ckGetData(el);}
140 public:
141   /// Attach this array to this CkReductionMgr
142   CkArrayReducer(CkGroupID mgrID_);
143   CkArrayReducer(CkMigrateMessage *m);
144   virtual void pup(PUP::er &p);
145   virtual ~CkArrayReducer();
146   PUPable_decl(CkArrayReducer);
147
148   void ckBeginInserting(void) {mgr->creatingContributors();}
149   void ckEndInserting(void) {mgr->doneCreatingContributors();}
150
151   void ckElementStamp(int *eltInfo) {mgr->contributorStamped((I)eltInfo);}
152
153   void ckElementCreating(ArrayElement *elt)
154     {mgr->contributorCreated(getData(elt));}
155   void ckElementDied(ArrayElement *elt)
156     {mgr->contributorDied(getData(elt));}
157
158   void ckElementLeaving(ArrayElement *elt)
159     {mgr->contributorLeaving(getData(elt));}
160   CmiBool ckElementArriving(ArrayElement *elt)
161     {mgr->contributorArriving(getData(elt)); return CmiTrue; }
162 };
163
164 /*
165 void 
166 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
167        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
168 }
169 */
170 void
171 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndex _idx, void *m, int ep, int opts) {
172         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
173         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
174 }
175
176 /*********************** CkVerboseListener ******************/
177 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
178
179 CkVerboseListener::CkVerboseListener(void)
180   :CkArrayListener(0)
181 {
182   VL_PRINT<<"INIT  Creating listener"<<endl;
183 }
184
185 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
186 {
187   CkArrayListener::ckRegister(arrMgr,dataOffset_);
188   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
189 }
190 void CkVerboseListener::ckBeginInserting(void)
191 {
192   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
193 }
194 void CkVerboseListener::ckEndInserting(void)
195 {
196   VL_PRINT<<"INIT  Done inserting elements"<<endl;
197 }
198
199 void CkVerboseListener::ckElementStamp(int *eltInfo)
200 {
201   VL_PRINT<<"LIFE  Stamping element"<<endl;
202 }
203 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
204 {
205   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
206 }
207 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
208 {
209   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
210   return CmiTrue;
211 }
212 void CkVerboseListener::ckElementDied(ArrayElement *elt)
213 {
214   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
215 }
216
217 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
218 {
219   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
220 }
221 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
222 {
223   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
224   return CmiTrue;
225 }
226
227
228 /************************* ArrayElement *******************/
229 class ArrayElement_initInfo {
230 public:
231   CkArray *thisArray;
232   CkArrayID thisArrayID;
233   CkArrayIndex numInitial;
234   int listenerData[CK_ARRAYLISTENER_MAXLEN];
235   CmiBool fromMigration;
236 };
237
238 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
239
240 void ArrayElement::initBasics(void)
241 {
242 #if CMK_OUT_OF_CORE
243   if (CkpvAccess(CkSaveRestorePrefetch)) 
244     return; /* Just restoring from disk--don't try to set up anything. */
245 #endif
246 #if CMK_GRID_QUEUE_AVAILABLE
247         grid_queue_interval = 0;
248         grid_queue_threshold = 0;
249         msg_count = 0;
250         msg_count_grid = 0;
251         border_flag = 0;
252
253         grid_queue_interval = CmiGridQueueGetInterval ();
254         grid_queue_threshold = CmiGridQueueGetThreshold ();
255 #endif
256   ArrayElement_initInfo &info=CkpvAccess(initInfo);
257   thisArray=info.thisArray;
258   thisArrayID=info.thisArrayID;
259   numInitialElements=info.numInitial.getCombinedCount();
260   if (info.listenerData) {
261     memcpy(listenerData,info.listenerData,sizeof(listenerData));
262   }
263   if (!info.fromMigration) {
264     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
265                           l->ckElementCreating(this));
266   }
267 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
268         mlogData->objID.type = TypeArray;
269         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
270 #endif
271 #ifdef _PIPELINED_ALLREDUCE_
272         allredMgr = NULL;
273 #endif
274 }
275
276 ArrayElement::ArrayElement(void) 
277 {
278         initBasics();
279 #if CMK_MEM_CHECKPOINT
280         init_checkpt();
281 #endif
282 }
283
284 ArrayElement::ArrayElement(CkMigrateMessage *m) 
285 {
286         initBasics();
287 }
288
289 //Called by the system just before and after migration to another processor:  
290 void ArrayElement::ckAboutToMigrate(void) {
291         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
292                                 l->ckElementLeaving(this));
293         CkMigratable::ckAboutToMigrate();
294 }
295 void ArrayElement::ckJustMigrated(void) {
296         CkMigratable::ckJustMigrated();
297         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
298               if (!l->ckElementArriving(this)) return;);
299 }
300
301 void ArrayElement::ckJustRestored(void) {
302     CkMigratable::ckJustRestored();
303     //empty for out-of-core emulation
304 }
305
306 #ifdef _PIPELINED_ALLREDUCE_
307 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
308                                         CMK_REFNUM_TYPE userFlag)
309 {
310         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
311         msg->setUserFlag(userFlag);
312         msg->setMigratableContributor(true);
313         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
314 }
315 void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
316                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
317 {
318         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
319         msg->setUserFlag(userFlag);
320         msg->setCallback(cb);
321         msg->setMigratableContributor(true);
322         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
323 }
324 void ArrayElement::contribute2(CkReductionMsg *msg) 
325 {
326         msg->setMigratableContributor(true);
327         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
328 }
329 void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
330 {
331         CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
332     msg->setUserFlag(userFlag);
333     msg->setCallback(cb);
334     msg->setMigratableContributor(true);
335     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
336 }
337 void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
338 {
339     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
340     msg->setUserFlag(userFlag);
341     msg->setMigratableContributor(true);
342     thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
343 }
344
345 void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
346                                                           const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
347 {
348         // if it is a broadcast to myself and size is large
349         if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
350         {
351                 if (!allredMgr) {
352                         allredMgr = new AllreduceMgr();
353                 }
354                 // number of fragments
355                 int fragNo = dataSize/FRAG_SIZE;
356                 int size = FRAG_SIZE;
357                 // for each fragment
358                 for (int i=0; i<fragNo; i++) {
359                         // callback to defragmentor
360                         CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
361                         if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
362                                 size = dataSize%FRAG_SIZE;
363                         }
364                         CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
365                         // initialize the new msg
366                         msg->reducer            = type;
367                         msg->nFrags             = fragNo;
368                         msg->fragNo             = i;
369                         msg->callback           = defrag_cb;
370                         msg->userFlag           = userFlag;
371                         allredMgr->cb           = cb;
372                         allredMgr->cb.type      = CkCallback::sendArray;
373                         allredMgr->cb.d.array.idx = myIndex;
374                         contribute2(msg);
375                 }
376                 return;
377         }
378         CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
379         msg->setUserFlag(userFlag);
380         msg->setCallback(cb);
381         msg->setMigratableContributor(true);
382         thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
383 }
384
385
386 #else
387 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
388    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
389 #endif
390 // _PIPELINED_ALLREDUCE_
391 void ArrayElement::defrag(CkReductionMsg *msg)
392 {
393 //      CkPrintf("in defrag\n");
394 #ifdef _PIPELINED_ALLREDUCE_
395         allredMgr->allreduce_recieve(msg);
396 #endif
397 }
398
399 /// Remote method: calls destructor
400 void ArrayElement::ckDestroy(void)
401 {
402         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
403             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
404                            l->ckElementDied(this));
405         }
406         CkMigratable::ckDestroy();
407 }
408
409 //Destructor (virtual)
410 ArrayElement::~ArrayElement()
411 {
412 #if CMK_OUT_OF_CORE
413   if (CkpvAccess(CkSaveRestorePrefetch)) 
414     return; /* Just saving to disk--don't trash anything. */
415 #endif
416   //To detect use-after-delete: 
417   thisArray=(CkArray *)0xDEADa7a1;
418 }
419
420 void ArrayElement::pup(PUP::er &p)
421 {
422   DEBM((AA"  ArrayElement::pup()\n"AB));
423   CkMigratable::pup(p);
424   thisArrayID.pup(p);
425   if (p.isUnpacking())
426         thisArray=thisArrayID.ckLocalBranch();
427   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
428 #if CMK_MEM_CHECKPOINT
429   p(budPEs, 2);
430 #endif
431   p.syncComment(PUP::sync_last_system,"ArrayElement");
432 #if CMK_GRID_QUEUE_AVAILABLE
433   p|grid_queue_interval;
434   p|grid_queue_threshold;
435   p|msg_count;
436   p|msg_count_grid;
437   p|border_flag;
438   if (p.isUnpacking ()) {
439     msg_count = 0;
440     msg_count_grid = 0;
441     border_flag = 0;
442   }
443 #endif
444 }
445
446 char *ArrayElement::ckDebugChareName(void) {
447         char buf[200];
448         const char *className=_chareTable[ckGetChareType()]->name;
449         const int *d=thisIndexMax.data();
450         const short int *s=(const short int*)d;
451         switch (thisIndexMax.dimension) {
452         case 0: sprintf(buf,"%s",className); break;
453         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
454         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
455         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
456     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
457     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
458     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
459         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
460         };
461         return strdup(buf);
462 }
463
464 int ArrayElement::ckDebugChareID(char *str, int limit) {
465   if (limit<21) return -1;
466   str[0] = 2;
467   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
468   *((CkArrayIndex*)&str[5]) = thisIndexMax;
469   return 21;
470 }
471
472 /// A more verbose form of abort
473 void ArrayElement::CkAbort(const char *str) const
474 {
475         CkError("[%d] Array element at index %s aborting:\n",
476                 CkMyPe(), idx2str(thisIndexMax));
477         CkMigratable::CkAbort(str);
478 }
479
480 void ArrayElement::recvBroadcast(CkMessage *m){
481 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
482         CkArrayMessage *bcast = (CkArrayMessage *)m;
483     envelope *env = UsrToEnv(m);
484         int epIdx= env->piggyBcastIdx;
485     ckInvokeEntry(epIdx,bcast,CmiTrue);
486 #endif
487 }
488
489 /*********************** Spring Cleaning *****************
490 Periodically (every minute or so) remove expired broadcasts
491 from the queue.
492
493 This does not get called for arrays with stable locations (all
494 insertions done at creation, migration only at discrete points).
495 */
496
497 inline void CkArray::springCleaning(void)
498 {
499   DEBK((AA"Starting spring cleaning\n"AB));
500   broadcaster->springCleaning();
501 }
502
503 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
504         ((CkArray *)forArray)->springCleaning();
505 }
506
507 /********************* Little CkArray Utilities ******************/
508
509 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
510         :CProxy(), _aid(e->ckGetArrayID())
511         {}
512 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
513         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
514         {}
515
516 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
517         {return ckLocalBranch()->getLocMgr(); }
518
519 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
520
521 CkArrayOptions::CkArrayOptions(void) //Default: empty array
522         :numInitial(),map(_defaultArrayMapID)
523 {
524     init();
525 }
526
527 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
528         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
529 {
530     init();
531 }
532
533 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
534         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
535 {
536     init();
537 }
538
539 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
540         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
541 {
542     init();
543 }
544
545 void CkArrayOptions::init()
546 {
547     locMgr.setZero();
548     anytimeMigration = _isAnytimeMigration;
549     staticInsertion = false;
550     reductionClient.type = CkCallback::invalid;
551     disableNotifyChildInRed = !_isNotifyChildInRed;
552 }
553
554 CkArrayOptions &CkArrayOptions::setStaticInsertion(bool b)
555 {
556     staticInsertion = b;
557     if (b && map == _defaultArrayMapID)
558         map = _fastArrayMapID;
559     return *this;
560 }
561
562 /// Bind our elements to this array
563 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
564 {
565         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
566         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
567         // not just initially.
568         //setNumInitial(arr->getNumInitial());
569         return setLocationManager(arr->getLocMgr()->getGroupID());
570 }
571 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
572 {
573         arrayListeners.push_back(listener);
574         return *this;
575 }
576
577 void CkArrayOptions::pup(PUP::er &p) {
578         p|numInitial;
579         p|map;
580         p|locMgr;
581         p|arrayListeners;
582         p|reductionClient;
583         p|anytimeMigration;
584         p|disableNotifyChildInRed;
585         p|staticInsertion;
586 }
587
588 CkArrayListener::CkArrayListener(int nInts_) 
589   :nInts(nInts_) 
590 {
591   dataOffset=-1;
592 }
593 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
594   nInts=-1; dataOffset=-1;
595 }
596 void CkArrayListener::pup(PUP::er &p) {
597   p|nInts;
598   p|dataOffset;
599 }
600
601 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
602 {
603   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
604   dataOffset=dataOffset_;
605 }
606
607 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
608                                           const CkArrayOptions &opts_)
609 {
610   CkArrayOptions opts(opts_);
611   CkGroupID locMgr = opts.getLocationManager();
612   if (locMgr.isZero())
613   { //Create a new location manager
614 #if !CMK_LBDB_ON
615     CkGroupID _lbdb;
616 #endif
617     CkEntryOptions  e_opts;
618     e_opts.setGroupDepID(opts.getMap());       // group creation dependence
619     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial(),&e_opts);
620     opts.setLocationManager(locMgr);
621   }
622   //Create the array manager
623   m->array_ep()=ctor;
624   CkMarshalledMessage marsh(m);
625   CkEntryOptions  e_opts;
626   e_opts.setGroupDepID(locMgr);       // group creation dependence
627 #if !GROUP_LEVEL_REDUCTION
628   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
629   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
630   nodereductionProxy.setAttachedGroup(ag);
631 #else
632   CkNodeGroupID dummyid;
633   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
634 #endif
635   return (CkArrayID)ag;
636 }
637
638 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
639 {
640   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
641 }
642
643 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
644         const CkArrayIndex &idx)
645 {
646   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
647   m->array_ep()=ctor;
648   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
649   if (ckIsDelegated()) {
650         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
651         return;
652   }
653   
654   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
655   CkArrayManagerInsert(onPe,m,_aid);
656 }
657
658 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
659 {
660   ckInsertIdx(m,ctorIndex,onPe,_idx);
661 }
662
663 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
664 {
665   return ckLocalBranch()->lookup(_idx);
666 }
667
668 //pack-unpack method for CProxy_ArrayBase
669 void CProxy_ArrayBase::pup(PUP::er &p)
670 {
671   CProxy::pup(p);
672   _aid.pup(p);
673 }
674 void CProxyElement_ArrayBase::pup(PUP::er &p)
675 {
676   CProxy_ArrayBase::pup(p);
677   p|_idx.nInts;
678   p|_idx.dimension;
679   p(_idx.data(),_idx.nInts);
680 }
681
682 void CProxySection_ArrayBase::pup(PUP::er &p)
683 {
684   CProxy_ArrayBase::pup(p);
685   p | _nsid;
686   if (p.isUnpacking()) {
687     if (_nsid == 1) _sid = new CkSectionID;
688     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
689     else _sid = NULL;
690   }
691   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
692 }
693
694 /*********************** CkArray Creation *************************/
695 void _ckArrayInit(void)
696 {
697   CkpvInitialize(ArrayElement_initInfo,initInfo);
698   CkDisableTracing(CkIndex_CkArray::insertElement(0));
699   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
700     // disable because broadcast listener may deliver broadcast message
701   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
702   // by default anytime migration is allowed
703 }
704
705 CkArray::CkArray(CkArrayOptions &opts,
706                  CkMarshalledMessage &initMsg,
707                  CkNodeGroupID nodereductionID)
708   : CkReductionMgr(),
709     locMgr(CProxy_CkLocMgr::ckLocalBranch(opts.getLocationManager())),
710     locMgrID(opts.getLocationManager()),
711     thisProxy(thisgroup),
712     // Register with our location manager
713     elements((ArrayElementList *)locMgr->addManager(thisgroup,this)),
714     stableLocations(opts.staticInsertion && !opts.anytimeMigration),
715     numInitial(opts.getNumInitial()), isInserting(CmiTrue)
716 {
717   if (!stableLocations)
718       CcdCallOnConditionKeep(CcdPERIODIC_1minute,
719                              staticSpringCleaning, (void *)this);
720   
721   //set the field in one my parent class (CkReductionMgr)
722   if(opts.disableNotifyChildInRed)
723           disableNotifyChildrenStart = CmiTrue; 
724   
725   //Find, register, and initialize the arrayListeners
726   listenerDataOffset=0;
727   broadcaster=new CkArrayBroadcaster(stableLocations);
728   addListener(broadcaster);
729   reducer=new CkArrayReducer(thisgroup);
730   addListener(reducer);
731
732   // COMLIB HACK
733   //calistener = new ComlibArrayListener();
734   //addListener(calistener,dataOffset);
735
736   int lNo,nL=opts.getListeners(); //User-added listeners
737   for (lNo=0;lNo<nL;lNo++) addListener(opts.getListener(lNo));
738
739   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
740
741   ///Set up initial elements (if any)
742   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
743
744   ///adding code for Reduction using nodegroups
745
746 #if !GROUP_LEVEL_REDUCTION
747   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
748   nodeProxy = nodetemp;
749   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
750 #endif
751
752   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
753       ckSetReductionClient(&opts.reductionClient);
754 }
755
756 CkArray::CkArray(CkMigrateMessage *m)
757         :CkReductionMgr(m), thisProxy(thisgroup)
758 {
759   locMgr=NULL;
760   isInserting=CmiTrue;
761 }
762
763 #if CMK_ERROR_CHECKING
764 inline void testPup(PUP::er &p,int shouldBe) {
765   int a=shouldBe;
766   p|a;
767   if (a!=shouldBe)
768     CkAbort("PUP direction mismatch!");
769 }
770 #else
771 inline void testPup(PUP::er &p,int shouldBe) {}
772 #endif
773
774 void CkArray::pup(PUP::er &p){
775         CkReductionMgr::pup(p);
776         p|numInitial;
777         p|locMgrID;
778         p|listeners;
779         p|listenerDataOffset;
780         testPup(p,1234);
781         if(p.isUnpacking()){
782                 thisProxy=thisgroup;
783                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
784                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
785                 /// Restore our default listeners:
786                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
787                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
788         }
789 }
790
791 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
792   int dataOffset=0; \
793   for (int lNo=0;lNo<listeners.size();lNo++) { \
794     CkArrayListener *l=listeners[lNo]; \
795     l->ckElementStamp(&listenerData[dataOffset]); \
796     dataOffset+=l->ckGetLen(); \
797   } \
798 } while (0)
799
800 //Called on send side to prepare array constructor message
801 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
802 {
803   envelope *env=UsrToEnv((void *)m);
804   env->getsetArrayIndex()=idx;
805   int *listenerData=env->getsetArrayListenerData();
806   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
807   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
808   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
809         getLocMgr()->inform(idx,onPe);
810 }
811
812 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
813                         CkElementCreation_t type)
814 {
815         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
816         if (type==CkElementCreation_resume) 
817         { // HACK: Re-stamp elements on checkpoint resume--
818           //  this restores, e.g., reduction manager's gcount
819                 int *listenerData=ret->listenerData;
820                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
821         }
822         return ret;
823 }
824
825 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
826                      CkMessage *msg,CmiBool fromMigration) 
827 {
828         //Stash the element's initialization information in the global "initInfo"
829         ArrayElement_initInfo &init=CkpvAccess(initInfo);
830         init.numInitial=numInitial;
831         init.thisArray=this;
832         init.thisArrayID=thisgroup;
833         if (msg) /*Have to *copy* data because msg will be deleted*/
834           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
835                  sizeof(init.listenerData));
836         init.fromMigration=fromMigration;
837         
838         //Build the element
839         int elSize=_chareTable[elChareType]->size;
840         ArrayElement *elem = (ArrayElement *)malloc(elSize);
841 #ifndef CMK_OPTIMIZE
842         if (elem!=NULL) setMemoryTypeChare(elem);
843 #endif
844         return elem;
845 }
846
847 /// This method is called by ck.C or the user to add an element.
848 CmiBool CkArray::insertElement(CkMessage *me)
849 {
850   CK_MAGICNUMBER_CHECK
851   CkArrayMessage *m=(CkArrayMessage *)me;
852   const CkArrayIndex &idx=m->array_index();
853   int onPe;
854   if (locMgr->isRemote(idx,&onPe)) 
855   { /* element's sibling lives somewhere else, so insert there */
856         CkArrayManagerInsert(onPe,me,thisgroup);
857         return CmiFalse;
858   }
859   int ctorIdx=m->array_ep();
860   int chareType=_entryTable[ctorIdx]->chareIdx;
861   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
862 #ifndef CMK_CHARE_USE_PTR
863   ((Chare *)elt)->chareIdx = -1;
864 #endif
865   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
866   CK_ARRAYLISTENER_LOOP(listeners,
867       if (!l->ckElementCreated(elt)) return CmiFalse;);
868   return CmiTrue;
869 }
870
871 void CProxy_ArrayBase::doneInserting(void)
872 {
873   DEBC((AA"Broadcasting a doneInserting request\n"AB));
874   //Broadcast a DoneInserting
875   CProxy_CkArray(_aid).remoteDoneInserting();
876 }
877
878 void CkArray::doneInserting(void)
879 {
880   thisProxy[CkMyPe()].remoteDoneInserting();
881 }
882
883 /// This is called on every processor after the last array insertion.
884 void CkArray::remoteDoneInserting(void)
885 {
886   CK_MAGICNUMBER_CHECK
887   if (isInserting) {
888     isInserting=CmiFalse;
889     DEBC((AA"Done inserting objects\n"AB));
890     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
891     locMgr->doneInserting();
892   }
893 }
894
895 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
896         int onPe,int ctor,CkDeliver_t type)
897 {
898         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
899         prepareCtorMsg(m,onPe,idx);
900         m->array_ep()=ctor;
901         
902         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
903                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
904                 CkArrayManagerInsert(onPe,m,thisgroup);
905         } else /* local message, non-queued */ {
906                 //Call local constructor directly
907                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
908                 return insertElement(m);
909         }
910         return CmiTrue;
911 }
912
913 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
914 {
915         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
916         if (local) {
917           int onPe=CkMyPe();
918           prepareCtorMsg(m,onPe,idx);
919 #if CMK_BLUEGENE_CHARM
920           BgEntrySplit("split-array-new");
921 #endif
922           insertElement(m);
923         }
924         else {
925           int onPe=-1;
926           prepareCtorMsg(m,onPe,idx);
927           CkArrayManagerInsert(onPe,m,getGroupID());
928         }
929 }
930
931 /********************* CkArray Messaging ******************/
932 /// Fill out a message's array fields before sending it
933 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
934 {
935         envelope *env=UsrToEnv((void *)msg);
936         env->getsetArrayMgr()=aid;
937         env->getsetArraySrcPe()=CkMyPe();
938         env->setEpIdx(ep);
939         env->getsetArrayHops()=0;
940 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
941         criticalPath_send(env);
942         automaticallySetMessagePriority(env);
943 #endif
944 }
945
946
947 /// Just a non-inlined version of msg_prepareSend()
948 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
949 {
950         envelope *env=UsrToEnv((void *)msg);
951         env->getsetArrayMgr()=aid;
952         env->getsetArraySrcPe()=CkMyPe();
953         env->setEpIdx(ep);
954         env->getsetArrayHops()=0;
955 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
956         criticalPath_send(env);
957         automaticallySetMessagePriority(env);
958 #endif
959 }
960
961 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
962 {
963 #if CMK_ERROR_CHECKING
964         //Check our array index for validity
965         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
966         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
967                 CkAbort("Array index length (nInts) is too long-- did you "
968                         "use bytes instead of integers?\n");
969 #endif
970         msg_prepareSend(msg,ep,ckGetArrayID());
971         msg->array_index()=_idx;//Insert array index
972         if (ckIsDelegated()) //Just call our delegateMgr
973           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
974         else 
975         { //Usual case: a direct send
976           CkArray *localbranch = ckLocalBranch();
977           if (localbranch == NULL) {             // array not created yet
978             CkArrayManagerDeliver(CkMyPe(), msg, 0);
979           }
980           else {
981             if (opts & CK_MSG_INLINE)
982               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
983             else
984               localbranch->deliver(msg, CkDeliver_queue, opts);
985           }
986         }
987 }
988
989 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
990 {
991         CkFutureID f=CkCreateAttachedFuture(msg);
992         ckSend(msg,ep);
993         return CkWaitReleaseFuture(f);
994 }
995
996 void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts     )
997 {
998         CProxySection_ArrayBase sp(sID);
999         sp.ckSend((CkArrayMessage *)msg,entryIndex,opts);
1000 }
1001
1002 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
1003 {
1004         if (ckIsDelegated()) //Just call our delegateMgr
1005           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
1006         else {
1007           // send through all
1008           for (int k=0; k<_nsid; ++k) {
1009             for (int i=0; i< _sid[k]._nElems-1; i++) {
1010               CProxyElement_ArrayBase ap(_sid[k]._cookie.aid, _sid[k]._elems[i]);
1011               void *newMsg=CkCopyMsg((void **)&msg);
1012               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1013             }
1014             if (_sid[k]._nElems > 0) {
1015               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
1016               CProxyElement_ArrayBase ap(_sid[k]._cookie.aid, _sid[k]._elems[_sid[k]._nElems-1]);
1017               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
1018             }
1019           }
1020         }
1021 }
1022
1023 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1024 {
1025   CkArrayMessage *m=(CkArrayMessage *)msg;
1026   m->array_index()=idx;
1027   msg_prepareSend(m,entryIndex,aID);
1028   CkArray *a=(CkArray *)_localBranch(aID);
1029   if (a == NULL)
1030     CkArrayManagerDeliver(CkMyPe(), msg, 0);
1031   else
1032     a->deliver(m,CkDeliver_queue,opts);
1033 }
1034
1035 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
1036 {
1037   CkArrayMessage *m=(CkArrayMessage *)msg;
1038   m->array_index()=idx;
1039   msg_prepareSend(m,entryIndex,aID);
1040   CkArray *a=(CkArray *)_localBranch(aID);
1041   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
1042   a->deliver(m,CkDeliver_inline,opts);
1043   if (oldStatus) CkEnableTracing(entryIndex);
1044 }
1045
1046
1047 /*********************** CkArray Reduction *******************/
1048 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
1049   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
1050    mgrID(mgrID_)
1051 {
1052   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1053 }
1054 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
1055   :CkArrayListener(m)
1056 {
1057   mgr=NULL;
1058 }
1059 void CkArrayReducer::pup(PUP::er &p) {
1060   CkArrayListener::pup(p);
1061   p|mgrID;
1062   if (p.isUnpacking())
1063     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
1064 }
1065 CkArrayReducer::~CkArrayReducer() {}
1066
1067 /*********************** CkArray Broadcast ******************/
1068
1069 CkArrayBroadcaster::CkArrayBroadcaster(bool stableLocations_)
1070     :CkArrayListener(1), //Each array element carries a broadcast number
1071      bcastNo(0), oldBcastNo(0), stableLocations(stableLocations_)
1072 { }
1073 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
1074     :CkArrayListener(m), bcastNo(-1), oldBcastNo(-1)
1075 { }
1076
1077 void CkArrayBroadcaster::pup(PUP::er &p) {
1078   CkArrayListener::pup(p);
1079   /* Assumption: no migrants during checkpoint, so no need to
1080      save old broadcasts. */
1081   p|bcastNo;
1082   p|stableLocations;
1083   if (p.isUnpacking()) {
1084     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
1085   }
1086 }
1087
1088 CkArrayBroadcaster::~CkArrayBroadcaster()
1089 {
1090   CkArrayMessage *msg;
1091   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1092 }
1093
1094 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
1095 {
1096   bcastNo++;
1097   DEBB((AA"Received broadcast %d\n"AB,bcastNo));
1098
1099   if (stableLocations)
1100     return;
1101
1102   CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
1103   oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
1104 }
1105
1106 /// Deliver a copy of the given broadcast to the given local element
1107 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast, ArrayElement *el,
1108                                     CmiBool doFree)
1109 {
1110   int &elBcastNo=getData(el);
1111   // if this array element already received this message, skip it
1112   if (elBcastNo >= bcastNo) return CmiFalse;
1113   elBcastNo++;
1114   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
1115   int epIdx=bcast->array_ep_bcast();
1116
1117 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1118   DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
1119   return CmiTrue;
1120 #else
1121   return el->ckInvokeEntry(epIdx, bcast, doFree);
1122 #endif
1123 }
1124
1125 /// Deliver all needed broadcasts to the given local element
1126 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
1127 {
1128   if (stableLocations) return CmiTrue;
1129   int &elBcastNo=getData(el);
1130   if (elBcastNo<bcastNo)
1131   {//This element needs some broadcasts-- it must have
1132    //been migrating during the broadcast.
1133     int i,nDeliver=bcastNo-elBcastNo;
1134     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
1135
1136     //Skip the old junk at the front of the bcast queue
1137     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
1138       oldBcasts.enq(oldBcasts.deq());
1139
1140     //Deliver the newest messages, in old-to-new order
1141     for (i=nDeliver-1;i>=0;i--)
1142     {
1143       CkArrayMessage *msg=oldBcasts.deq();
1144                 if(msg == NULL)
1145                 continue;
1146       oldBcasts.enq(msg);
1147       if (!deliver(msg, el, CmiFalse))
1148         return CmiFalse; //Element migrated away
1149     }
1150   }
1151   //Otherwise, the element survived
1152   return CmiTrue;
1153 }
1154
1155
1156 void CkArrayBroadcaster::springCleaning(void)
1157 {
1158   //Remove old broadcast messages
1159   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
1160   if (nDelete>0) {
1161     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
1162     for (int i=0;i<nDelete;i++)
1163       delete oldBcasts.deq();
1164   }
1165   oldBcastNo=bcastNo;
1166 }
1167
1168 void CkArrayBroadcaster::flushState() 
1169
1170   bcastNo = oldBcastNo = 0; 
1171   CkArrayMessage *msg;
1172   while (NULL!=(msg=oldBcasts.deq())) delete msg;
1173 }
1174
1175 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
1176 {
1177         CProxy_ArrayBase ap(aID);
1178         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
1179 }
1180
1181 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
1182 {
1183         msg->array_ep_bcast()=ep;
1184         if (ckIsDelegated()) //Just call our delegateMgr
1185           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
1186         else 
1187         { //Broadcast message via serializer node
1188           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
1189           int skipsched = opts & CK_MSG_EXPEDITED; 
1190           //int serializer=0;//1623802937%CkNumPes();
1191 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1192                 CProxy_CkArray ap(_aid);
1193                 ap[CpvAccess(serializer)].sendBroadcast(msg);
1194                 CkGroupID _id = _aid;
1195 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
1196 #else
1197           if (CkMyPe()==CpvAccess(serializer))
1198           {
1199                 DEBB((AA"Sending array broadcast\n"AB));
1200                 if (skipsched)
1201                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
1202                 else
1203                         CProxy_CkArray(_aid).recvBroadcast(msg);
1204           } else {
1205                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
1206                 CProxy_CkArray ap(_aid);
1207                 if (skipsched)
1208                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
1209                 else
1210                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1211           }
1212 #endif
1213         }
1214 }
1215
1216 /// Reflect a broadcast off this Pe:
1217 void CkArray::sendBroadcast(CkMessage *msg)
1218 {
1219         CK_MAGICNUMBER_CHECK
1220         if(CkMyPe() == CpvAccess(serializer)){
1221                 //Broadcast the message to all processors
1222                 thisProxy.recvBroadcast(msg);
1223         }else{
1224                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1225         }
1226 }
1227 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1228 {
1229         CK_MAGICNUMBER_CHECK
1230         //Broadcast the message to all processors
1231         thisProxy.recvExpeditedBroadcast(msg);
1232 }
1233
1234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1235 int _tempBroadcastCount=0;
1236
1237 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1238     if(homePe(*index)==CmiMyPe()){
1239         CkArrayMessage *bcast = (CkArrayMessage *)data;
1240     int epIdx=bcast->array_ep_bcast();
1241         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1242         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1243         envelope *env = UsrToEnv(copy);
1244         env->sender.data.group.onPE = CkMyPe();
1245         env->TN  = env->SN=0;
1246         env->piggyBcastIdx = epIdx;
1247         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1248         env->getsetArrayMgr() = thisgroup;
1249         env->getsetArrayIndex() = *index;
1250     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1251         env->setSrcPe(CkMyPe());
1252         rec->deliver(copy,CkDeliver_queue);
1253         _tempBroadcastCount++;
1254     }else{
1255         if(locMgr->homeElementCount != -1){
1256             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1257         }
1258     }
1259 }
1260
1261 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1262     arr->broadcastHomeElements(data,rec,index);
1263 }
1264 #endif
1265
1266
1267 /// Increment broadcast count; deliver to all local elements
1268 void CkArray::recvBroadcast(CkMessage *m)
1269 {
1270         CK_MAGICNUMBER_CHECK
1271         CkArrayMessage *msg=(CkArrayMessage *)m;
1272         broadcaster->incoming(msg);
1273
1274 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1275         _tempBroadcastCount=0;
1276         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1277 #else
1278         //Run through the list of local elements
1279         int idx=0, len=0;
1280         if (stableLocations) {            /* remove all NULLs in the array */
1281           len = 0;
1282           while (elements->next(idx)!=NULL) len++;
1283           idx = 0;
1284         }
1285         ArrayElement *el;
1286 #if CMK_BLUEGENE_CHARM
1287         void *root;
1288         _TRACE_BG_TLINE_END(&root);
1289         BgSetEntryName("start-broadcast", &root);
1290         CkVec<void *> logs;    // store all logs for each delivery
1291         extern void stopVTimer();
1292         extern void startVTimer();
1293 #endif
1294         while (NULL!=(el=elements->next(idx))) {
1295 #if CMK_BLUEGENE_CHARM
1296                 //BgEntrySplit("split-broadcast");
1297                 stopVTimer();
1298                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1299                 logs.push_back(curlog);
1300                 startVTimer();
1301 #endif
1302                 CmiBool doFree = CmiFalse;
1303                 if (stableLocations && idx == len) doFree = CmiTrue;
1304                 broadcaster->deliver(msg, el, doFree);
1305         }
1306 #endif
1307
1308 #if CMK_BLUEGENE_CHARM
1309         //BgEntrySplit("end-broadcast");
1310         stopVTimer();
1311         BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1312         startVTimer();
1313 #endif
1314
1315         // CkArrayBroadcaster doesn't have msg buffered, and there was
1316         // no last delivery to transfer ownership
1317         if (stableLocations && len == 0)
1318           delete msg;
1319 }
1320
1321 #include "CkArray.def.h"
1322
1323