Changes for out-of-core emulation in BigSim. Details could be referred to Chao Mei...
[charm.git] / src / ck-core / ckarray.C
1 /**
2 \file
3 \addtogroup CkArray
4
5 An Array is a collection of array elements (Chares) which
6 can be indexed by an arbitary run of bytes (a CkArrayIndex).
7 Elements can be inserted or removed from the array,
8 or migrated between processors.  Arrays are integrated with
9 the run-time load balancer.
10 Elements can also receive broadcasts and participate in
11 reductions.
12
13 Here's a list, valid in 2003/12, of all the different 
14 code paths used to create array elements:
15
16 1.) Initial inserts: all at once
17 CProxy_foo::ckNew(msg,n);
18  CProxy_ArrayBase::ckCreateArray
19   CkArray::CkArray
20    CkLocMgr::populateInitial(numInitial)
21     for (idx=...)
22      if (map->procNum(idx)==thisPe) 
23       CkArray::insertInitial
24        CkArray::prepareCtorMsg
25        CkArray::insertElement
26
27 2.) Initial inserts: one at a time
28 fooProxy[idx].insert(msg,n);
29  CProxy_ArrayBase::ckInsertIdx
30   CkArray::prepareCtorMsg
31   CkArrayManagerInsert
32    CkArray::insertElement
33
34 3.) Demand creation (receive side)
35 CkLocMgr::deliver
36  CkLocMgr::deliverUnknown
37   CkLocMgr::demandCreateElement
38    CkArray::demandCreateElement
39     CkArray::prepareCtorMsg
40     CkArrayManagerInsert or direct CkArray::insertElement
41
42 4.) Migration (receive side)
43 CkLocMgr::migrateIncoming
44  CkLocMgr::pupElementsFor
45   CkArray::allocateMigrated
46
47
48
49 Converted from 1-D arrays 2/27/2000 by
50 Orion Sky Lawlor, olawlor@acm.org
51 */
52 #include "charm++.h"
53 #include "register.h"
54 #include "ck.h"
55
56 #if CMK_LBDB_ON
57 #include "LBDatabase.h"
58 #endif // CMK_LBDB_ON
59
60 CpvDeclare(int ,serializer);
61
62 CmiBool isAnytimeMigration;
63
64 /************************** Debugging Utilities **************/
65
66 //For debugging: convert given index to a string (NOT threadsafe)
67 static const char *idx2str(const CkArrayIndex &ind)
68 {
69   static char retBuf[80];
70   retBuf[0]=0;
71   for (int i=0;i<ind.nInts;i++)
72   {
73         if (i>0) strcat(retBuf,";");
74         sprintf(&retBuf[strlen(retBuf)],"%d",ind.data()[i]);
75   }
76   return retBuf;
77 }
78 static const char *idx2str(const ArrayElement *el)
79   {return idx2str(el->thisIndexMax);}
80
81 #define ARRAY_DEBUG_OUTPUT 0
82
83 #if ARRAY_DEBUG_OUTPUT 
84 #   define DEB(x) CkPrintf x  //General debug messages
85 #   define DEBI(x) CkPrintf x  //Index debug messages
86 #   define DEBC(x) CkPrintf x  //Construction debug messages
87 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
88 #   define DEBM(x) CkPrintf x  //Migration debug messages
89 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
90 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
91 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
92 #   define AA "ArrayBOC on %d: "
93 #   define AB ,CkMyPe()
94 #else
95 #   define DEB(X) /*CkPrintf x*/
96 #   define DEBI(X) /*CkPrintf x*/
97 #   define DEBC(X) /*CkPrintf x*/
98 #   define DEBS(x) /*CkPrintf x*/
99 #   define DEBM(X) /*CkPrintf x*/
100 #   define DEBL(X) /*CkPrintf x*/
101 #   define DEBK(x) /*CkPrintf x*/
102 #   define DEBB(x) /*CkPrintf x*/
103 #   define str(x) /**/
104 #endif
105
106 inline CkArrayIndexMax &CkArrayMessage::array_index(void)
107 {
108         return UsrToEnv((void *)this)->getsetArrayIndex();
109 }
110
111 /*
112 void 
113 CProxyElement_ArrayBase::ckSendWrapper(void *me, void *m, int ep, int opts){
114        ((CProxyElement_ArrayBase*)me)->ckSend((CkArrayMessage*)m,ep,opts);
115 }
116 */
117 void
118 CProxyElement_ArrayBase::ckSendWrapper(CkArrayID _aid, CkArrayIndexMax _idx, void *m, int ep, int opts) {
119         CProxyElement_ArrayBase me = CProxyElement_ArrayBase(_aid,_idx);
120         ((CProxyElement_ArrayBase)me).ckSend((CkArrayMessage*)m,ep,opts);
121 }
122
123 /*********************** CkVerboseListener ******************/
124 #define VL_PRINT ckout<<"VerboseListener on PE "<<CkMyPe()<<" > "
125
126 CkVerboseListener::CkVerboseListener(void)
127   :CkArrayListener(0)
128 {
129   VL_PRINT<<"INIT  Creating listener"<<endl;
130 }
131
132 void CkVerboseListener::ckRegister(CkArray *arrMgr,int dataOffset_)
133 {
134   CkArrayListener::ckRegister(arrMgr,dataOffset_);
135   VL_PRINT<<"INIT  Registering array manager at offset "<<dataOffset_<<endl;
136 }
137 void CkVerboseListener::ckBeginInserting(void)
138 {
139   VL_PRINT<<"INIT  Begin inserting elements"<<endl;
140 }
141 void CkVerboseListener::ckEndInserting(void)
142 {
143   VL_PRINT<<"INIT  Done inserting elements"<<endl;
144 }
145
146 void CkVerboseListener::ckElementStamp(int *eltInfo)
147 {
148   VL_PRINT<<"LIFE  Stamping element"<<endl;
149 }
150 void CkVerboseListener::ckElementCreating(ArrayElement *elt)
151 {
152   VL_PRINT<<"LIFE  About to create element "<<idx2str(elt)<<endl;
153 }
154 CmiBool CkVerboseListener::ckElementCreated(ArrayElement *elt)
155 {
156   VL_PRINT<<"LIFE  Created element "<<idx2str(elt)<<endl;
157   return CmiTrue;
158 }
159 void CkVerboseListener::ckElementDied(ArrayElement *elt)
160 {
161   VL_PRINT<<"LIFE  Deleting element "<<idx2str(elt)<<endl;
162 }
163
164 void CkVerboseListener::ckElementLeaving(ArrayElement *elt)
165 {
166   VL_PRINT<<"MIG  Leaving: element "<<idx2str(elt)<<endl;
167 }
168 CmiBool CkVerboseListener::ckElementArriving(ArrayElement *elt)
169 {
170   VL_PRINT<<"MIG  Arriving: element "<<idx2str(elt)<<endl;
171   return CmiTrue;
172 }
173
174
175 /************************* ArrayElement *******************/
176 class ArrayElement_initInfo {
177 public:
178   CkArray *thisArray;
179   CkArrayID thisArrayID;
180   CkArrayIndexMax numInitial;
181   int listenerData[CK_ARRAYLISTENER_MAXLEN];
182   CmiBool fromMigration;
183 };
184
185 CkpvStaticDeclare(ArrayElement_initInfo,initInfo);
186
187 void ArrayElement::initBasics(void)
188 {
189 #if CMK_OUT_OF_CORE
190   if (CkpvAccess(CkSaveRestorePrefetch)) 
191     return; /* Just restoring from disk--don't try to set up anything. */
192 #endif
193 #if CMK_GRID_QUEUE_AVAILABLE
194         grid_queue_interval = 0;
195         grid_queue_threshold = 0;
196         msg_count = 0;
197         msg_count_grid = 0;
198         border_flag = 0;
199
200         grid_queue_interval = CmiGridQueueGetInterval ();
201         grid_queue_threshold = CmiGridQueueGetThreshold ();
202 #endif
203   ArrayElement_initInfo &info=CkpvAccess(initInfo);
204   thisArray=info.thisArray;
205   thisArrayID=info.thisArrayID;
206   numElements=info.numInitial.getCombinedCount();
207   if (info.listenerData) {
208     memcpy(listenerData,info.listenerData,sizeof(listenerData));
209   }
210   if (!info.fromMigration) {
211     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
212                           l->ckElementCreating(this));
213   }
214 }
215
216 ArrayElement::ArrayElement(void) 
217 {
218         initBasics();
219 #if CMK_MEM_CHECKPOINT
220         init_checkpt();
221 #endif
222 }
223
224 ArrayElement::ArrayElement(CkMigrateMessage *m) 
225 {
226         initBasics();
227 }
228
229 //Called by the system just before and after migration to another processor:  
230 void ArrayElement::ckAboutToMigrate(void) {
231         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
232                                 l->ckElementLeaving(this));
233         CkMigratable::ckAboutToMigrate();
234 }
235 void ArrayElement::ckJustMigrated(void) {
236         CkMigratable::ckJustMigrated();
237         CK_ARRAYLISTENER_LOOP(thisArray->listeners,
238               if (!l->ckElementArriving(this)) return;);
239 }
240
241 void ArrayElement::ckJustRestored(void) {
242     CkMigratable::ckJustRestored();
243     //empty for out-of-core emulation
244 }
245
246 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
247    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true);
248
249 /// Remote method: calls destructor
250 void ArrayElement::ckDestroy(void)
251 {
252         if(BgOutOfCoreFlag!=1){ //in case of taking core out of memory
253             CK_ARRAYLISTENER_LOOP(thisArray->listeners,
254                            l->ckElementDied(this));
255         }
256         CkMigratable::ckDestroy();
257 }
258
259 //Destructor (virtual)
260 ArrayElement::~ArrayElement()
261 {
262 #if CMK_OUT_OF_CORE
263   if (CkpvAccess(CkSaveRestorePrefetch)) 
264     return; /* Just saving to disk--don't trash anything. */
265 #endif
266   //To detect use-after-delete: 
267   thisArray=(CkArray *)0xDEADa7a1;
268 }
269
270 void ArrayElement::pup(PUP::er &p)
271 {
272   DEBM((AA"  ArrayElement::pup()\n"AB));
273   CkMigratable::pup(p);
274   thisArrayID.pup(p);
275   if (p.isUnpacking())
276         thisArray=thisArrayID.ckLocalBranch();
277   p(listenerData,CK_ARRAYLISTENER_MAXLEN);
278 #if CMK_MEM_CHECKPOINT
279   p(budPEs, 2);
280 #endif
281   p.syncComment(PUP::sync_last_system,"ArrayElement");
282 #if CMK_GRID_QUEUE_AVAILABLE
283   p|grid_queue_interval;
284   p|grid_queue_threshold;
285   p|msg_count;
286   p|msg_count_grid;
287   p|border_flag;
288   if (p.isUnpacking ()) {
289     msg_count = 0;
290     msg_count_grid = 0;
291     border_flag = 0;
292   }
293 #endif
294 }
295
296 char *ArrayElement::ckDebugChareName(void) {
297         char buf[200];
298         const char *className=_chareTable[ckGetChareType()]->name;
299         const int *d=thisIndexMax.data();
300         switch (thisIndexMax.nInts) {
301         case 0: sprintf(buf,"%s",className); break;
302         case 1: sprintf(buf,"%s[%d]",className,d[0]); break;
303         case 2: sprintf(buf,"%s(%d,%d)",className,d[0],d[1]); break;
304         case 3: sprintf(buf,"%s(%d,%d,%d)",className,d[0],d[1],d[2]); break;
305         default: sprintf(buf,"%s(%d,%d,%d,%d..)",className,d[0],d[1],d[2],d[3]); break;
306         };
307         return strdup(buf);
308 }
309
310 int ArrayElement::ckDebugChareID(char *str, int limit) {
311   if (limit<21) return -1;
312   str[0] = 2;
313   *((int*)&str[1]) = ((CkGroupID)thisArrayID).idx;
314   *((CkArrayIndexMax*)&str[5]) = thisIndexMax;
315   return 21;
316 }
317
318 /// A more verbose form of abort
319 void ArrayElement::CkAbort(const char *str) const
320 {
321         CkError("[%d] Array element at index %s aborting:\n",
322                 CkMyPe(), idx2str(thisIndexMax));
323         CkMigratable::CkAbort(str);
324 }
325
326 /*********************** Spring Cleaning *****************
327 Periodically (every minute or so) remove expired broadcasts
328 from the queue.
329 */
330
331 inline void CkArray::springCleaning(void)
332 {
333   DEBK((AA"Starting spring cleaning\n"AB));
334   broadcaster->springCleaning();
335 }
336
337 void CkArray::staticSpringCleaning(void *forArray,double curWallTime) {
338         ((CkArray *)forArray)->springCleaning();
339 }
340
341 /********************* Little CkArray Utilities ******************/
342
343 CProxy_ArrayBase::CProxy_ArrayBase(const ArrayElement *e)
344         :CProxy(), _aid(e->ckGetArrayID())
345         {}
346 CProxyElement_ArrayBase::CProxyElement_ArrayBase(const ArrayElement *e)
347         :CProxy_ArrayBase(e), _idx(e->ckGetArrayIndex())
348         {}
349
350 CkLocMgr *CProxy_ArrayBase::ckLocMgr(void) const
351         {return ckLocalBranch()->getLocMgr(); }
352
353 CK_REDUCTION_CLIENT_DEF(CProxy_ArrayBase,ckLocalBranch());
354
355 CkArrayOptions::CkArrayOptions(void) //Default: empty array
356         :numInitial(0),map(_RRMapID)
357 {
358         locMgr.setZero();
359 }
360
361 CkArrayOptions::CkArrayOptions(int ni1) //With initial elements (1D)
362         :numInitial(CkArrayIndex1D(ni1)),map(_RRMapID)
363 {
364         locMgr.setZero();
365 }
366
367 CkArrayOptions::CkArrayOptions(int ni1, int ni2) //With initial elements (2D)
368         :numInitial(CkArrayIndex2D(ni1, ni2)),map(_RRMapID)
369 {
370         locMgr.setZero();
371 }
372
373 CkArrayOptions::CkArrayOptions(int ni1, int ni2, int ni3) //With initial elements (3D)
374         :numInitial(CkArrayIndex3D(ni1, ni2, ni3)),map(_RRMapID)
375 {
376         locMgr.setZero();
377 }
378
379 /// Bind our elements to this array
380 CkArrayOptions &CkArrayOptions::bindTo(const CkArrayID &b)
381 {
382         CkArray *arr=CProxy_CkArray(b).ckLocalBranch();
383         //Stupid bug: need a way for arrays to stay the same size *FOREVER*,
384         // not just initially.
385         //setNumInitial(arr->getNumInitial());
386         return setLocationManager(arr->getLocMgr()->getGroupID());
387 }
388 CkArrayOptions &CkArrayOptions::addListener(CkArrayListener *listener)
389 {
390         arrayListeners.push_back(listener);
391         return *this;
392 }
393
394 void CkArrayOptions::pup(PUP::er &p) {
395         p|numInitial;
396         p|locMgr;
397         p|map;
398         p|arrayListeners;
399 }
400
401 CkArrayListener::CkArrayListener(int nInts_) 
402   :nInts(nInts_) 
403 {
404   dataOffset=-1;
405 }
406 CkArrayListener::CkArrayListener(CkMigrateMessage *m) {
407   nInts=-1; dataOffset=-1;
408 }
409 void CkArrayListener::pup(PUP::er &p) {
410   p|nInts;
411   p|dataOffset;
412 }
413
414 void CkArrayListener::ckRegister(CkArray *arrMgr,int dataOffset_)
415 {
416   if (dataOffset!=-1) CkAbort("Cannot register an ArrayListener twice!\n");
417   dataOffset=dataOffset_;
418 }
419
420 CkArrayID CProxy_ArrayBase::ckCreateArray(CkArrayMessage *m,int ctor,
421                                           const CkArrayOptions &opts_)
422 {
423   CkArrayOptions opts(opts_);
424   if (opts.getLocationManager().isZero())
425   { //Create a new location manager
426 #if !CMK_LBDB_ON
427     CkGroupID _lbdb;
428 #endif
429     opts.setLocationManager(CProxy_CkLocMgr::ckNew(
430       opts.getMap(),_lbdb,opts.getNumInitial()
431       ));
432   }
433   //Create the array manager
434   m->array_ep()=ctor;
435   CkMarshalledMessage marsh(m);
436   CProxy_CkArrayReductionMgr nodereductionProxy = CProxy_CkArrayReductionMgr::ckNew();
437   CkGroupID ag=CProxy_CkArray::ckNew(opts,marsh,nodereductionProxy);
438   nodereductionProxy.setAttachedGroup(ag);
439   return (CkArrayID)ag;
440 }
441 CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
442 {
443   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
444 }
445
446 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
447         const CkArrayIndex &idx)
448 {
449   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
450   m->array_ep()=ctor;
451   ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
452   if (ckIsDelegated()) {
453         ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
454         return;
455   }
456   
457   DEBC((AA"Proxy inserting element %s on Pe %d\n"AB,idx2str(idx),onPe));
458   CkArrayManagerInsert(onPe,m,_aid);
459 }
460
461 void CProxyElement_ArrayBase::ckInsert(CkArrayMessage *m,int ctorIndex,int onPe)
462 {
463   ckInsertIdx(m,ctorIndex,onPe,_idx);
464 }
465
466 ArrayElement *CProxyElement_ArrayBase::ckLocal(void) const
467 {
468   return ckLocalBranch()->lookup(_idx);
469 }
470
471 //pack-unpack method for CProxy_ArrayBase
472 void CProxy_ArrayBase::pup(PUP::er &p)
473 {
474   CProxy::pup(p);
475   _aid.pup(p);
476 }
477 void CProxyElement_ArrayBase::pup(PUP::er &p)
478 {
479   CProxy_ArrayBase::pup(p);
480   p|_idx.nInts;
481   p(_idx.data(),_idx.nInts);
482 }
483
484 void CProxySection_ArrayBase::pup(PUP::er &p)
485 {
486   CProxy_ArrayBase::pup(p);
487   _sid.pup(p);
488 }
489
490 /*********************** CkArray Creation *************************/
491 void _ckArrayInit(void)
492 {
493   CkpvInitialize(ArrayElement_initInfo,initInfo);
494   CkDisableTracing(CkIndex_CkArray::insertElement(0));
495   CkDisableTracing(CkIndex_CkArray::recvBroadcast(0));
496     // disable because broadcast listener may deliver broadcast message
497   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
498   // by default anytime migration is allowed
499 }
500
501 CkArray::CkArray(CkArrayOptions &c,CkMarshalledMessage &initMsg,CkNodeGroupID nodereductionID)
502   : CkReductionMgr(),
503   locMgr(CProxy_CkLocMgr::ckLocalBranch(c.getLocationManager())),locMgrID(c.getLocationManager()),
504   thisProxy(thisgroup)
505 {
506   //Registration
507   elements=(ArrayElementList *)locMgr->addManager(thisgroup,this);
508 //  moved to _ckArrayInit()
509 //  CkpvInitialize(ArrayElement_initInfo,initInfo);
510   CcdCallOnConditionKeep(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this);
511
512   //Set class variables
513   numInitial=c.getNumInitial();
514   isInserting=CmiTrue;
515
516   //Find, register, and initialize the arrayListeners
517   int dataOffset=0;
518   broadcaster=new CkArrayBroadcaster();
519   addListener(broadcaster,dataOffset);
520   reducer=new CkArrayReducer(thisgroup);
521   addListener(reducer,dataOffset);
522
523   calistener = new ComlibArrayListener();
524   addListener(calistener,dataOffset);
525
526   int lNo,nL=c.getListeners(); //User-added listeners
527   for (lNo=0;lNo<nL;lNo++) addListener(c.getListener(lNo),dataOffset);
528   if (dataOffset>CK_ARRAYLISTENER_MAXLEN)
529     CkAbort("Too much array listener data!\n"
530 "You'll have to either use fewer array listeners, or increase the compile-time\n"
531 "constant CK_ARRAYLISTENER_MAXLEN!\n");
532
533   for (int l=0;l<listeners.size();l++) listeners[l]->ckBeginInserting();
534
535   ///Set up initial elements (if any)
536   locMgr->populateInitial(numInitial,initMsg.getMessage(),this);
537
538   ///adding code for Reduction using nodegroups
539
540   CProxy_CkArrayReductionMgr  nodetemp(nodereductionID);  
541   nodeProxy = nodetemp;
542   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
543 }
544
545 CkArray::CkArray(CkMigrateMessage *m)
546         :CkReductionMgr(m), thisProxy(thisgroup)
547 {
548   locMgr=NULL;
549   isInserting=CmiTrue;
550 }
551
552 #ifndef CMK_OPTIMIZE
553 inline void testPup(PUP::er &p,int shouldBe) {
554   int a=shouldBe;
555   p|a;
556   if (a!=shouldBe)
557     CkAbort("PUP direction mismatch!");
558 }
559 #else
560 inline void testPup(PUP::er &p,int shouldBe) {}
561 #endif
562
563 void CkArray::pup(PUP::er &p){
564         CkReductionMgr::pup(p);
565         p|numInitial;
566         p|locMgrID;
567         p|listeners;
568         testPup(p,1234);
569         if(p.isUnpacking()){
570                 thisProxy=thisgroup;
571                 locMgr = CProxy_CkLocMgr::ckLocalBranch(locMgrID);
572                 elements = (ArrayElementList *)locMgr->addManager(thisgroup,this);
573                 /// Restore our default listeners:
574                 broadcaster=(CkArrayBroadcaster *)(CkArrayListener *)(listeners[0]);
575                 reducer=(CkArrayReducer *)(CkArrayListener *)(listeners[1]);
576         }
577 }
578
579 #define CK_ARRAYLISTENER_STAMP_LOOP(listenerData) do {\
580   int dataOffset=0; \
581   for (int lNo=0;lNo<listeners.size();lNo++) { \
582     CkArrayListener *l=listeners[lNo]; \
583     l->ckElementStamp(&listenerData[dataOffset]); \
584     dataOffset+=l->ckGetLen(); \
585   } \
586 } while (0)
587
588 //Called on send side to prepare array constructor message
589 void CkArray::prepareCtorMsg(CkMessage *m,int &onPe,const CkArrayIndex &idx)
590 {
591   envelope *env=UsrToEnv((void *)m);
592   env->getsetArrayIndex()=idx;
593   int *listenerData=env->getsetArrayListenerData();
594   CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
595   if (onPe==-1) onPe=procNum(idx);   // onPe may still be -1
596   if (onPe!=CkMyPe()&&onPe!=-1) //Let the local manager know where this el't is
597         getLocMgr()->inform(idx,onPe);
598 }
599
600 CkMigratable *CkArray::allocateMigrated(int elChareType,const CkArrayIndex &idx,
601                         CkElementCreation_t type)
602 {
603         ArrayElement *ret=allocate(elChareType,idx,NULL,CmiTrue);
604         if (type==CkElementCreation_resume) 
605         { // HACK: Re-stamp elements on checkpoint resume--
606           //  this restores, e.g., reduction manager's gcount
607                 int *listenerData=ret->listenerData;
608                 CK_ARRAYLISTENER_STAMP_LOOP(listenerData);
609         }
610         return ret;
611 }
612
613 ArrayElement *CkArray::allocate(int elChareType,const CkArrayIndex &idx,
614                      CkMessage *msg,CmiBool fromMigration) 
615 {
616         //Stash the element's initialization information in the global "initInfo"
617         ArrayElement_initInfo &init=CkpvAccess(initInfo);
618         init.numInitial=numInitial;
619         init.thisArray=this;
620         init.thisArrayID=thisgroup;
621         if (msg) /*Have to *copy* data because msg will be deleted*/
622           memcpy(init.listenerData,UsrToEnv(msg)->getsetArrayListenerData(),
623                  sizeof(init.listenerData));
624         init.fromMigration=fromMigration;
625         
626         //Build the element
627         int elSize=_chareTable[elChareType]->size;
628         ArrayElement *elem = (ArrayElement *)malloc(elSize);
629 #ifndef CMK_OPTIMIZE
630         if (elem!=NULL) setMemoryTypeChare(elem);
631 #endif
632         return elem;
633 }
634
635 /// This method is called by ck.C or the user to add an element.
636 CmiBool CkArray::insertElement(CkMessage *me)
637 {
638   CK_MAGICNUMBER_CHECK
639   CkArrayMessage *m=(CkArrayMessage *)me;
640   const CkArrayIndex &idx=m->array_index();
641   int onPe;
642   if (locMgr->isRemote(idx,&onPe)) 
643   { /* element's sibling lives somewhere else, so insert there */
644         CkArrayManagerInsert(onPe,me,thisgroup);
645         return CmiFalse;
646   }
647   int ctorIdx=m->array_ep();
648   int chareType=_entryTable[ctorIdx]->chareIdx;
649   ArrayElement *elt=allocate(chareType,idx,me,CmiFalse);
650   if (!locMgr->addElement(thisgroup,idx,elt,ctorIdx,(void *)m)) return CmiFalse;
651   CK_ARRAYLISTENER_LOOP(listeners,
652       if (!l->ckElementCreated(elt)) return CmiFalse;);
653   return CmiTrue;
654 }
655
656 void CProxy_ArrayBase::doneInserting(void)
657 {
658   DEBC((AA"Broadcasting a doneInserting request\n"AB));
659   //Broadcast a DoneInserting
660   CProxy_CkArray(_aid).remoteDoneInserting();
661 }
662
663 void CkArray::doneInserting(void)
664 {
665   thisProxy[CkMyPe()].remoteDoneInserting();
666 }
667
668 /// This is called on every processor after the last array insertion.
669 void CkArray::remoteDoneInserting(void)
670 {
671   CK_MAGICNUMBER_CHECK
672   if (isInserting) {
673     isInserting=CmiFalse;
674     DEBC((AA"Done inserting objects\n"AB));
675     for (int l=0;l<listeners.size();l++) listeners[l]->ckEndInserting();
676     locMgr->doneInserting();
677   }
678 }
679
680 CmiBool CkArray::demandCreateElement(const CkArrayIndex &idx,
681         int onPe,int ctor,CkDeliver_t type)
682 {
683         CkArrayMessage *m=(CkArrayMessage *)CkAllocSysMsg();
684         prepareCtorMsg(m,onPe,idx);
685         m->array_ep()=ctor;
686         
687         if ((onPe!=CkMyPe()) || (type==CkDeliver_queue)) {
688                 DEBC((AA"Forwarding demand-creation request for %s to %d\n"AB,idx2str(idx),onPe));
689                 CkArrayManagerInsert(onPe,m,thisgroup);
690         } else /* local message, non-queued */ {
691                 //Call local constructor directly
692                 DEBC((AA"Demand-creating %s\n"AB,idx2str(idx)));
693                 return insertElement(m);
694         }
695         return CmiTrue;
696 }
697
698 void CkArray::insertInitial(const CkArrayIndex &idx,void *ctorMsg, int local)
699 {
700         CkArrayMessage *m=(CkArrayMessage *)ctorMsg;
701         if (local) {
702           int onPe=CkMyPe();
703           prepareCtorMsg(m,onPe,idx);
704 #if CMK_BLUEGENE_CHARM
705           BgEntrySplit("split-array-new");
706 #endif
707           insertElement(m);
708         }
709         else {
710           int onPe=-1;
711           prepareCtorMsg(m,onPe,idx);
712           CkArrayManagerInsert(onPe,m,getGroupID());
713         }
714 }
715
716 /********************* CkArray Messaging ******************/
717 /// Fill out a message's array fields before sending it
718 inline void msg_prepareSend(CkArrayMessage *msg, int ep,CkArrayID aid)
719 {
720         envelope *env=UsrToEnv((void *)msg);
721         env->getsetArrayMgr()=aid;
722         env->getsetArraySrcPe()=CkMyPe();
723         env->setEpIdx(ep);
724         env->getsetArrayHops()=0;
725 }
726 void CProxyElement_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts) const
727 {
728 #ifndef CMK_OPTIMIZE
729         //Check our array index for validity
730         if (_idx.nInts<0) CkAbort("Array index length is negative!\n");
731         if (_idx.nInts>CK_ARRAYINDEX_MAXLEN)
732                 CkAbort("Array index length (nInts) is too long-- did you "
733                         "use bytes instead of integers?\n");
734 #endif
735         msg_prepareSend(msg,ep,ckGetArrayID());
736         msg->array_index()=_idx;//Insert array index
737         if (ckIsDelegated()) //Just call our delegateMgr
738           ckDelegatedTo()->ArraySend(ckDelegatedPtr(),ep,msg,_idx,ckGetArrayID());
739         else 
740         { //Usual case: a direct send
741           if (opts & CK_MSG_INLINE)
742             ckLocalBranch()->deliver(msg, CkDeliver_inline, opts&!CK_MSG_INLINE);
743           else
744             ckLocalBranch()->deliver(msg, CkDeliver_queue, opts);
745         }
746 }
747
748 void *CProxyElement_ArrayBase::ckSendSync(CkArrayMessage *msg, int ep) const
749 {
750         CkFutureID f=CkCreateAttachedFuture(msg);
751         ckSend(msg,ep);
752         return CkWaitReleaseFuture(f);
753 }
754
755 void CProxySection_ArrayBase::ckSend(CkArrayMessage *msg, int ep, int opts)
756 {
757         if (ckIsDelegated()) //Just call our delegateMgr
758           ckDelegatedTo()->ArraySectionSend(ckDelegatedPtr(),ep,msg,ckGetArrayID(),ckGetSectionID(), opts);
759         else {
760           // send through all
761           for (int i=0; i< _sid._nElems-1; i++) {
762             CProxyElement_ArrayBase ap(ckGetArrayID(), _sid._elems[i]);
763             void *newMsg=CkCopyMsg((void **)&msg);
764             ap.ckSend((CkArrayMessage *)newMsg,ep,opts);
765           }
766           if (_sid._nElems > 0) {
767             CProxyElement_ArrayBase ap(ckGetArrayID(), _sid._elems[_sid._nElems-1]);
768             ap.ckSend((CkArrayMessage *)msg,ep,opts);
769           }
770         }
771 }
772
773 void CkSendMsgArray(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
774 {
775   CkArrayMessage *m=(CkArrayMessage *)msg;
776   m->array_index()=idx;
777   msg_prepareSend(m,entryIndex,aID);
778   CkArray *a=(CkArray *)_localBranch(aID);
779   a->deliver(m,CkDeliver_queue,opts);
780 }
781
782 void CkSendMsgArrayInline(int entryIndex, void *msg, CkArrayID aID, const CkArrayIndex &idx, int opts)
783 {
784   CkArrayMessage *m=(CkArrayMessage *)msg;
785   m->array_index()=idx;
786   msg_prepareSend(m,entryIndex,aID);
787   CkArray *a=(CkArray *)_localBranch(aID);
788   int oldStatus = CkDisableTracing(entryIndex);     // avoid nested tracing
789   a->deliver(m,CkDeliver_inline,opts);
790   if (oldStatus) CkEnableTracing(entryIndex);
791 }
792
793
794 /*********************** CkArray Reduction *******************/
795 CkArrayReducer::CkArrayReducer(CkGroupID mgrID_)
796   :CkArrayListener(sizeof(contributorInfo)/sizeof(int)),
797    mgrID(mgrID_)
798 {
799   mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
800 }
801 CkArrayReducer::CkArrayReducer(CkMigrateMessage *m)
802   :CkArrayListener(m)
803 {
804   mgr=NULL;
805 }
806 void CkArrayReducer::pup(PUP::er &p) {
807   CkArrayListener::pup(p);
808   p|mgrID;
809   if (p.isUnpacking())
810     mgr=CProxy_CkReductionMgr(mgrID).ckLocalBranch();
811 }
812 CkArrayReducer::~CkArrayReducer() {}
813
814 /*********************** CkArray Broadcast ******************/
815
816 CkArrayBroadcaster::CkArrayBroadcaster(void)
817   :CkArrayListener(1) //Each array element carries a broadcast number
818 {
819   bcastNo=oldBcastNo=0;
820 }
821 CkArrayBroadcaster::CkArrayBroadcaster(CkMigrateMessage *m)
822         :CkArrayListener(m) { bcastNo=-1; oldBcastNo=-1; }
823 void CkArrayBroadcaster::pup(PUP::er &p) {
824   CkArrayListener::pup(p);
825   /* Assumption: no migrants during checkpoint, so no need to
826      save old broadcasts. */
827   p|bcastNo;
828   if (p.isUnpacking()) {
829     oldBcastNo=bcastNo; /* because we threw away oldBcasts */
830   }
831 }
832 CkArrayBroadcaster::~CkArrayBroadcaster()
833 {
834   CkArrayMessage *msg;
835   while (NULL!=(msg=oldBcasts.deq())) delete msg;
836 }
837
838 void CkArrayBroadcaster::incoming(CkArrayMessage *msg)
839 {
840   bcastNo++;
841   if (isAnytimeMigration) {
842     DEBB((AA"Received broadcast %d\n"AB,bcastNo));
843     CmiMemoryMarkBlock(((char *)UsrToEnv(msg))-sizeof(CmiChunkHeader));
844     oldBcasts.enq((CkArrayMessage *)msg);//Stash the message for later use
845   }
846 }
847
848 /// Deliver a copy of the given broadcast to the given local element
849 CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast,ArrayElement *el)
850 {
851   int &elBcastNo=getData(el);
852   // if this array element already received this message, skip it
853   if (elBcastNo >= bcastNo) return CmiFalse;
854   elBcastNo++;
855   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
856   int epIdx=bcast->array_ep_bcast();
857   return el->ckInvokeEntry(epIdx,bcast,CmiFalse);
858 }
859
860 /// Deliver all needed broadcasts to the given local element
861 CmiBool CkArrayBroadcaster::bringUpToDate(ArrayElement *el)
862 {
863   if (! isAnytimeMigration) return CmiTrue;
864   int &elBcastNo=getData(el);
865   if (elBcastNo<bcastNo)
866   {//This element needs some broadcasts-- it must have
867    //been migrating during the broadcast.
868     int i,nDeliver=bcastNo-elBcastNo;
869     DEBM((AA"Migrator %s missed %d broadcasts--\n"AB,idx2str(el),nDeliver));
870
871     //Skip the old junk at the front of the bcast queue
872     for (i=oldBcasts.length()-1;i>=nDeliver;i--)
873       oldBcasts.enq(oldBcasts.deq());
874
875     //Deliver the newest messages, in old-to-new order
876     for (i=nDeliver-1;i>=0;i--)
877     {
878       CkArrayMessage *msg=oldBcasts.deq();
879       oldBcasts.enq(msg);
880       if (!deliver(msg,el))
881         return CmiFalse; //Element migrated away
882     }
883   }
884   //Otherwise, the element survived
885   return CmiTrue;
886 }
887
888
889 void CkArrayBroadcaster::springCleaning(void)
890 {
891   if (! isAnytimeMigration) return;
892   //Remove old broadcast messages
893   int nDelete=oldBcasts.length()-(bcastNo-oldBcastNo);
894   if (nDelete>0) {
895     DEBK((AA"Cleaning out %d old broadcasts\n"AB,nDelete));
896     for (int i=0;i<nDelete;i++)
897       delete oldBcasts.deq();
898   }
899   oldBcastNo=bcastNo;
900 }
901
902 void CkArrayBroadcaster::flushState() 
903
904   bcastNo = oldBcastNo = 0; 
905   CkArrayMessage *msg;
906   while (NULL!=(msg=oldBcasts.deq())) delete msg;
907 }
908
909 void CkBroadcastMsgArray(int entryIndex, void *msg, CkArrayID aID, int opts)
910 {
911         CProxy_ArrayBase ap(aID);
912         ap.ckBroadcast((CkArrayMessage *)msg,entryIndex,opts);
913 }
914
915 void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
916 {
917         msg->array_ep_bcast()=ep;
918         if (ckIsDelegated()) //Just call our delegateMgr
919           ckDelegatedTo()->ArrayBroadcast(ckDelegatedPtr(),ep,msg,_aid);
920         else 
921         { //Broadcast message via serializer node
922           _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
923           int skipsched = opts & CK_MSG_EXPEDITED; 
924           //int serializer=0;//1623802937%CkNumPes();
925           if (CkMyPe()==CpvAccess(serializer))
926           {
927                 DEBB((AA"Sending array broadcast\n"AB));
928                 if (skipsched)
929                         CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
930                 else
931                         CProxy_CkArray(_aid).recvBroadcast(msg);
932           } else {
933                 DEBB((AA"Forwarding array broadcast to serializer node %d\n"AB,CpvAccess(serializer)));
934                 CProxy_CkArray ap(_aid);
935                 if (skipsched)
936                         ap[CpvAccess(serializer)].sendExpeditedBroadcast(msg);
937                 else
938                         ap[CpvAccess(serializer)].sendBroadcast(msg);
939           }
940         }
941 }
942
943 /// Reflect a broadcast off this Pe:
944 void CkArray::sendBroadcast(CkMessage *msg)
945 {
946         CK_MAGICNUMBER_CHECK
947         if(CkMyPe() == CpvAccess(serializer)){
948                 //Broadcast the message to all processors
949                 thisProxy.recvBroadcast(msg);
950         }else{
951                 thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
952         }
953 }
954 void CkArray::sendExpeditedBroadcast(CkMessage *msg)
955 {
956         CK_MAGICNUMBER_CHECK
957         //Broadcast the message to all processors
958         thisProxy.recvExpeditedBroadcast(msg);
959 }
960
961 /// Increment broadcast count; deliver to all local elements
962 void CkArray::recvBroadcast(CkMessage *m)
963 {
964         CK_MAGICNUMBER_CHECK
965         CkArrayMessage *msg=(CkArrayMessage *)m;
966         broadcaster->incoming(msg);
967         //Run through the list of local elements
968         int idx=0;
969         ArrayElement *el;
970         while (NULL!=(el=elements->next(idx))) {
971 #if CMK_BLUEGENE_CHARM
972                 BgEntrySplit("split-broadcast");
973 #endif
974                 broadcaster->deliver(msg,el);
975         }
976 #if CMK_BLUEGENE_CHARM
977                 BgEntrySplit("end-broadcast");
978 #endif
979         if (! isAnytimeMigration) {
980           delete msg;
981         }
982 }
983
984 #include "CkArray.def.h"
985
986