Updating the migration and strategy cost
[charm.git] / src / ck-core / cklocation.C
1 /** \file cklocation.C
2  *  \addtogroup CkArrayImpl
3  *
4  *  The location manager keeps track of an indexed set of migratable objects.
5  *  It is used by the array manager to locate array elements, interact with the
6  *  load balancer, and perform migrations.
7  *
8  *  Orion Sky Lawlor, olawlor@acm.org 9/29/2001
9  */
10
11 #include "charm++.h"
12 #include "register.h"
13 #include "ck.h"
14 #include "trace.h"
15 #include "TopoManager.h"
16
17 #include<sstream>
18
19 #if CMK_LBDB_ON
20 #include "LBDatabase.h"
21 #endif // CMK_LBDB_ON
22
23 #if CMK_GRID_QUEUE_AVAILABLE
24 CpvExtern(void *, CkGridObject);
25 #endif
26
27 static const char *idx2str(const CkArrayMessage *m) {
28   return idx2str(((CkArrayMessage *)m)->array_index());
29 }
30
31 #define ARRAY_DEBUG_OUTPUT 0
32
33 #if ARRAY_DEBUG_OUTPUT 
34 #   define DEB(x) CkPrintf x  //General debug messages
35 #   define DEBI(x) CkPrintf x  //Index debug messages
36 #   define DEBC(x) CkPrintf x  //Construction debug messages
37 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
38 #   define DEBM(x) CkPrintf x  //Migration debug messages
39 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
40 #   define DEBK(x) //CkPrintf x  //Spring Cleaning debug messages
41 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
42 #   define AA "LocMgr on %d: "
43 #   define AB ,CkMyPe()
44 #   define DEBUG(x) CkPrintf x
45 #else
46 #   define DEB(X) /*CkPrintf x*/
47 #   define DEBI(X) /*CkPrintf x*/
48 #   define DEBC(X) /*CkPrintf x*/
49 #   define DEBS(x) /*CkPrintf x*/
50 #   define DEBM(X) /*CkPrintf x*/
51 #   define DEBL(X) /*CkPrintf x*/
52 #   define DEBK(x) /*CkPrintf x*/
53 #   define DEBB(x) /*CkPrintf x*/
54 #   define str(x) /**/
55 #   define DEBUG(x)   /**/
56 #endif
57
58 #if CMK_LBDB_ON
59 /*LBDB object handles are fixed-sized, and not necc.
60 the same size as ArrayIndices.
61 */
62 LDObjid idx2LDObjid(const CkArrayIndex &idx)
63 {
64   LDObjid r;
65   int i;
66   const int *data=idx.data();
67   if (OBJ_ID_SZ>=idx.nInts) {
68     for (i=0;i<idx.nInts;i++)
69       r.id[i]=data[i];
70     for (i=idx.nInts;i<OBJ_ID_SZ;i++)
71       r.id[i]=0;
72   } else {
73     //Must hash array index into LBObjid
74     int j;
75     for (j=0;j<OBJ_ID_SZ;j++)
76         r.id[j]=data[j];
77     for (i=0;i<idx.nInts;i++)
78       for (j=0;j<OBJ_ID_SZ;j++)
79         r.id[j]+=circleShift(data[i],22+11*i*(j+1))+
80           circleShift(data[i],21-9*i*(j+1));
81   }
82   return r;
83 }
84 #endif
85
86 /*********************** Array Messages ************************/
87 CkArrayIndex &CkArrayMessage::array_index(void)
88 {
89     return UsrToEnv((void *)this)->getsetArrayIndex();
90 }
91 unsigned short &CkArrayMessage::array_ep(void)
92 {
93         return UsrToEnv((void *)this)->getsetArrayEp();
94 }
95 unsigned short &CkArrayMessage::array_ep_bcast(void)
96 {
97         return UsrToEnv((void *)this)->getsetArrayBcastEp();
98 }
99 unsigned char &CkArrayMessage::array_hops(void)
100 {
101         return UsrToEnv((void *)this)->getsetArrayHops();
102 }
103 unsigned int CkArrayMessage::array_getSrcPe(void)
104 {
105         return UsrToEnv((void *)this)->getsetArraySrcPe();
106 }
107 unsigned int CkArrayMessage::array_ifNotThere(void)
108 {
109         return UsrToEnv((void *)this)->getArrayIfNotThere();
110 }
111 void CkArrayMessage::array_setIfNotThere(unsigned int i)
112 {
113         UsrToEnv((void *)this)->setArrayIfNotThere(i);
114 }
115
116 /*********************** Array Map ******************
117 Given an array element index, an array map tells us 
118 the index's "home" Pe.  This is the Pe the element will
119 be created on, and also where messages to this element will
120 be forwarded by default.
121 */
122
123 CkArrayMap::CkArrayMap(void) { }
124 CkArrayMap::~CkArrayMap() { }
125 int CkArrayMap::registerArray(CkArrayIndex& numElements,CkArrayID aid)
126 {return 0;}
127
128 #define CKARRAYMAP_POPULATE_INITIAL(POPULATE_CONDITION) \
129         int i; \
130         for (int i1=0; i1<numElements.data()[0]; i1++) { \
131           if (numElements.dimension == 1) { \
132             /* Make 1D indices */ \
133             i = i1; \
134             CkArrayIndex1D idx(i1); \
135             if (POPULATE_CONDITION) \
136               mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
137           } else { \
138             /* higher dimensionality */ \
139             for (int i2=0; i2<numElements.data()[1]; i2++) { \
140               if (numElements.dimension == 2) { \
141                 /* Make 2D indices */ \
142                 i = i1 * numElements.data()[1] + i2; \
143                 CkArrayIndex2D idx(i1, i2); \
144                 if (POPULATE_CONDITION) \
145                   mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
146               } else { \
147                 /* higher dimensionality */ \
148                 CkAssert(numElements.dimension == 3); \
149                 for (int i3=0; i3<numElements.data()[2]; i3++) { \
150                   /* Make 3D indices */ \
151                   i = (i1 * numElements.data()[1] + i2) * numElements.data()[2] + i3; \
152                   CkArrayIndex3D idx(i1, i2, i3 ); \
153                   if (POPULATE_CONDITION) \
154                     mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
155                 } \
156               } \
157             } \
158           } \
159         }
160
161 void CkArrayMap::populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr)
162 {
163         if (numElements.nInts==0) {
164           CkFreeMsg(ctorMsg);
165           return;
166         }
167         int thisPe=CkMyPe();
168         /* The CkArrayIndex is supposed to have at most 3 dimensions, which
169            means that all the fields are ints, and numElements.nInts represents
170            how many of them are used */
171         CKARRAYMAP_POPULATE_INITIAL(procNum(arrayHdl,idx)==thisPe);
172
173 #if CMK_BIGSIM_CHARM
174         BgEntrySplit("split-array-new-end");
175 #endif
176
177         mgr->doneInserting();
178         CkFreeMsg(ctorMsg);
179 }
180
181 CkGroupID _defaultArrayMapID;
182 CkGroupID _fastArrayMapID;
183
184 class RRMap : public CkArrayMap
185 {
186 public:
187   RRMap(void)
188   {
189           DEBC((AA"Creating RRMap\n"AB));
190   }
191   RRMap(CkMigrateMessage *m):CkArrayMap(m){}
192   int procNum(int /*arrayHdl*/, const CkArrayIndex &i)
193   {
194 #if 1
195     if (i.nInts==1) {
196       //Map 1D integer indices in simple round-robin fashion
197       int ans= (i.data()[0])%CkNumPes();
198       while(!CmiNodeAlive(ans) || (ans == CkMyPe() && CkpvAccess(startedEvac))){
199         ans = (ans +1 )%CkNumPes();
200       }
201       return ans;
202     }
203     else 
204 #endif
205       {
206         //Map other indices based on their hash code, mod a big prime.
207         unsigned int hash=(i.hash()+739)%1280107;
208         int ans = (hash % CkNumPes());
209         while(!CmiNodeAlive(ans)){
210                 ans = (ans +1 )%CkNumPes();
211         }
212         return ans;
213
214       }
215   }
216 };
217
218 /** 
219  * Class used to store the dimensions of the array and precalculate numChares,
220  * binSize and other values for the DefaultArrayMap -- ASB
221  */
222 class arrayMapInfo {
223 public:
224   CkArrayIndex _nelems;
225   int _binSizeFloor;            /* floor of numChares/numPes */
226   int _binSizeCeil;             /* ceiling of numChares/numPes */
227   int _numChares;               /* initial total number of chares */
228   int _remChares;               /* numChares % numPes -- equals the number of
229                                    processors in the first set */
230   int _numFirstSet;             /* _remChares X (_binSize + 1) -- number of
231                                    chares in the first set */
232
233   /** All processors are divided into two sets. Processors in the first set
234    *  have one chare more than the processors in the second set. */
235
236   arrayMapInfo(void) { }
237
238   arrayMapInfo(CkArrayIndex& n) : _nelems(n), _numChares(0) {
239     compute_binsize();
240   }
241
242   ~arrayMapInfo() {}
243   
244   void compute_binsize()
245   {
246     int numPes = CkNumPes();
247
248     if (_nelems.nInts == 1) {
249       _numChares = _nelems.data()[0];
250     } else if (_nelems.nInts == 2) {
251       _numChares = _nelems.data()[0] * _nelems.data()[1];
252     } else if (_nelems.nInts == 3) {
253       _numChares = _nelems.data()[0] * _nelems.data()[1] * _nelems.data()[2];
254     }
255
256     _remChares = _numChares % numPes;
257     _binSizeFloor = (int)floor((double)_numChares/(double)numPes);
258     _binSizeCeil = (int)ceil((double)_numChares/(double)numPes);
259     _numFirstSet = _remChares * (_binSizeFloor + 1);
260   }
261
262   void pup(PUP::er& p){
263     p|_nelems;
264     p|_binSizeFloor;
265     p|_binSizeCeil;
266     p|_numChares;
267     p|_remChares;
268     p|_numFirstSet;
269   }
270 };
271
272
273 /**
274  * The default map object -- This does blocked mapping in the general case and
275  * calls the round-robin procNum for the dynamic insertion case -- ASB
276  */
277 class DefaultArrayMap : public RRMap
278 {
279 public:
280   /** This array stores information about different chare arrays in a Charm
281    *  program (dimensions, binsize, numChares etc ... ) */
282   CkPupPtrVec<arrayMapInfo> amaps;
283
284 public:
285   DefaultArrayMap(void) {
286     DEBC((AA"Creating DefaultArrayMap\n"AB));
287   }
288
289   DefaultArrayMap(CkMigrateMessage *m) : RRMap(m){}
290
291   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
292   {
293     int idx = amaps.size();
294     amaps.resize(idx+1);
295     amaps[idx] = new arrayMapInfo(numElements);
296     return idx;
297   }
298  
299   int procNum(int arrayHdl, const CkArrayIndex &i) {
300     int flati;
301     if (amaps[arrayHdl]->_nelems.nInts == 0) {
302       return RRMap::procNum(arrayHdl, i);
303     }
304
305     if (i.nInts == 1) {
306       flati = i.data()[0];
307     } else if (i.nInts == 2) {
308       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
309     } else if (i.nInts == 3) {
310       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
311     }
312 #if CMK_ERROR_CHECKING
313     else {
314       CkAbort("CkArrayIndex has more than 3 integers!");
315     }
316 #endif
317
318     if(flati < amaps[arrayHdl]->_numFirstSet)
319       return (flati / (amaps[arrayHdl]->_binSizeFloor + 1));
320     else if (flati < amaps[arrayHdl]->_numChares)
321       return (amaps[arrayHdl]->_remChares + (flati - amaps[arrayHdl]->_numFirstSet) / (amaps[arrayHdl]->_binSizeFloor));
322     else
323       return (flati % CkNumPes());
324   }
325
326   void pup(PUP::er& p){
327     RRMap::pup(p);
328     int npes = CkNumPes();
329     p|npes;
330     p|amaps;
331     if (p.isUnpacking() && npes != CkNumPes())  {   // binSize needs update
332       for (int i=0; i<amaps.size(); i++)
333         amaps[i]->compute_binsize();
334     }
335   }
336 };
337
338 /**
339  *  A fast map for chare arrays which do static insertions and promise NOT
340  *  to do late insertions -- ASB
341  */
342 class FastArrayMap : public DefaultArrayMap
343 {
344 public:
345   FastArrayMap(void) {
346     DEBC((AA"Creating FastArrayMap\n"AB));
347   }
348
349   FastArrayMap(CkMigrateMessage *m) : DefaultArrayMap(m){}
350
351   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
352   {
353     int idx;
354     idx = DefaultArrayMap::registerArray(numElements, aid);
355
356     return idx;
357   }
358
359   int procNum(int arrayHdl, const CkArrayIndex &i) {
360     int flati;
361     if (amaps[arrayHdl]->_nelems.nInts == 0) {
362       return RRMap::procNum(arrayHdl, i);
363     }
364
365     if (i.nInts == 1) {
366       flati = i.data()[0];
367     } else if (i.nInts == 2) {
368       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
369     } else if (i.nInts == 3) {
370       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
371     }
372 #if CMK_ERROR_CHECKING
373     else {
374       CkAbort("CkArrayIndex has more than 3 integers!");
375     }
376 #endif
377
378     /** binSize used in DefaultArrayMap is the floor of numChares/numPes
379      *  but for this FastArrayMap, we need the ceiling */
380     return (flati / amaps[arrayHdl]->_binSizeCeil);
381   }
382
383   void pup(PUP::er& p){
384     DefaultArrayMap::pup(p);
385   }
386 };
387
388
389 /**
390  * This map can be used for topology aware mapping when the mapping is provided
391  * through a file -- ASB
392  */
393 class ReadFileMap : public DefaultArrayMap
394 {
395 private:
396   CkVec<int> mapping;
397
398 public:
399   ReadFileMap(void) {
400     DEBC((AA"Creating ReadFileMap\n"AB));
401   }
402
403   ReadFileMap(CkMigrateMessage *m) : DefaultArrayMap(m){}
404
405   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
406   {
407     int idx;
408     idx = DefaultArrayMap::registerArray(numElements, aid);
409
410     if(mapping.size() == 0) {
411       int numChares;
412
413       if (amaps[idx]->_nelems.nInts == 1) {
414         numChares = amaps[idx]->_nelems.data()[0];
415       } else if (amaps[idx]->_nelems.nInts == 2) {
416         numChares = amaps[idx]->_nelems.data()[0] * amaps[idx]->_nelems.data()[1];
417       } else if (amaps[idx]->_nelems.nInts == 3) {
418         numChares = amaps[idx]->_nelems.data()[0] * amaps[idx]->_nelems.data()[1] * amaps[idx]->_nelems.data()[2];
419       } else {
420         CkAbort("CkArrayIndex has more than 3 integers!");
421       }
422
423       mapping.resize(numChares);
424       FILE *mapf = fopen("mapfile", "r");
425       TopoManager tmgr;
426       int x, y, z, t;
427
428       for(int i=0; i<numChares; i++) {
429         (void) fscanf(mapf, "%d %d %d %d", &x, &y, &z, &t);
430         mapping[i] = tmgr.coordinatesToRank(x, y, z, t);
431       }
432       fclose(mapf);
433     }
434
435     return idx;
436   }
437
438   int procNum(int arrayHdl, const CkArrayIndex &i) {
439     int flati;
440
441     if (i.nInts == 1) {
442       flati = i.data()[0];
443     } else if (i.nInts == 2) {
444       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
445     } else if (i.nInts == 3) {
446       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
447     } else {
448       CkAbort("CkArrayIndex has more than 3 integers!");
449     }
450
451     return mapping[flati];
452   }
453
454   void pup(PUP::er& p){
455     DefaultArrayMap::pup(p);
456     p|mapping;
457   }
458 };
459
460 class BlockMap : public RRMap
461 {
462 public:
463   BlockMap(void){
464         DEBC((AA"Creating BlockMap\n"AB));
465   }
466   BlockMap(CkMigrateMessage *m):RRMap(m){ }
467   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr){
468         if (numElements.nInts==0) {
469           CkFreeMsg(ctorMsg);
470           return;
471         }
472         int thisPe=CkMyPe();
473         int numPes=CkNumPes();
474         int binSize;
475         if (numElements.nInts == 1) {
476           binSize = (int)ceil((double)numElements.data()[0]/(double)numPes);
477         } else if (numElements.nInts == 2) {
478           binSize = (int)ceil((double)(numElements.data()[0]*numElements.data()[1])/(double)numPes);
479         } else if (numElements.nInts == 3) {
480           binSize = (int)ceil((double)(numElements.data()[0]*numElements.data()[1]*numElements.data()[2])/(double)numPes);
481         } else {
482           CkAbort("CkArrayIndex has more than 3 integers!");
483         }
484         CKARRAYMAP_POPULATE_INITIAL(i/binSize==thisPe);
485
486         /*
487         CkArrayIndex idx;
488         for (idx=numElements.begin(); idx<numElements; idx.getNext(numElements)) {
489           //for (int i=0;i<numElements;i++) {
490                 int binSize = (int)ceil((double)numElements.getCombinedCount()/(double)numPes);
491                 if (i/binSize==thisPe)
492                         mgr->insertInitial(idx,CkCopyMsg(&ctorMsg));
493         }*/
494         mgr->doneInserting();
495         CkFreeMsg(ctorMsg);
496   }
497 };
498
499 /**
500  * map object-- use seed load balancer.  
501  */
502 class CldMap : public CkArrayMap
503 {
504 public:
505   CldMap(void)
506   {
507           DEBC((AA"Creating CldMap\n"AB));
508   }
509   CldMap(CkMigrateMessage *m):CkArrayMap(m){}
510   int homePe(int /*arrayHdl*/, const CkArrayIndex &i)
511   {
512     if (i.nInts==1) {
513       //Map 1D integer indices in simple round-robin fashion
514       return (i.data()[0])%CkNumPes();
515     }
516     else 
517       {
518         //Map other indices based on their hash code, mod a big prime.
519         unsigned int hash=(i.hash()+739)%1280107;
520         return (hash % CkNumPes());
521       }
522   }
523   int procNum(int arrayHdl, const CkArrayIndex &i)
524   {
525      return CLD_ANYWHERE;   // -1
526   }
527   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr)  {
528         if (numElements.nInts==0) {
529           CkFreeMsg(ctorMsg);
530           return;
531         }
532         int thisPe=CkMyPe();
533         int numPes=CkNumPes();
534         //CkArrayIndex idx;
535
536         CKARRAYMAP_POPULATE_INITIAL(i%numPes==thisPe);
537         /*for (idx=numElements.begin(); idx<numElements; idx.getNext(numElements)) {
538           //for (int i=0;i<numElements;i++)
539                         if((idx.getRank(numElements))%numPes==thisPe)
540                                 mgr->insertInitial(CkArrayIndex1D(i),CkCopyMsg(&ctorMsg),0);
541         }*/
542         mgr->doneInserting();
543         CkFreeMsg(ctorMsg);
544   }
545
546 };
547
548
549 /// A class responsible for parsing the command line arguments for the PE
550 /// to extract the format string passed in with +ConfigurableRRMap
551 class ConfigurableRRMapLoader {
552 public:
553   
554   int *locations;
555   int objs_per_block;
556   int PE_per_block;
557
558   /// labels for states used when parsing the ConfigurableRRMap from ARGV
559   enum ConfigurableRRMapLoadStatus{
560     not_loaded,
561     loaded_found,
562     loaded_not_found
563   };
564   
565   enum ConfigurableRRMapLoadStatus state;
566   
567   ConfigurableRRMapLoader(){
568     state = not_loaded;
569     locations = NULL;
570     objs_per_block = 0;
571     PE_per_block = 0;
572   }
573   
574   /// load configuration if possible, and return whether a valid configuration exists
575   bool haveConfiguration() {
576     if(state == not_loaded) {
577       DEBUG(("[%d] loading ConfigurableRRMap configuration\n", CkMyPe()));
578       char **argv=CkGetArgv();
579       char *configuration = NULL;
580       bool found = CmiGetArgString(argv, "+ConfigurableRRMap", &configuration);
581       if(!found){
582         DEBUG(("Couldn't find +ConfigurableRRMap command line argument\n"));
583         state = loaded_not_found;
584         return false;
585       } else {
586
587         DEBUG(("Found +ConfigurableRRMap command line argument in %p=\"%s\"\n", configuration, configuration));
588
589         std::istringstream instream(configuration);
590         CkAssert(instream.good());
591          
592         // Example line:
593         // 10 8 0 1 2 3 4 5 6 7 7 7 7
594         // Map 10 objects to 8 PEs, with each object's index among the 8 PEs.
595         
596         // extract first integer
597         instream >> objs_per_block >> PE_per_block;
598         CkAssert(instream.good());
599         CkAssert(objs_per_block > 0);
600         CkAssert(PE_per_block > 0);
601         locations = new int[objs_per_block];
602         for(int i=0;i<objs_per_block;i++){
603           locations[i] = 0;
604           CkAssert(instream.good());
605           instream >> locations[i];
606           CkAssert(locations[i] < PE_per_block);
607         }
608         state = loaded_found;
609         return true;
610       }
611
612     } else {
613       DEBUG(("[%d] ConfigurableRRMap has already been loaded\n", CkMyPe()));
614       return state == loaded_found;
615     }      
616      
617   }
618   
619 };
620
621 CkpvDeclare(ConfigurableRRMapLoader, myConfigRRMapState);
622
623 void _initConfigurableRRMap(){
624   CkpvInitialize(ConfigurableRRMapLoader, myConfigRRMapState);
625 }
626
627
628 /// Try to load the command line arguments for ConfigurableRRMap
629 bool haveConfigurableRRMap(){
630   DEBUG(("haveConfigurableRRMap()\n"));
631   ConfigurableRRMapLoader &loader =  CkpvAccess(myConfigRRMapState);
632   return loader.haveConfiguration();
633 }
634
635 class ConfigurableRRMap : public RRMap
636 {
637 public:
638   ConfigurableRRMap(void){
639         DEBC((AA"Creating ConfigurableRRMap\n"AB));
640   }
641   ConfigurableRRMap(CkMigrateMessage *m):RRMap(m){ }
642
643
644   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr){
645     // Try to load the configuration from command line argument
646     CkAssert(haveConfigurableRRMap());
647     ConfigurableRRMapLoader &loader =  CkpvAccess(myConfigRRMapState);
648     if (numElements.nInts==0) {
649       CkFreeMsg(ctorMsg);
650       return;
651     }
652     int thisPe=CkMyPe();
653     int maxIndex = numElements.data()[0];
654     DEBUG(("[%d] ConfigurableRRMap: index=%d,%d,%d\n", CkMyPe(),(int)numElements.data()[0], (int)numElements.data()[1], (int)numElements.data()[2]));
655
656     if (numElements.nInts != 1) {
657       CkAbort("ConfigurableRRMap only supports dimension 1!");
658     }
659         
660     for (int index=0; index<maxIndex; index++) {        
661       CkArrayIndex1D idx(index);                
662       
663       int cyclic_block = index / loader.objs_per_block;
664       int cyclic_local = index % loader.objs_per_block;
665       int l = loader.locations[ cyclic_local ];
666       int PE = (cyclic_block*loader.PE_per_block + l) % CkNumPes();
667
668       DEBUG(("[%d] ConfigurableRRMap: index=%d is located on PE %d l=%d\n", CkMyPe(), (int)index, (int)PE, l));
669
670       if(PE == thisPe)
671         mgr->insertInitial(idx,CkCopyMsg(&ctorMsg));
672
673     }
674     //        CKARRAYMAP_POPULATE_INITIAL(PE == thisPe);
675         
676     mgr->doneInserting();
677     CkFreeMsg(ctorMsg);
678   }
679 };
680
681
682 CkpvStaticDeclare(double*, rem);
683
684 class arrInfo {
685  private:
686    CkArrayIndex _nelems;
687    int *_map;
688    void distrib(int *speeds);
689  public:
690    arrInfo(void):_map(NULL){}
691    arrInfo(CkArrayIndex& n, int *speeds)
692    {
693      _nelems = n;
694      _map = new int[_nelems.getCombinedCount()];
695      distrib(speeds);
696    }
697    ~arrInfo() { delete[] _map; }
698    int getMap(const CkArrayIndex &i);
699    void pup(PUP::er& p){
700      p|_nelems;
701      int totalElements = _nelems.getCombinedCount();
702      if(p.isUnpacking()){
703        _map = new int[totalElements];
704      }
705      p(_map,totalElements);
706    }
707 };
708
709 static int cmp(const void *first, const void *second)
710 {
711   int fi = *((const int *)first);
712   int si = *((const int *)second);
713   return ((CkpvAccess(rem)[fi]==CkpvAccess(rem)[si]) ?
714           0 :
715           ((CkpvAccess(rem)[fi]<CkpvAccess(rem)[si]) ?
716           1 : (-1)));
717 }
718
719 void
720 arrInfo::distrib(int *speeds)
721 {
722   int _nelemsCount = _nelems.getCombinedCount();
723   double total = 0.0;
724   int npes = CkNumPes();
725   int i,j,k;
726   for(i=0;i<npes;i++)
727     total += (double) speeds[i];
728   double *nspeeds = new double[npes];
729   for(i=0;i<npes;i++)
730     nspeeds[i] = (double) speeds[i] / total;
731   int *cp = new int[npes];
732   for(i=0;i<npes;i++)
733     cp[i] = (int) (nspeeds[i]*_nelemsCount);
734   int nr = 0;
735   for(i=0;i<npes;i++)
736     nr += cp[i];
737   nr = _nelemsCount - nr;
738   if(nr != 0)
739   {
740     CkpvAccess(rem) = new double[npes];
741     for(i=0;i<npes;i++)
742       CkpvAccess(rem)[i] = (double)_nelemsCount*nspeeds[i] - cp[i];
743     int *pes = new int[npes];
744     for(i=0;i<npes;i++)
745       pes[i] = i;
746     qsort(pes, npes, sizeof(int), cmp);
747     for(i=0;i<nr;i++)
748       cp[pes[i]]++;
749     delete[] pes;
750     delete[] CkpvAccess(rem);
751   }
752   k = 0;
753   for(i=0;i<npes;i++)
754   {
755     for(j=0;j<cp[i];j++)
756       _map[k++] = i;
757   }
758   delete[] cp;
759   delete[] nspeeds;
760 }
761
762 int
763 arrInfo::getMap(const CkArrayIndex &i)
764 {
765   if(i.nInts==1)
766     return _map[i.data()[0]];
767   else
768     return _map[((i.hash()+739)%1280107)%_nelems.getCombinedCount()];
769 }
770
771 //Speeds maps processor number to "speed" (some sort of iterations per second counter)
772 // It is initialized by processor 0.
773 static int* speeds;
774
775 #if CMK_USE_PROP_MAP
776 typedef struct _speedmsg
777 {
778   char hdr[CmiMsgHeaderSizeBytes];
779   int node;
780   int speed;
781 } speedMsg;
782
783 static void _speedHdlr(void *m)
784 {
785   speedMsg *msg=(speedMsg *)m;
786   if (CmiMyRank()==0)
787     for (int pe=0;pe<CmiNodeSize(msg->node);pe++)
788       speeds[CmiNodeFirst(msg->node)+pe] = msg->speed;  
789   CmiFree(m);
790 }
791
792 // initnode call
793 void _propMapInit(void)
794 {
795   speeds = new int[CkNumPes()];
796   int hdlr = CkRegisterHandler((CmiHandler)_speedHdlr);
797   CmiPrintf("[%d]Measuring processor speed for prop. mapping...\n", CkMyPe());
798   int s = LDProcessorSpeed();
799   speedMsg msg;
800   CmiSetHandler(&msg, hdlr);
801   msg.node = CkMyNode();
802   msg.speed = s;
803   CmiSyncBroadcastAllAndFree(sizeof(msg), &msg);
804   for(int i=0;i<CkNumNodes();i++)
805     CmiDeliverSpecificMsg(hdlr);
806 }
807 #else
808 void _propMapInit(void)
809 {
810   speeds = new int[CkNumPes()];
811   int i;
812   for(i=0;i<CkNumPes();i++)
813     speeds[i] = 1;
814 }
815 #endif
816 /**
817  * A proportional map object-- tries to map more objects to
818  * faster processors and fewer to slower processors.  Also
819  * attempts to ensure good locality by mapping nearby elements
820  * together.
821  */
822 class PropMap : public CkArrayMap
823 {
824 private:
825   CkPupPtrVec<arrInfo> arrs;
826 public:
827   PropMap(void)
828   {
829     CkpvInitialize(double*, rem);
830     DEBC((AA"Creating PropMap\n"AB));
831   }
832   PropMap(CkMigrateMessage *m) {}
833   int registerArray(CkArrayIndex& numElements,CkArrayID aid)
834   {
835     int idx = arrs.size();
836     arrs.resize(idx+1);
837     arrs[idx] = new arrInfo(numElements, speeds);
838     return idx;
839   }
840   int procNum(int arrayHdl, const CkArrayIndex &i)
841   {
842     return arrs[arrayHdl]->getMap(i);
843   }
844   void pup(PUP::er& p){
845     p|arrs;
846   }
847 };
848
849 class CkMapsInit : public Chare
850 {
851 public:
852         CkMapsInit(CkArgMsg *msg) {
853                 _defaultArrayMapID = CProxy_DefaultArrayMap::ckNew();
854                 _fastArrayMapID = CProxy_FastArrayMap::ckNew();
855                 delete msg;
856         }
857
858         CkMapsInit(CkMigrateMessage *m) {}
859 };
860
861 // given an envelope of a Charm msg, find the recipient object pointer
862 CkMigratable * CkArrayMessageObjectPtr(envelope *env) {
863   if (env->getMsgtype()!=ForArrayEltMsg) return NULL;   // not an array msg
864
865   CkArrayID aid = env->getsetArrayMgr();
866   CkArray *mgr=(CkArray *)_localBranch(aid);
867   if (mgr) {
868     CkLocMgr *locMgr = mgr->getLocMgr();
869     if (locMgr) {
870       return locMgr->lookup(env->getsetArrayIndex(),aid);
871     }
872   }
873   return NULL;
874 }
875
876 /****************************** Out-of-Core support ********************/
877
878 #if CMK_OUT_OF_CORE
879 CooPrefetchManager CkArrayElementPrefetcher;
880 CkpvDeclare(int,CkSaveRestorePrefetch);
881
882 /**
883  * Return the out-of-core objid (from CooRegisterObject)
884  * that this Converse message will access.  If the message
885  * will not access an object, return -1.
886  */
887 int CkArrayPrefetch_msg2ObjId(void *msg) {
888   envelope *env=(envelope *)msg;
889   CkMigratable *elt = CkArrayMessageObjectPtr(env);
890   return elt?elt->prefetchObjID:-1;
891 }
892
893 /**
894  * Write this object (registered with RegisterObject)
895  * to this writable file.
896  */
897 void CkArrayPrefetch_writeToSwap(FILE *swapfile,void *objptr) {
898   CkMigratable *elt=(CkMigratable *)objptr;
899
900   //Save the element's data to disk:
901   PUP::toDisk p(swapfile);
902   elt->pup(p);
903
904   //Call the element's destructor in-place (so pointer doesn't change)
905   CkpvAccess(CkSaveRestorePrefetch)=1;
906   elt->~CkMigratable(); //< because destuctor is virtual, destroys user class too.
907   CkpvAccess(CkSaveRestorePrefetch)=0;
908 }
909         
910 /**
911  * Read this object (registered with RegisterObject)
912  * from this readable file.
913  */
914 void CkArrayPrefetch_readFromSwap(FILE *swapfile,void *objptr) {
915   CkMigratable *elt=(CkMigratable *)objptr;
916   //Call the element's migration constructor in-place
917   CkpvAccess(CkSaveRestorePrefetch)=1;
918   int ctorIdx=_chareTable[elt->thisChareType]->migCtor;
919   elt->myRec->invokeEntry(elt,(CkMigrateMessage *)0,ctorIdx,CmiTrue);
920   CkpvAccess(CkSaveRestorePrefetch)=0;
921   
922   //Restore the element's data from disk:
923   PUP::fromDisk p(swapfile);
924   elt->pup(p);
925 }
926
927 static void _CkMigratable_prefetchInit(void) 
928 {
929   CkpvExtern(int,CkSaveRestorePrefetch);
930   CkpvAccess(CkSaveRestorePrefetch)=0;
931   CkArrayElementPrefetcher.msg2ObjId=CkArrayPrefetch_msg2ObjId;
932   CkArrayElementPrefetcher.writeToSwap=CkArrayPrefetch_writeToSwap;
933   CkArrayElementPrefetcher.readFromSwap=CkArrayPrefetch_readFromSwap;
934   CooRegisterManager(&CkArrayElementPrefetcher, _charmHandlerIdx);
935 }
936 #endif
937
938 /****************************** CkMigratable ***************************/
939 /**
940  * This tiny class is used to convey information to the 
941  * newly created CkMigratable object when its constructor is called.
942  */
943 class CkMigratable_initInfo {
944 public:
945         CkLocRec_local *locRec;
946         int chareType;
947         CmiBool forPrefetch; /* If true, this creation is only a prefetch restore-from-disk.*/
948 };
949
950 CkpvStaticDeclare(CkMigratable_initInfo,mig_initInfo);
951
952
953 void _CkMigratable_initInfoInit(void) {
954   CkpvInitialize(CkMigratable_initInfo,mig_initInfo);
955 #if CMK_OUT_OF_CORE
956   _CkMigratable_prefetchInit();
957 #endif
958 }
959
960 void CkMigratable::commonInit(void) {
961         CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
962 #if CMK_OUT_OF_CORE
963         isInCore=CmiTrue;
964         if (CkpvAccess(CkSaveRestorePrefetch))
965                 return; /* Just restoring from disk--don't touch object */
966         prefetchObjID=-1; //Unregistered
967 #endif
968         myRec=i.locRec;
969         thisIndexMax=myRec->getIndex();
970         thisChareType=i.chareType;
971         usesAtSync=CmiFalse;
972         usesAutoMeasure=CmiTrue;
973         barrierRegistered=CmiFalse;
974   atsync_iteration = -1;
975   //CkPrintf("%s in init and off\n", idx2str(thisIndexMax));
976   local_state = OFF;
977   prev_load = 0.0;
978         /*
979         FAULT_EVAC
980         */
981         AsyncEvacuate(CmiTrue);
982 }
983
984 CkMigratable::CkMigratable(void) {
985         DEBC((AA"In CkMigratable constructor\n"AB));
986         commonInit();
987 }
988 CkMigratable::CkMigratable(CkMigrateMessage *m): Chare(m) {
989         commonInit();
990 }
991
992 int CkMigratable::ckGetChareType(void) const {return thisChareType;}
993
994 void CkMigratable::pup(PUP::er &p) {
995         DEBM((AA"In CkMigratable::pup %s\n"AB,idx2str(thisIndexMax)));
996         Chare::pup(p);
997         p|thisIndexMax;
998         p(usesAtSync);
999         p(usesAutoMeasure);
1000 #if CMK_LBDB_ON 
1001         int readyMigrate;
1002         if (p.isPacking()) readyMigrate = myRec->isReadyMigrate();
1003         p|readyMigrate;
1004         if (p.isUnpacking()) myRec->ReadyMigrate(readyMigrate);
1005 #endif
1006         if(p.isUnpacking()) barrierRegistered=CmiFalse;
1007         /*
1008                 FAULT_EVAC
1009         */
1010         p | asyncEvacuate;
1011         if(p.isUnpacking()){myRec->AsyncEvacuate(asyncEvacuate);}
1012         
1013         ckFinishConstruction();
1014 }
1015
1016 void CkMigratable::ckDestroy(void) {
1017         DEBC((AA"In CkMigratable::ckDestroy %s\n"AB,idx2str(thisIndexMax)));
1018         myRec->destroy();
1019 }
1020
1021 void CkMigratable::ckAboutToMigrate(void) { }
1022 void CkMigratable::ckJustMigrated(void) { }
1023 void CkMigratable::ckJustRestored(void) { }
1024
1025 CkMigratable::~CkMigratable() {
1026         DEBC((AA"In CkMigratable::~CkMigratable %s\n"AB,idx2str(thisIndexMax)));
1027 #if CMK_OUT_OF_CORE
1028         isInCore=CmiFalse;
1029         if (CkpvAccess(CkSaveRestorePrefetch)) 
1030                 return; /* Just saving to disk--don't deregister anything. */
1031         /* We're really leaving or dying-- unregister from the ooc system*/
1032         if (prefetchObjID!=-1) {
1033                 CooDeregisterObject(prefetchObjID);
1034                 prefetchObjID=-1;
1035         }
1036 #endif
1037         /*Might want to tell myRec about our doom here--
1038         it's difficult to avoid some kind of circular-delete, though.
1039         */
1040 #if CMK_LBDB_ON 
1041         if (barrierRegistered) {
1042           DEBL((AA"Removing barrier for element %s\n"AB,idx2str(thisIndexMax)));
1043           if (usesAtSync)
1044                 myRec->getLBDB()->RemoveLocalBarrierClient(ldBarrierHandle);
1045           else
1046                 myRec->getLBDB()->RemoveLocalBarrierReceiver(ldBarrierRecvHandle);
1047         }
1048 #endif
1049         //To detect use-after-delete
1050         thisIndexMax.nInts=-12345;
1051         thisIndexMax.dimension=-12345;
1052 }
1053
1054 void CkMigratable::CkAbort(const char *why) const {
1055         CkError("CkMigratable '%s' aborting:\n",_chareTable[thisChareType]->name);
1056         ::CkAbort(why);
1057 }
1058
1059 void CkMigratable::ResumeFromSync(void)
1060 {
1061 //      CkAbort("::ResumeFromSync() not defined for this array element!\n");
1062 }
1063
1064 void CkMigratable::UserSetLBLoad() {
1065         CkAbort("::UserSetLBLoad() not defined for this array element!\n");
1066 }
1067
1068 #if CMK_LBDB_ON  //For load balancing:
1069 // user can call this helper function to set obj load (for model-based lb)
1070 void CkMigratable::setObjTime(double cputime) {
1071         myRec->setObjTime(cputime);
1072 }
1073 double CkMigratable::getObjTime() {
1074         return myRec->getObjTime();
1075 }
1076
1077 void CkMigratable::recvLBPeriod(void *data) {
1078   int lb_period = *((int *) data);
1079   //CkPrintf("--[pe %s] Received the LB Period %d current iter %d state %d\n",
1080    //   idx2str(thisIndexMax), lb_period, atsync_iteration, local_state);
1081   if (local_state == PAUSE) {
1082     if (atsync_iteration < lb_period) {
1083     //  CkPrintf("---[pe %s] pause and decided\n", idx2str(thisIndexMax));
1084       local_state = DECIDED;
1085       ResumeFromSync();
1086       return;
1087     }
1088    // CkPrintf("---[pe %s] load balance\n", idx2str(thisIndexMax));
1089     local_state = LOAD_BALANCE;
1090
1091     local_state = OFF;
1092     atsync_iteration = -1;
1093     prev_load = 0.0;
1094
1095     myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
1096     return;
1097   }
1098  // CkPrintf("---[pe %s] decided\n", idx2str(thisIndexMax));
1099   local_state = DECIDED;
1100 }
1101
1102 void CkMigratable::ckFinishConstruction(void)
1103 {
1104 //      if ((!usesAtSync) || barrierRegistered) return;
1105         myRec->setMeasure(usesAutoMeasure);
1106         if (barrierRegistered) return;
1107         DEBL((AA"Registering barrier client for %s\n"AB,idx2str(thisIndexMax)));
1108         if (usesAtSync)
1109           ldBarrierHandle = myRec->getLBDB()->AddLocalBarrierClient(
1110                 (LDBarrierFn)staticResumeFromSync,(void*)(this));
1111         else
1112           ldBarrierRecvHandle = myRec->getLBDB()->AddLocalBarrierReceiver(
1113                 (LDBarrierFn)staticResumeFromSync,(void*)(this));
1114         barrierRegistered=CmiTrue;
1115 }
1116
1117 void CkMigratable::AtSync(int waitForMigration)
1118 {
1119         if (!usesAtSync)
1120                 CkAbort("You must set usesAtSync=CmiTrue in your array element constructor to use AtSync!\n");
1121 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1122         mlogData->toResumeOrNot=1;
1123 #endif
1124         myRec->AsyncMigrate(!waitForMigration);
1125         if (waitForMigration) ReadyMigrate(CmiTrue);
1126         ckFinishConstruction();
1127   DEBL((AA"Element %s going to sync\n"AB,idx2str(thisIndexMax)));
1128   // model-based load balancing, ask user to provide cpu load
1129   if (usesAutoMeasure == CmiFalse) UserSetLBLoad();
1130   //    myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
1131
1132   atsync_iteration++;
1133   // CkPrintf("[pe %s] atsync_iter %d && predicted period %d state: %d\n",
1134   //     idx2str(thisIndexMax), atsync_iteration,
1135   //     myRec->getLBDB()->getPredictedLBPeriod(), local_state);
1136   double tmp = prev_load;
1137   prev_load = myRec->getObjTime();
1138   double current_load = prev_load - tmp;
1139
1140   if (atsync_iteration != 0) {
1141     myRec->getLBDB()->AddLoad(atsync_iteration, current_load);
1142   }
1143
1144 //
1145 //  if (atsync_iteration == 3) {
1146 //    myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
1147 //    return;
1148 //  } else {
1149 //    ResumeFromSync();
1150 //    return;
1151 //  }
1152
1153   if (atsync_iteration < myRec->getLBDB()->getPredictedLBPeriod()) {
1154     ResumeFromSync();
1155   } else if (local_state == DECIDED) {
1156 //    CkPrintf("[pe %s] Went to load balance\n", idx2str(thisIndexMax));
1157     local_state = LOAD_BALANCE;
1158     local_state = OFF;
1159     atsync_iteration = -1;
1160     prev_load = 0.0;
1161     myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
1162   } else {
1163 //    CkPrintf("[pe %s] Went to pause state\n", idx2str(thisIndexMax));
1164     local_state = PAUSE;
1165   }
1166 }
1167
1168 void CkMigratable::ReadyMigrate(CmiBool ready)
1169 {
1170         myRec->ReadyMigrate(ready);
1171 }
1172
1173 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1174     extern int globalResumeCount;
1175 #endif
1176
1177 void CkMigratable::staticResumeFromSync(void* data)
1178 {
1179         CkMigratable *el=(CkMigratable *)data;
1180 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1181     if(el->mlogData->toResumeOrNot ==0 || el->mlogData->resumeCount >= globalResumeCount){
1182         return;
1183     }
1184 #endif
1185         DEBL((AA"Element %s resuming from sync\n"AB,idx2str(el->thisIndexMax)));
1186 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1187     CpvAccess(_currentObj) = el;
1188 #endif
1189         el->ResumeFromSync();
1190 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1191     el->mlogData->resumeCount++;
1192 #endif
1193 }
1194 void CkMigratable::setMigratable(int migratable) 
1195 {
1196         myRec->setMigratable(migratable);
1197 }
1198
1199 struct CkArrayThreadListener {
1200         struct CthThreadListener base;
1201         CkMigratable *mig;
1202 };
1203
1204 extern "C"
1205 void CkArrayThreadListener_suspend(struct CthThreadListener *l)
1206 {
1207         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1208         a->mig->ckStopTiming();
1209 }
1210
1211 extern "C"
1212 void CkArrayThreadListener_resume(struct CthThreadListener *l)
1213 {
1214         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1215         a->mig->ckStartTiming();
1216 }
1217
1218 extern "C"
1219 void CkArrayThreadListener_free(struct CthThreadListener *l)
1220 {
1221         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1222         delete a;
1223 }
1224
1225 void CkMigratable::CkAddThreadListeners(CthThread tid, void *msg)
1226 {
1227         Chare::CkAddThreadListeners(tid, msg);   // for trace
1228         CthSetThreadID(tid, thisIndexMax.data()[0], thisIndexMax.data()[1], 
1229                        thisIndexMax.data()[2]);
1230         CkArrayThreadListener *a=new CkArrayThreadListener;
1231         a->base.suspend=CkArrayThreadListener_suspend;
1232         a->base.resume=CkArrayThreadListener_resume;
1233         a->base.free=CkArrayThreadListener_free;
1234         a->mig=this;
1235         CthAddListener(tid,(struct CthThreadListener *)a);
1236 }
1237 #else
1238 void CkMigratable::setObjTime(double cputime) {}
1239 double CkMigratable::getObjTime() {return 0.0;}
1240
1241 /* no load balancer: need dummy implementations to prevent link error */
1242 void CkMigratable::CkAddThreadListeners(CthThread tid, void *msg)
1243 {
1244 }
1245 #endif
1246
1247
1248 /*CkMigratableList*/
1249 CkMigratableList::CkMigratableList() {}
1250 CkMigratableList::~CkMigratableList() {}
1251
1252 void CkMigratableList::setSize(int s) {
1253         el.resize(s);
1254 }
1255
1256 void CkMigratableList::put(CkMigratable *v,int atIdx) {
1257 #if CMK_ERROR_CHECKING
1258         if (atIdx>=length())
1259                 CkAbort("Internal array manager error (CkMigrableList::put index out of bounds)");
1260 #endif
1261         el[atIdx]=v;
1262 }
1263
1264
1265 /************************** Location Records: *********************************/
1266
1267 //---------------- Base type:
1268 void CkLocRec::weAreObsolete(const CkArrayIndex &idx) {}
1269 CkLocRec::~CkLocRec() { }
1270 void CkLocRec::beenReplaced(void)
1271     {/*Default: ignore replacement*/}
1272
1273 //Return the represented array element; or NULL if there is none
1274 CkMigratable *CkLocRec::lookupElement(CkArrayID aid) {return NULL;}
1275
1276 //Return the last known processor; or -1 if none
1277 int CkLocRec::lookupProcessor(void) {return -1;}
1278
1279
1280 /*----------------- Local:
1281 Matches up the array index with the local index, an
1282 interfaces with the load balancer on behalf of the
1283 represented array elements.
1284 */
1285 CkLocRec_local::CkLocRec_local(CkLocMgr *mgr,CmiBool fromMigration,
1286   CmiBool ignoreArrival, const CkArrayIndex &idx_,int localIdx_)
1287         :CkLocRec(mgr),idx(idx_),localIdx(localIdx_),
1288          running(CmiFalse),deletedMarker(NULL)
1289 {
1290 #if CMK_LBDB_ON
1291         DEBL((AA"Registering element %s with load balancer\n"AB,idx2str(idx)));
1292         //BIGSIM_OOC DEBUGGING
1293         //CkPrintf("LocMgr on %d: Registering element %s with load balancer\n", CkMyPe(), idx2str(idx));
1294         nextPe = -1;
1295         asyncMigrate = CmiFalse;
1296         readyMigrate = CmiTrue;
1297         enable_measure = CmiTrue;
1298         bounced  = CmiFalse;
1299         the_lbdb=mgr->getLBDB();
1300         ldHandle=the_lbdb->RegisterObj(mgr->getOMHandle(),
1301                 idx2LDObjid(idx),(void *)this,1);
1302         if (fromMigration) {
1303                 DEBL((AA"Element %s migrated in\n"AB,idx2str(idx)));
1304                 if (!ignoreArrival)  {
1305                         the_lbdb->Migrated(ldHandle, CmiTrue);
1306                   // load balancer should ignore this objects movement
1307                 //  AsyncMigrate(CmiTrue);
1308                 }
1309         }
1310 #endif
1311         /*
1312                 FAULT_EVAC
1313         */
1314         asyncEvacuate = CmiTrue;
1315 }
1316 CkLocRec_local::~CkLocRec_local()
1317 {
1318         if (deletedMarker!=NULL) *deletedMarker=CmiTrue;
1319         myLocMgr->reclaim(idx,localIdx);
1320 #if CMK_LBDB_ON
1321         stopTiming();
1322         DEBL((AA"Unregistering element %s from load balancer\n"AB,idx2str(idx)));
1323         the_lbdb->UnregisterObj(ldHandle);
1324 #endif
1325 }
1326 void CkLocRec_local::migrateMe(int toPe) //Leaving this processor
1327 {
1328         //This will pack us up, send us off, and delete us
1329 //      printf("[%d] migrating migrateMe to %d \n",CkMyPe(),toPe);
1330         myLocMgr->emigrate(this,toPe);
1331 }
1332
1333 void CkLocRec_local::informIdealLBPeriod(int lb_ideal_period) {
1334   myLocMgr->informLBPeriod(this, lb_ideal_period);
1335 }
1336
1337 #if CMK_LBDB_ON
1338 void CkLocRec_local::startTiming(int ignore_running) {
1339         if (!ignore_running) running=CmiTrue;
1340         DEBL((AA"Start timing for %s at %.3fs {\n"AB,idx2str(idx),CkWallTimer()));
1341         if (enable_measure) the_lbdb->ObjectStart(ldHandle);
1342 }
1343 void CkLocRec_local::stopTiming(int ignore_running) {
1344         DEBL((AA"} Stop timing for %s at %.3fs\n"AB,idx2str(idx),CkWallTimer()));
1345         if ((ignore_running || running) && enable_measure) the_lbdb->ObjectStop(ldHandle);
1346         if (!ignore_running) running=CmiFalse;
1347 }
1348 void CkLocRec_local::setObjTime(double cputime) {
1349         the_lbdb->EstObjLoad(ldHandle, cputime);
1350 }
1351 double CkLocRec_local::getObjTime() {
1352         LBRealType walltime, cputime;
1353         the_lbdb->GetObjLoad(ldHandle, walltime, cputime);
1354         return walltime;
1355 }
1356 #endif
1357
1358 void CkLocRec_local::destroy(void) //User called destructor
1359 {
1360         //Our destructor does all the needed work
1361         delete this;
1362 }
1363 //Return the represented array element; or NULL if there is none
1364 CkMigratable *CkLocRec_local::lookupElement(CkArrayID aid) {
1365         return myLocMgr->lookupLocal(localIdx,aid);
1366 }
1367
1368 //Return the last known processor; or -1 if none
1369 int CkLocRec_local::lookupProcessor(void) {
1370         return CkMyPe();
1371 }
1372
1373 CkLocRec::RecType CkLocRec_local::type(void)
1374 {
1375         return local;
1376 }
1377
1378 void CkLocRec_local::addedElement(void) 
1379 {
1380         //Push everything in the half-created queue into the system--
1381         // anything not ready yet will be put back in.
1382         while (!halfCreated.isEmpty()) 
1383                 CkArrayManagerDeliver(CkMyPe(),halfCreated.deq());
1384 }
1385
1386 CmiBool CkLocRec_local::isObsolete(int nSprings,const CkArrayIndex &idx_)
1387
1388         int len=halfCreated.length();
1389         if (len!=0) {
1390                 /* This is suspicious-- the halfCreated queue should be extremely
1391                  transient.  It's possible we just looked at the wrong time, though;
1392                  so this is only a warning. 
1393                 */
1394                 CkPrintf("CkLoc WARNING> %d messages still around for uncreated element %s!\n",
1395                          len,idx2str(idx));
1396         }
1397         //A local element never expires
1398         return CmiFalse;
1399 }
1400
1401 /**********Added for cosmology (inline function handling without parameter marshalling)***********/
1402
1403 LDObjHandle CkMigratable::timingBeforeCall(int* objstopped){
1404
1405         LDObjHandle objHandle;
1406 #if CMK_LBDB_ON
1407         if (getLBDB()->RunningObject(&objHandle)) {
1408                 *objstopped = 1;
1409                 getLBDB()->ObjectStop(objHandle);
1410         }
1411         myRec->startTiming(1);
1412 #endif
1413
1414   //DEBS((AA"   Invoking entry %d on element %s\n"AB,epIdx,idx2str(idx)));
1415         //CmiBool isDeleted=CmiFalse; //Enables us to detect deletion during processing
1416         //deletedMarker=&isDeleted;
1417 /*#ifndef CMK_OPTIMIZE
1418         if (msg) {  Tracing: 
1419                 envelope *env=UsrToEnv(msg);
1420         //      CkPrintf("ckLocation.C beginExecuteDetailed %d %d \n",env->getEvent(),env->getsetArraySrcPe());
1421                 if (_entryTable[epIdx]->traceEnabled)
1422                         _TRACE_BEGIN_EXECUTE_DETAILED(env->getEvent(),
1423                                  ForChareMsg,epIdx,env->getsetArraySrcPe(), env->getTotalsize(), idx.getProjectionID(((CkGroupID)env->getsetArrayMgr())).idx);
1424         }
1425 #endif*/
1426
1427   return objHandle;
1428 }
1429
1430 void CkMigratable::timingAfterCall(LDObjHandle objHandle,int *objstopped){
1431   
1432 /*#ifndef CMK_OPTIMIZE
1433         if (msg) {  Tracing: 
1434                 if (_entryTable[epIdx]->traceEnabled)
1435                         _TRACE_END_EXECUTE();
1436         }
1437 #endif*/
1438 //#if CMK_LBDB_ON
1439 //        if (!isDeleted) checkBufferedMigration();   // check if should migrate
1440 //#endif
1441 //      if (isDeleted) return CmiFalse;//We were deleted
1442 //      deletedMarker=NULL;
1443 //      return CmiTrue;
1444         myRec->stopTiming(1);
1445 #if CMK_LBDB_ON
1446         if (*objstopped) {
1447                  getLBDB()->ObjectStart(objHandle);
1448         }
1449 #endif
1450
1451  return;
1452 }
1453 /****************************************************************************/
1454
1455
1456 CmiBool CkLocRec_local::invokeEntry(CkMigratable *obj,void *msg,
1457         int epIdx,CmiBool doFree) 
1458 {
1459
1460         DEBS((AA"   Invoking entry %d on element %s\n"AB,epIdx,idx2str(idx)));
1461         CmiBool isDeleted=CmiFalse; //Enables us to detect deletion during processing
1462         deletedMarker=&isDeleted;
1463         startTiming();
1464
1465
1466 #if CMK_TRACE_ENABLED
1467         if (msg) { /* Tracing: */
1468                 envelope *env=UsrToEnv(msg);
1469         //      CkPrintf("ckLocation.C beginExecuteDetailed %d %d \n",env->getEvent(),env->getsetArraySrcPe());
1470                 if (_entryTable[epIdx]->traceEnabled)
1471                         _TRACE_BEGIN_EXECUTE_DETAILED(env->getEvent(),
1472                                  ForChareMsg,epIdx,env->getsetArraySrcPe(), env->getTotalsize(), idx.getProjectionID((((CkGroupID)env->getsetArrayMgr())).idx));
1473         }
1474 #endif
1475
1476         if (doFree) 
1477            CkDeliverMessageFree(epIdx,msg,obj);
1478         else /* !doFree */
1479            CkDeliverMessageReadonly(epIdx,msg,obj);
1480
1481
1482 #if CMK_TRACE_ENABLED
1483         if (msg) { /* Tracing: */
1484                 if (_entryTable[epIdx]->traceEnabled)
1485                         _TRACE_END_EXECUTE();
1486         }
1487 #endif
1488 #if CMK_LBDB_ON
1489         if (!isDeleted) checkBufferedMigration();   // check if should migrate
1490 #endif
1491         if (isDeleted) return CmiFalse;//We were deleted
1492         deletedMarker=NULL;
1493         stopTiming();
1494         return CmiTrue;
1495 }
1496
1497 CmiBool CkLocRec_local::deliver(CkArrayMessage *msg,CkDeliver_t type,int opts)
1498 {
1499
1500         if (type==CkDeliver_queue) { /*Send via the message queue */
1501                 if (opts & CK_MSG_KEEP)
1502                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1503                 CkArrayManagerDeliver(CkMyPe(),msg,opts);
1504                 return CmiTrue;
1505         }
1506         else
1507         {
1508                 CkMigratable *obj=myLocMgr->lookupLocal(localIdx,
1509                         UsrToEnv(msg)->getsetArrayMgr());
1510                 if (obj==NULL) {//That sibling of this object isn't created yet!
1511                         if (opts & CK_MSG_KEEP)
1512                                 msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1513                         if (msg->array_ifNotThere()!=CkArray_IfNotThere_buffer) {
1514                                 return myLocMgr->demandCreateElement(msg,CkMyPe(),type);
1515                         }
1516                         else {
1517                                 DEBS((AA"   BUFFERING message for nonexistent element %s!\n"AB,idx2str(this->idx)));
1518                                 halfCreated.enq(msg);
1519                                 return CmiTrue;
1520                         }
1521                 }
1522                         
1523                 if (msg->array_hops()>1)
1524                         myLocMgr->multiHop(msg);
1525                 CmiBool doFree = (CmiBool)!(opts & CK_MSG_KEEP);
1526 #if CMK_LBDB_ON
1527                 // if there is a running obj being measured, stop it temporarily
1528                 LDObjHandle objHandle;
1529                 int objstopped = 0;
1530                 if (the_lbdb->RunningObject(&objHandle)) {
1531                         objstopped = 1;
1532                         the_lbdb->ObjectStop(objHandle);
1533                 }
1534 #endif
1535 #if CMK_GRID_QUEUE_AVAILABLE
1536                 // retain a pointer to the sending object (needed later)
1537                 CpvAccess(CkGridObject) = obj;
1538 #endif
1539
1540         CmiBool status = invokeEntry(obj,(void *)msg,msg->array_ep(),doFree);
1541         
1542 #if CMK_GRID_QUEUE_AVAILABLE
1543                 CpvAccess(CkGridObject) = NULL;
1544 #endif
1545 #if CMK_LBDB_ON
1546                 if (objstopped) the_lbdb->ObjectStart(objHandle);
1547 #endif
1548                 return status;
1549         }
1550
1551
1552 }
1553
1554 #if CMK_LBDB_ON
1555
1556 void CkLocRec_local::staticAdaptResumeSync(LDObjHandle h, int lb_ideal_period) {
1557         CkLocRec_local *el=(CkLocRec_local *)LDObjUserData(h);
1558         DEBL((AA"Load balancer wants to migrate %s to %d\n"AB,idx2str(el->idx),dest));
1559         el->adaptResumeSync(lb_ideal_period);
1560 }
1561
1562 void CkLocRec_local::adaptResumeSync(int lb_ideal_period) {
1563   informIdealLBPeriod(lb_ideal_period);
1564 }
1565
1566 void CkLocRec_local::staticMigrate(LDObjHandle h, int dest)
1567 {
1568         CkLocRec_local *el=(CkLocRec_local *)LDObjUserData(h);
1569         DEBL((AA"Load balancer wants to migrate %s to %d\n"AB,idx2str(el->idx),dest));
1570         el->recvMigrate(dest);
1571 }
1572
1573 void CkLocRec_local::recvMigrate(int toPe)
1574 {
1575         // we are in the mode of delaying actual migration
1576         // till readyMigrate()
1577         if (readyMigrate) { migrateMe(toPe); }
1578         else nextPe = toPe;
1579 }
1580
1581 void CkLocRec_local::AsyncMigrate(CmiBool use)  
1582 {
1583         asyncMigrate = use; 
1584         the_lbdb->UseAsyncMigrate(ldHandle, use);
1585 }
1586
1587 CmiBool CkLocRec_local::checkBufferedMigration()
1588 {
1589         // we don't migrate in user's code when calling ReadyMigrate(true)
1590         // we postphone the action to here until we exit from the user code.
1591         if (readyMigrate && nextPe != -1) {
1592             int toPe = nextPe;
1593             nextPe = -1;
1594             // don't migrate inside the object call
1595             migrateMe(toPe);
1596             // don't do anything
1597             return CmiTrue;
1598         }
1599         return CmiFalse;
1600 }
1601
1602 int CkLocRec_local::MigrateToPe()
1603 {
1604         int pe = nextPe;
1605         nextPe = -1;
1606         return pe;
1607 }
1608
1609 void CkLocRec_local::setMigratable(int migratable)
1610 {
1611         if (migratable)
1612           the_lbdb->Migratable(ldHandle);
1613         else
1614           the_lbdb->NonMigratable(ldHandle);
1615 }
1616 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1617 void CkLocRec_local::Migrated(){
1618     the_lbdb->Migrated(ldHandle, CmiTrue);
1619 }
1620 #endif
1621 #endif
1622
1623 /**
1624  * Represents a deleted array element (and prevents re-use).
1625  * These are a debugging aid, usable only by uncommenting a line in
1626  * the element destruction code.
1627  */
1628 class CkLocRec_dead:public CkLocRec {
1629 public:
1630         CkLocRec_dead(CkLocMgr *Narr):CkLocRec(Narr) {}
1631   
1632         virtual RecType type(void) {return dead;}
1633   
1634         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1635                 CkPrintf("Dead array element is %s.\n",idx2str(msg->array_index()));
1636                 CkAbort("Send to dead array element!\n");
1637                 return CmiFalse;
1638         }
1639         virtual void beenReplaced(void) 
1640                 {CkAbort("Can't re-use dead array element!\n");}
1641   
1642         //Return if this element is now obsolete (it isn't)
1643         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {return CmiFalse;}     
1644 };
1645
1646 /**
1647  * This is the abstract superclass of arrayRecs that keep track of their age,
1648  * and eventually expire. Its kids are remote and buffering.
1649  */
1650 class CkLocRec_aging:public CkLocRec {
1651 private:
1652         int lastAccess;//Age when last accessed
1653 protected:
1654         //Update our access time
1655         inline void access(void) {
1656                 lastAccess=myLocMgr->getSpringCount();
1657         }
1658         //Return if we are "stale"-- we were last accessed a while ago
1659         CmiBool isStale(void) {
1660                 if (myLocMgr->getSpringCount()-lastAccess>3) return CmiTrue;
1661                 else return CmiFalse;
1662         }
1663 public:
1664         CkLocRec_aging(CkLocMgr *Narr):CkLocRec(Narr) {
1665                 lastAccess=myLocMgr->getSpringCount();
1666         }
1667         //Return if this element is now obsolete
1668         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx)=0;
1669         //virtual void pup(PUP::er &p) { CkLocRec::pup(p); p(lastAccess); }
1670 };
1671
1672
1673 /**
1674  * Represents a remote array element.  This is just a PE number.
1675  */
1676 class CkLocRec_remote:public CkLocRec_aging {
1677 private:
1678         int onPe;//The last known Pe for this element
1679 public:
1680         CkLocRec_remote(CkLocMgr *Narr,int NonPe)
1681                 :CkLocRec_aging(Narr)
1682                 {
1683                         onPe=NonPe;
1684 #if CMK_ERROR_CHECKING
1685                         if (onPe==CkMyPe())
1686                                 CkAbort("ERROR!  'remote' array element on this Pe!\n");
1687 #endif
1688                 }
1689         //Return the last known processor for this element
1690         int lookupProcessor(void) {
1691                 return onPe;
1692         }  
1693         virtual RecType type(void) {return remote;}
1694   
1695         //Send a message for this element.
1696         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1697                 /*FAULT_EVAC*/
1698                 int destPE = onPe;
1699                 if((!CmiNodeAlive(onPe) && onPe != allowMessagesOnly)){
1700 //                      printf("Delivery failed because process %d is invalid\n",onPe);
1701                         /*
1702                                 Send it to its home processor instead
1703                         */
1704                         const CkArrayIndex &idx=msg->array_index();
1705                         destPE = getNextPE(idx);
1706                 }
1707                 access();//Update our modification date
1708                 msg->array_hops()++;
1709                 DEBS((AA"   Forwarding message for element %s to %d (REMOTE)\n"AB,
1710                       idx2str(msg->array_index()),destPE));
1711                 if (opts & CK_MSG_KEEP)
1712                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1713                 CkArrayManagerDeliver(destPE,msg,opts);
1714                 return CmiTrue;
1715         }
1716         //Return if this element is now obsolete
1717         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {
1718                 if (myLocMgr->isHome(idx)) 
1719                         //Home elements never become obsolete
1720                         // if they did, we couldn't deliver messages to that element.
1721                         return CmiFalse;
1722                 else if (isStale())
1723                         return CmiTrue;//We haven't been used in a long time
1724                 else
1725                         return CmiFalse;//We're fairly recent
1726         }
1727         //virtual void pup(PUP::er &p) { CkLocRec_aging::pup(p); p(onPe); }
1728 };
1729
1730
1731 /**
1732  * Buffers messages until record is replaced in the hash table, 
1733  * then delivers all messages to the replacing record.  This is 
1734  * used when a message arrives for a local element that has not
1735  * yet been created, buffering messages until the new element finally
1736  * checks in.
1737  *
1738  * It's silly to send a message to an element you won't ever create,
1739  * so this kind of record causes an abort "Stale array manager message!"
1740  * if it's left undelivered too long.
1741  */
1742 class CkLocRec_buffering:public CkLocRec_aging {
1743 private:
1744         CkQ<CkArrayMessage *> buffer;//Buffered messages.
1745 public:
1746         CkLocRec_buffering(CkLocMgr *Narr):CkLocRec_aging(Narr) {}
1747         virtual ~CkLocRec_buffering() {
1748                 if (0!=buffer.length()) {
1749                         CkPrintf("[%d] Warning: Messages abandoned in array manager buffer!\n", CkMyPe());
1750                         CkArrayMessage *m;
1751                         while (NULL!=(m=buffer.deq()))  {
1752                                 delete m;
1753                         }
1754                 }
1755         }
1756   
1757         virtual RecType type(void) {return buffering;}
1758   
1759         //Buffer a message for this element.
1760         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1761                 DEBS((AA" Queued message for %s\n"AB,idx2str(msg->array_index())));
1762                 if (opts & CK_MSG_KEEP)
1763                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1764                 buffer.enq(msg);
1765 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1766                 envelope *env = UsrToEnv(msg);
1767                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
1768 #endif
1769                 return CmiTrue;
1770         }
1771  
1772         //This is called when this ArrayRec is about to be replaced.
1773         // We dump all our buffered messages off on the next guy,
1774         // who should know what to do with them.
1775         virtual void beenReplaced(void) {
1776                 DEBS((AA" Delivering queued messages:\n"AB));
1777                 CkArrayMessage *m;
1778                 while (NULL!=(m=buffer.deq())) {
1779 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))         
1780                 DEBUG(CmiPrintf("[%d] buffered message being sent\n",CmiMyPe()));
1781                 envelope *env = UsrToEnv(m);
1782                 Chare *oldObj = CpvAccess(_currentObj);
1783                 CpvAccess(_currentObj) =(Chare *) env->sender.getObject();
1784                 env->sender.type = TypeInvalid;
1785 #endif
1786                 DEBS((AA"Sending buffered message to %s\n"AB,idx2str(m->array_index())));
1787                 myLocMgr->deliverViaQueue(m);
1788 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))         
1789                 CpvAccess(_currentObj) = oldObj;
1790 #endif
1791                 }
1792         }
1793   
1794         //Return if this element is now obsolete
1795         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {
1796                 if (isStale() && buffer.length()>0) {
1797                         /*This indicates something is seriously wrong--
1798                           buffers should be short-lived.*/
1799                         CkPrintf("[%d] WARNING: %d stale array message(s) found!\n",CkMyPe(),buffer.length());
1800                         CkArrayMessage *msg=buffer[0];
1801                         CkPrintf("Addressed to: ");
1802                         CkPrintEntryMethod(msg->array_ep());
1803                         CkPrintf(" index %s\n",idx2str(idx));
1804                         if (myLocMgr->isHome(idx)) 
1805                                 CkPrintf("is this an out-of-bounds array index, or was it never created?\n");
1806                         else //Idx is a remote-home index
1807                                 CkPrintf("why weren't they forwarded?\n");
1808                         
1809                         // CkAbort("Stale array manager message(s)!\n");
1810                 }
1811                 return CmiFalse;
1812         }
1813   
1814 /*  virtual void pup(PUP::er &p) {
1815     CkLocRec_aging::pup(p);
1816     CkArray::pupArrayMsgQ(buffer, p);
1817     }*/
1818 };
1819
1820 /*********************** Spring Cleaning *****************/
1821 /**
1822  * Used to periodically flush out unused remote element pointers.
1823  *
1824  * Cleaning often will free up memory quickly, but slow things
1825  * down because the cleaning takes time and some not-recently-referenced
1826  * remote element pointers might be valid and used some time in 
1827  * the future.
1828  *
1829  * Also used to determine if buffered messages have become stale.
1830  */
1831 inline void CkLocMgr::springCleaning(void)
1832 {
1833   nSprings++;
1834
1835   //Poke through the hash table for old ArrayRecs.
1836   void *objp;
1837   void *keyp;
1838   
1839   CkHashtableIterator *it=hash.iterator();
1840   CmiImmediateLock(hashImmLock);
1841   while (NULL!=(objp=it->next(&keyp))) {
1842     CkLocRec *rec=*(CkLocRec **)objp;
1843     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1844     if (rec->isObsolete(nSprings,idx)) {
1845       //This record is obsolete-- remove it from the table
1846       DEBK((AA"Cleaning out old record %s\n"AB,idx2str(idx)));
1847       hash.remove(*(CkArrayIndex *)&idx);
1848       delete rec;
1849       it->seek(-1);//retry this hash slot
1850     }
1851   }
1852   CmiImmediateUnlock(hashImmLock);
1853   delete it;
1854 }
1855 void CkLocMgr::staticSpringCleaning(void *forWhom,double curWallTime) {
1856         DEBK((AA"Starting spring cleaning at %.2f\n"AB,CkWallTimer()));
1857         ((CkLocMgr *)forWhom)->springCleaning();
1858 }
1859
1860 // clean all buffer'ed messages and also free local objects
1861 void CkLocMgr::flushAllRecs(void)
1862 {
1863   void *objp;
1864   void *keyp;
1865     
1866   CkHashtableIterator *it=hash.iterator();
1867   CmiImmediateLock(hashImmLock);
1868   while (NULL!=(objp=it->next(&keyp))) {
1869     CkLocRec *rec=*(CkLocRec **)objp;
1870     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1871     if (rec->type() != CkLocRec::local) {
1872       //In the case of taking core out of memory (in BigSim's emulation)
1873       //the meta data in the location manager are not deleted so we need
1874       //this condition
1875       if(_BgOutOfCoreFlag!=1){
1876         hash.remove(*(CkArrayIndex *)&idx);
1877         delete rec;
1878         it->seek(-1);//retry this hash slot
1879       }
1880     }
1881     else {
1882         callMethod((CkLocRec_local*)rec, &CkMigratable::ckDestroy);
1883         it->seek(-1);//retry this hash slot
1884     }
1885   }
1886   delete it;
1887   CmiImmediateUnlock(hashImmLock);
1888 }
1889
1890 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1891 void CkLocMgr::callForAllRecords(CkLocFn fnPointer,CkArray *arr,void *data){
1892     void *objp;
1893     void *keyp;
1894
1895     CkHashtableIterator *it = hash.iterator();
1896   while (NULL!=(objp=it->next(&keyp))) {
1897     CkLocRec *rec=*(CkLocRec **)objp;
1898     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1899         fnPointer(arr,data,rec,&idx);
1900     }
1901 }
1902 #endif
1903
1904 /*************************** LocMgr: CREATION *****************************/
1905 CkLocMgr::CkLocMgr(CkGroupID mapID_,CkGroupID lbdbID_,CkArrayIndex& numInitial)
1906         :thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),
1907          hash(17,0.3)
1908 {
1909         DEBC((AA"Creating new location manager %d\n"AB,thisgroup));
1910 // moved to _CkMigratable_initInfoInit()
1911 //      CkpvInitialize(CkMigratable_initInfo,mig_initInfo);
1912
1913         managers.init();
1914         nManagers=0;
1915         firstManager=NULL;
1916         firstFree=localLen=0;
1917         duringMigration=CmiFalse;
1918         nSprings=0;
1919         CcdCallOnConditionKeepOnPE(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this, CkMyPe());
1920
1921 //Register with the map object
1922         mapID=mapID_;
1923         map=(CkArrayMap *)CkLocalBranch(mapID);
1924         if (map==NULL) CkAbort("ERROR!  Local branch of array map is NULL!");
1925         mapHandle=map->registerArray(numInitial,thisgroup);
1926
1927 //Find and register with the load balancer
1928         lbdbID = lbdbID_;
1929         initLB(lbdbID_);
1930         hashImmLock = CmiCreateImmediateLock();
1931 }
1932
1933 CkLocMgr::CkLocMgr(CkMigrateMessage* m)
1934         :IrrGroup(m),thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),hash(17,0.3)
1935 {
1936         managers.init();
1937         nManagers=0;
1938         firstManager=NULL;
1939         firstFree=localLen=0;
1940         duringMigration=CmiFalse;
1941         nSprings=0;
1942         CcdCallOnConditionKeepOnPE(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this, CkMyPe());
1943         hashImmLock = CmiCreateImmediateLock();
1944 }
1945
1946 void CkLocMgr::pup(PUP::er &p){
1947         IrrGroup::pup(p);
1948         p|mapID;
1949         p|mapHandle;
1950         p|lbdbID;
1951         mapID = _defaultArrayMapID;
1952         if(p.isUnpacking()){
1953                 thisProxy=thisgroup;
1954                 CProxyElement_CkLocMgr newlocalproxy(thisgroup,CkMyPe());
1955                 thislocalproxy=newlocalproxy;
1956                 //Register with the map object
1957                 map=(CkArrayMap *)CkLocalBranch(mapID);
1958                 if (map==NULL) CkAbort("ERROR!  Local branch of array map is NULL!");
1959                 CkArrayIndex emptyIndex;
1960                 map->registerArray(emptyIndex,thisgroup);
1961                 // _lbdb is the fixed global groupID
1962                 initLB(lbdbID);
1963
1964 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1965         int count;
1966         p | count;
1967         DEBUG(CmiPrintf("[%d] Unpacking Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
1968         homeElementCount = count;
1969
1970         for(int i=0;i<count;i++){
1971             CkArrayIndex idx;
1972             int pe;
1973             idx.pup(p);
1974             p | pe;
1975             DEBUG(CmiPrintf("[%d] idx %s is a home element exisiting on pe %d\n",CmiMyPe(),idx2str(idx),pe));
1976             inform(idx,pe);
1977             CkLocRec *rec = elementNrec(idx);
1978             CmiAssert(rec!=NULL);
1979             CmiAssert(lastKnown(idx) == pe);
1980         }
1981 #endif
1982                 // delay doneInserting when it is unpacking during restart.
1983                 // to prevent load balancing kicking in
1984                 if (!CkInRestarting()) 
1985                         doneInserting();
1986         }else{
1987  /**
1988  * pack the indexes of elements which have their homes on this processor
1989  * but dont exist on it.. needed for broadcast after a restart
1990  * indexes of local elements dont need to be packed
1991  * since they will be recreated later anyway
1992  */
1993 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1994         int count=0,count1=0;
1995         void *objp;
1996         void *keyp;
1997         CkHashtableIterator *it = hash.iterator();
1998       while (NULL!=(objp=it->next(&keyp))) {
1999       CkLocRec *rec=*(CkLocRec **)objp;
2000         CkArrayIndex &idx=*(CkArrayIndex *)keyp;
2001             if(rec->type() != CkLocRec::local){
2002                 if(homePe(idx) == CmiMyPe()){
2003                     count++;
2004                 }
2005             }
2006         }
2007         p | count;
2008         DEBUG(CmiPrintf("[%d] Packing Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
2009
2010         it = hash.iterator();
2011       while (NULL!=(objp=it->next(&keyp))) {
2012       CkLocRec *rec=*(CkLocRec **)objp;
2013         CkArrayIndex &idx=*(CkArrayIndex *)keyp;
2014             CkArrayIndex max = idx;
2015             if(rec->type() != CkLocRec::local){
2016                 if(homePe(idx) == CmiMyPe()){
2017                     int pe;
2018                     max.pup(p);
2019                     pe = rec->lookupProcessor();
2020                     p | pe;
2021                     count1++;
2022                 }
2023             }
2024         }
2025         CmiAssert(count == count1);
2026
2027 #endif
2028
2029         }
2030 }
2031
2032 void _CkLocMgrInit(void) {
2033   /* Don't trace our deliver method--it does its own tracing */
2034   CkDisableTracing(CkIndex_CkLocMgr::deliverInline(0));
2035 }
2036
2037 /// Add a new local array manager to our list.
2038 /// Returns a new CkMigratableList for the manager to store his
2039 /// elements in.
2040 CkMigratableList *CkLocMgr::addManager(CkArrayID id,CkArrMgr *mgr)
2041 {
2042         CK_MAGICNUMBER_CHECK
2043         DEBC((AA"Adding new array manager\n"AB));
2044         //Link new manager into list
2045         ManagerRec *n=new ManagerRec;
2046         managers.find(id)=n;
2047         n->next=firstManager;
2048         n->mgr=mgr;
2049         n->elts.setSize(localLen);
2050         nManagers++;
2051         firstManager=n;
2052         return &n->elts;
2053 }
2054
2055 /// Return the next unused local element index.
2056 int CkLocMgr::nextFree(void) {
2057         if (firstFree>=localLen)
2058         {//Need more space in the local index arrays-- enlarge them
2059                 int oldLen=localLen;
2060                 localLen=localLen*2+8;
2061                 DEBC((AA"Growing the local list from %d to %d...\n"AB,oldLen,localLen));
2062                 for (ManagerRec *m=firstManager;m!=NULL;m=m->next)
2063                         m->elts.setSize(localLen);
2064                 //Update the free list
2065                 freeList.resize(localLen);
2066                 for (int i=oldLen;i<localLen;i++)
2067                         freeList[i]=i+1;
2068         }
2069         int localIdx=firstFree;
2070         if (localIdx==-1) CkAbort("CkLocMgr free list corrupted!");
2071         firstFree=freeList[localIdx];
2072         freeList[localIdx]=-1; //Mark as used
2073         return localIdx;
2074 }
2075
2076 CkLocRec_remote *CkLocMgr::insertRemote(const CkArrayIndex &idx,int nowOnPe)
2077 {
2078         DEBS((AA"Remote element %s lives on %d\n"AB,idx2str(idx),nowOnPe));
2079         CkLocRec_remote *rem=new CkLocRec_remote(this,nowOnPe);
2080         insertRec(rem,idx);
2081         return rem;
2082 }
2083
2084 //This element now lives on the given Pe
2085 void CkLocMgr::inform(const CkArrayIndex &idx,int nowOnPe)
2086 {
2087         if (nowOnPe==CkMyPe())
2088                 return; //Never insert a "remote" record pointing here
2089         CkLocRec *rec=elementNrec(idx);
2090         if (rec!=NULL && rec->type()==CkLocRec::local){
2091 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2092         CmiPrintf("[%d]WARNING!!! Element %d:%s is local but is being told it exists on %d\n",CkMyPe(),idx.dimension,idx2str(idx), nowOnPe);
2093 #endif
2094                 return; //Never replace a local element's record!
2095         }
2096         insertRemote(idx,nowOnPe);
2097 }
2098
2099 //Tell this element's home processor it now lives "there"
2100 void CkLocMgr::informHome(const CkArrayIndex &idx,int nowOnPe)
2101 {
2102         int home=homePe(idx);
2103         if (home!=CkMyPe() && home!=nowOnPe) {
2104                 //Let this element's home Pe know it lives here now
2105                 DEBC((AA"  Telling %s's home %d that it lives on %d.\n"AB,idx2str(idx),home,nowOnPe));
2106 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2107         informLocationHome(thisgroup,idx,home,CkMyPe());
2108 #else
2109                 thisProxy[home].updateLocation(idx,nowOnPe);
2110 #endif
2111         }
2112 }
2113
2114 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2115 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx,
2116         CmiBool forMigration, CmiBool ignoreArrival,
2117         CmiBool notifyHome,int dummy)
2118 {
2119     int localIdx=nextFree();
2120     DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
2121     CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
2122     if(!dummy){
2123         insertRec(rec,idx); //Add to global hashtable
2124     }   
2125     if (notifyHome) informHome(idx,CkMyPe());
2126     return rec; 
2127 }
2128 #else
2129 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx, 
2130                 CmiBool forMigration, CmiBool ignoreArrival,
2131                 CmiBool notifyHome)
2132 {
2133         int localIdx=nextFree();
2134         DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
2135         CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
2136         insertRec(rec,idx); //Add to global hashtable
2137
2138
2139         if (notifyHome) informHome(idx,CkMyPe());
2140         return rec;
2141 }
2142 #endif
2143
2144 //Add a new local array element, calling element's constructor
2145 CmiBool CkLocMgr::addElement(CkArrayID id,const CkArrayIndex &idx,
2146                 CkMigratable *elt,int ctorIdx,void *ctorMsg)
2147 {
2148         CK_MAGICNUMBER_CHECK
2149         CkLocRec *oldRec=elementNrec(idx);
2150         CkLocRec_local *rec;
2151         if (oldRec==NULL||oldRec->type()!=CkLocRec::local) 
2152         { //This is the first we've heard of that element-- add new local record
2153                 rec=createLocal(idx,CmiFalse,CmiFalse,CmiTrue);
2154         } else 
2155         { //rec is *already* local-- must not be the first insertion    
2156                 rec=((CkLocRec_local *)oldRec);
2157                 rec->addedElement();
2158         }
2159         if (!addElementToRec(rec,managers.find(id),elt,ctorIdx,ctorMsg)) return CmiFalse;
2160         elt->ckFinishConstruction();
2161         return CmiTrue;
2162 }
2163
2164 //As above, but shared with the migration code
2165 CmiBool CkLocMgr::addElementToRec(CkLocRec_local *rec,ManagerRec *m,
2166                 CkMigratable *elt,int ctorIdx,void *ctorMsg)
2167 {//Insert the new element into its manager's local list
2168         int localIdx=rec->getLocalIndex();
2169         if (m->elts.get(localIdx)!=NULL) CkAbort("Cannot insert array element twice!");
2170         m->elts.put(elt,localIdx); //Local element table
2171
2172 //Call the element's constructor
2173         DEBC((AA"Constructing element %s of array\n"AB,idx2str(rec->getIndex())));
2174         CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
2175         i.locRec=rec;
2176         i.chareType=_entryTable[ctorIdx]->chareIdx;
2177         if (!rec->invokeEntry(elt,ctorMsg,ctorIdx,CmiTrue)) return CmiFalse;
2178
2179 #if CMK_OUT_OF_CORE
2180         /* Register new element with out-of-core */
2181         PUP::sizer p_getSize; elt->pup(p_getSize);
2182         elt->prefetchObjID=CooRegisterObject(&CkArrayElementPrefetcher,p_getSize.size(),elt);
2183 #endif
2184         
2185         return CmiTrue;
2186 }
2187 void CkLocMgr::updateLocation(const CkArrayIndex &idx,int nowOnPe) {
2188         inform(idx,nowOnPe);
2189 }
2190
2191 /*************************** LocMgr: DELETION *****************************/
2192 /// This index will no longer be used-- delete the associated elements
2193 void CkLocMgr::reclaim(const CkArrayIndex &idx,int localIdx) {
2194         CK_MAGICNUMBER_CHECK
2195         DEBC((AA"Destroying element %s (local %d)\n"AB,idx2str(idx),localIdx));
2196         //Delete, and mark as empty, each array element
2197         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2198                 delete m->elts.get(localIdx);
2199                 m->elts.empty(localIdx);
2200         }
2201         
2202         removeFromTable(idx);
2203         
2204         //Link local index into free list
2205         freeList[localIdx]=firstFree;
2206         firstFree=localIdx;
2207         
2208                 
2209         if (!duringMigration) 
2210         { //This is a local element dying a natural death
2211             #if CMK_BIGSIM_CHARM
2212                 //After migration, reclaimRemote will be called through 
2213                 //the CkRemoveArrayElement in the pupping routines for those 
2214                 //objects that are not on the home processors. However,
2215                 //those remote records should not be deleted since the corresponding
2216                 //objects are not actually deleted but on disk. If deleted, msgs
2217                 //that seeking where is the object will be accumulated (a circular
2218                 //msg chain) and causes the program no progress
2219                 if(_BgOutOfCoreFlag==1) return; 
2220             #endif
2221                 int home=homePe(idx);
2222                 if (home!=CkMyPe())
2223 #if CMK_MEM_CHECKPOINT
2224                 if (!CkInRestarting()) // all array elements are removed anyway
2225 #endif
2226                         thisProxy[home].reclaimRemote(idx,CkMyPe());
2227         /*      //Install a zombie to keep the living from re-using this index.
2228                 insertRecN(new CkLocRec_dead(this),idx); */
2229         }
2230 }
2231
2232 void CkLocMgr::reclaimRemote(const CkArrayIndex &idx,int deletedOnPe) {
2233         DEBC((AA"Our element %s died on PE %d\n"AB,idx2str(idx),deletedOnPe));
2234         CkLocRec *rec=elementNrec(idx);
2235         if (rec==NULL) return; //We never knew him
2236         if (rec->type()==CkLocRec::local) return; //He's already been reborn
2237         removeFromTable(idx);
2238         delete rec;
2239 }
2240 void CkLocMgr::removeFromTable(const CkArrayIndex &idx) {
2241 #if CMK_ERROR_CHECKING
2242         //Make sure it's actually in the table before we delete it
2243         if (NULL==elementNrec(idx))
2244                 CkAbort("CkLocMgr::removeFromTable called on invalid index!");
2245 #endif
2246         CmiImmediateLock(hashImmLock);
2247         hash.remove(*(CkArrayIndex *)&idx);
2248         CmiImmediateUnlock(hashImmLock);
2249 #if CMK_ERROR_CHECKING
2250         //Make sure it's really gone
2251         if (NULL!=elementNrec(idx))
2252                 CkAbort("CkLocMgr::removeFromTable called, but element still there!");
2253 #endif
2254 }
2255
2256 /************************** LocMgr: MESSAGING *************************/
2257 /// Deliver message to this element, going via the scheduler if local
2258 /// @return 0 if object local, 1 if not
2259 int CkLocMgr::deliver(CkMessage *m,CkDeliver_t type,int opts) {
2260         DEBS((AA"deliver \n"AB));
2261         CK_MAGICNUMBER_CHECK
2262         CkArrayMessage *msg=(CkArrayMessage *)m;
2263
2264
2265         const CkArrayIndex &idx=msg->array_index();
2266         DEBS((AA"deliver %s\n"AB,idx2str(idx)));
2267         if (type==CkDeliver_queue)
2268                 _TRACE_CREATION_DETAILED(UsrToEnv(m),msg->array_ep());
2269         CkLocRec *rec=elementNrec(idx);
2270         if(rec != NULL){
2271                 DEBS((AA"deliver %s of type %d \n"AB,idx2str(idx),rec->type()));
2272         }else{
2273                 DEBS((AA"deliver %s rec is null\n"AB,idx2str(idx)));
2274         }
2275 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2276 #if CMK_LBDB_ON
2277         if (type==CkDeliver_queue) {
2278                 if (!(opts & CK_MSG_LB_NOTRACE) && the_lbdb->CollectingCommStats()) {
2279                 if(rec!=NULL) the_lbdb->Send(myLBHandle,idx2LDObjid(idx),UsrToEnv(msg)->getTotalsize(), rec->lookupProcessor(), 1);
2280                 else /*rec==NULL*/ the_lbdb->Send(myLBHandle,idx2LDObjid(idx),UsrToEnv(msg)->getTotalsize(),homePe(msg->array_index()), 1);
2281                 }
2282         }
2283 #endif
2284 #endif
2285 #if CMK_GRID_QUEUE_AVAILABLE
2286         int gridSrcPE;
2287         int gridSrcCluster;
2288         int gridDestPE;
2289         int gridDestCluster;
2290         CkMigratable *obj;
2291         ArrayElement *obj2;
2292         CkGroupID gid;
2293         int *data;
2294
2295         obj = (CkMigratable *) CpvAccess(CkGridObject);   // CkGridObject is a pointer to the sending object (retained earlier)
2296         if (obj != NULL) {
2297           obj2 = dynamic_cast<ArrayElement *> (obj);
2298           if (obj2 > 0) {
2299             // Get the sending object's array gid and indexes.
2300             // These are guaranteed to exist due to the succeeding dynamic cast above.
2301             gid = obj2->ckGetArrayID ();
2302             data = obj2->thisIndexMax.data ();
2303
2304             // Get the source PE and destination PE.
2305             gridSrcPE = CkMyPe ();
2306             if (rec != NULL) {
2307               gridDestPE = rec->lookupProcessor ();
2308             } else {
2309               gridDestPE = homePe (msg->array_index ());
2310             }
2311
2312             // Get the source cluster and destination cluster.
2313             gridSrcCluster = CmiGetCluster (gridSrcPE);
2314             gridDestCluster = CmiGetCluster (gridDestPE);
2315
2316             // If the Grid queue interval is greater than zero, it means that the more complicated
2317             // technique for registering border objects that exceed a specified threshold of
2318             // cross-cluster messages within a specified interval (and deregistering border objects
2319             // that do not meet this threshold) is used.  Otherwise a much simpler technique is used
2320             // where a border object is registered immediately upon sending a single cross-cluster
2321             // message (and deregistered when load balancing takes place).
2322             if (obj2->grid_queue_interval > 0) {
2323               // Increment the sending object's count of all messages.
2324               obj2->msg_count += 1;
2325
2326               // If the source cluster and destination cluster differ, this is a Grid message.
2327               // (Increment the count of all Grid messages.)
2328               if (gridSrcCluster != gridDestCluster) {
2329                 obj2->msg_count_grid += 1;
2330               }
2331
2332               // If the number of messages exceeds the interval, check to see if the object has
2333               // sent enough cross-cluster messages to qualify as a border object.
2334               if (obj2->msg_count >= obj2->grid_queue_interval) {
2335                 if (obj2->msg_count_grid >= obj2->grid_queue_threshold) {
2336                   // The object is a border object; if it is not already registered, register it.
2337                   if (!obj2->border_flag) {
2338                     CmiGridQueueRegister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2339                   }
2340                   obj2->border_flag = 1;
2341                 } else {
2342                   // The object is not a border object; if it is registered, deregister it.
2343                   if (obj2->border_flag) {
2344                     CmiGridQueueDeregister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2345                   }
2346                   obj2->border_flag = 0;
2347                 }
2348                 // Reset the counts.
2349                 obj2->msg_count = 0;
2350                 obj2->msg_count_grid = 0;
2351               }
2352             } else {
2353               if (gridSrcCluster != gridDestCluster) {
2354                 CmiGridQueueRegister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2355               }
2356             }
2357           }
2358
2359           // Reset the CkGridObject pointer.
2360           CpvAccess(CkGridObject) = NULL;
2361         }
2362 #endif
2363         /**FAULT_EVAC*/
2364         if (rec!=NULL){
2365                 CmiBool result = rec->deliver(msg,type,opts);
2366                 // if result is CmiFalse, than rec is not valid anymore, as the object
2367                 // the message was just delivered to has died or migrated out.
2368                 // Therefore rec->type() cannot be invoked!
2369                 if (result==CmiTrue && rec->type()==CkLocRec::local) return 0;
2370                 else return 1;
2371                 /*if(!result){
2372                         //DEBS((AA"deliver %s failed type %d \n"AB,idx2str(idx),rec->type()));
2373                         DEBS((AA"deliver %s failed \n"AB,idx2str(idx)));
2374                         if(rec->type() == CkLocRec::remote){
2375                                 if (opts & CK_MSG_KEEP)
2376                                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
2377                                 deliverUnknown(msg,type);
2378                         }
2379                 }*/
2380         }else /* rec==NULL*/ {
2381                 if (opts & CK_MSG_KEEP)
2382                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
2383                 deliverUnknown(msg,type,opts);
2384                 return 1;
2385         }
2386
2387 }
2388
2389 /// This index is not hashed-- somehow figure out what to do.
2390 CmiBool CkLocMgr::deliverUnknown(CkArrayMessage *msg,CkDeliver_t type,int opts)
2391 {
2392         CK_MAGICNUMBER_CHECK
2393         const CkArrayIndex &idx=msg->array_index();
2394         int onPe=homePe(idx);
2395         if (onPe!=CkMyPe()) 
2396         {// Forward the message to its home processor
2397                 DEBM((AA"Forwarding message for unknown %s to home %d \n"AB,idx2str(idx),onPe));
2398                 msg->array_hops()++;
2399                 CkArrayManagerDeliver(onPe,msg,opts);
2400                 return CmiTrue;
2401         }
2402         else
2403         { // We *are* the home processor:
2404         //Check if the element's array manager has been registered yet:
2405           CkArrMgr *mgr=managers.find(UsrToEnv((void *)msg)->getsetArrayMgr())->mgr;
2406           if (!mgr) { //No manager yet-- postpone the message (stupidly)
2407             if (CkInRestarting()) {
2408               // during restarting, this message should be ignored
2409               delete msg;
2410             }
2411             else {
2412               CkArrayManagerDeliver(CkMyPe(),msg); 
2413             }
2414           }
2415           else { // Has a manager-- must buffer the message
2416             DEBC((AA"Adding buffer for unknown element %s\n"AB,idx2str(idx)));
2417             CkLocRec *rec=new CkLocRec_buffering(this);
2418             insertRecN(rec,idx);
2419             rec->deliver(msg,type);
2420           
2421             if (msg->array_ifNotThere()!=CkArray_IfNotThere_buffer) 
2422             { //Demand-create the element:
2423               return demandCreateElement(msg,-1,type);
2424             }
2425           }
2426           return CmiTrue;
2427         }
2428 }
2429
2430 CmiBool CkLocMgr::demandCreateElement(CkArrayMessage *msg,int onPe,CkDeliver_t type)
2431 {
2432         CK_MAGICNUMBER_CHECK
2433         const CkArrayIndex &idx=msg->array_index();
2434         int chareType=_entryTable[msg->array_ep()]->chareIdx;
2435         int ctor=_chareTable[chareType]->getDefaultCtor();
2436         if (ctor==-1) CkAbort("Can't create array element to handle message--\n"
2437                               "The element has no default constructor in the .ci file!\n");
2438         if (onPe==-1) 
2439         { //Decide where element needs to live
2440                 if (msg->array_ifNotThere()==CkArray_IfNotThere_createhere) 
2441                         onPe=UsrToEnv(msg)->getsetArraySrcPe();
2442                 else //Createhome
2443                         onPe=homePe(idx);
2444         }
2445         
2446         //Find the manager and build the element
2447         DEBC((AA"Demand-creating element %s on pe %d\n"AB,idx2str(idx),onPe));
2448         CkArrMgr *mgr=managers.find(UsrToEnv((void *)msg)->getsetArrayMgr())->mgr;
2449         if (!mgr) CkAbort("Tried to demand-create for nonexistent arrMgr");
2450         return mgr->demandCreateElement(idx,onPe,ctor,type);
2451 }
2452
2453 //This message took several hops to reach us-- fix it
2454 void CkLocMgr::multiHop(CkArrayMessage *msg)
2455 {
2456         CK_MAGICNUMBER_CHECK
2457         int srcPe=msg->array_getSrcPe();
2458         if (srcPe==CkMyPe())
2459                 DEB((AA"Odd routing: local element %s is %d hops away!\n"AB,idx2str(msg),msg->array_hops()));
2460         else
2461         {//Send a routing message letting original sender know new element location
2462                 DEBS((AA"Sending update back to %d for element\n"AB,srcPe,idx2str(msg)));
2463                 thisProxy[srcPe].updateLocation(msg->array_index(),CkMyPe());
2464         }
2465 }
2466
2467 /************************** LocMgr: ITERATOR *************************/
2468 CkLocation::CkLocation(CkLocMgr *mgr_, CkLocRec_local *rec_)
2469         :mgr(mgr_), rec(rec_) {}
2470         
2471 const CkArrayIndex &CkLocation::getIndex(void) const {
2472         return rec->getIndex();
2473 }
2474
2475 void CkLocation::destroyAll() {
2476         mgr->callMethod(rec, &CkMigratable::ckDestroy);
2477 }
2478
2479 void CkLocation::pup(PUP::er &p) {
2480         mgr->pupElementsFor(p,rec,CkElementCreation_migrate);
2481 }
2482
2483 CkLocIterator::~CkLocIterator() {}
2484
2485 /// Iterate over our local elements:
2486 void CkLocMgr::iterate(CkLocIterator &dest) {
2487   //Poke through the hash table for local ArrayRecs.
2488   void *objp;
2489   CkHashtableIterator *it=hash.iterator();
2490   CmiImmediateLock(hashImmLock);
2491
2492   while (NULL!=(objp=it->next())) {
2493     CkLocRec *rec=*(CkLocRec **)objp;
2494     if (rec->type()==CkLocRec::local) {
2495       CkLocation loc(this,(CkLocRec_local *)rec);
2496       dest.addLocation(loc);
2497     }
2498   }
2499   CmiImmediateUnlock(hashImmLock);
2500   delete it;
2501 }
2502
2503
2504
2505
2506 /************************** LocMgr: MIGRATION *************************/
2507 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2508 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
2509         CkElementCreation_t type, CmiBool create, int dummy)
2510 {
2511     p.comment("-------- Array Location --------");
2512     register ManagerRec *m;
2513     int localIdx=rec->getLocalIndex();
2514     CkVec<CkMigratable *> dummyElts;
2515
2516     for (m=firstManager;m!=NULL;m=m->next) {
2517         int elCType;
2518         if (!p.isUnpacking())
2519         { //Need to find the element's existing type
2520             CkMigratable *elt=m->element(localIdx);
2521             if (elt) elCType=elt->ckGetChareType();
2522             else elCType=-1; //Element hasn't been created
2523         }
2524         p(elCType);
2525         if (p.isUnpacking() && elCType!=-1) {
2526             CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
2527             int migCtorIdx=_chareTable[elCType]->getMigCtor();
2528                 if(!dummy){
2529                         if(create)
2530                                 if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
2531                                 }else{
2532                     CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
2533                     i.locRec=rec;
2534                     i.chareType=_entryTable[migCtorIdx]->chareIdx;
2535                     dummyElts.push_back(elt);
2536                     if (!rec->invokeEntry(elt,NULL,migCtorIdx,CmiTrue)) return ;
2537                 }
2538         }
2539     }
2540     if(!dummy){
2541         for (m=firstManager;m!=NULL;m=m->next) {
2542             CkMigratable *elt=m->element(localIdx);
2543             if (elt!=NULL)
2544                 {
2545                        elt->pup(p);
2546                 }
2547         }
2548     }else{
2549             for(int i=0;i<dummyElts.size();i++){
2550                 CkMigratable *elt = dummyElts[i];
2551                 if (elt!=NULL){
2552             elt->pup(p);
2553                         }
2554                 delete elt;
2555             }
2556                         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2557                 m->elts.empty(localIdx);
2558             }
2559         freeList[localIdx]=firstFree;
2560         firstFree=localIdx;
2561     }
2562 }
2563 #else
2564 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
2565                 CkElementCreation_t type)
2566 {
2567         p.comment("-------- Array Location --------");
2568         register ManagerRec *m;
2569         int localIdx=rec->getLocalIndex();
2570
2571         //First pup the element types
2572         // (A separate loop so ckLocal works even in element pup routines)
2573         for (m=firstManager;m!=NULL;m=m->next) {
2574                 int elCType;
2575                 if (!p.isUnpacking())
2576                 { //Need to find the element's existing type
2577                         CkMigratable *elt=m->element(localIdx);
2578                         if (elt) elCType=elt->ckGetChareType();
2579                         else elCType=-1; //Element hasn't been created
2580                 }
2581                 p(elCType);
2582                 if (p.isUnpacking() && elCType!=-1) {
2583                         //Create the element
2584                         CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
2585                         int migCtorIdx=_chareTable[elCType]->getMigCtor();
2586                         //Insert into our tables and call migration constructor
2587                         if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
2588                 }
2589         }
2590         //Next pup the element data
2591         for (m=firstManager;m!=NULL;m=m->next) {
2592                 CkMigratable *elt=m->element(localIdx);
2593                 if (elt!=NULL)
2594                 {
2595                         elt->pup(p);
2596 #if CMK_ERROR_CHECKING
2597                         if (p.isUnpacking()) elt->sanitycheck();
2598 #endif
2599                 }
2600         }
2601 }
2602 #endif
2603
2604 /// Call this member function on each element of this location:
2605 void CkLocMgr::callMethod(CkLocRec_local *rec,CkMigratable_voidfn_t fn)
2606 {
2607         int localIdx=rec->getLocalIndex();
2608         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2609                 CkMigratable *el=m->element(localIdx);
2610                 if (el) (el->* fn)();
2611         }
2612 }
2613
2614 /// Call this member function on each element of this location:
2615 void CkLocMgr::callMethod(CkLocRec_local *rec,CkMigratable_voidfn_arg_t fn,     void * data)
2616 {
2617         int localIdx=rec->getLocalIndex();
2618         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2619                 CkMigratable *el=m->element(localIdx);
2620                 if (el) (el->* fn)(data);
2621         }
2622 }
2623
2624 /// return a list of migratables in this local record
2625 void CkLocMgr::migratableList(CkLocRec_local *rec, CkVec<CkMigratable *> &list)
2626 {
2627         register ManagerRec *m;
2628         int localIdx=rec->getLocalIndex();
2629
2630         for (m=firstManager;m!=NULL;m=m->next) {
2631                 CkMigratable *elt=m->element(localIdx);
2632                 if (elt) list.push_back(elt);
2633         }
2634 }
2635
2636 /// Migrate this local element away to another processor.
2637 void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
2638 {
2639         CK_MAGICNUMBER_CHECK
2640         if (toPe==CkMyPe()) return; //You're already there!
2641         /*
2642                 FAULT_EVAC
2643                 if the toProcessor is already marked as invalid, dont emigrate
2644                 Shouldn't happen but might
2645         */
2646         if(!CmiNodeAlive(toPe)){
2647                 return;
2648         }
2649         CkArrayIndex idx=rec->getIndex();
2650
2651 #if CMK_OUT_OF_CORE
2652         int localIdx=rec->getLocalIndex();
2653         /* Load in any elements that are out-of-core */
2654         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2655                 CkMigratable *el=m->element(localIdx);
2656                 if (el) if (!el->isInCore) CooBringIn(el->prefetchObjID);
2657         }
2658 #endif
2659
2660         //Let all the elements know we're leaving
2661         callMethod(rec,&CkMigratable::ckAboutToMigrate);
2662         /*EVAC*/
2663
2664 //First pass: find size of migration message
2665         int bufSize;
2666         {
2667                 PUP::sizer p;
2668                 p(nManagers);
2669                 pupElementsFor(p,rec,CkElementCreation_migrate);
2670                 bufSize=p.size(); 
2671         }
2672
2673 //Allocate and pack into message
2674         int doubleSize=bufSize/sizeof(double)+1;
2675         CkArrayElementMigrateMessage *msg = 
2676                 new (doubleSize, 0) CkArrayElementMigrateMessage;
2677         msg->idx=idx;
2678         msg->length=bufSize;
2679 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
2680     msg->gid = ckGetGroupID();
2681 #endif
2682 #if CMK_LBDB_ON
2683         msg->ignoreArrival = rec->isAsyncMigrate()?1:0;
2684 #endif
2685         /*
2686                 FAULT_EVAC
2687         */
2688         msg->bounced = rec->isBounced();
2689         {
2690                 PUP::toMem p(msg->packData); 
2691                 p.becomeDeleting(); 
2692                 p(nManagers);
2693                 pupElementsFor(p,rec,CkElementCreation_migrate);
2694                 if (p.size()!=bufSize) {
2695                         CkError("ERROR! Array element claimed it was %d bytes to a "
2696                                 "sizing PUP::er, but copied %d bytes into the packing PUP::er!\n",
2697                                 bufSize,p.size());
2698                         CkAbort("Array element's pup routine has a direction mismatch.\n");
2699                 }
2700         }
2701
2702         DEBM((AA"Migrated index size %s to %d \n"AB,idx2str(idx),toPe));        
2703
2704 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2705     sendMlogLocation(toPe,UsrToEnv(msg));
2706 #else
2707         //Send off message and delete old copy
2708         thisProxy[toPe].immigrate(msg);
2709 #endif
2710
2711         duringMigration=CmiTrue;
2712         delete rec; //Removes elements, hashtable entries, local index
2713         
2714         
2715         duringMigration=CmiFalse;
2716         //The element now lives on another processor-- tell ourselves and its home
2717         inform(idx,toPe);
2718 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
2719         informHome(idx,toPe);
2720 #endif
2721         CK_MAGICNUMBER_CHECK
2722 }
2723
2724 void CkLocMgr::informLBPeriod(CkLocRec_local *rec, int lb_ideal_period) {
2725         callMethod(rec,&CkMigratable::recvLBPeriod, (void *)&lb_ideal_period);
2726 }
2727
2728 /**
2729   Migrating array element is arriving on this processor.
2730 */
2731 void CkLocMgr::immigrate(CkArrayElementMigrateMessage *msg)
2732 {
2733         const CkArrayIndex &idx=msg->idx;
2734                 
2735         PUP::fromMem p(msg->packData); 
2736         
2737         int nMsgMan;
2738         p(nMsgMan);
2739         if (nMsgMan<nManagers)
2740                 CkAbort("Array element arrived from location with fewer managers!\n");
2741         if (nMsgMan>nManagers) {
2742                 //Some array managers haven't registered yet-- throw it back
2743                 DEBM((AA"Busy-waiting for array registration on migrating %s\n"AB,idx2str(idx)));
2744                 thisProxy[CkMyPe()].immigrate(msg);
2745                 return;
2746         }
2747
2748         //Create a record for this element
2749 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
2750         CkLocRec_local *rec=createLocal(idx,CmiTrue,msg->ignoreArrival,CmiFalse /* home told on departure */ );
2751 #else
2752     CkLocRec_local *rec=createLocal(idx,CmiTrue,CmiTrue,CmiFalse /* home told on departure */ );
2753 #endif
2754         
2755         //Create the new elements as we unpack the message
2756         pupElementsFor(p,rec,CkElementCreation_migrate);
2757         if (p.size()!=msg->length) {
2758                 CkError("ERROR! Array element claimed it was %d bytes to a"
2759                         "packing PUP::er, but %d bytes in the unpacking PUP::er!\n",
2760                         msg->length,p.size());
2761                 CkError("(I have %d managers; he claims %d managers)\n",
2762                         nManagers,nMsgMan);
2763                 
2764                 CkAbort("Array element's pup routine has a direction mismatch.\n");
2765         }
2766         /*
2767                 FAULT_EVAC
2768                         if this element came in as a result of being bounced off some other process,
2769                         then it needs to be resumed. It is assumed that it was bounced because load 
2770                         balancing caused it to move into a processor which later crashed
2771         */
2772         if(msg->bounced){
2773                 callMethod(rec,&CkMigratable::ResumeFromSync);
2774         }
2775         
2776         //Let all the elements know we've arrived
2777         callMethod(rec,&CkMigratable::ckJustMigrated);
2778         /*FAULT_EVAC
2779                 If this processor has started evacuating array elements on it 
2780                 dont let new immigrants in. If they arrive send them to what
2781                 would be their correct homePE.
2782                 Leave a record here mentioning the processor where it got sent
2783         */
2784         
2785         if(CkpvAccess(startedEvac)){
2786                 int newhomePE = getNextPE(idx);
2787                 DEBM((AA"Migrated into failed processor index size %s resent to %d \n"AB,idx2str(idx),newhomePE));      
2788                 CkLocMgr *mgr = rec->getLocMgr();
2789                 int targetPE=getNextPE(idx);
2790                 //set this flag so that load balancer is not informed when
2791                 //this element migrates
2792                 rec->AsyncMigrate(CmiTrue);
2793                 rec->Bounced(CmiTrue);
2794                 mgr->emigrate(rec,targetPE);
2795                 
2796         }
2797
2798         delete msg;
2799 }
2800
2801 void CkLocMgr::restore(const CkArrayIndex &idx, PUP::er &p)
2802 {
2803         //This is in broughtIntoMem during out-of-core emulation in BigSim,
2804         //informHome should not be called since such information is already
2805         //immediately updated real migration
2806 #if CMK_ERROR_CHECKING
2807         if(_BgOutOfCoreFlag!=2)
2808             CmiAbort("CkLocMgr::restore should only be used in out-of-core emulation for BigSim and be called when object is brought into memory!\n");
2809 #endif
2810         CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,CmiFalse);
2811         
2812         //BIGSIM_OOC DEBUGGING
2813         //CkPrintf("Proc[%d]: Registering element %s with LDB\n", CkMyPe(), idx2str(idx));
2814
2815         //Create the new elements as we unpack the message
2816         pupElementsFor(p,rec,CkElementCreation_restore);
2817
2818         callMethod(rec,&CkMigratable::ckJustRestored);
2819 }
2820
2821
2822 /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
2823 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2824 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool create, int dummy)
2825 {
2826         CkLocRec_local *rec;
2827         CkLocRec *recGlobal;    
2828
2829         if(create){
2830                 rec = createLocal(idx,CmiFalse,CmiFalse,CmiTrue && !dummy /* home doesn't know yet */,dummy );
2831         }else{
2832                 recGlobal = elementNrec(idx);
2833                 if(recGlobal == NULL) 
2834                         CmiAbort("Local object not found");
2835                 if(recGlobal->type() != CkLocRec::local)
2836                         CmiAbort("Local object not local, :P");
2837                 rec = (CkLocRec_local *)recGlobal;
2838         }
2839         
2840     pupElementsFor(p,rec,CkElementCreation_resume,create,dummy);
2841
2842     if(!dummy){
2843         callMethod(rec,&CkMigratable::ckJustMigrated);
2844     }
2845 }
2846 #else
2847 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool notify)
2848 {
2849         CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,notify /* home doesn't know yet */ );
2850
2851         //Create the new elements as we unpack the message
2852         pupElementsFor(p,rec,CkElementCreation_resume);
2853
2854         callMethod(rec,&CkMigratable::ckJustMigrated);
2855 }
2856 #endif
2857
2858 /********************* LocMgr: UTILITY ****************/
2859 void CkMagicNumber_impl::badMagicNumber(
2860         int expected,const char *file,int line,void *obj) const
2861 {
2862         CkError("FAILURE on pe %d, %s:%d> Expected %p's magic number "
2863                 "to be 0x%08x; but found 0x%08x!\n", CkMyPe(),file,line,obj,
2864                 expected, magic);
2865         CkAbort("Bad magic number detected!  This implies either\n"
2866                 "the heap or a message was corrupted!\n");
2867 }
2868 CkMagicNumber_impl::CkMagicNumber_impl(int m) :magic(m) { }
2869
2870 //Look up the object with this array index, or return NULL
2871 CkMigratable *CkLocMgr::lookup(const CkArrayIndex &idx,CkArrayID aid) {
2872         CkLocRec *rec=elementNrec(idx);
2873         if (rec==NULL) return NULL;
2874         else return rec->lookupElement(aid);
2875 }
2876 //"Last-known" location (returns a processor number)
2877 int CkLocMgr::lastKnown(const CkArrayIndex &idx) {
2878         CkLocMgr *vthis=(CkLocMgr *)this;//Cast away "const"
2879         CkLocRec *rec=vthis->elementNrec(idx);
2880         int pe=-1;
2881         if (rec!=NULL) pe=rec->lookupProcessor();
2882         if (pe==-1) return homePe(idx);
2883         else{
2884                 /*
2885                         FAULT_EVAC
2886                         if the lastKnownPE is invalid return homePE and delete this record
2887                 */
2888                 if(!CmiNodeAlive(pe)){
2889                         removeFromTable(idx);
2890                         return homePe(idx);
2891                 }
2892                 return pe;
2893         }       
2894 }
2895 /// Return true if this array element lives on another processor
2896 bool CkLocMgr::isRemote(const CkArrayIndex &idx,int *onPe) const
2897 {
2898         CkLocMgr *vthis=(CkLocMgr *)this;//Cast away "const"
2899         CkLocRec *rec=vthis->elementNrec(idx);
2900         if (rec==NULL || rec->type()!=CkLocRec::remote) 
2901                 return false; /* not definitely a remote element */
2902         else /* element is indeed remote */
2903         {
2904                 *onPe=rec->lookupProcessor();
2905                 return true;
2906         }
2907 }
2908
2909 static const char *rec2str[]={
2910     "base (INVALID)",//Base class (invalid type)
2911     "local",//Array element that lives on this Pe
2912     "remote",//Array element that lives on some other Pe
2913     "buffering",//Array element that was just created
2914     "dead"//Deleted element (for debugging)
2915 };
2916
2917 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2918 void CkLocMgr::setDuringMigration(CmiBool _duringMigration){
2919     duringMigration = _duringMigration;
2920 }
2921 #endif
2922
2923
2924 //Add given element array record at idx, replacing the existing record
2925 void CkLocMgr::insertRec(CkLocRec *rec,const CkArrayIndex &idx) {
2926         CkLocRec *old=elementNrec(idx);
2927         insertRecN(rec,idx);
2928         if (old!=NULL) {
2929                 DEBC((AA"  replaces old rec(%s) for %s\n"AB,rec2str[old->type()],idx2str(idx)));
2930                 //There was an old element at this location
2931                 if (old->type()==CkLocRec::local && rec->type()==CkLocRec::local) {
2932                     if (!CkInRestarting()) {    // ok if it is restarting
2933                         CkPrintf("ERROR! Duplicate array index: %s\n",idx2str(idx));
2934                         CkAbort("Duplicate array index used");
2935                     }
2936                 }
2937                 old->beenReplaced();
2938                 delete old;
2939         }
2940 }
2941
2942 //Add given record, when there is guarenteed to be no prior record
2943 void CkLocMgr::insertRecN(CkLocRec *rec,const CkArrayIndex &idx) {
2944         DEBC((AA"  adding new rec(%s) for %s\n"AB,rec2str[rec->type()],idx2str(idx)));
2945         CmiImmediateLock(hashImmLock);
2946         hash.put(*(CkArrayIndex *)&idx)=rec;
2947         CmiImmediateUnlock(hashImmLock);
2948 }
2949
2950 //Call this on an unrecognized array index
2951 static void abort_out_of_bounds(const CkArrayIndex &idx)
2952 {
2953   CkPrintf("ERROR! Unknown array index: %s\n",idx2str(idx));
2954   CkAbort("Array index out of bounds\n");
2955 }
2956
2957 //Look up array element in hash table.  Index out-of-bounds if not found.
2958 CkLocRec *CkLocMgr::elementRec(const CkArrayIndex &idx) {
2959 #if ! CMK_ERROR_CHECKING
2960 //Assume the element will be found
2961         return hash.getRef(*(CkArrayIndex *)&idx);
2962 #else
2963 //Include an out-of-bounds check if the element isn't found
2964         CkLocRec *rec=elementNrec(idx);
2965         if (rec==NULL) abort_out_of_bounds(idx);
2966         return rec;
2967 #endif
2968 }
2969
2970 //Look up array element in hash table.  Return NULL if not there.
2971 CkLocRec *CkLocMgr::elementNrec(const CkArrayIndex &idx) {
2972         return hash.get(*(CkArrayIndex *)&idx);
2973 }
2974
2975 struct LocalElementCounter :  public CkLocIterator
2976 {
2977     unsigned int count;
2978     LocalElementCounter() : count(0) {}
2979     void addLocation(CkLocation &loc)
2980         { ++count; }
2981 };
2982
2983 unsigned int CkLocMgr::numLocalElements()
2984 {
2985     LocalElementCounter c;
2986     iterate(c);
2987     return c.count;
2988 }
2989
2990
2991 /********************* LocMgr: LOAD BALANCE ****************/
2992
2993 #if !CMK_LBDB_ON
2994 //Empty versions of all load balancer calls
2995 void CkLocMgr::initLB(CkGroupID lbdbID_) {}
2996 void CkLocMgr::startInserting(void) {}
2997 void CkLocMgr::doneInserting(void) {}
2998 void CkLocMgr::dummyAtSync(void) {}
2999 #endif
3000
3001
3002 #if CMK_LBDB_ON
3003 void CkLocMgr::initLB(CkGroupID lbdbID_)
3004 { //Find and register with the load balancer
3005         the_lbdb = (LBDatabase *)CkLocalBranch(lbdbID_);
3006         if (the_lbdb == 0)
3007                 CkAbort("LBDatabase not yet created?\n");
3008         DEBL((AA"Connected to load balancer %p\n"AB,the_lbdb));
3009
3010         // Register myself as an object manager
3011         LDOMid myId;
3012         myId.id = thisgroup;
3013         LDCallbacks myCallbacks;
3014         myCallbacks.migrate = (LDMigrateFn)CkLocRec_local::staticMigrate;
3015         myCallbacks.setStats = NULL;
3016         myCallbacks.queryEstLoad = NULL;
3017   myCallbacks.adaptResumeSync =
3018       (LDAdaptResumeSyncFn)CkLocRec_local::staticAdaptResumeSync;
3019         myLBHandle = the_lbdb->RegisterOM(myId,this,myCallbacks);
3020
3021         // Tell the lbdb that I'm registering objects
3022         the_lbdb->RegisteringObjects(myLBHandle);
3023
3024         /*Set up the dummy barrier-- the load balancer needs
3025           us to call Registering/DoneRegistering during each AtSync,
3026           and this is the only way to do so.
3027         */
3028         the_lbdb->AddLocalBarrierReceiver(
3029                 (LDBarrierFn)staticRecvAtSync,(void*)(this));
3030         dummyBarrierHandle = the_lbdb->AddLocalBarrierClient(
3031                 (LDResumeFn)staticDummyResumeFromSync,(void*)(this));
3032         dummyAtSync();
3033 }
3034 void CkLocMgr::dummyAtSync(void)
3035 {
3036         DEBL((AA"dummyAtSync called\n"AB));
3037         the_lbdb->AtLocalBarrier(dummyBarrierHandle);
3038 }
3039
3040 void CkLocMgr::staticDummyResumeFromSync(void* data)
3041 {      ((CkLocMgr*)data)->dummyResumeFromSync(); }
3042 void CkLocMgr::dummyResumeFromSync()
3043 {
3044         DEBL((AA"DummyResumeFromSync called\n"AB));
3045         the_lbdb->DoneRegisteringObjects(myLBHandle);
3046         dummyAtSync();
3047 }
3048 void CkLocMgr::staticRecvAtSync(void* data)
3049 {      ((CkLocMgr*)data)->recvAtSync(); }
3050 void CkLocMgr::recvAtSync()
3051 {
3052         DEBL((AA"recvAtSync called\n"AB));
3053         the_lbdb->RegisteringObjects(myLBHandle);
3054 }
3055
3056 void CkLocMgr::startInserting(void)
3057 {
3058         the_lbdb->RegisteringObjects(myLBHandle);
3059 }
3060 void CkLocMgr::doneInserting(void)
3061 {
3062         the_lbdb->DoneRegisteringObjects(myLBHandle);
3063 }
3064 #endif
3065
3066 #include "CkLocation.def.h"
3067
3068