CkMulticast: Support multicasts to cross-array sections
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55 #include "pathHistory.h"
56
57 #if CMK_LBDB_ON
58 #include "LBDatabase.h"
59 #endif // CMK_LBDB_ON
60
61 CpvDeclare(int ,serializer);
62
63 CmiBool _isAnytimeMigration;
64
65 #define ARRAY_DEBUG_OUTPUT 0
66
67 #if ARRAY_DEBUG_OUTPUT 
68 #   define DEB(x) CkPrintf x  //General debug messages
69 #   define DEBI(x) CkPrintf x  //Index debug messages
70 #   define DEBC(x) CkPrintf x  //Construction debug messages
71 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
72 #   define DEBM(x) CkPrintf x  //Migration debug messages
73 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
74 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
75 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
76 #   define AA "ArrayBOC on %d: "
77 #   define AB ,CkMyPe()
78 #   define DEBUG(x) x
79 #else
80 #   define DEB(X) /*CkPrintf x*/
81 #   define DEBI(X) /*CkPrintf x*/
82 #   define DEBC(X) /*CkPrintf x*/
83 #   define DEBS(x) /*CkPrintf x*/
84 #   define DEBM(X) /*CkPrintf x*/
85 #   define DEBL(X) /*CkPrintf x*/
86 #   define DEBK(x) /*CkPrintf x*/
87 #   define DEBB(x) /*CkPrintf x*/
88 #   define str(x) /**/
89 #   define DEBUG(x)
90 #endif
91
92 /*
93 void 
94 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
95        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
96 }
97 */
98 void
99 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndexMax _idx, void *m, int ep, int opts) {
100         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
101         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
102 }
103
104 /*********************** CkVerboseListener ******************/
105 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
106
107 CkVerboseListener::CkVerboseListener(void)
108   :CkArrayListener(0)
109 {
110   VL_PRINT<<"INIT  Creating listener"<<endl;
111 }
112
113 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
114 {
115   CkArrayListener::ckRegister(arrMgr,dataOffset_);
116   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
117 }
118 void CkVerboseListener::ckBeginInserting(void)
119 {
120   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
121 }
122 void CkVerboseListener::ckEndInserting(void)
123 {
124   VL_PRINT<<"INIT  Done inserting elements"<<endl;
125 }
126
127 void CkVerboseListener::ckElementStamp(int *eltInfo)
128 {
129   VL_PRINT<<"LIFE  Stamping element"<<endl;
130 }
131 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
132 {
133   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
134 }
135 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
136 {
137   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
138   return CmiTrue;
139 }
140 void CkVerboseListener::ckElementDied(ArrayElement *elt)
141 {
142   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
143 }
144
145 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
146 {
147   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
148 }
149 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
150 {
151   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
152   return CmiTrue;
153 }
154
155
156 /************************* ArrayElement *******************/
157 class ArrayElement_initInfo {
158 public:
159   CkArray *thisArray;
160   CkArrayID thisArrayID;
161   CkArrayIndexMax numInitial;
162   int listenerData[CK_ARRAYLISTENER_MAXLEN];
163   CmiBool fromMigration;
164 };
165
166 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
167
168 void ArrayElement::initBasics(void)
169 {
170 #if CMK_OUT_OF_CORE
171   if (CkpvAccess(CkSaveRestorePrefetch)) 
172     return; /* Just restoring from disk--don't try to set up anything. */
173 #endif
174 #if CMK_GRID_QUEUE_AVAILABLE
175         grid_queue_interval = 0;
176         grid_queue_threshold = 0;
177         msg_count = 0;
178         msg_count_grid = 0;
179         border_flag = 0;
180
181         grid_queue_interval = CmiGridQueueGetInterval ();
182         grid_queue_threshold = CmiGridQueueGetThreshold ();
183 #endif
184   ArrayElement_initInfo &info=CkpvAccess(initInfo);
185   thisArray=info.thisArray;
186   thisArrayID=info.thisArrayID;
187   numInitialElements=info.numInitial.getCombinedCount();
188   if (info.listenerData) {
189     memcpy(listenerData,info.listenerData,sizeof(listenerData));
190   }
191   if (!info.fromMigration) {
192     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
193                           l->ckElementCreating(this));
194   }
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196         mlogData->objID.type = TypeArray;
197         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
198 #endif
199 }
200
201 ArrayElement::ArrayElement(void) 
202 {
203         initBasics();
204 #if CMK_MEM_CHECKPOINT
205         init_checkpt();
206 #endif
207 }
208
209 ArrayElement::ArrayElement(CkMigrateMessage *m) 
210 {
211         initBasics();
212 }
213
214 //Called by the system just before and after migration to another processor:  
215 void ArrayElement::ckAboutToMigrate(void) {
216         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
217                                 l->ckElementLeaving(this));
218         CkMigratable::ckAboutToMigrate();
219 }
220 void ArrayElement::ckJustMigrated(void) {
221         CkMigratable::ckJustMigrated();
222         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
223               if (!l->ckElementArriving(this)) return;);
224 }
225
226 void ArrayElement::ckJustRestored(void) {
227     CkMigratable::ckJustRestored();
228     //empty for out-of-core emulation
229 }
230
231 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
232    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
233
234 /// Remote method: calls destructor
235 void ArrayElement::ckDestroy(void)
236 {
237         if(_BgOutOfCoreFlag!=1){ //in case of taking core out of memory
238             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
239                            l->ckElementDied(this));
240         }
241         CkMigratable::ckDestroy();
242 }
243
244 //Destructor (virtual)
245 ArrayElement::~ArrayElement()
246 {
247 #if CMK_OUT_OF_CORE
248   if (CkpvAccess(CkSaveRestorePrefetch)) 
249     return; /* Just saving to disk--don't trash anything. */
250 #endif
251   //To detect use-after-delete: 
252   thisArray=(CkArray *)0xDEADa7a1;
253 }
254
255 void ArrayElement::pup(PUP::er &p)
256 {
257   DEBM((AA"  ArrayElement::pup()\n"AB));
258   CkMigratable::pup(p);
259   thisArrayID.pup(p);
260   if (p.isUnpacking())
261         thisArray=thisArrayID.ckLocalBranch();
262   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
263 #if CMK_MEM_CHECKPOINT
264   p(budPEs, 2);
265 #endif
266   p.syncComment(PUP::sync_last_system,"ArrayElement");
267 #if CMK_GRID_QUEUE_AVAILABLE
268   p|grid_queue_interval;
269   p|grid_queue_threshold;
270   p|msg_count;
271   p|msg_count_grid;
272   p|border_flag;
273   if (p.isUnpacking ()) {
274     msg_count = 0;
275     msg_count_grid = 0;
276     border_flag = 0;
277   }
278 #endif
279 }
280
281 char *ArrayElement::ckDebugChareName(void) {
282         char buf[200];
283         const char *className=_chareTable[ckGetChareType()]->name;
284         const int *d=thisIndexMax.data();
285         const short int *s=(const short int*)d;
286         switch (thisIndexMax.dimension) {
287         case 0: sprintf(buf,"%s",className); break;
288         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
289         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
290         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
291     case 4: sprintf(buf,"%s(%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3]); break;
292     case 5: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4]); break;
293     case 6: sprintf(buf,"%s(%hd,%hd,%hd,%hd,%hd,%hd)",className,s[0],s[1],s[2],s[3],s[4],s[5]); break;
294         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
295         };
296         return strdup(buf);
297 }
298
299 int ArrayElement::ckDebugChareID(char *str, int limit) {
300   if (limit<21) return -1;
301   str[0] = 2;
302   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
303   *((CkArrayIndexMax*)&str[5]) = thisIndexMax;
304   return 21;
305 }
306
307 /// A more verbose form of abort
308 void ArrayElement::CkAbort(const char *str) const
309 {
310         CkError("[%d] Array element at index %s aborting:\n",
311                 CkMyPe(), idx2str(thisIndexMax));
312         CkMigratable::CkAbort(str);
313 }
314
315 void ArrayElement::recvBroadcast(CkMessage *m){
316 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
317         CkArrayMessage *bcast = (CkArrayMessage *)m;
318     envelope *env = UsrToEnv(m);
319         int epIdx= env->piggyBcastIdx;
320     ckInvokeEntry(epIdx,bcast,CmiTrue);
321 #endif
322 }
323
324 /*********************** Spring Cleaning *****************
325 Periodically (every minute or so) remove expired broadcasts
326 from the queue.
327 */
328
329 inline void CkArray::springCleaning(void)
330 {
331   DEBK((AA"Starting spring cleaning\n"AB));
332   broadcaster->springCleaning();
333 }
334
335 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
336         ((CkArray *)forArray)->springCleaning();
337 }
338
339 /********************* Little CkArray Utilities ******************/
340
341 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
342         :CProxy(), _aid(e->ckGetArrayID())
343         {}
344 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
345         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
346         {}
347
348 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
349         {return ckLocalBranch()->getLocMgr(); }
350
351 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch())
352
353 CkArrayOptions::CkArrayOptions(void) //Default: empty array
354         :numInitial(0),map(_defaultArrayMapID)
355 {
356         locMgr.setZero();
357 }
358
359 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
360         :numInitial(CkArrayIndex1D(ni1)),map(_defaultArrayMapID)
361 {
362         locMgr.setZero();
363 }
364
365 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
366         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_defaultArrayMapID)
367 {
368         locMgr.setZero();
369 }
370
371 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
372         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_defaultArrayMapID)
373 {
374         locMgr.setZero();
375 }
376
377 /// Bind our elements to this array
378 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
379 {
380         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
381         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
382         // not just initially.
383         //setNumInitial(arr->getNumInitial());
384         return setLocationManager(arr->getLocMgr()->getGroupID());
385 }
386 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
387 {
388         arrayListeners.push_back(listener);
389         return *this;
390 }
391
392 void CkArrayOptions::pup(PUP::er &p) {
393         p|numInitial;
394         p|locMgr;
395         p|map;
396         p|arrayListeners;
397 }
398
399 CkArrayListener::CkArrayListener(int nInts_) 
400   :nInts(nInts_) 
401 {
402   dataOffset=-1;
403 }
404 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
405   nInts=-1; dataOffset=-1;
406 }
407 void CkArrayListener::pup(PUP::er &p) {
408   p|nInts;
409   p|dataOffset;
410 }
411
412 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
413 {
414   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
415   dataOffset=dataOffset_;
416 }
417
418 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
419                                           const CkArrayOptions &opts_)
420 {
421   CkArrayOptions opts(opts_);
422   CkGroupID locMgr = opts.getLocationManager();
423   if (locMgr.isZero())
424   { //Create a new location manager
425 #if !CMK_LBDB_ON
426     CkGroupID _lbdb;
427 #endif
428     locMgr = CProxy_CkLocMgr::ckNew(opts.getMap(),_lbdb,opts.getNumInitial());
429     opts.setLocationManager(locMgr);
430   }
431   //Create the array manager
432   m->array_ep()=ctor;
433   CkMarshalledMessage marsh(m);
434   CkEntryOptions  e_opts;
435   e_opts.setGroupDepID(locMgr);       // group creation dependence
436 #if !GROUP_LEVEL_REDUCTION
437   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
438   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy,&e_opts);
439   nodereductionProxy.setAttachedGroup(ag);
440 #else
441   CkNodeGroupID dummyid;
442   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,dummyid,&e_opts);
443 #endif
444   return (CkArrayID)ag;
445 }
446
447 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
448 {
449   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
450 }
451
452 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
453         const CkArrayIndex &idx)
454 {
455   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
456   m->array_ep()=ctor;
457   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
458   if (ckIsDelegated()) {
459         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
460         return;
461   }
462   
463   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
464   CkArrayManagerInsert(onPe,m,_aid);
465 }
466
467 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
468 {
469   ckInsertIdx(m,ctorIndex,onPe,_idx);
470 }
471
472 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
473 {
474   return ckLocalBranch()->lookup(_idx);
475 }
476
477 //pack-unpack method for CProxy_ArrayBase
478 void CProxy_ArrayBase::pup(PUP::er &p)
479 {
480   CProxy::pup(p);
481   _aid.pup(p);
482 }
483 void CProxyElement_ArrayBase::pup(PUP::er &p)
484 {
485   CProxy_ArrayBase::pup(p);
486   p|_idx.nInts;
487   p|_idx.dimension;
488   p(_idx.data(),_idx.nInts);
489 }
490
491 void CProxySection_ArrayBase::pup(PUP::er &p)
492 {
493   CProxy_ArrayBase::pup(p);
494   p | _nsid;
495   if (p.isUnpacking()) {
496     if (_nsid == 1) _sid = new CkSectionID;
497     else if (_nsid > 1) _sid = new CkSectionID[_nsid];
498     else _sid = NULL;
499   }
500   for (int i=0; i<_nsid; ++i) _sid[i].pup(p);
501 }
502
503 /*********************** CkArray Creation *************************/
504 void _ckArrayInit(void)
505 {
506   CkpvInitialize(ArrayElement_initInfo,initInfo);
507   CkDisableTracing(CkIndex_CkArray::insertElement(0));
508   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
509     // disable because broadcast listener may deliver broadcast message
510   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
511   // by default anytime migration is allowed
512 }
513
514 CkArray::CkArray(CkArrayOptions &c,CkMarshalledMessage &initMsg,CkNodeGroupID nodereductionID)
515   : CkReductionMgr(),
516   locMgr(CProxy_CkLocMgr::ckLocalBranch(c.getLocationManager())),locMgrID(c.getLocationManager()),
517   thisProxy(thisgroup)
518 {
519   //Registration
520   elements=(ArrayElementList *)locMgr->addManager(thisgroup,this);
521 //  moved to _ckArrayInit()
522 //  CkpvInitialize(ArrayElement_initInfo,initInfo);
523   CcdCallOnConditionKeep(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this);
524
525   //Set class variables
526   numInitial=c.getNumInitial();
527   isInserting=CmiTrue;
528
529   //Find, register, and initialize the arrayListeners
530   listenerDataOffset=0;
531   broadcaster=new CkArrayBroadcaster();
532   addListener(broadcaster);
533   reducer=new CkArrayReducer(thisgroup);
534   addListener(reducer);
535
536   // COMLIB HACK
537   //calistener = new ComlibArrayListener();
538   //addListener(calistener,dataOffset);
539
540   int lNo,nL=c.getListeners(); //User-added listeners
541   for (lNo=0;lNo<nL;lNo++) addListener(c.getListener(lNo));
542
543   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
544
545   ///Set up initial elements (if any)
546   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
547
548   ///adding code for Reduction using nodegroups
549
550 #if !GROUP_LEVEL_REDUCTION
551   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
552   nodeProxy = nodetemp;
553   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
554 #endif
555 }
556
557 CkArray::CkArray(CkMigrateMessage *m)
558         :CkReductionMgr(m), thisProxy(thisgroup)
559 {
560   locMgr=NULL;
561   isInserting=CmiTrue;
562 }
563
564 #ifndef CMK_OPTIMIZE
565 inline void testPup(PUP::er &p,int shouldBe) {
566   int a=shouldBe;
567   p|a;
568   if (a!=shouldBe)
569     CkAbort("PUP direction mismatch!");
570 }
571 #else
572 inline void testPup(PUP::er &p,int shouldBe) {}
573 #endif
574
575 void CkArray::pup(PUP::er &p){
576         CkReductionMgr::pup(p);
577         p|numInitial;
578         p|locMgrID;
579         p|listeners;
580         p|listenerDataOffset;
581         testPup(p,1234);
582         if(p.isUnpacking()){
583                 thisProxy=thisgroup;
584                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
585                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
586                 /// Restore our default listeners:
587                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
588                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
589         }
590 }
591
592 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
593   int dataOffset=0; \
594   for (int lNo=0;lNo<listeners.size();lNo++) { \
595     CkArrayListener *l=listeners[lNo]; \
596     l->ckElementStamp(&listenerData[dataOffset]); \
597     dataOffset+=l->ckGetLen(); \
598   } \
599 } while (0)
600
601 //Called on send side to prepare array constructor message
602 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
603 {
604   envelope *env=UsrToEnv((void *)m);
605   env->getsetArrayIndex()=idx;
606   int *listenerData=env->getsetArrayListenerData();
607   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
608   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
609   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
610         getLocMgr()->inform(idx,onPe);
611 }
612
613 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
614                         CkElementCreation_t type)
615 {
616         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
617         if (type==CkElementCreation_resume) 
618         { // HACK: Re-stamp elements on checkpoint resume--
619           //  this restores, e.g., reduction manager's gcount
620                 int *listenerData=ret->listenerData;
621                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
622         }
623         return ret;
624 }
625
626 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
627                      CkMessage *msg,CmiBool fromMigration) 
628 {
629         //Stash the element's initialization information in the global "initInfo"
630         ArrayElement_initInfo &init=CkpvAccess(initInfo);
631         init.numInitial=numInitial;
632         init.thisArray=this;
633         init.thisArrayID=thisgroup;
634         if (msg) /*Have to *copy* data because msg will be deleted*/
635           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
636                  sizeof(init.listenerData));
637         init.fromMigration=fromMigration;
638         
639         //Build the element
640         int elSize=_chareTable[elChareType]->size;
641         ArrayElement *elem = (ArrayElement *)malloc(elSize);
642 #ifndef CMK_OPTIMIZE
643         if (elem!=NULL) setMemoryTypeChare(elem);
644 #endif
645         return elem;
646 }
647
648 /// This method is called by ck.C or the user to add an element.
649 CmiBool CkArray::insertElement(CkMessage *me)
650 {
651   CK_MAGICNUMBER_CHECK
652   CkArrayMessage *m=(CkArrayMessage *)me;
653   const CkArrayIndex &idx=m->array_index();
654   int onPe;
655   if (locMgr->isRemote(idx,&onPe)) 
656   { /* element's sibling lives somewhere else, so insert there */
657         CkArrayManagerInsert(onPe,me,thisgroup);
658         return CmiFalse;
659   }
660   int ctorIdx=m->array_ep();
661   int chareType=_entryTable[ctorIdx]->chareIdx;
662   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
663 #ifndef CMK_CHARE_USE_PTR
664   ((Chare *)elt)->chareIdx = -1;
665 #endif
666   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
667   CK_ARRAYLISTENER_LOOP(listeners,
668       if (!l->ckElementCreated(elt)) return CmiFalse;);
669   return CmiTrue;
670 }
671
672 void CProxy_ArrayBase::doneInserting(void)
673 {
674   DEBC((AA"Broadcasting a doneInserting request\n"AB));
675   //Broadcast a DoneInserting
676   CProxy_CkArray(_aid).remoteDoneInserting();
677 }
678
679 void CkArray::doneInserting(void)
680 {
681   thisProxy[CkMyPe()].remoteDoneInserting();
682 }
683
684 /// This is called on every processor after the last array insertion.
685 void CkArray::remoteDoneInserting(void)
686 {
687   CK_MAGICNUMBER_CHECK
688   if (isInserting) {
689     isInserting=CmiFalse;
690     DEBC((AA"Done inserting objects\n"AB));
691     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
692     locMgr->doneInserting();
693   }
694 }
695
696 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
697         int onPe,int ctor,CkDeliver_t type)
698 {
699         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
700         prepareCtorMsg(m,onPe,idx);
701         m->array_ep()=ctor;
702         
703         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
704                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
705                 CkArrayManagerInsert(onPe,m,thisgroup);
706         } else /* local message, non-queued */ {
707                 //Call local constructor directly
708                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
709                 return insertElement(m);
710         }
711         return CmiTrue;
712 }
713
714 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
715 {
716         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
717         if (local) {
718           int onPe=CkMyPe();
719           prepareCtorMsg(m,onPe,idx);
720 #if CMK_BLUEGENE_CHARM
721           BgEntrySplit("split-array-new");
722 #endif
723           insertElement(m);
724         }
725         else {
726           int onPe=-1;
727           prepareCtorMsg(m,onPe,idx);
728           CkArrayManagerInsert(onPe,m,getGroupID());
729         }
730 }
731
732 /********************* CkArray Messaging ******************/
733 /// Fill out a message's array fields before sending it
734 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
735 {
736         envelope *env=UsrToEnv((void *)msg);
737         env->getsetArrayMgr()=aid;
738         env->getsetArraySrcPe()=CkMyPe();
739         env->setEpIdx(ep);
740         env->getsetArrayHops()=0;
741 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
742         criticalPath_send(env);
743         automaticallySetMessagePriority(env);
744 #endif
745 }
746
747
748 /// Just a non-inlined version of msg_prepareSend()
749 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid)
750 {
751         envelope *env=UsrToEnv((void *)msg);
752         env->getsetArrayMgr()=aid;
753         env->getsetArraySrcPe()=CkMyPe();
754         env->setEpIdx(ep);
755         env->getsetArrayHops()=0;
756 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
757         criticalPath_send(env);
758         automaticallySetMessagePriority(env);
759 #endif
760 }
761
762 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
763 {
764 #ifndef CMK_OPTIMIZE
765         //Check our array index for validity
766         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
767         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
768                 CkAbort("Array index length (nInts) is too long-- did you "
769                         "use bytes instead of integers?\n");
770 #endif
771         msg_prepareSend(msg,ep,ckGetArrayID());
772         msg->array_index()=_idx;//Insert array index
773         if (ckIsDelegated()) //Just call our delegateMgr
774           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
775         else 
776         { //Usual case: a direct send
777           CkArray *localbranch = ckLocalBranch();
778           if (localbranch == NULL) {             // array not created yet
779             CkArrayManagerDeliver(CkMyPe(), msg, 0);
780           }
781           else {
782             if (opts & CK_MSG_INLINE)
783               localbranch->deliver(msg, CkDeliver_inline, opts & (~CK_MSG_INLINE));
784             else
785               localbranch->deliver(msg, CkDeliver_queue, opts);
786           }
787         }
788 }
789
790 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
791 {
792         CkFutureID f=CkCreateAttachedFuture(msg);
793         ckSend(msg,ep);
794         return CkWaitReleaseFuture(f);
795 }
796
797 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
798 {
799         if (ckIsDelegated()) //Just call our delegateMgr
800           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(), ep, msg, _nsid, _sid, opts);
801         else {
802           // send through all
803           for (int k=0; k<_nsid; ++k) {
804             for (int i=0; i< _sid[k]._nElems-1; i++) {
805               CProxyElement_ArrayBase ap(_sid[k]._cookie.aid, _sid[k]._elems[i]);
806               void *newMsg=CkCopyMsg((void **)&msg);
807               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
808             }
809             if (_sid[k]._nElems > 0) {
810               void *newMsg= (k<_nsid-1) ? CkCopyMsg((void **)&msg) : msg;
811               CProxyElement_ArrayBase ap(_sid[k]._cookie.aid, _sid[k]._elems[_sid[k]._nElems-1]);
812               ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
813             }
814           }
815         }
816 }
817
818 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
819 {
820   CkArrayMessage *m=(CkArrayMessage *)msg;
821   m->array_index()=idx;
822   msg_prepareSend(m,entryIndex,aID);
823   CkArray *a=(CkArray *)_localBranch(aID);
824   if (a == NULL)
825     CkArrayManagerDeliver(CkMyPe(), msg, 0);
826   else
827     a->deliver(m,CkDeliver_queue,opts);
828 }
829
830 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
831 {
832   CkArrayMessage *m=(CkArrayMessage *)msg;
833   m->array_index()=idx;
834   msg_prepareSend(m,entryIndex,aID);
835   CkArray *a=(CkArray *)_localBranch(aID);
836   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
837   a->deliver(m,CkDeliver_inline,opts);
838   if (oldStatus) CkEnableTracing(entryIndex);
839 }
840
841
842 /*********************** CkArray Reduction *******************/
843 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
844   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
845    mgrID(mgrID_)
846 {
847   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
848 }
849 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
850   :CkArrayListener(m)
851 {
852   mgr=NULL;
853 }
854 void CkArrayReducer::pup(PUP::er &p) {
855   CkArrayListener::pup(p);
856   p|mgrID;
857   if (p.isUnpacking())
858     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
859 }
860 CkArrayReducer::~CkArrayReducer() {}
861
862 /*********************** CkArray Broadcast ******************/
863
864 CkArrayBroadcaster::CkArrayBroadcaster(void)
865   :CkArrayListener(1) //Each array element carries a broadcast number
866 {
867   bcastNo=oldBcastNo=0;
868 }
869 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
870         :CkArrayListener(m) { bcastNo=-1; oldBcastNo=-1; }
871 void CkArrayBroadcaster::pup(PUP::er &p) {
872   CkArrayListener::pup(p);
873   /* Assumption: no migrants during checkpoint, so no need to
874      save old broadcasts. */
875   p|bcastNo;
876   if (p.isUnpacking()) {
877     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
878   }
879 }
880 CkArrayBroadcaster::~CkArrayBroadcaster()
881 {
882   CkArrayMessage *msg;
883   while (NULL!=(msg=oldBcasts.deq())) delete msg;
884 }
885
886 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
887 {
888   bcastNo++;
889   if (_isAnytimeMigration) {
890     DEBB((AA"Received broadcast %d\n"AB,bcastNo));
891     CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
892     oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
893   }
894 }
895
896 /// Deliver a copy of the given broadcast to the given local element
897 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast,ArrayElement *el)
898 {
899   int &elBcastNo=getData(el);
900   // if this array element already received this message, skip it
901   if (elBcastNo >= bcastNo) return CmiFalse;
902   elBcastNo++;
903   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
904   int epIdx=bcast->array_ep_bcast();
905
906 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
907         DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
908         return true;
909 #else
910   return el->ckInvokeEntry(epIdx,bcast,CmiFalse);
911 #endif
912 }
913
914 /// Deliver all needed broadcasts to the given local element
915 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
916 {
917   if (! _isAnytimeMigration) return CmiTrue;
918   int &elBcastNo=getData(el);
919   if (elBcastNo<bcastNo)
920   {//This element needs some broadcasts-- it must have
921    //been migrating during the broadcast.
922     int i,nDeliver=bcastNo-elBcastNo;
923     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
924
925     //Skip the old junk at the front of the bcast queue
926     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
927       oldBcasts.enq(oldBcasts.deq());
928
929     //Deliver the newest messages, in old-to-new order
930     for (i=nDeliver-1;i>=0;i--)
931     {
932       CkArrayMessage *msg=oldBcasts.deq();
933                 if(msg == NULL)
934                 continue;
935       oldBcasts.enq(msg);
936       if (!deliver(msg,el))
937         return CmiFalse; //Element migrated away
938     }
939   }
940   //Otherwise, the element survived
941   return CmiTrue;
942 }
943
944
945 void CkArrayBroadcaster::springCleaning(void)
946 {
947   if (! _isAnytimeMigration) return;
948   //Remove old broadcast messages
949   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
950   if (nDelete>0) {
951     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
952     for (int i=0;i<nDelete;i++)
953       delete oldBcasts.deq();
954   }
955   oldBcastNo=bcastNo;
956 }
957
958 void CkArrayBroadcaster::flushState() 
959
960   bcastNo = oldBcastNo = 0; 
961   CkArrayMessage *msg;
962   while (NULL!=(msg=oldBcasts.deq())) delete msg;
963 }
964
965 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
966 {
967         CProxy_ArrayBase ap(aID);
968         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
969 }
970
971 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
972 {
973         msg->array_ep_bcast()=ep;
974         if (ckIsDelegated()) //Just call our delegateMgr
975           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
976         else 
977         { //Broadcast message via serializer node
978           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
979           int skipsched = opts & CK_MSG_EXPEDITED; 
980           //int serializer=0;//1623802937%CkNumPes();
981 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
982                 CProxy_CkArray ap(_aid);
983                 ap[CpvAccess(serializer)].sendBroadcast(msg);
984                 CkGroupID _id = _aid;
985 //              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
986 #else
987           if (CkMyPe()==CpvAccess(serializer))
988           {
989                 DEBB((AA"Sending array broadcast\n"AB));
990                 if (skipsched)
991                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
992                 else
993                         CProxy_CkArray(_aid).recvBroadcast(msg);
994           } else {
995                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
996                 CProxy_CkArray ap(_aid);
997                 if (skipsched)
998                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
999                 else
1000                         ap[CpvAccess(serializer)].sendBroadcast(msg);
1001           }
1002 #endif
1003         }
1004 }
1005
1006 /// Reflect a broadcast off this Pe:
1007 void CkArray::sendBroadcast(CkMessage *msg)
1008 {
1009         CK_MAGICNUMBER_CHECK
1010         if(CkMyPe() == CpvAccess(serializer)){
1011                 //Broadcast the message to all processors
1012                 thisProxy.recvBroadcast(msg);
1013         }else{
1014                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
1015         }
1016 }
1017 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
1018 {
1019         CK_MAGICNUMBER_CHECK
1020         //Broadcast the message to all processors
1021         thisProxy.recvExpeditedBroadcast(msg);
1022 }
1023
1024 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1025 int _tempBroadcastCount=0;
1026
1027 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
1028     if(homePe(*index)==CmiMyPe()){
1029         CkArrayMessage *bcast = (CkArrayMessage *)data;
1030     int epIdx=bcast->array_ep_bcast();
1031         DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
1032         CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
1033         envelope *env = UsrToEnv(copy);
1034         env->sender.data.group.onPE = CkMyPe();
1035         env->TN  = env->SN=0;
1036         env->piggyBcastIdx = epIdx;
1037         env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
1038         env->getsetArrayMgr() = thisgroup;
1039         env->getsetArrayIndex() = *index;
1040     env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
1041         env->setSrcPe(CkMyPe());
1042         rec->deliver(copy,CkDeliver_queue);
1043         _tempBroadcastCount++;
1044     }else{
1045         if(locMgr->homeElementCount != -1){
1046             DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
1047         }
1048     }
1049 }
1050
1051 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
1052     arr->broadcastHomeElements(data,rec,index);
1053 }
1054 #endif
1055
1056
1057 /// Increment broadcast count; deliver to all local elements
1058 void CkArray::recvBroadcast(CkMessage *m)
1059 {
1060         CK_MAGICNUMBER_CHECK
1061         CkArrayMessage *msg=(CkArrayMessage *)m;
1062         broadcaster->incoming(msg);
1063 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1064         _tempBroadcastCount=0;
1065         locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
1066 #else
1067
1068         //Run through the list of local elements
1069         int idx=0;
1070         ArrayElement *el;
1071 #if CMK_BLUEGENE_CHARM
1072         void *root;
1073         _TRACE_BG_TLINE_END(&root);
1074         BgSetEntryName("start-broadcast", &root);
1075         CkVec<void *> logs;    // store all logs for each delivery
1076         extern void stopVTimer();
1077         extern void startVTimer();
1078 #endif
1079         while (NULL!=(el=elements->next(idx))) {
1080 #if CMK_BLUEGENE_CHARM
1081 //                BgEntrySplit("split-broadcast");
1082                 stopVTimer();
1083                 void *curlog = BgSplitEntry("split-broadcast", &root, 1);
1084                 logs.push_back(curlog);
1085                 startVTimer();
1086 #endif
1087                 broadcaster->deliver(msg,el);
1088         }
1089 #endif
1090
1091 #if CMK_BLUEGENE_CHARM
1092 //                BgEntrySplit("end-broadcast");
1093                 stopVTimer();
1094                 BgSplitEntry("end-broadcast", logs.getVec(), logs.size());
1095                 startVTimer();
1096 #endif
1097         if (! _isAnytimeMigration) {
1098           delete msg;
1099         }
1100 }
1101
1102 #include "CkArray.def.h"
1103
1104