scheme for checkpoint group data
[charm.git] / src / ck-core / cklocation.C
1 /** \file cklocation.C
2  *  \addtogroup CkArrayImpl
3  *
4  *  The location manager keeps track of an indexed set of migratable objects.
5  *  It is used by the array manager to locate array elements, interact with the
6  *  load balancer, and perform migrations.
7  *
8  *  Orion Sky Lawlor, olawlor@acm.org 9/29/2001
9  */
10
11 #include "charm++.h"
12 #include "register.h"
13 #include "ck.h"
14 #include "trace.h"
15 #include "TopoManager.h"
16 #include <vector>
17 #include<sstream>
18
19 #if CMK_LBDB_ON
20 #include "LBDatabase.h"
21 #if CMK_GLOBAL_LOCATION_UPDATE
22 #include "BaseLB.h"
23 #include "init.h"
24 #endif
25 #endif // CMK_LBDB_ON
26
27 #if CMK_GRID_QUEUE_AVAILABLE
28 CpvExtern(void *, CkGridObject);
29 #endif
30
31 static const char *idx2str(const CkArrayMessage *m) {
32   return idx2str(((CkArrayMessage *)m)->array_index());
33 }
34
35 #define ARRAY_DEBUG_OUTPUT 0
36
37 #if ARRAY_DEBUG_OUTPUT 
38 #   define DEB(x) CkPrintf x  //General debug messages
39 #   define DEBI(x) CkPrintf x  //Index debug messages
40 #   define DEBC(x) CkPrintf x  //Construction debug messages
41 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
42 #   define DEBM(x) CkPrintf x  //Migration debug messages
43 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
44 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
45 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
46 #   define AA "LocMgr on %d: "
47 #   define AB ,CkMyPe()
48 #   define DEBUG(x) CkPrintf x
49 #else
50 #   define DEB(X) /*CkPrintf x*/
51 #   define DEBI(X) /*CkPrintf x*/
52 #   define DEBC(X) /*CkPrintf x*/
53 #   define DEBS(x) /*CkPrintf x*/
54 #   define DEBM(X) /*CkPrintf x*/
55 #   define DEBL(X) /*CkPrintf x*/
56 #   define DEBK(x) /*CkPrintf x*/
57 #   define DEBB(x) /*CkPrintf x*/
58 #   define str(x) /**/
59 #   define DEBUG(x)   /**/
60 #endif
61
62 //whether to use block mapping in the SMP node level
63 bool useNodeBlkMapping;
64
65 #if CMK_LBDB_ON
66 /*LBDB object handles are fixed-sized, and not necc.
67 the same size as ArrayIndices.
68 */
69 LDObjid idx2LDObjid(const CkArrayIndex &idx)
70 {
71   LDObjid r;
72   int i;
73   const int *data=idx.data();
74   if (OBJ_ID_SZ>=idx.nInts) {
75     for (i=0;i<idx.nInts;i++)
76       r.id[i]=data[i];
77     for (i=idx.nInts;i<OBJ_ID_SZ;i++)
78       r.id[i]=0;
79   } else {
80     //Must hash array index into LBObjid
81     int j;
82     for (j=0;j<OBJ_ID_SZ;j++)
83         r.id[j]=data[j];
84     for (i=0;i<idx.nInts;i++)
85       for (j=0;j<OBJ_ID_SZ;j++)
86         r.id[j]+=circleShift(data[i],22+11*i*(j+1))+
87           circleShift(data[i],21-9*i*(j+1));
88   }
89
90 #if CMK_GLOBAL_LOCATION_UPDATE
91   r.dimension = idx.dimension;
92   r.nInts = idx.nInts; 
93   r.isArrayElement = 1; 
94 #endif
95
96   return r;
97 }
98
99 #if CMK_GLOBAL_LOCATION_UPDATE
100 void UpdateLocation(MigrateInfo& migData) {
101
102   if (migData.obj.id.isArrayElement == 0) {
103     return;
104   }
105
106   CkArrayIndex idx; 
107   idx.dimension = migData.obj.id.dimension; 
108   idx.nInts = migData.obj.id.nInts; 
109
110   for (int i = 0; i < idx.nInts; i++) {
111     idx.data()[i] = migData.obj.id.id[i];    
112   }
113
114   CkGroupID locMgrGid;
115   locMgrGid.idx = migData.obj.id.locMgrGid;
116   CkLocMgr *localLocMgr = (CkLocMgr *) CkLocalBranch(locMgrGid);
117   localLocMgr->updateLocation(idx, migData.to_pe); 
118 }
119 #endif
120
121 #endif
122
123 /*********************** Array Messages ************************/
124 CkArrayIndex &CkArrayMessage::array_index(void)
125 {
126     return UsrToEnv((void *)this)->getsetArrayIndex();
127 }
128 unsigned short &CkArrayMessage::array_ep(void)
129 {
130         return UsrToEnv((void *)this)->getsetArrayEp();
131 }
132 unsigned short &CkArrayMessage::array_ep_bcast(void)
133 {
134         return UsrToEnv((void *)this)->getsetArrayBcastEp();
135 }
136 unsigned char &CkArrayMessage::array_hops(void)
137 {
138         return UsrToEnv((void *)this)->getsetArrayHops();
139 }
140 unsigned int CkArrayMessage::array_getSrcPe(void)
141 {
142         return UsrToEnv((void *)this)->getsetArraySrcPe();
143 }
144 unsigned int CkArrayMessage::array_ifNotThere(void)
145 {
146         return UsrToEnv((void *)this)->getArrayIfNotThere();
147 }
148 void CkArrayMessage::array_setIfNotThere(unsigned int i)
149 {
150         UsrToEnv((void *)this)->setArrayIfNotThere(i);
151 }
152
153 /*********************** Array Map ******************
154 Given an array element index, an array map tells us 
155 the index's "home" Pe.  This is the Pe the element will
156 be created on, and also where messages to this element will
157 be forwarded by default.
158 */
159
160 CkArrayMap::CkArrayMap(void) { }
161 CkArrayMap::~CkArrayMap() { }
162 int CkArrayMap::registerArray(CkArrayIndex& numElements,CkArrayID aid)
163 {return 0;}
164
165 #define CKARRAYMAP_POPULATE_INITIAL(POPULATE_CONDITION) \
166         int i; \
167         for (int i1=0; i1<numElements.data()[0]; i1++) { \
168           if (numElements.dimension == 1) { \
169             /* Make 1D indices */ \
170             i = i1; \
171             CkArrayIndex1D idx(i1); \
172             if (POPULATE_CONDITION) \
173               mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
174           } else { \
175             /* higher dimensionality */ \
176             for (int i2=0; i2<numElements.data()[1]; i2++) { \
177               if (numElements.dimension == 2) { \
178                 /* Make 2D indices */ \
179                 i = i1 * numElements.data()[1] + i2; \
180                 CkArrayIndex2D idx(i1, i2); \
181                 if (POPULATE_CONDITION) \
182                   mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
183               } else { \
184                 /* higher dimensionality */ \
185                 CkAssert(numElements.dimension == 3); \
186                 for (int i3=0; i3<numElements.data()[2]; i3++) { \
187                   /* Make 3D indices */ \
188                   i = (i1 * numElements.data()[1] + i2) * numElements.data()[2] + i3; \
189                   CkArrayIndex3D idx(i1, i2, i3 ); \
190                   if (POPULATE_CONDITION) \
191                     mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
192                 } \
193               } \
194             } \
195           } \
196         }
197
198 void CkArrayMap::populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr)
199 {
200         if (numElements.nInts==0) {
201           CkFreeMsg(ctorMsg);
202           return;
203         }
204         int thisPe=CkMyPe();
205         /* The CkArrayIndex is supposed to have at most 3 dimensions, which
206            means that all the fields are ints, and numElements.nInts represents
207            how many of them are used */
208         CKARRAYMAP_POPULATE_INITIAL(procNum(arrayHdl,idx)==thisPe);
209
210 #if CMK_BIGSIM_CHARM
211         BgEntrySplit("split-array-new-end");
212 #endif
213
214         mgr->doneInserting();
215         CkFreeMsg(ctorMsg);
216 }
217
218 CkGroupID _defaultArrayMapID;
219 CkGroupID _fastArrayMapID;
220
221 class RRMap : public CkArrayMap
222 {
223 public:
224   RRMap(void)
225   {
226           DEBC((AA"Creating RRMap\n"AB));
227   }
228   RRMap(CkMigrateMessage *m):CkArrayMap(m){}
229   int procNum(int /*arrayHdl*/, const CkArrayIndex &i)
230   {
231 #if 1
232     if (i.nInts==1) {
233       //Map 1D integer indices in simple round-robin fashion
234       int ans= (i.data()[0])%CkNumPes();
235       while(!CmiNodeAlive(ans) || (ans == CkMyPe() && CkpvAccess(startedEvac))){
236         ans = (ans +1 )%CkNumPes();
237       }
238       return ans;
239     }
240     else 
241 #endif
242       {
243         //Map other indices based on their hash code, mod a big prime.
244         unsigned int hash=(i.hash()+739)%1280107;
245         int ans = (hash % CkNumPes());
246         while(!CmiNodeAlive(ans)){
247                 ans = (ans +1 )%CkNumPes();
248         }
249         return ans;
250
251       }
252   }
253 };
254
255 /** 
256  * Class used to store the dimensions of the array and precalculate numChares,
257  * binSize and other values for the DefaultArrayMap -- ASB
258  */
259 class arrayMapInfo {
260 public:
261   CkArrayIndex _nelems;
262   int _binSizeFloor;            /* floor of numChares/numPes */
263   int _binSizeCeil;             /* ceiling of numChares/numPes */
264   int _numChares;               /* initial total number of chares */
265   int _remChares;               /* numChares % numPes -- equals the number of
266                                    processors in the first set */
267   int _numFirstSet;             /* _remChares X (_binSize + 1) -- number of
268                                    chares in the first set */
269
270   int _nBinSizeFloor;           /* floor of numChares/numNodes */
271   int _nRemChares;              /* numChares % numNodes -- equals the number of
272                                    nodes in the first set */
273   int _nNumFirstSet;            /* _remChares X (_binSize + 1) -- number of
274                                    chares in the first set of nodes */
275
276   /** All processors are divided into two sets. Processors in the first set
277    *  have one chare more than the processors in the second set. */
278
279   arrayMapInfo(void) { }
280
281   arrayMapInfo(CkArrayIndex& n) : _nelems(n), _numChares(0) {
282     compute_binsize();
283   }
284
285   ~arrayMapInfo() {}
286   
287   void compute_binsize()
288   {
289     int numPes = CkNumPes();
290     //Now assuming homogenous nodes where each node has the same number of PEs
291     int numNodes = CkNumNodes();
292
293     if (_nelems.nInts == 1) {
294       _numChares = _nelems.data()[0];
295     } else if (_nelems.nInts == 2) {
296       _numChares = _nelems.data()[0] * _nelems.data()[1];
297     } else if (_nelems.nInts == 3) {
298       _numChares = _nelems.data()[0] * _nelems.data()[1] * _nelems.data()[2];
299     }
300
301     _remChares = _numChares % numPes;
302     _binSizeFloor = (int)floor((double)_numChares/(double)numPes);
303     _binSizeCeil = (int)ceil((double)_numChares/(double)numPes);
304     _numFirstSet = _remChares * (_binSizeFloor + 1);
305
306     _nRemChares = _numChares % numNodes;
307     _nBinSizeFloor = _numChares/numNodes;
308     _nNumFirstSet = _nRemChares * (_nBinSizeFloor +1);
309   }
310
311   void pup(PUP::er& p){
312     p|_nelems;
313     p|_binSizeFloor;
314     p|_binSizeCeil;
315     p|_numChares;
316     p|_remChares;
317     p|_numFirstSet;
318     p|_nRemChares;
319     p|_nBinSizeFloor;
320     p|_nNumFirstSet;
321   }
322 }c;
323
324
325 /**
326  * The default map object -- This does blocked mapping in the general case and
327  * calls the round-robin procNum for the dynamic insertion case -- ASB
328  */
329 class DefaultArrayMap : public RRMap
330 {
331 public:
332   /** This array stores information about different chare arrays in a Charm
333    *  program (dimensions, binsize, numChares etc ... ) */
334   CkPupPtrVec<arrayMapInfo> amaps;
335
336 public:
337   DefaultArrayMap(void) {
338     DEBC((AA"Creating DefaultArrayMap\n"AB));
339   }
340
341   DefaultArrayMap(CkMigrateMessage *m) : RRMap(m){}
342
343   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
344   {
345     int idx = amaps.size();
346     amaps.resize(idx+1);
347     amaps[idx] = new arrayMapInfo(numElements);
348     return idx;
349   }
350  
351   int procNum(int arrayHdl, const CkArrayIndex &i) {
352     int flati;
353     if (amaps[arrayHdl]->_nelems.nInts == 0) {
354       return RRMap::procNum(arrayHdl, i);
355     }
356
357     if (i.nInts == 1) {
358       flati = i.data()[0];
359     } else if (i.nInts == 2) {
360       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
361     } else if (i.nInts == 3) {
362       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
363     }
364 #if CMK_ERROR_CHECKING
365     else {
366       CkAbort("CkArrayIndex has more than 3 integers!");
367     }
368 #endif
369
370     if(useNodeBlkMapping){
371       if(flati < amaps[arrayHdl]->_numChares){
372         int numCharesOnNode = amaps[arrayHdl]->_nBinSizeFloor;
373         int startNodeID, offsetInNode;
374         if(flati < amaps[arrayHdl]->_nNumFirstSet){
375           numCharesOnNode++;
376           startNodeID = flati/numCharesOnNode;
377           offsetInNode = flati%numCharesOnNode;
378         }else{
379           startNodeID = amaps[arrayHdl]->_nRemChares+(flati-amaps[arrayHdl]->_nNumFirstSet)/numCharesOnNode;
380           offsetInNode = (flati-amaps[arrayHdl]->_nNumFirstSet)%numCharesOnNode;
381         }
382         int nodeSize = CkMyNodeSize(); //assuming every node has same number of PEs
383         int elemsPerPE = numCharesOnNode/nodeSize;
384         int remElems = numCharesOnNode%nodeSize;
385         int firstSetPEs = remElems*(elemsPerPE+1);
386         if(offsetInNode<firstSetPEs){
387           return CkNodeFirst(startNodeID)+offsetInNode/(elemsPerPE+1);
388         }else{
389           return CkNodeFirst(startNodeID)+remElems+(offsetInNode-firstSetPEs)/elemsPerPE;
390         }
391       } else
392           return (flati % CkNumPes());
393     }
394     //regular PE-based block mapping
395     if(flati < amaps[arrayHdl]->_numFirstSet)
396       return (flati / (amaps[arrayHdl]->_binSizeFloor + 1));
397     else if (flati < amaps[arrayHdl]->_numChares)
398       return (amaps[arrayHdl]->_remChares + (flati - amaps[arrayHdl]->_numFirstSet) / (amaps[arrayHdl]->_binSizeFloor));
399     else
400       return (flati % CkNumPes());
401   }
402
403   void pup(PUP::er& p){
404     RRMap::pup(p);
405     int npes = CkNumPes();
406     p|npes;
407     p|amaps;
408     if (p.isUnpacking() && npes != CkNumPes())  {   // binSize needs update
409       for (int i=0; i<amaps.size(); i++)
410         amaps[i]->compute_binsize();
411     }
412   }
413 };
414
415 /**
416  *  A fast map for chare arrays which do static insertions and promise NOT
417  *  to do late insertions -- ASB
418  */
419 class FastArrayMap : public DefaultArrayMap
420 {
421 public:
422   FastArrayMap(void) {
423     DEBC((AA"Creating FastArrayMap\n"AB));
424   }
425
426   FastArrayMap(CkMigrateMessage *m) : DefaultArrayMap(m){}
427
428   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
429   {
430     int idx;
431     idx = DefaultArrayMap::registerArray(numElements, aid);
432
433     return idx;
434   }
435
436   int procNum(int arrayHdl, const CkArrayIndex &i) {
437     int flati;
438     if (amaps[arrayHdl]->_nelems.nInts == 0) {
439       return RRMap::procNum(arrayHdl, i);
440     }
441
442     if (i.nInts == 1) {
443       flati = i.data()[0];
444     } else if (i.nInts == 2) {
445       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
446     } else if (i.nInts == 3) {
447       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
448     }
449 #if CMK_ERROR_CHECKING
450     else {
451       CkAbort("CkArrayIndex has more than 3 integers!");
452     }
453 #endif
454
455     /** binSize used in DefaultArrayMap is the floor of numChares/numPes
456      *  but for this FastArrayMap, we need the ceiling */
457     return (flati / amaps[arrayHdl]->_binSizeCeil);
458   }
459
460   void pup(PUP::er& p){
461     DefaultArrayMap::pup(p);
462   }
463 };
464
465
466 /**
467  * This map can be used for topology aware mapping when the mapping is provided
468  * through a file -- ASB
469  */
470 class ReadFileMap : public DefaultArrayMap
471 {
472 private:
473   CkVec<int> mapping;
474
475 public:
476   ReadFileMap(void) {
477     DEBC((AA"Creating ReadFileMap\n"AB));
478   }
479
480   ReadFileMap(CkMigrateMessage *m) : DefaultArrayMap(m){}
481
482   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
483   {
484     int idx;
485     idx = DefaultArrayMap::registerArray(numElements, aid);
486
487     if(mapping.size() == 0) {
488       int numChares;
489
490       if (amaps[idx]->_nelems.nInts == 1) {
491         numChares = amaps[idx]->_nelems.data()[0];
492       } else if (amaps[idx]->_nelems.nInts == 2) {
493         numChares = amaps[idx]->_nelems.data()[0] * amaps[idx]->_nelems.data()[1];
494       } else if (amaps[idx]->_nelems.nInts == 3) {
495         numChares = amaps[idx]->_nelems.data()[0] * amaps[idx]->_nelems.data()[1] * amaps[idx]->_nelems.data()[2];
496       } else {
497         CkAbort("CkArrayIndex has more than 3 integers!");
498       }
499
500       mapping.resize(numChares);
501       FILE *mapf = fopen("mapfile", "r");
502       TopoManager tmgr;
503       int x, y, z, t;
504
505       for(int i=0; i<numChares; i++) {
506         (void) fscanf(mapf, "%d %d %d %d", &x, &y, &z, &t);
507         mapping[i] = tmgr.coordinatesToRank(x, y, z, t);
508       }
509       fclose(mapf);
510     }
511
512     return idx;
513   }
514
515   int procNum(int arrayHdl, const CkArrayIndex &i) {
516     int flati;
517
518     if (i.nInts == 1) {
519       flati = i.data()[0];
520     } else if (i.nInts == 2) {
521       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
522     } else if (i.nInts == 3) {
523       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
524     } else {
525       CkAbort("CkArrayIndex has more than 3 integers!");
526     }
527
528     return mapping[flati];
529   }
530
531   void pup(PUP::er& p){
532     DefaultArrayMap::pup(p);
533     p|mapping;
534   }
535 };
536
537 class BlockMap : public RRMap
538 {
539 public:
540   BlockMap(void){
541         DEBC((AA"Creating BlockMap\n"AB));
542   }
543   BlockMap(CkMigrateMessage *m):RRMap(m){ }
544   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr){
545         if (numElements.nInts==0) {
546           CkFreeMsg(ctorMsg);
547           return;
548         }
549         int thisPe=CkMyPe();
550         int numPes=CkNumPes();
551         int binSize;
552         if (numElements.nInts == 1) {
553           binSize = (int)ceil((double)numElements.data()[0]/(double)numPes);
554         } else if (numElements.nInts == 2) {
555           binSize = (int)ceil((double)(numElements.data()[0]*numElements.data()[1])/(double)numPes);
556         } else if (numElements.nInts == 3) {
557           binSize = (int)ceil((double)(numElements.data()[0]*numElements.data()[1]*numElements.data()[2])/(double)numPes);
558         } else {
559           CkAbort("CkArrayIndex has more than 3 integers!");
560         }
561         CKARRAYMAP_POPULATE_INITIAL(i/binSize==thisPe);
562
563         /*
564         CkArrayIndex idx;
565         for (idx=numElements.begin(); idx<numElements; idx.getNext(numElements)) {
566           //for (int i=0;i<numElements;i++) {
567                 int binSize = (int)ceil((double)numElements.getCombinedCount()/(double)numPes);
568                 if (i/binSize==thisPe)
569                         mgr->insertInitial(idx,CkCopyMsg(&ctorMsg));
570         }*/
571         mgr->doneInserting();
572         CkFreeMsg(ctorMsg);
573   }
574 };
575
576 /**
577  * map object-- use seed load balancer.  
578  */
579 class CldMap : public CkArrayMap
580 {
581 public:
582   CldMap(void)
583   {
584           DEBC((AA"Creating CldMap\n"AB));
585   }
586   CldMap(CkMigrateMessage *m):CkArrayMap(m){}
587   int homePe(int /*arrayHdl*/, const CkArrayIndex &i)
588   {
589     if (i.nInts==1) {
590       //Map 1D integer indices in simple round-robin fashion
591       return (i.data()[0])%CkNumPes();
592     }
593     else 
594       {
595         //Map other indices based on their hash code, mod a big prime.
596         unsigned int hash=(i.hash()+739)%1280107;
597         return (hash % CkNumPes());
598       }
599   }
600   int procNum(int arrayHdl, const CkArrayIndex &i)
601   {
602      return CLD_ANYWHERE;   // -1
603   }
604   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr)  {
605         if (numElements.nInts==0) {
606           CkFreeMsg(ctorMsg);
607           return;
608         }
609         int thisPe=CkMyPe();
610         int numPes=CkNumPes();
611         //CkArrayIndex idx;
612
613         CKARRAYMAP_POPULATE_INITIAL(i%numPes==thisPe);
614         /*for (idx=numElements.begin(); idx<numElements; idx.getNext(numElements)) {
615           //for (int i=0;i<numElements;i++)
616                         if((idx.getRank(numElements))%numPes==thisPe)
617                                 mgr->insertInitial(CkArrayIndex1D(i),CkCopyMsg(&ctorMsg),0);
618         }*/
619         mgr->doneInserting();
620         CkFreeMsg(ctorMsg);
621   }
622
623 };
624
625
626 /// A class responsible for parsing the command line arguments for the PE
627 /// to extract the format string passed in with +ConfigurableRRMap
628 class ConfigurableRRMapLoader {
629 public:
630   
631   int *locations;
632   int objs_per_block;
633   int PE_per_block;
634
635   /// labels for states used when parsing the ConfigurableRRMap from ARGV
636   enum ConfigurableRRMapLoadStatus{
637     not_loaded,
638     loaded_found,
639     loaded_not_found
640   };
641   
642   enum ConfigurableRRMapLoadStatus state;
643   
644   ConfigurableRRMapLoader(){
645     state = not_loaded;
646     locations = NULL;
647     objs_per_block = 0;
648     PE_per_block = 0;
649   }
650   
651   /// load configuration if possible, and return whether a valid configuration exists
652   bool haveConfiguration() {
653     if(state == not_loaded) {
654       DEBUG(("[%d] loading ConfigurableRRMap configuration\n", CkMyPe()));
655       char **argv=CkGetArgv();
656       char *configuration = NULL;
657       bool found = CmiGetArgString(argv, "+ConfigurableRRMap", &configuration);
658       if(!found){
659         DEBUG(("Couldn't find +ConfigurableRRMap command line argument\n"));
660         state = loaded_not_found;
661         return false;
662       } else {
663
664         DEBUG(("Found +ConfigurableRRMap command line argument in %p=\"%s\"\n", configuration, configuration));
665
666         std::istringstream instream(configuration);
667         CkAssert(instream.good());
668          
669         // Example line:
670         // 10 8 0 1 2 3 4 5 6 7 7 7 7
671         // Map 10 objects to 8 PEs, with each object's index among the 8 PEs.
672         
673         // extract first integer
674         instream >> objs_per_block >> PE_per_block;
675         CkAssert(instream.good());
676         CkAssert(objs_per_block > 0);
677         CkAssert(PE_per_block > 0);
678         locations = new int[objs_per_block];
679         for(int i=0;i<objs_per_block;i++){
680           locations[i] = 0;
681           CkAssert(instream.good());
682           instream >> locations[i];
683           CkAssert(locations[i] < PE_per_block);
684         }
685         state = loaded_found;
686         return true;
687       }
688
689     } else {
690       DEBUG(("[%d] ConfigurableRRMap has already been loaded\n", CkMyPe()));
691       return state == loaded_found;
692     }      
693      
694   }
695   
696 };
697
698 CkpvDeclare(ConfigurableRRMapLoader, myConfigRRMapState);
699
700 void _initConfigurableRRMap(){
701   CkpvInitialize(ConfigurableRRMapLoader, myConfigRRMapState);
702 }
703
704
705 /// Try to load the command line arguments for ConfigurableRRMap
706 bool haveConfigurableRRMap(){
707   DEBUG(("haveConfigurableRRMap()\n"));
708   ConfigurableRRMapLoader &loader =  CkpvAccess(myConfigRRMapState);
709   return loader.haveConfiguration();
710 }
711
712 class ConfigurableRRMap : public RRMap
713 {
714 public:
715   ConfigurableRRMap(void){
716         DEBC((AA"Creating ConfigurableRRMap\n"AB));
717   }
718   ConfigurableRRMap(CkMigrateMessage *m):RRMap(m){ }
719
720
721   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr){
722     // Try to load the configuration from command line argument
723     CkAssert(haveConfigurableRRMap());
724     ConfigurableRRMapLoader &loader =  CkpvAccess(myConfigRRMapState);
725     if (numElements.nInts==0) {
726       CkFreeMsg(ctorMsg);
727       return;
728     }
729     int thisPe=CkMyPe();
730     int maxIndex = numElements.data()[0];
731     DEBUG(("[%d] ConfigurableRRMap: index=%d,%d,%d\n", CkMyPe(),(int)numElements.data()[0], (int)numElements.data()[1], (int)numElements.data()[2]));
732
733     if (numElements.nInts != 1) {
734       CkAbort("ConfigurableRRMap only supports dimension 1!");
735     }
736         
737     for (int index=0; index<maxIndex; index++) {        
738       CkArrayIndex1D idx(index);                
739       
740       int cyclic_block = index / loader.objs_per_block;
741       int cyclic_local = index % loader.objs_per_block;
742       int l = loader.locations[ cyclic_local ];
743       int PE = (cyclic_block*loader.PE_per_block + l) % CkNumPes();
744
745       DEBUG(("[%d] ConfigurableRRMap: index=%d is located on PE %d l=%d\n", CkMyPe(), (int)index, (int)PE, l));
746
747       if(PE == thisPe)
748         mgr->insertInitial(idx,CkCopyMsg(&ctorMsg));
749
750     }
751     //        CKARRAYMAP_POPULATE_INITIAL(PE == thisPe);
752         
753     mgr->doneInserting();
754     CkFreeMsg(ctorMsg);
755   }
756 };
757
758
759 CkpvStaticDeclare(double*, rem);
760
761 class arrInfo {
762  private:
763    CkArrayIndex _nelems;
764    int *_map;
765    void distrib(int *speeds);
766  public:
767    arrInfo(void):_map(NULL){}
768    arrInfo(CkArrayIndex& n, int *speeds)
769    {
770      _nelems = n;
771      _map = new int[_nelems.getCombinedCount()];
772      distrib(speeds);
773    }
774    ~arrInfo() { delete[] _map; }
775    int getMap(const CkArrayIndex &i);
776    void pup(PUP::er& p){
777      p|_nelems;
778      int totalElements = _nelems.getCombinedCount();
779      if(p.isUnpacking()){
780        _map = new int[totalElements];
781      }
782      p(_map,totalElements);
783    }
784 };
785
786 static int cmp(const void *first, const void *second)
787 {
788   int fi = *((const int *)first);
789   int si = *((const int *)second);
790   return ((CkpvAccess(rem)[fi]==CkpvAccess(rem)[si]) ?
791           0 :
792           ((CkpvAccess(rem)[fi]<CkpvAccess(rem)[si]) ?
793           1 : (-1)));
794 }
795
796 void
797 arrInfo::distrib(int *speeds)
798 {
799   int _nelemsCount = _nelems.getCombinedCount();
800   double total = 0.0;
801   int npes = CkNumPes();
802   int i,j,k;
803   for(i=0;i<npes;i++)
804     total += (double) speeds[i];
805   double *nspeeds = new double[npes];
806   for(i=0;i<npes;i++)
807     nspeeds[i] = (double) speeds[i] / total;
808   int *cp = new int[npes];
809   for(i=0;i<npes;i++)
810     cp[i] = (int) (nspeeds[i]*_nelemsCount);
811   int nr = 0;
812   for(i=0;i<npes;i++)
813     nr += cp[i];
814   nr = _nelemsCount - nr;
815   if(nr != 0)
816   {
817     CkpvAccess(rem) = new double[npes];
818     for(i=0;i<npes;i++)
819       CkpvAccess(rem)[i] = (double)_nelemsCount*nspeeds[i] - cp[i];
820     int *pes = new int[npes];
821     for(i=0;i<npes;i++)
822       pes[i] = i;
823     qsort(pes, npes, sizeof(int), cmp);
824     for(i=0;i<nr;i++)
825       cp[pes[i]]++;
826     delete[] pes;
827     delete[] CkpvAccess(rem);
828   }
829   k = 0;
830   for(i=0;i<npes;i++)
831   {
832     for(j=0;j<cp[i];j++)
833       _map[k++] = i;
834   }
835   delete[] cp;
836   delete[] nspeeds;
837 }
838
839 int
840 arrInfo::getMap(const CkArrayIndex &i)
841 {
842   if(i.nInts==1)
843     return _map[i.data()[0]];
844   else
845     return _map[((i.hash()+739)%1280107)%_nelems.getCombinedCount()];
846 }
847
848 //Speeds maps processor number to "speed" (some sort of iterations per second counter)
849 // It is initialized by processor 0.
850 static int* speeds;
851
852 #if CMK_USE_PROP_MAP
853 typedef struct _speedmsg
854 {
855   char hdr[CmiMsgHeaderSizeBytes];
856   int node;
857   int speed;
858 } speedMsg;
859
860 static void _speedHdlr(void *m)
861 {
862   speedMsg *msg=(speedMsg *)m;
863   if (CmiMyRank()==0)
864     for (int pe=0;pe<CmiNodeSize(msg->node);pe++)
865       speeds[CmiNodeFirst(msg->node)+pe] = msg->speed;  
866   CmiFree(m);
867 }
868
869 // initnode call
870 void _propMapInit(void)
871 {
872   speeds = new int[CkNumPes()];
873   int hdlr = CkRegisterHandler((CmiHandler)_speedHdlr);
874   CmiPrintf("[%d]Measuring processor speed for prop. mapping...\n", CkMyPe());
875   int s = LDProcessorSpeed();
876   speedMsg msg;
877   CmiSetHandler(&msg, hdlr);
878   msg.node = CkMyNode();
879   msg.speed = s;
880   CmiSyncBroadcastAllAndFree(sizeof(msg), &msg);
881   for(int i=0;i<CkNumNodes();i++)
882     CmiDeliverSpecificMsg(hdlr);
883 }
884 #else
885 void _propMapInit(void)
886 {
887   speeds = new int[CkNumPes()];
888   int i;
889   for(i=0;i<CkNumPes();i++)
890     speeds[i] = 1;
891 }
892 #endif
893 /**
894  * A proportional map object-- tries to map more objects to
895  * faster processors and fewer to slower processors.  Also
896  * attempts to ensure good locality by mapping nearby elements
897  * together.
898  */
899 class PropMap : public CkArrayMap
900 {
901 private:
902   CkPupPtrVec<arrInfo> arrs;
903 public:
904   PropMap(void)
905   {
906     CkpvInitialize(double*, rem);
907     DEBC((AA"Creating PropMap\n"AB));
908   }
909   PropMap(CkMigrateMessage *m) {}
910   int registerArray(CkArrayIndex& numElements,CkArrayID aid)
911   {
912     int idx = arrs.size();
913     arrs.resize(idx+1);
914     arrs[idx] = new arrInfo(numElements, speeds);
915     return idx;
916   }
917   int procNum(int arrayHdl, const CkArrayIndex &i)
918   {
919     return arrs[arrayHdl]->getMap(i);
920   }
921   void pup(PUP::er& p){
922     p|arrs;
923   }
924 };
925
926 class CkMapsInit : public Chare
927 {
928 public:
929         CkMapsInit(CkArgMsg *msg) {
930                 _defaultArrayMapID = CProxy_DefaultArrayMap::ckNew();
931                 _fastArrayMapID = CProxy_FastArrayMap::ckNew();
932                 delete msg;
933         }
934
935         CkMapsInit(CkMigrateMessage *m) {}
936 };
937
938 // given an envelope of a Charm msg, find the recipient object pointer
939 CkMigratable * CkArrayMessageObjectPtr(envelope *env) {
940   if (env->getMsgtype()!=ForArrayEltMsg) return NULL;   // not an array msg
941
942   CkArrayID aid = env->getsetArrayMgr();
943   CkArray *mgr=(CkArray *)_localBranch(aid);
944   if (mgr) {
945     CkLocMgr *locMgr = mgr->getLocMgr();
946     if (locMgr) {
947       return locMgr->lookup(env->getsetArrayIndex(),aid);
948     }
949   }
950   return NULL;
951 }
952
953 /****************************** Out-of-Core support ********************/
954
955 #if CMK_OUT_OF_CORE
956 CooPrefetchManager CkArrayElementPrefetcher;
957 CkpvDeclare(int,CkSaveRestorePrefetch);
958
959 /**
960  * Return the out-of-core objid (from CooRegisterObject)
961  * that this Converse message will access.  If the message
962  * will not access an object, return -1.
963  */
964 int CkArrayPrefetch_msg2ObjId(void *msg) {
965   envelope *env=(envelope *)msg;
966   CkMigratable *elt = CkArrayMessageObjectPtr(env);
967   return elt?elt->prefetchObjID:-1;
968 }
969
970 /**
971  * Write this object (registered with RegisterObject)
972  * to this writable file.
973  */
974 void CkArrayPrefetch_writeToSwap(FILE *swapfile,void *objptr) {
975   CkMigratable *elt=(CkMigratable *)objptr;
976
977   //Save the element's data to disk:
978   PUP::toDisk p(swapfile);
979   elt->pup(p);
980
981   //Call the element's destructor in-place (so pointer doesn't change)
982   CkpvAccess(CkSaveRestorePrefetch)=1;
983   elt->~CkMigratable(); //< because destuctor is virtual, destroys user class too.
984   CkpvAccess(CkSaveRestorePrefetch)=0;
985 }
986         
987 /**
988  * Read this object (registered with RegisterObject)
989  * from this readable file.
990  */
991 void CkArrayPrefetch_readFromSwap(FILE *swapfile,void *objptr) {
992   CkMigratable *elt=(CkMigratable *)objptr;
993   //Call the element's migration constructor in-place
994   CkpvAccess(CkSaveRestorePrefetch)=1;
995   int ctorIdx=_chareTable[elt->thisChareType]->migCtor;
996   elt->myRec->invokeEntry(elt,(CkMigrateMessage *)0,ctorIdx,CmiTrue);
997   CkpvAccess(CkSaveRestorePrefetch)=0;
998   
999   //Restore the element's data from disk:
1000   PUP::fromDisk p(swapfile);
1001   elt->pup(p);
1002 }
1003
1004 static void _CkMigratable_prefetchInit(void) 
1005 {
1006   CkpvExtern(int,CkSaveRestorePrefetch);
1007   CkpvAccess(CkSaveRestorePrefetch)=0;
1008   CkArrayElementPrefetcher.msg2ObjId=CkArrayPrefetch_msg2ObjId;
1009   CkArrayElementPrefetcher.writeToSwap=CkArrayPrefetch_writeToSwap;
1010   CkArrayElementPrefetcher.readFromSwap=CkArrayPrefetch_readFromSwap;
1011   CooRegisterManager(&CkArrayElementPrefetcher, _charmHandlerIdx);
1012 }
1013 #endif
1014
1015 /****************************** CkMigratable ***************************/
1016 /**
1017  * This tiny class is used to convey information to the 
1018  * newly created CkMigratable object when its constructor is called.
1019  */
1020 class CkMigratable_initInfo {
1021 public:
1022         CkLocRec_local *locRec;
1023         int chareType;
1024         CmiBool forPrefetch; /* If true, this creation is only a prefetch restore-from-disk.*/
1025 };
1026
1027 CkpvStaticDeclare(CkMigratable_initInfo,mig_initInfo);
1028
1029
1030 void _CkMigratable_initInfoInit(void) {
1031   CkpvInitialize(CkMigratable_initInfo,mig_initInfo);
1032 #if CMK_OUT_OF_CORE
1033   _CkMigratable_prefetchInit();
1034 #endif
1035 }
1036
1037 void CkMigratable::commonInit(void) {
1038         CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
1039 #if CMK_OUT_OF_CORE
1040         isInCore=CmiTrue;
1041         if (CkpvAccess(CkSaveRestorePrefetch))
1042                 return; /* Just restoring from disk--don't touch object */
1043         prefetchObjID=-1; //Unregistered
1044 #endif
1045         myRec=i.locRec;
1046         thisIndexMax=myRec->getIndex();
1047         thisChareType=i.chareType;
1048         usesAtSync=CmiFalse;
1049         usesAutoMeasure=CmiTrue;
1050         barrierRegistered=CmiFalse;
1051         /*
1052         FAULT_EVAC
1053         */
1054         AsyncEvacuate(CmiTrue);
1055 }
1056
1057 CkMigratable::CkMigratable(void) {
1058         DEBC((AA"In CkMigratable constructor\n"AB));
1059         commonInit();
1060 }
1061 CkMigratable::CkMigratable(CkMigrateMessage *m): Chare(m) {
1062         commonInit();
1063 }
1064
1065 int CkMigratable::ckGetChareType(void) const {return thisChareType;}
1066
1067 void CkMigratable::pup(PUP::er &p) {
1068         DEBM((AA"In CkMigratable::pup %s\n"AB,idx2str(thisIndexMax)));
1069         Chare::pup(p);
1070         p|thisIndexMax;
1071         p(usesAtSync);
1072         p(usesAutoMeasure);
1073 #if CMK_LBDB_ON 
1074         int readyMigrate;
1075         if (p.isPacking()) readyMigrate = myRec->isReadyMigrate();
1076         p|readyMigrate;
1077         if (p.isUnpacking()) myRec->ReadyMigrate(readyMigrate);
1078 #endif
1079         if(p.isUnpacking()) barrierRegistered=CmiFalse;
1080         /*
1081                 FAULT_EVAC
1082         */
1083         p | asyncEvacuate;
1084         if(p.isUnpacking()){myRec->AsyncEvacuate(asyncEvacuate);}
1085         
1086         ckFinishConstruction();
1087 }
1088
1089 void CkMigratable::ckDestroy(void) {
1090         DEBC((AA"In CkMigratable::ckDestroy %s\n"AB,idx2str(thisIndexMax)));
1091         myRec->destroy();
1092 }
1093
1094 void CkMigratable::ckAboutToMigrate(void) { }
1095 void CkMigratable::ckJustMigrated(void) { }
1096 void CkMigratable::ckJustRestored(void) { }
1097
1098 CkMigratable::~CkMigratable() {
1099         DEBC((AA"In CkMigratable::~CkMigratable %s\n"AB,idx2str(thisIndexMax)));
1100 #if CMK_OUT_OF_CORE
1101         isInCore=CmiFalse;
1102         if (CkpvAccess(CkSaveRestorePrefetch)) 
1103                 return; /* Just saving to disk--don't deregister anything. */
1104         /* We're really leaving or dying-- unregister from the ooc system*/
1105         if (prefetchObjID!=-1) {
1106                 CooDeregisterObject(prefetchObjID);
1107                 prefetchObjID=-1;
1108         }
1109 #endif
1110         /*Might want to tell myRec about our doom here--
1111         it's difficult to avoid some kind of circular-delete, though.
1112         */
1113 #if CMK_LBDB_ON 
1114         if (barrierRegistered) {
1115           DEBL((AA"Removing barrier for element %s\n"AB,idx2str(thisIndexMax)));
1116           if (usesAtSync)
1117                 myRec->getLBDB()->RemoveLocalBarrierClient(ldBarrierHandle);
1118           else
1119                 myRec->getLBDB()->RemoveLocalBarrierReceiver(ldBarrierRecvHandle);
1120         }
1121 #endif
1122         //To detect use-after-delete
1123         thisIndexMax.nInts=-12345;
1124         thisIndexMax.dimension=-12345;
1125 }
1126
1127 void CkMigratable::CkAbort(const char *why) const {
1128         CkError("CkMigratable '%s' aborting:\n",_chareTable[thisChareType]->name);
1129         ::CkAbort(why);
1130 }
1131
1132 void CkMigratable::ResumeFromSync(void)
1133 {
1134 //      CkAbort("::ResumeFromSync() not defined for this array element!\n");
1135 }
1136
1137 void CkMigratable::UserSetLBLoad() {
1138         CkAbort("::UserSetLBLoad() not defined for this array element!\n");
1139 }
1140
1141 #if CMK_LBDB_ON  //For load balancing:
1142 // user can call this helper function to set obj load (for model-based lb)
1143 void CkMigratable::setObjTime(double cputime) {
1144         myRec->setObjTime(cputime);
1145 }
1146 double CkMigratable::getObjTime() {
1147         return myRec->getObjTime();
1148 }
1149
1150 void CkMigratable::ckFinishConstruction(void)
1151 {
1152 //      if ((!usesAtSync) || barrierRegistered) return;
1153         myRec->setMeasure(usesAutoMeasure);
1154         if (barrierRegistered) return;
1155         DEBL((AA"Registering barrier client for %s\n"AB,idx2str(thisIndexMax)));
1156         if (usesAtSync)
1157           ldBarrierHandle = myRec->getLBDB()->AddLocalBarrierClient(
1158                 (LDBarrierFn)staticResumeFromSync,(void*)(this));
1159         else
1160           ldBarrierRecvHandle = myRec->getLBDB()->AddLocalBarrierReceiver(
1161                 (LDBarrierFn)staticResumeFromSync,(void*)(this));
1162         barrierRegistered=CmiTrue;
1163 }
1164 void CkMigratable::AtSync(int waitForMigration)
1165 {
1166         if (!usesAtSync)
1167                 CkAbort("You must set usesAtSync=CmiTrue in your array element constructor to use AtSync!\n");
1168 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1169         mlogData->toResumeOrNot=1;
1170 #endif
1171         myRec->AsyncMigrate(!waitForMigration);
1172         if (waitForMigration) ReadyMigrate(CmiTrue);
1173         ckFinishConstruction();
1174         DEBL((AA"Element %s going to sync\n"AB,idx2str(thisIndexMax)));
1175           // model-based load balancing, ask user to provide cpu load
1176         if (usesAutoMeasure == CmiFalse) UserSetLBLoad();
1177         myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
1178 }
1179 void CkMigratable::ReadyMigrate(CmiBool ready)
1180 {
1181         myRec->ReadyMigrate(ready);
1182 }
1183
1184 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1185     extern int globalResumeCount;
1186 #endif
1187
1188 void CkMigratable::staticResumeFromSync(void* data)
1189 {
1190         CkMigratable *el=(CkMigratable *)data;
1191 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1192     if(el->mlogData->toResumeOrNot ==0 || el->mlogData->resumeCount >= globalResumeCount){
1193         return;
1194     }
1195 #endif
1196         DEBL((AA"Element %s resuming from sync\n"AB,idx2str(el->thisIndexMax)));
1197 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1198     CpvAccess(_currentObj) = el;
1199 #endif
1200         el->ResumeFromSync();
1201 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1202     el->mlogData->resumeCount++;
1203 #endif
1204 }
1205 void CkMigratable::setMigratable(int migratable) 
1206 {
1207         myRec->setMigratable(migratable);
1208 }
1209
1210 struct CkArrayThreadListener {
1211         struct CthThreadListener base;
1212         CkMigratable *mig;
1213 };
1214
1215 extern "C"
1216 void CkArrayThreadListener_suspend(struct CthThreadListener *l)
1217 {
1218         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1219         a->mig->ckStopTiming();
1220 }
1221
1222 extern "C"
1223 void CkArrayThreadListener_resume(struct CthThreadListener *l)
1224 {
1225         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1226         a->mig->ckStartTiming();
1227 }
1228
1229 extern "C"
1230 void CkArrayThreadListener_free(struct CthThreadListener *l)
1231 {
1232         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1233         delete a;
1234 }
1235
1236 void CkMigratable::CkAddThreadListeners(CthThread tid, void *msg)
1237 {
1238         Chare::CkAddThreadListeners(tid, msg);   // for trace
1239         CthSetThreadID(tid, thisIndexMax.data()[0], thisIndexMax.data()[1], 
1240                        thisIndexMax.data()[2]);
1241         CkArrayThreadListener *a=new CkArrayThreadListener;
1242         a->base.suspend=CkArrayThreadListener_suspend;
1243         a->base.resume=CkArrayThreadListener_resume;
1244         a->base.free=CkArrayThreadListener_free;
1245         a->mig=this;
1246         CthAddListener(tid,(struct CthThreadListener *)a);
1247 }
1248 #else
1249 void CkMigratable::setObjTime(double cputime) {}
1250 double CkMigratable::getObjTime() {return 0.0;}
1251
1252 /* no load balancer: need dummy implementations to prevent link error */
1253 void CkMigratable::CkAddThreadListeners(CthThread tid, void *msg)
1254 {
1255 }
1256 #endif
1257
1258
1259 /*CkMigratableList*/
1260 CkMigratableList::CkMigratableList() {}
1261 CkMigratableList::~CkMigratableList() {}
1262
1263 void CkMigratableList::setSize(int s) {
1264         el.resize(s);
1265 }
1266
1267 void CkMigratableList::put(CkMigratable *v,int atIdx) {
1268 #if CMK_ERROR_CHECKING
1269         if (atIdx>=length())
1270                 CkAbort("Internal array manager error (CkMigrableList::put index out of bounds)");
1271 #endif
1272         el[atIdx]=v;
1273 }
1274
1275
1276 /************************** Location Records: *********************************/
1277
1278 //---------------- Base type:
1279 void CkLocRec::weAreObsolete(const CkArrayIndex &idx) {}
1280 CkLocRec::~CkLocRec() { }
1281 void CkLocRec::beenReplaced(void)
1282     {/*Default: ignore replacement*/}
1283
1284 //Return the represented array element; or NULL if there is none
1285 CkMigratable *CkLocRec::lookupElement(CkArrayID aid) {return NULL;}
1286
1287 //Return the last known processor; or -1 if none
1288 int CkLocRec::lookupProcessor(void) {return -1;}
1289
1290
1291 /*----------------- Local:
1292 Matches up the array index with the local index, an
1293 interfaces with the load balancer on behalf of the
1294 represented array elements.
1295 */
1296 CkLocRec_local::CkLocRec_local(CkLocMgr *mgr,CmiBool fromMigration,
1297   CmiBool ignoreArrival, const CkArrayIndex &idx_,int localIdx_)
1298         :CkLocRec(mgr),idx(idx_),localIdx(localIdx_),
1299          running(CmiFalse),deletedMarker(NULL)
1300 {
1301 #if CMK_LBDB_ON
1302         DEBL((AA"Registering element %s with load balancer\n"AB,idx2str(idx)));
1303         //BIGSIM_OOC DEBUGGING
1304         //CkPrintf("LocMgr on %d: Registering element %s with load balancer\n", CkMyPe(), idx2str(idx));
1305         nextPe = -1;
1306         asyncMigrate = CmiFalse;
1307         readyMigrate = CmiTrue;
1308         enable_measure = CmiTrue;
1309         bounced  = CmiFalse;
1310         the_lbdb=mgr->getLBDB();
1311         LDObjid ldid = idx2LDObjid(idx);
1312 #if CMK_GLOBAL_LOCATION_UPDATE
1313         ldid.locMgrGid = mgr->getGroupID().idx;
1314 #endif        
1315         ldHandle=the_lbdb->RegisterObj(mgr->getOMHandle(),
1316                 ldid,(void *)this,1);
1317         if (fromMigration) {
1318                 DEBL((AA"Element %s migrated in\n"AB,idx2str(idx)));
1319                 if (!ignoreArrival)  {
1320                         the_lbdb->Migrated(ldHandle, CmiTrue);
1321                   // load balancer should ignore this objects movement
1322                 //  AsyncMigrate(CmiTrue);
1323                 }
1324         }
1325 #endif
1326         /*
1327                 FAULT_EVAC
1328         */
1329         asyncEvacuate = CmiTrue;
1330 }
1331 CkLocRec_local::~CkLocRec_local()
1332 {
1333         if (deletedMarker!=NULL) *deletedMarker=CmiTrue;
1334         myLocMgr->reclaim(idx,localIdx);
1335 #if CMK_LBDB_ON
1336         stopTiming();
1337         DEBL((AA"Unregistering element %s from load balancer\n"AB,idx2str(idx)));
1338         the_lbdb->UnregisterObj(ldHandle);
1339 #endif
1340 }
1341 void CkLocRec_local::migrateMe(int toPe) //Leaving this processor
1342 {
1343         //This will pack us up, send us off, and delete us
1344 //      printf("[%d] migrating migrateMe to %d \n",CkMyPe(),toPe);
1345         myLocMgr->emigrate(this,toPe);
1346 }
1347
1348 #if CMK_LBDB_ON
1349 void CkLocRec_local::startTiming(int ignore_running) {
1350         if (!ignore_running) running=CmiTrue;
1351         DEBL((AA"Start timing for %s at %.3fs {\n"AB,idx2str(idx),CkWallTimer()));
1352         if (enable_measure) the_lbdb->ObjectStart(ldHandle);
1353 }
1354 void CkLocRec_local::stopTiming(int ignore_running) {
1355         DEBL((AA"} Stop timing for %s at %.3fs\n"AB,idx2str(idx),CkWallTimer()));
1356         if ((ignore_running || running) && enable_measure) the_lbdb->ObjectStop(ldHandle);
1357         if (!ignore_running) running=CmiFalse;
1358 }
1359 void CkLocRec_local::setObjTime(double cputime) {
1360         the_lbdb->EstObjLoad(ldHandle, cputime);
1361 }
1362 double CkLocRec_local::getObjTime() {
1363         LBRealType walltime, cputime;
1364         the_lbdb->GetObjLoad(ldHandle, walltime, cputime);
1365         return walltime;
1366 }
1367 #endif
1368
1369 void CkLocRec_local::destroy(void) //User called destructor
1370 {
1371         //Our destructor does all the needed work
1372         delete this;
1373 }
1374 //Return the represented array element; or NULL if there is none
1375 CkMigratable *CkLocRec_local::lookupElement(CkArrayID aid) {
1376         return myLocMgr->lookupLocal(localIdx,aid);
1377 }
1378
1379 //Return the last known processor; or -1 if none
1380 int CkLocRec_local::lookupProcessor(void) {
1381         return CkMyPe();
1382 }
1383
1384 CkLocRec::RecType CkLocRec_local::type(void)
1385 {
1386         return local;
1387 }
1388
1389 void CkLocRec_local::addedElement(void) 
1390 {
1391         //Push everything in the half-created queue into the system--
1392         // anything not ready yet will be put back in.
1393         while (!halfCreated.isEmpty()) 
1394                 CkArrayManagerDeliver(CkMyPe(),halfCreated.deq());
1395 }
1396
1397 CmiBool CkLocRec_local::isObsolete(int nSprings,const CkArrayIndex &idx_)
1398
1399         int len=halfCreated.length();
1400         if (len!=0) {
1401                 /* This is suspicious-- the halfCreated queue should be extremely
1402                  transient.  It's possible we just looked at the wrong time, though;
1403                  so this is only a warning. 
1404                 */
1405                 CkPrintf("CkLoc WARNING> %d messages still around for uncreated element %s!\n",
1406                          len,idx2str(idx));
1407         }
1408         //A local element never expires
1409         return CmiFalse;
1410 }
1411
1412 /**********Added for cosmology (inline function handling without parameter marshalling)***********/
1413
1414 LDObjHandle CkMigratable::timingBeforeCall(int* objstopped){
1415
1416         LDObjHandle objHandle;
1417 #if CMK_LBDB_ON
1418         if (getLBDB()->RunningObject(&objHandle)) {
1419                 *objstopped = 1;
1420                 getLBDB()->ObjectStop(objHandle);
1421         }
1422         myRec->startTiming(1);
1423 #endif
1424
1425   //DEBS((AA"   Invoking entry %d on element %s\n"AB,epIdx,idx2str(idx)));
1426         //CmiBool isDeleted=CmiFalse; //Enables us to detect deletion during processing
1427         //deletedMarker=&isDeleted;
1428 /*#ifndef CMK_OPTIMIZE
1429         if (msg) {  Tracing: 
1430                 envelope *env=UsrToEnv(msg);
1431         //      CkPrintf("ckLocation.C beginExecuteDetailed %d %d \n",env->getEvent(),env->getsetArraySrcPe());
1432                 if (_entryTable[epIdx]->traceEnabled)
1433                         _TRACE_BEGIN_EXECUTE_DETAILED(env->getEvent(),
1434                                  ForChareMsg,epIdx,env->getsetArraySrcPe(), env->getTotalsize(), idx.getProjectionID(((CkGroupID)env->getsetArrayMgr())).idx);
1435         }
1436 #endif*/
1437
1438   return objHandle;
1439 }
1440
1441 void CkMigratable::timingAfterCall(LDObjHandle objHandle,int *objstopped){
1442   
1443 /*#ifndef CMK_OPTIMIZE
1444         if (msg) {  Tracing: 
1445                 if (_entryTable[epIdx]->traceEnabled)
1446                         _TRACE_END_EXECUTE();
1447         }
1448 #endif*/
1449 //#if CMK_LBDB_ON
1450 //        if (!isDeleted) checkBufferedMigration();   // check if should migrate
1451 //#endif
1452 //      if (isDeleted) return CmiFalse;//We were deleted
1453 //      deletedMarker=NULL;
1454 //      return CmiTrue;
1455         myRec->stopTiming(1);
1456 #if CMK_LBDB_ON
1457         if (*objstopped) {
1458                  getLBDB()->ObjectStart(objHandle);
1459         }
1460 #endif
1461
1462  return;
1463 }
1464 /****************************************************************************/
1465
1466
1467 CmiBool CkLocRec_local::invokeEntry(CkMigratable *obj,void *msg,
1468         int epIdx,CmiBool doFree) 
1469 {
1470
1471         DEBS((AA"   Invoking entry %d on element %s\n"AB,epIdx,idx2str(idx)));
1472         CmiBool isDeleted=CmiFalse; //Enables us to detect deletion during processing
1473         deletedMarker=&isDeleted;
1474         startTiming();
1475
1476
1477 #if CMK_TRACE_ENABLED
1478         if (msg) { /* Tracing: */
1479                 envelope *env=UsrToEnv(msg);
1480         //      CkPrintf("ckLocation.C beginExecuteDetailed %d %d \n",env->getEvent(),env->getsetArraySrcPe());
1481                 if (_entryTable[epIdx]->traceEnabled)
1482                         _TRACE_BEGIN_EXECUTE_DETAILED(env->getEvent(),
1483                                  ForChareMsg,epIdx,env->getsetArraySrcPe(), env->getTotalsize(), idx.getProjectionID((((CkGroupID)env->getsetArrayMgr())).idx));
1484         }
1485 #endif
1486
1487         if (doFree) 
1488            CkDeliverMessageFree(epIdx,msg,obj);
1489         else /* !doFree */
1490            CkDeliverMessageReadonly(epIdx,msg,obj);
1491
1492
1493 #if CMK_TRACE_ENABLED
1494         if (msg) { /* Tracing: */
1495                 if (_entryTable[epIdx]->traceEnabled)
1496                         _TRACE_END_EXECUTE();
1497         }
1498 #endif
1499 #if CMK_LBDB_ON
1500         if (!isDeleted) checkBufferedMigration();   // check if should migrate
1501 #endif
1502         if (isDeleted) return CmiFalse;//We were deleted
1503         deletedMarker=NULL;
1504         stopTiming();
1505         return CmiTrue;
1506 }
1507
1508 CmiBool CkLocRec_local::deliver(CkArrayMessage *msg,CkDeliver_t type,int opts)
1509 {
1510
1511         if (type==CkDeliver_queue) { /*Send via the message queue */
1512                 if (opts & CK_MSG_KEEP)
1513                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1514                 CkArrayManagerDeliver(CkMyPe(),msg,opts);
1515                 return CmiTrue;
1516         }
1517         else
1518         {
1519                 CkMigratable *obj=myLocMgr->lookupLocal(localIdx,
1520                         UsrToEnv(msg)->getsetArrayMgr());
1521                 if (obj==NULL) {//That sibling of this object isn't created yet!
1522                         if (opts & CK_MSG_KEEP)
1523                                 msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1524                         if (msg->array_ifNotThere()!=CkArray_IfNotThere_buffer) {
1525                                 return myLocMgr->demandCreateElement(msg,CkMyPe(),type);
1526                         }
1527                         else {
1528                                 DEBS((AA"   BUFFERING message for nonexistent element %s!\n"AB,idx2str(this->idx)));
1529                                 halfCreated.enq(msg);
1530                                 return CmiTrue;
1531                         }
1532                 }
1533                         
1534                 if (msg->array_hops()>1)
1535                         myLocMgr->multiHop(msg);
1536                 CmiBool doFree = (CmiBool)!(opts & CK_MSG_KEEP);
1537 #if CMK_LBDB_ON
1538                 // if there is a running obj being measured, stop it temporarily
1539                 LDObjHandle objHandle;
1540                 int objstopped = 0;
1541                 if (the_lbdb->RunningObject(&objHandle)) {
1542                         objstopped = 1;
1543                         the_lbdb->ObjectStop(objHandle);
1544                 }
1545 #endif
1546 #if CMK_GRID_QUEUE_AVAILABLE
1547                 // retain a pointer to the sending object (needed later)
1548                 CpvAccess(CkGridObject) = obj;
1549 #endif
1550
1551         CmiBool status = invokeEntry(obj,(void *)msg,msg->array_ep(),doFree);
1552         
1553 #if CMK_GRID_QUEUE_AVAILABLE
1554                 CpvAccess(CkGridObject) = NULL;
1555 #endif
1556 #if CMK_LBDB_ON
1557                 if (objstopped) the_lbdb->ObjectStart(objHandle);
1558 #endif
1559                 return status;
1560         }
1561
1562
1563 }
1564
1565 #if CMK_LBDB_ON
1566 void CkLocRec_local::staticMigrate(LDObjHandle h, int dest)
1567 {
1568         CkLocRec_local *el=(CkLocRec_local *)LDObjUserData(h);
1569         DEBL((AA"Load balancer wants to migrate %s to %d\n"AB,idx2str(el->idx),dest));
1570         el->recvMigrate(dest);
1571 }
1572
1573 void CkLocRec_local::recvMigrate(int toPe)
1574 {
1575         // we are in the mode of delaying actual migration
1576         // till readyMigrate()
1577         if (readyMigrate) { migrateMe(toPe); }
1578         else nextPe = toPe;
1579 }
1580
1581 void CkLocRec_local::AsyncMigrate(CmiBool use)  
1582 {
1583         asyncMigrate = use; 
1584         the_lbdb->UseAsyncMigrate(ldHandle, use);
1585 }
1586
1587 CmiBool CkLocRec_local::checkBufferedMigration()
1588 {
1589         // we don't migrate in user's code when calling ReadyMigrate(true)
1590         // we postphone the action to here until we exit from the user code.
1591         if (readyMigrate && nextPe != -1) {
1592             int toPe = nextPe;
1593             nextPe = -1;
1594             // don't migrate inside the object call
1595             migrateMe(toPe);
1596             // don't do anything
1597             return CmiTrue;
1598         }
1599         return CmiFalse;
1600 }
1601
1602 int CkLocRec_local::MigrateToPe()
1603 {
1604         int pe = nextPe;
1605         nextPe = -1;
1606         return pe;
1607 }
1608
1609 void CkLocRec_local::setMigratable(int migratable)
1610 {
1611         if (migratable)
1612           the_lbdb->Migratable(ldHandle);
1613         else
1614           the_lbdb->NonMigratable(ldHandle);
1615 }
1616 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1617 void CkLocRec_local::Migrated(){
1618     the_lbdb->Migrated(ldHandle, CmiTrue);
1619 }
1620 #endif
1621 #endif
1622
1623 /**
1624  * Represents a deleted array element (and prevents re-use).
1625  * These are a debugging aid, usable only by uncommenting a line in
1626  * the element destruction code.
1627  */
1628 class CkLocRec_dead:public CkLocRec {
1629 public:
1630         CkLocRec_dead(CkLocMgr *Narr):CkLocRec(Narr) {}
1631   
1632         virtual RecType type(void) {return dead;}
1633   
1634         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1635                 CkPrintf("Dead array element is %s.\n",idx2str(msg->array_index()));
1636                 CkAbort("Send to dead array element!\n");
1637                 return CmiFalse;
1638         }
1639         virtual void beenReplaced(void) 
1640                 {CkAbort("Can't re-use dead array element!\n");}
1641   
1642         //Return if this element is now obsolete (it isn't)
1643         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {return CmiFalse;}     
1644 };
1645
1646 /**
1647  * This is the abstract superclass of arrayRecs that keep track of their age,
1648  * and eventually expire. Its kids are remote and buffering.
1649  */
1650 class CkLocRec_aging:public CkLocRec {
1651 private:
1652         int lastAccess;//Age when last accessed
1653 protected:
1654         //Update our access time
1655         inline void access(void) {
1656                 lastAccess=myLocMgr->getSpringCount();
1657         }
1658         //Return if we are "stale"-- we were last accessed a while ago
1659         CmiBool isStale(void) {
1660                 if (myLocMgr->getSpringCount()-lastAccess>3) return CmiTrue;
1661                 else return CmiFalse;
1662         }
1663 public:
1664         CkLocRec_aging(CkLocMgr *Narr):CkLocRec(Narr) {
1665                 lastAccess=myLocMgr->getSpringCount();
1666         }
1667         //Return if this element is now obsolete
1668         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx)=0;
1669         //virtual void pup(PUP::er &p) { CkLocRec::pup(p); p(lastAccess); }
1670 };
1671
1672
1673 /**
1674  * Represents a remote array element.  This is just a PE number.
1675  */
1676 class CkLocRec_remote:public CkLocRec_aging {
1677 private:
1678         int onPe;//The last known Pe for this element
1679 public:
1680         CkLocRec_remote(CkLocMgr *Narr,int NonPe)
1681                 :CkLocRec_aging(Narr)
1682                 {
1683                         onPe=NonPe;
1684 #if CMK_ERROR_CHECKING
1685                         if (onPe==CkMyPe())
1686                                 CkAbort("ERROR!  'remote' array element on this Pe!\n");
1687 #endif
1688                 }
1689         //Return the last known processor for this element
1690         int lookupProcessor(void) {
1691                 return onPe;
1692         }  
1693         virtual RecType type(void) {return remote;}
1694   
1695         //Send a message for this element.
1696         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1697                 /*FAULT_EVAC*/
1698                 int destPE = onPe;
1699                 if((!CmiNodeAlive(onPe) && onPe != allowMessagesOnly)){
1700 //                      printf("Delivery failed because process %d is invalid\n",onPe);
1701                         /*
1702                                 Send it to its home processor instead
1703                         */
1704                         const CkArrayIndex &idx=msg->array_index();
1705                         destPE = getNextPE(idx);
1706                 }
1707                 access();//Update our modification date
1708                 msg->array_hops()++;
1709                 DEBS((AA"   Forwarding message for element %s to %d (REMOTE)\n"AB,
1710                       idx2str(msg->array_index()),destPE));
1711                 if (opts & CK_MSG_KEEP)
1712                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1713                 CkArrayManagerDeliver(destPE,msg,opts);
1714                 return CmiTrue;
1715         }
1716         //Return if this element is now obsolete
1717         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {
1718                 if (myLocMgr->isHome(idx)) 
1719                         //Home elements never become obsolete
1720                         // if they did, we couldn't deliver messages to that element.
1721                         return CmiFalse;
1722                 else if (isStale())
1723                         return CmiTrue;//We haven't been used in a long time
1724                 else
1725                         return CmiFalse;//We're fairly recent
1726         }
1727         //virtual void pup(PUP::er &p) { CkLocRec_aging::pup(p); p(onPe); }
1728 };
1729
1730
1731 /**
1732  * Buffers messages until record is replaced in the hash table, 
1733  * then delivers all messages to the replacing record.  This is 
1734  * used when a message arrives for a local element that has not
1735  * yet been created, buffering messages until the new element finally
1736  * checks in.
1737  *
1738  * It's silly to send a message to an element you won't ever create,
1739  * so this kind of record causes an abort "Stale array manager message!"
1740  * if it's left undelivered too long.
1741  */
1742 class CkLocRec_buffering:public CkLocRec_aging {
1743 private:
1744         CkQ<CkArrayMessage *> buffer;//Buffered messages.
1745 public:
1746         CkLocRec_buffering(CkLocMgr *Narr):CkLocRec_aging(Narr) {}
1747         virtual ~CkLocRec_buffering() {
1748                 if (0!=buffer.length()) {
1749                         CkPrintf("[%d] Warning: Messages abandoned in array manager buffer!\n", CkMyPe());
1750                         CkArrayMessage *m;
1751                         while (NULL!=(m=buffer.deq()))  {
1752                                 delete m;
1753                         }
1754                 }
1755         }
1756   
1757         virtual RecType type(void) {return buffering;}
1758   
1759         //Buffer a message for this element.
1760         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1761                 DEBS((AA" Queued message for %s\n"AB,idx2str(msg->array_index())));
1762                 if (opts & CK_MSG_KEEP)
1763                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1764                 buffer.enq(msg);
1765 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1766                 envelope *env = UsrToEnv(msg);
1767                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
1768 #endif
1769                 return CmiTrue;
1770         }
1771  
1772         //This is called when this ArrayRec is about to be replaced.
1773         // We dump all our buffered messages off on the next guy,
1774         // who should know what to do with them.
1775         virtual void beenReplaced(void) {
1776                 DEBS((AA" Delivering queued messages:\n"AB));
1777                 CkArrayMessage *m;
1778                 while (NULL!=(m=buffer.deq())) {
1779 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))         
1780                 DEBUG(CmiPrintf("[%d] buffered message being sent\n",CmiMyPe()));
1781                 envelope *env = UsrToEnv(m);
1782                 Chare *oldObj = CpvAccess(_currentObj);
1783                 CpvAccess(_currentObj) =(Chare *) env->sender.getObject();
1784                 env->sender.type = TypeInvalid;
1785 #endif
1786                 DEBS((AA"Sending buffered message to %s\n"AB,idx2str(m->array_index())));
1787                 myLocMgr->deliverViaQueue(m);
1788 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))         
1789                 CpvAccess(_currentObj) = oldObj;
1790 #endif
1791                 }
1792         }
1793   
1794         //Return if this element is now obsolete
1795         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {
1796                 if (isStale() && buffer.length()>0) {
1797                         /*This indicates something is seriously wrong--
1798                           buffers should be short-lived.*/
1799                         CkPrintf("[%d] WARNING: %d stale array message(s) found!\n",CkMyPe(),buffer.length());
1800                         CkArrayMessage *msg=buffer[0];
1801                         CkPrintf("Addressed to: ");
1802                         CkPrintEntryMethod(msg->array_ep());
1803                         CkPrintf(" index %s\n",idx2str(idx));
1804                         if (myLocMgr->isHome(idx)) 
1805                                 CkPrintf("is this an out-of-bounds array index, or was it never created?\n");
1806                         else //Idx is a remote-home index
1807                                 CkPrintf("why weren't they forwarded?\n");
1808                         
1809                         // CkAbort("Stale array manager message(s)!\n");
1810                 }
1811                 return CmiFalse;
1812         }
1813   
1814 /*  virtual void pup(PUP::er &p) {
1815     CkLocRec_aging::pup(p);
1816     CkArray::pupArrayMsgQ(buffer, p);
1817     }*/
1818 };
1819
1820 /*********************** Spring Cleaning *****************/
1821 /**
1822  * Used to periodically flush out unused remote element pointers.
1823  *
1824  * Cleaning often will free up memory quickly, but slow things
1825  * down because the cleaning takes time and some not-recently-referenced
1826  * remote element pointers might be valid and used some time in 
1827  * the future.
1828  *
1829  * Also used to determine if buffered messages have become stale.
1830  */
1831 inline void CkLocMgr::springCleaning(void)
1832 {
1833   nSprings++;
1834
1835   //Poke through the hash table for old ArrayRecs.
1836   void *objp;
1837   void *keyp;
1838   
1839   CkHashtableIterator *it=hash.iterator();
1840   CmiImmediateLock(hashImmLock);
1841   while (NULL!=(objp=it->next(&keyp))) {
1842     CkLocRec *rec=*(CkLocRec **)objp;
1843     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1844     if (rec->isObsolete(nSprings,idx)) {
1845       //This record is obsolete-- remove it from the table
1846       DEBK((AA"Cleaning out old record %s\n"AB,idx2str(idx)));
1847       hash.remove(*(CkArrayIndex *)&idx);
1848       delete rec;
1849       it->seek(-1);//retry this hash slot
1850     }
1851   }
1852   CmiImmediateUnlock(hashImmLock);
1853   delete it;
1854 }
1855 void CkLocMgr::staticSpringCleaning(void *forWhom,double curWallTime) {
1856         DEBK((AA"Starting spring cleaning at %.2f\n"AB,CkWallTimer()));
1857         ((CkLocMgr *)forWhom)->springCleaning();
1858 }
1859
1860 // clean all buffer'ed messages and also free local objects
1861 void CkLocMgr::flushAllRecs(void)
1862 {
1863   void *objp;
1864   void *keyp;
1865   int flag_local = 0;  
1866   int flag_remote = 0;  
1867   CkHashtableIterator *it=hash.iterator();
1868   CmiImmediateLock(hashImmLock);
1869   while (NULL!=(objp=it->next(&keyp))) {
1870     CkLocRec *rec=*(CkLocRec **)objp;
1871     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1872     if (rec->type() != CkLocRec::local) {
1873       //In the case of taking core out of memory (in BigSim's emulation)
1874       //the meta data in the location manager are not deleted so we need
1875       //this condition
1876       
1877       if(_BgOutOfCoreFlag!=1){
1878         //hash.remove(*(CkArrayIndex *)&idx);
1879         //delete rec;
1880         //it->seek(-1);//retry this hash slot
1881         flag_remote++;
1882       }
1883     }
1884     else {
1885         callMethod((CkLocRec_local*)rec, &CkMigratable::ckDestroy);
1886         it->seek(-1);//retry this hash slot
1887         flag_local++;
1888     }
1889   }
1890   if(CkMyPe()==33)
1891     CkPrintf("[%d] local:%d remote:%d\n",CkMyPe(),flag_local,flag_remote);
1892   delete it;
1893   CmiImmediateUnlock(hashImmLock);
1894 }
1895
1896
1897 void CkLocMgr::checkpointRemoteIdx(PUP::er &p)
1898 {
1899   void *objp;
1900   void *keyp;
1901   int flag = 0;
1902   CkHashtableIterator *it=hash.iterator();
1903   CmiImmediateLock(hashImmLock);
1904   while (NULL!=(objp=it->next(&keyp))) {
1905     CkLocRec *rec=*(CkLocRec **)objp;
1906     if (rec->type() != CkLocRec::local) {
1907       CkArrayIndexMax &idx=*(CkArrayIndex *)keyp;
1908       p|idx;
1909       int onPe = ((CkLocRec_remote *)rec)->lookupProcessor();
1910       p|onPe;
1911       flag++;
1912     }
1913   }
1914  // CkPrintf("[%d] has %d remote elements\n",CkMyPe(),flag);
1915 }
1916
1917 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1918 void CkLocMgr::callForAllRecords(CkLocFn fnPointer,CkArray *arr,void *data){
1919         void *objp;
1920         void *keyp;
1921
1922         CkHashtableIterator *it = hash.iterator();
1923         while (NULL!=(objp=it->next(&keyp))) {
1924                 CkLocRec *rec=*(CkLocRec **)objp;
1925                 CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1926                 fnPointer(arr,data,rec,&idx);
1927         }
1928
1929         // releasing iterator memory
1930         delete it;
1931 }
1932 #endif
1933
1934 /*************************** LocMgr: CREATION *****************************/
1935 CkLocMgr::CkLocMgr(CkGroupID mapID_,CkGroupID lbdbID_,CkArrayIndex& numInitial)
1936         :thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),
1937          hash(17,0.3)
1938 {
1939         DEBC((AA"Creating new location manager %d\n"AB,thisgroup));
1940 // moved to _CkMigratable_initInfoInit()
1941 //      CkpvInitialize(CkMigratable_initInfo,mig_initInfo);
1942
1943         managers.init();
1944         nManagers=0;
1945         firstManager=NULL;
1946         firstFree=localLen=0;
1947         duringMigration=CmiFalse;
1948         nSprings=0;
1949 #if !CMK_GLOBAL_LOCATION_UPDATE
1950         CcdCallOnConditionKeepOnPE(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this, CkMyPe());
1951 #endif
1952
1953 //Register with the map object
1954         mapID=mapID_;
1955         map=(CkArrayMap *)CkLocalBranch(mapID);
1956         if (map==NULL) CkAbort("ERROR!  Local branch of array map is NULL!");
1957         mapHandle=map->registerArray(numInitial,thisgroup);
1958
1959 //Find and register with the load balancer
1960         lbdbID = lbdbID_;
1961         initLB(lbdbID_);
1962         hashImmLock = CmiCreateImmediateLock();
1963 }
1964
1965 CkLocMgr::CkLocMgr(CkMigrateMessage* m)
1966         :IrrGroup(m),thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),hash(17,0.3)
1967 {
1968         managers.init();
1969         nManagers=0;
1970         firstManager=NULL;
1971         firstFree=localLen=0;
1972         duringMigration=CmiFalse;
1973         nSprings=0;
1974 #if !CMK_GLOBAL_LOCATION_UPDATE
1975         CcdCallOnConditionKeepOnPE(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this, CkMyPe());
1976 #endif
1977         hashImmLock = CmiCreateImmediateLock();
1978 }
1979
1980 void CkLocMgr::pup(PUP::er &p){
1981         IrrGroup::pup(p);
1982         p|mapID;
1983         p|mapHandle;
1984         p|lbdbID;
1985         mapID = _defaultArrayMapID;
1986         if(p.isUnpacking()){
1987                 thisProxy=thisgroup;
1988                 CProxyElement_CkLocMgr newlocalproxy(thisgroup,CkMyPe());
1989                 thislocalproxy=newlocalproxy;
1990                 //Register with the map object
1991                 map=(CkArrayMap *)CkLocalBranch(mapID);
1992                 if (map==NULL) CkAbort("ERROR!  Local branch of array map is NULL!");
1993                 CkArrayIndex emptyIndex;
1994                 map->registerArray(emptyIndex,thisgroup);
1995                 // _lbdb is the fixed global groupID
1996                 initLB(lbdbID);
1997   //  CkPrintf("unpacking loca manager\n");
1998 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) ||CMK_MEM_CHECKPOINT    
1999 #if 1
2000         int count;
2001         p | count;
2002         DEBUG(CmiPrintf("[%d] Unpacking Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
2003     //    CmiPrintf("[%d] Unpacking Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count);
2004         //homeElementCount = count;
2005
2006         for(int i=0;i<count;i++){
2007             CkArrayIndex idx;
2008             int pe;
2009             idx.pup(p);
2010             p | pe;
2011   //          CmiPrintf("[%d] idx %s is a home element exisiting on pe %d\n",CmiMyPe(),idx2str(idx),pe);
2012             inform(idx,pe);
2013             CkLocRec *rec = elementNrec(idx);
2014             CmiAssert(rec!=NULL);
2015             CmiAssert(lastKnown(idx) == pe);
2016         }
2017 #endif
2018                 // delay doneInserting when it is unpacking during restart.
2019                 // to prevent load balancing kicking in
2020                 if (!CkInRestarting()) 
2021                         doneInserting();
2022         }else{
2023  /**
2024  * pack the indexes of elements which have their homes on this processor
2025  * but dont exist on it.. needed for broadcast after a restart
2026  * indexes of local elements dont need to be packed
2027  * since they will be recreated later anyway
2028  */
2029  //   CkPrintf("packing loca manager\n");
2030 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))  || CMK_MEM_CHECKPOINT   
2031 #if 1
2032         int count=0,count1=0;
2033         void *objp;
2034         void *keyp;
2035         CkVec<int> pe_list;
2036         //std::vector<CkArrayIndex> idx_list;
2037         CkVec<CkArrayIndex> idx_list;
2038         CkHashtableIterator *it = hash.iterator();
2039      // if(CkMyPe()==0)
2040        // CmiPrintf("before first %lf\n",CmiWallTimer());
2041       while (NULL!=(objp=it->next(&keyp))) {
2042           CkLocRec *rec=*(CkLocRec **)objp;
2043           CkArrayIndex &idx=*(CkArrayIndex *)keyp;
2044             if(rec->type() != CkLocRec::local){
2045                 if(homePe(idx) == CmiMyPe()){
2046                   int pe;
2047                   CkArrayIndex max = idx;
2048                   pe = rec->lookupProcessor();
2049                   idx_list.push_back(max);
2050                   pe_list.push_back(pe);
2051                     count++;
2052                 }
2053             }
2054         }
2055       //if(CkMyPe()==0)
2056        // CmiPrintf("after first %lf %d\n",CmiWallTimer(),count);
2057         p | count;
2058     //    CmiPrintf("[%d] Packing Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count);
2059
2060                 // releasing iterator memory
2061                 delete it;
2062
2063     //  if(CkMyPe()==0)
2064    //     CmiPrintf("before second %lf\n",CmiWallTimer());
2065       for(int i=0;i<pe_list.length();i++){
2066         CkArrayIndex max = idx_list[i];
2067         max.pup(p);
2068         p|pe_list[i];
2069       }
2070     /*    it = hash.iterator();
2071       while (NULL!=(objp=it->next(&keyp))) {
2072       CkLocRec *rec=*(CkLocRec **)objp;
2073         CkArrayIndex &idx=*(CkArrayIndex *)keyp;
2074             CkArrayIndex max = idx;
2075             if(rec->type() != CkLocRec::local){
2076                 if(homePe(idx) == CmiMyPe()){
2077                     int pe;
2078                     max.pup(p);
2079                     pe = rec->lookupProcessor();
2080                     p | pe;
2081                     count1++;
2082                 }
2083             }
2084         }
2085       if(CkMyPe()==0)
2086         CmiPrintf("after second %lf\n",CmiWallTimer());
2087       //  CmiAssert(count == count1);
2088
2089                 // releasing iterator memory
2090                 delete it;*/
2091
2092 #endif
2093
2094         }
2095 }
2096
2097 void _CkLocMgrInit(void) {
2098   /* Don't trace our deliver method--it does its own tracing */
2099   CkDisableTracing(CkIndex_CkLocMgr::deliverInline(0));
2100 }
2101
2102 /// Add a new local array manager to our list.
2103 /// Returns a new CkMigratableList for the manager to store his
2104 /// elements in.
2105 CkMigratableList *CkLocMgr::addManager(CkArrayID id,CkArrMgr *mgr)
2106 {
2107         CK_MAGICNUMBER_CHECK
2108         DEBC((AA"Adding new array manager\n"AB));
2109         //Link new manager into list
2110         ManagerRec *n=new ManagerRec;
2111         managers.find(id)=n;
2112         n->next=firstManager;
2113         n->mgr=mgr;
2114         n->elts.setSize(localLen);
2115         nManagers++;
2116         firstManager=n;
2117         return &n->elts;
2118 }
2119
2120 /// Return the next unused local element index.
2121 int CkLocMgr::nextFree(void) {
2122         if (firstFree>=localLen)
2123         {//Need more space in the local index arrays-- enlarge them
2124                 int oldLen=localLen;
2125                 localLen=localLen*2+8;
2126                 DEBC((AA"Growing the local list from %d to %d...\n"AB,oldLen,localLen));
2127                 for (ManagerRec *m=firstManager;m!=NULL;m=m->next)
2128                         m->elts.setSize(localLen);
2129                 //Update the free list
2130                 freeList.resize(localLen);
2131                 for (int i=oldLen;i<localLen;i++)
2132                         freeList[i]=i+1;
2133         }
2134         int localIdx=firstFree;
2135         if (localIdx==-1) CkAbort("CkLocMgr free list corrupted!");
2136         firstFree=freeList[localIdx];
2137         freeList[localIdx]=-1; //Mark as used
2138         return localIdx;
2139 }
2140
2141 CkLocRec_remote *CkLocMgr::insertRemote(const CkArrayIndex &idx,int nowOnPe)
2142 {
2143         DEBS((AA"Remote element %s lives on %d\n"AB,idx2str(idx),nowOnPe));
2144         CkLocRec_remote *rem=new CkLocRec_remote(this,nowOnPe);
2145         insertRec(rem,idx);
2146         return rem;
2147 }
2148
2149 //This element now lives on the given Pe
2150 void CkLocMgr::inform(const CkArrayIndex &idx,int nowOnPe)
2151 {
2152         if (nowOnPe==CkMyPe())
2153                 return; //Never insert a "remote" record pointing here
2154         CkLocRec *rec=elementNrec(idx);
2155         if (rec!=NULL && rec->type()==CkLocRec::local){
2156 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2157         CmiPrintf("[%d]WARNING!!! Element %d:%s is local but is being told it exists on %d\n",CkMyPe(),idx.dimension,idx2str(idx), nowOnPe);
2158 #endif
2159                 return; //Never replace a local element's record!
2160         }
2161         insertRemote(idx,nowOnPe);
2162 }
2163
2164 //Tell this element's home processor it now lives "there"
2165 void CkLocMgr::informHome(const CkArrayIndex &idx,int nowOnPe)
2166 {
2167         int home=homePe(idx);
2168         if (home!=CkMyPe() && home!=nowOnPe) {
2169                 //Let this element's home Pe know it lives here now
2170                 DEBC((AA"  Telling %s's home %d that it lives on %d.\n"AB,idx2str(idx),home,nowOnPe));
2171 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2172 //#if defined(_FAULT_MLOG_)
2173 //        informLocationHome(thisgroup,idx,home,CkMyPe());
2174 //#else
2175                 thisProxy[home].updateLocation(idx,nowOnPe);
2176 //#endif
2177         }
2178 }
2179
2180 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2181 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx,
2182         CmiBool forMigration, CmiBool ignoreArrival,
2183         CmiBool notifyHome,int dummy)
2184 {
2185     int localIdx=nextFree();
2186     DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
2187     CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
2188     if(!dummy){
2189         insertRec(rec,idx); //Add to global hashtable
2190     }   
2191     if (notifyHome) informHome(idx,CkMyPe());
2192     return rec; 
2193 }
2194 #else
2195 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx, 
2196                 CmiBool forMigration, CmiBool ignoreArrival,
2197                 CmiBool notifyHome)
2198 {
2199         int localIdx=nextFree();
2200         DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
2201         CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
2202         insertRec(rec,idx); //Add to global hashtable
2203
2204
2205         if (notifyHome) informHome(idx,CkMyPe());
2206         return rec;
2207 }
2208 #endif
2209
2210 //Add a new local array element, calling element's constructor
2211 CmiBool CkLocMgr::addElement(CkArrayID id,const CkArrayIndex &idx,
2212                 CkMigratable *elt,int ctorIdx,void *ctorMsg)
2213 {
2214         CK_MAGICNUMBER_CHECK
2215         CkLocRec *oldRec=elementNrec(idx);
2216         CkLocRec_local *rec;
2217         if (oldRec==NULL||oldRec->type()!=CkLocRec::local) 
2218         { //This is the first we've heard of that element-- add new local record
2219                 rec=createLocal(idx,CmiFalse,CmiFalse,CmiTrue);
2220 #if CMK_GLOBAL_LOCATION_UPDATE
2221                 if (homePe(idx) != CkMyPe()) {
2222                   DEBC((AA"Global location broadcast for new element idx %s "
2223                         "assigned to %d \n"AB, idx2str(idx), CkMyPe()));
2224                   thisProxy.updateLocation(idx, CkMyPe());  
2225                 }
2226 #endif
2227                 
2228         } else 
2229         { //rec is *already* local-- must not be the first insertion    
2230                 rec=((CkLocRec_local *)oldRec);
2231                 rec->addedElement();
2232         }
2233         if (!addElementToRec(rec,managers.find(id),elt,ctorIdx,ctorMsg)) return CmiFalse;
2234         elt->ckFinishConstruction();
2235         return CmiTrue;
2236 }
2237
2238 //As above, but shared with the migration code
2239 CmiBool CkLocMgr::addElementToRec(CkLocRec_local *rec,ManagerRec *m,
2240                 CkMigratable *elt,int ctorIdx,void *ctorMsg)
2241 {//Insert the new element into its manager's local list
2242         int localIdx=rec->getLocalIndex();
2243         if (m->elts.get(localIdx)!=NULL) CkAbort("Cannot insert array element twice!");
2244         m->elts.put(elt,localIdx); //Local element table
2245
2246 //Call the element's constructor
2247         DEBC((AA"Constructing element %s of array\n"AB,idx2str(rec->getIndex())));
2248         CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
2249         i.locRec=rec;
2250         i.chareType=_entryTable[ctorIdx]->chareIdx;
2251         if (!rec->invokeEntry(elt,ctorMsg,ctorIdx,CmiTrue)) return CmiFalse;
2252
2253 #if CMK_OUT_OF_CORE
2254         /* Register new element with out-of-core */
2255         PUP::sizer p_getSize; elt->pup(p_getSize);
2256         elt->prefetchObjID=CooRegisterObject(&CkArrayElementPrefetcher,p_getSize.size(),elt);
2257 #endif
2258         
2259         return CmiTrue;
2260 }
2261 void CkLocMgr::updateLocation(const CkArrayIndex &idx,int nowOnPe) {
2262   inform(idx,nowOnPe);
2263         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
2264 }
2265
2266 /*************************** LocMgr: DELETION *****************************/
2267 /// This index will no longer be used-- delete the associated elements
2268 void CkLocMgr::reclaim(const CkArrayIndex &idx,int localIdx) {
2269         CK_MAGICNUMBER_CHECK
2270         DEBC((AA"Destroying element %s (local %d)\n"AB,idx2str(idx),localIdx));
2271         //Delete, and mark as empty, each array element
2272         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2273                 delete m->elts.get(localIdx);
2274                 m->elts.empty(localIdx);
2275         }
2276         
2277         removeFromTable(idx);
2278         
2279         //Link local index into free list
2280         freeList[localIdx]=firstFree;
2281         firstFree=localIdx;
2282         
2283                 
2284         if (!duringMigration) 
2285         { //This is a local element dying a natural death
2286             #if CMK_BIGSIM_CHARM
2287                 //After migration, reclaimRemote will be called through 
2288                 //the CkRemoveArrayElement in the pupping routines for those 
2289                 //objects that are not on the home processors. However,
2290                 //those remote records should not be deleted since the corresponding
2291                 //objects are not actually deleted but on disk. If deleted, msgs
2292                 //that seeking where is the object will be accumulated (a circular
2293                 //msg chain) and causes the program no progress
2294                 if(_BgOutOfCoreFlag==1) return; 
2295             #endif
2296                 int home=homePe(idx);
2297                 if (home!=CkMyPe())
2298 #if CMK_MEM_CHECKPOINT
2299                 if (!CkInRestarting()) // all array elements are removed anyway
2300 #endif
2301                         thisProxy[home].reclaimRemote(idx,CkMyPe());
2302         /*      //Install a zombie to keep the living from re-using this index.
2303                 insertRecN(new CkLocRec_dead(this),idx); */
2304         }
2305 }
2306
2307 void CkLocMgr::reclaimRemote(const CkArrayIndex &idx,int deletedOnPe) {
2308         DEBC((AA"Our element %s died on PE %d\n"AB,idx2str(idx),deletedOnPe));
2309         CkLocRec *rec=elementNrec(idx);
2310         if (rec==NULL) return; //We never knew him
2311         if (rec->type()==CkLocRec::local) return; //He's already been reborn
2312         removeFromTable(idx);
2313         delete rec;
2314 }
2315 void CkLocMgr::removeFromTable(const CkArrayIndex &idx) {
2316 #if CMK_ERROR_CHECKING
2317         //Make sure it's actually in the table before we delete it
2318         if (NULL==elementNrec(idx))
2319                 CkAbort("CkLocMgr::removeFromTable called on invalid index!");
2320 #endif
2321         CmiImmediateLock(hashImmLock);
2322         hash.remove(*(CkArrayIndex *)&idx);
2323         CmiImmediateUnlock(hashImmLock);
2324 #if CMK_ERROR_CHECKING
2325         //Make sure it's really gone
2326         if (NULL!=elementNrec(idx))
2327                 CkAbort("CkLocMgr::removeFromTable called, but element still there!");
2328 #endif
2329 }
2330
2331 /************************** LocMgr: MESSAGING *************************/
2332 /// Deliver message to this element, going via the scheduler if local
2333 /// @return 0 if object local, 1 if not
2334 int CkLocMgr::deliver(CkMessage *m,CkDeliver_t type,int opts) {
2335         DEBS((AA"deliver \n"AB));
2336         CK_MAGICNUMBER_CHECK
2337         CkArrayMessage *msg=(CkArrayMessage *)m;
2338
2339
2340         const CkArrayIndex &idx=msg->array_index();
2341         DEBS((AA"deliver %s\n"AB,idx2str(idx)));
2342         if (type==CkDeliver_queue)
2343                 _TRACE_CREATION_DETAILED(UsrToEnv(m),msg->array_ep());
2344         CkLocRec *rec=elementNrec(idx);
2345         if(rec != NULL){
2346                 DEBS((AA"deliver %s of type %d \n"AB,idx2str(idx),rec->type()));
2347         }else{
2348                 DEBS((AA"deliver %s rec is null\n"AB,idx2str(idx)));
2349         }
2350 //#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2351 //#if !defined(_FAULT_MLOG_)
2352 #if CMK_LBDB_ON
2353
2354         LDObjid ldid = idx2LDObjid(idx);
2355 #if CMK_GLOBAL_LOCATION_UPDATE
2356         ldid.locMgrGid = thisgroup.idx;
2357 #endif        
2358         if (type==CkDeliver_queue) {
2359                 if (!(opts & CK_MSG_LB_NOTRACE) && the_lbdb->CollectingCommStats()) {
2360                 if(rec!=NULL) the_lbdb->Send(myLBHandle,ldid,UsrToEnv(msg)->getTotalsize(), rec->lookupProcessor(), 1);
2361                 else /*rec==NULL*/ the_lbdb->Send(myLBHandle,ldid,UsrToEnv(msg)->getTotalsize(),homePe(msg->array_index()), 1);
2362                 }
2363         }
2364 #endif
2365 //#endif
2366 #if CMK_GRID_QUEUE_AVAILABLE
2367         int gridSrcPE;
2368         int gridSrcCluster;
2369         int gridDestPE;
2370         int gridDestCluster;
2371         CkMigratable *obj;
2372         ArrayElement *obj2;
2373         CkGroupID gid;
2374         int *data;
2375
2376         obj = (CkMigratable *) CpvAccess(CkGridObject);   // CkGridObject is a pointer to the sending object (retained earlier)
2377         if (obj != NULL) {
2378           obj2 = dynamic_cast<ArrayElement *> (obj);
2379           if (obj2 > 0) {
2380             // Get the sending object's array gid and indexes.
2381             // These are guaranteed to exist due to the succeeding dynamic cast above.
2382             gid = obj2->ckGetArrayID ();
2383             data = obj2->thisIndexMax.data ();
2384
2385             // Get the source PE and destination PE.
2386             gridSrcPE = CkMyPe ();
2387             if (rec != NULL) {
2388               gridDestPE = rec->lookupProcessor ();
2389             } else {
2390               gridDestPE = homePe (msg->array_index ());
2391             }
2392
2393             // Get the source cluster and destination cluster.
2394             gridSrcCluster = CmiGetCluster (gridSrcPE);
2395             gridDestCluster = CmiGetCluster (gridDestPE);
2396
2397             // If the Grid queue interval is greater than zero, it means that the more complicated
2398             // technique for registering border objects that exceed a specified threshold of
2399             // cross-cluster messages within a specified interval (and deregistering border objects
2400             // that do not meet this threshold) is used.  Otherwise a much simpler technique is used
2401             // where a border object is registered immediately upon sending a single cross-cluster
2402             // message (and deregistered when load balancing takes place).
2403             if (obj2->grid_queue_interval > 0) {
2404               // Increment the sending object's count of all messages.
2405               obj2->msg_count += 1;
2406
2407               // If the source cluster and destination cluster differ, this is a Grid message.
2408               // (Increment the count of all Grid messages.)
2409               if (gridSrcCluster != gridDestCluster) {
2410                 obj2->msg_count_grid += 1;
2411               }
2412
2413               // If the number of messages exceeds the interval, check to see if the object has
2414               // sent enough cross-cluster messages to qualify as a border object.
2415               if (obj2->msg_count >= obj2->grid_queue_interval) {
2416                 if (obj2->msg_count_grid >= obj2->grid_queue_threshold) {
2417                   // The object is a border object; if it is not already registered, register it.
2418                   if (!obj2->border_flag) {
2419                     CmiGridQueueRegister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2420                   }
2421                   obj2->border_flag = 1;
2422                 } else {
2423                   // The object is not a border object; if it is registered, deregister it.
2424                   if (obj2->border_flag) {
2425                     CmiGridQueueDeregister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2426                   }
2427                   obj2->border_flag = 0;
2428                 }
2429                 // Reset the counts.
2430                 obj2->msg_count = 0;
2431                 obj2->msg_count_grid = 0;
2432               }
2433             } else {
2434               if (gridSrcCluster != gridDestCluster) {
2435                 CmiGridQueueRegister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2436               }
2437             }
2438           }
2439
2440           // Reset the CkGridObject pointer.
2441           CpvAccess(CkGridObject) = NULL;
2442         }
2443 #endif
2444         /**FAULT_EVAC*/
2445         if (rec!=NULL){
2446                 CmiBool result = rec->deliver(msg,type,opts);
2447                 // if result is CmiFalse, than rec is not valid anymore, as the object
2448                 // the message was just delivered to has died or migrated out.
2449                 // Therefore rec->type() cannot be invoked!
2450                 if (result==CmiTrue && rec->type()==CkLocRec::local) return 0;
2451                 else return 1;
2452                 /*if(!result){
2453                         //DEBS((AA"deliver %s failed type %d \n"AB,idx2str(idx),rec->type()));
2454                         DEBS((AA"deliver %s failed \n"AB,idx2str(idx)));
2455                         if(rec->type() == CkLocRec::remote){
2456                                 if (opts & CK_MSG_KEEP)
2457                                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
2458                                 deliverUnknown(msg,type);
2459                         }
2460                 }*/
2461         }else /* rec==NULL*/ {
2462                 if (opts & CK_MSG_KEEP)
2463                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
2464                 deliverUnknown(msg,type,opts);
2465                 return 1;
2466         }
2467
2468 }
2469
2470 /// This index is not hashed-- somehow figure out what to do.
2471 CmiBool CkLocMgr::deliverUnknown(CkArrayMessage *msg,CkDeliver_t type,int opts)
2472 {
2473         CK_MAGICNUMBER_CHECK
2474         const CkArrayIndex &idx=msg->array_index();
2475         int onPe=homePe(idx);
2476         if (onPe!=CkMyPe()) 
2477         {// Forward the message to its home processor
2478                 DEBM((AA"Forwarding message for unknown %s to home %d \n"AB,idx2str(idx),onPe));
2479                 msg->array_hops()++;
2480                 CkArrayManagerDeliver(onPe,msg,opts);
2481                 return CmiTrue;
2482         }
2483         else
2484         { // We *are* the home processor:
2485         //Check if the element's array manager has been registered yet:
2486           CkArrMgr *mgr=managers.find(UsrToEnv((void *)msg)->getsetArrayMgr())->mgr;
2487           if (!mgr) { //No manager yet-- postpone the message (stupidly)
2488             if (CkInRestarting()) {
2489               // during restarting, this message should be ignored
2490               delete msg;
2491             }
2492             else {
2493               CkArrayManagerDeliver(CkMyPe(),msg); 
2494             }
2495           }
2496           else { // Has a manager-- must buffer the message
2497             DEBC((AA"Adding buffer for unknown element %s\n"AB,idx2str(idx)));
2498             CkLocRec *rec=new CkLocRec_buffering(this);
2499             insertRecN(rec,idx);
2500             rec->deliver(msg,type);
2501           
2502             if (msg->array_ifNotThere()!=CkArray_IfNotThere_buffer) 
2503             { //Demand-create the element:
2504               return demandCreateElement(msg,-1,type);
2505             }
2506           }
2507           return CmiTrue;
2508         }
2509 }
2510
2511 CmiBool CkLocMgr::demandCreateElement(CkArrayMessage *msg,int onPe,CkDeliver_t type)
2512 {
2513         CK_MAGICNUMBER_CHECK
2514         const CkArrayIndex &idx=msg->array_index();
2515         int chareType=_entryTable[msg->array_ep()]->chareIdx;
2516         int ctor=_chareTable[chareType]->getDefaultCtor();
2517         if (ctor==-1) CkAbort("Can't create array element to handle message--\n"
2518                               "The element has no default constructor in the .ci file!\n");
2519         if (onPe==-1) 
2520         { //Decide where element needs to live
2521                 if (msg->array_ifNotThere()==CkArray_IfNotThere_createhere) 
2522                         onPe=UsrToEnv(msg)->getsetArraySrcPe();
2523                 else //Createhome
2524                         onPe=homePe(idx);
2525         }
2526         
2527         //Find the manager and build the element
2528         DEBC((AA"Demand-creating element %s on pe %d\n"AB,idx2str(idx),onPe));
2529         CkArrMgr *mgr=managers.find(UsrToEnv((void *)msg)->getsetArrayMgr())->mgr;
2530         if (!mgr) CkAbort("Tried to demand-create for nonexistent arrMgr");
2531         return mgr->demandCreateElement(idx,onPe,ctor,type);
2532 }
2533
2534 //This message took several hops to reach us-- fix it
2535 void CkLocMgr::multiHop(CkArrayMessage *msg)
2536 {
2537
2538         CK_MAGICNUMBER_CHECK
2539         int srcPe=msg->array_getSrcPe();
2540         if (srcPe==CkMyPe())
2541                 DEB((AA"Odd routing: local element %s is %d hops away!\n"AB,idx2str(msg),msg->array_hops()));
2542         else
2543         {//Send a routing message letting original sender know new element location
2544                 DEBS((AA"Sending update back to %d for element\n"AB,srcPe,idx2str(msg)));
2545                 thisProxy[srcPe].updateLocation(msg->array_index(),CkMyPe());
2546         }
2547 }
2548
2549 /************************** LocMgr: ITERATOR *************************/
2550 CkLocation::CkLocation(CkLocMgr *mgr_, CkLocRec_local *rec_)
2551         :mgr(mgr_), rec(rec_) {}
2552         
2553 const CkArrayIndex &CkLocation::getIndex(void) const {
2554         return rec->getIndex();
2555 }
2556
2557 void CkLocation::destroyAll() {
2558         mgr->callMethod(rec, &CkMigratable::ckDestroy);
2559 }
2560
2561 void CkLocation::pup(PUP::er &p) {
2562         mgr->pupElementsFor(p,rec,CkElementCreation_migrate);
2563 }
2564
2565 CkLocIterator::~CkLocIterator() {}
2566
2567 /// Iterate over our local elements:
2568 void CkLocMgr::iterate(CkLocIterator &dest) {
2569   //Poke through the hash table for local ArrayRecs.
2570   void *objp;
2571   CkHashtableIterator *it=hash.iterator();
2572   CmiImmediateLock(hashImmLock);
2573
2574   while (NULL!=(objp=it->next())) {
2575     CkLocRec *rec=*(CkLocRec **)objp;
2576     if (rec->type()==CkLocRec::local) {
2577       CkLocation loc(this,(CkLocRec_local *)rec);
2578       dest.addLocation(loc);
2579     }
2580   }
2581   CmiImmediateUnlock(hashImmLock);
2582   delete it;
2583 }
2584
2585
2586
2587
2588 /************************** LocMgr: MIGRATION *************************/
2589 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2590 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
2591         CkElementCreation_t type, CmiBool create, int dummy)
2592 {
2593     p.comment("-------- Array Location --------");
2594     register ManagerRec *m;
2595     int localIdx=rec->getLocalIndex();
2596     CkVec<CkMigratable *> dummyElts;
2597
2598     for (m=firstManager;m!=NULL;m=m->next) {
2599         int elCType;
2600         if (!p.isUnpacking())
2601         { //Need to find the element's existing type
2602             CkMigratable *elt=m->element(localIdx);
2603             if (elt) elCType=elt->ckGetChareType();
2604             else elCType=-1; //Element hasn't been created
2605         }
2606         p(elCType);
2607         if (p.isUnpacking() && elCType!=-1) {
2608             CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
2609             int migCtorIdx=_chareTable[elCType]->getMigCtor();
2610                 if(!dummy){
2611                         if(create)
2612                                 if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
2613                                 }else{
2614                     CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
2615                     i.locRec=rec;
2616                     i.chareType=_entryTable[migCtorIdx]->chareIdx;
2617                     dummyElts.push_back(elt);
2618                     if (!rec->invokeEntry(elt,NULL,migCtorIdx,CmiTrue)) return ;
2619                 }
2620         }
2621     }
2622     if(!dummy){
2623         for (m=firstManager;m!=NULL;m=m->next) {
2624             CkMigratable *elt=m->element(localIdx);
2625             if (elt!=NULL)
2626                 {
2627                        elt->pup(p);
2628                 }
2629         }
2630     }else{
2631             for(int i=0;i<dummyElts.size();i++){
2632                 CkMigratable *elt = dummyElts[i];
2633                 if (elt!=NULL){
2634             elt->pup(p);
2635                         }
2636                 delete elt;
2637             }
2638                         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2639                 m->elts.empty(localIdx);
2640             }
2641         freeList[localIdx]=firstFree;
2642         firstFree=localIdx;
2643     }
2644 }
2645 #else
2646 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
2647                 CkElementCreation_t type)
2648 {
2649         p.comment("-------- Array Location --------");
2650         register ManagerRec *m;
2651         int localIdx=rec->getLocalIndex();
2652
2653         //First pup the element types
2654         // (A separate loop so ckLocal works even in element pup routines)
2655         for (m=firstManager;m!=NULL;m=m->next) {
2656                 int elCType;
2657                 if (!p.isUnpacking())
2658                 { //Need to find the element's existing type
2659                         CkMigratable *elt=m->element(localIdx);
2660                         if (elt) elCType=elt->ckGetChareType();
2661                         else elCType=-1; //Element hasn't been created
2662                 }
2663                 p(elCType);
2664                 if (p.isUnpacking() && elCType!=-1) {
2665                         //Create the element
2666                         CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
2667                         int migCtorIdx=_chareTable[elCType]->getMigCtor();
2668                         //Insert into our tables and call migration constructor
2669                         if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
2670                 }
2671         }
2672         //Next pup the element data
2673         for (m=firstManager;m!=NULL;m=m->next) {
2674                 CkMigratable *elt=m->element(localIdx);
2675                 if (elt!=NULL)
2676                 {
2677                         elt->pup(p);
2678 #if CMK_ERROR_CHECKING
2679                         if (p.isUnpacking()) elt->sanitycheck();
2680 #endif
2681                 }
2682         }
2683 #if CMK_MEM_CHECKPOINT
2684         if(CkInRestarting()){
2685           ArrayElement *elt;
2686           CkVec<CkMigratable *> list;
2687           migratableList(rec, list);
2688           CmiAssert(list.length() > 0);
2689           for (int l=0; l<list.length(); l++) {
2690                 //    reset, may not needed now
2691                 // for now.
2692                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
2693                         ArrayElement * elt = (ArrayElement *)list[l];
2694                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
2695                   if (c) c->redNo = 0;
2696                 }
2697           }
2698                 
2699         }
2700 #endif
2701 }
2702 #endif
2703
2704 /// Call this member function on each element of this location:
2705 void CkLocMgr::callMethod(CkLocRec_local *rec,CkMigratable_voidfn_t fn)
2706 {
2707         int localIdx=rec->getLocalIndex();
2708         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2709                 CkMigratable *el=m->element(localIdx);
2710                 if (el) (el->* fn)();
2711         }
2712 }
2713
2714 /// return a list of migratables in this local record
2715 void CkLocMgr::migratableList(CkLocRec_local *rec, CkVec<CkMigratable *> &list)
2716 {
2717         register ManagerRec *m;
2718         int localIdx=rec->getLocalIndex();
2719
2720         for (m=firstManager;m!=NULL;m=m->next) {
2721                 CkMigratable *elt=m->element(localIdx);
2722                 if (elt) list.push_back(elt);
2723         }
2724 }
2725
2726 /// Migrate this local element away to another processor.
2727 void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
2728 {
2729         CK_MAGICNUMBER_CHECK
2730         if (toPe==CkMyPe()) return; //You're already there!
2731         /*
2732                 FAULT_EVAC
2733                 if the toProcessor is already marked as invalid, dont emigrate
2734                 Shouldn't happen but might
2735         */
2736         if(!CmiNodeAlive(toPe)){
2737                 return;
2738         }
2739         CkArrayIndex idx=rec->getIndex();
2740
2741 #if CMK_OUT_OF_CORE
2742         int localIdx=rec->getLocalIndex();
2743         /* Load in any elements that are out-of-core */
2744         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2745                 CkMigratable *el=m->element(localIdx);
2746                 if (el) if (!el->isInCore) CooBringIn(el->prefetchObjID);
2747         }
2748 #endif
2749
2750         //Let all the elements know we're leaving
2751         callMethod(rec,&CkMigratable::ckAboutToMigrate);
2752         /*EVAC*/
2753
2754 //First pass: find size of migration message
2755         int bufSize;
2756         {
2757                 PUP::sizer p;
2758                 p(nManagers);
2759                 pupElementsFor(p,rec,CkElementCreation_migrate);
2760                 bufSize=p.size(); 
2761         }
2762
2763 //Allocate and pack into message
2764         int doubleSize=bufSize/sizeof(double)+1;
2765         CkArrayElementMigrateMessage *msg = 
2766                 new (doubleSize, 0) CkArrayElementMigrateMessage;
2767         msg->idx=idx;
2768         msg->length=bufSize;
2769 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
2770     msg->gid = ckGetGroupID();
2771 #endif
2772 #if CMK_LBDB_ON
2773         msg->ignoreArrival = rec->isAsyncMigrate()?1:0;
2774 #endif
2775         /*
2776                 FAULT_EVAC
2777         */
2778         msg->bounced = rec->isBounced();
2779         {
2780                 PUP::toMem p(msg->packData); 
2781                 p.becomeDeleting(); 
2782                 p(nManagers);
2783                 pupElementsFor(p,rec,CkElementCreation_migrate);
2784                 if (p.size()!=bufSize) {
2785                         CkError("ERROR! Array element claimed it was %d bytes to a "
2786                                 "sizing PUP::er, but copied %d bytes into the packing PUP::er!\n",
2787                                 bufSize,p.size());
2788                         CkAbort("Array element's pup routine has a direction mismatch.\n");
2789                 }
2790         }
2791
2792         DEBM((AA"Migrated index size %s to %d \n"AB,idx2str(idx),toPe));        
2793
2794 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2795 //#if defined(_FAULT_MLOG_)
2796 //    sendMlogLocation(toPe,UsrToEnv(msg));
2797 //#else
2798         //Send off message and delete old copy
2799         thisProxy[toPe].immigrate(msg);
2800 //#endif
2801
2802         duringMigration=CmiTrue;
2803         delete rec; //Removes elements, hashtable entries, local index
2804         
2805         
2806         duringMigration=CmiFalse;
2807         //The element now lives on another processor-- tell ourselves and its home
2808         inform(idx,toPe);
2809 //#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
2810 //#if !defined(_FAULT_MLOG_)    
2811         informHome(idx,toPe);
2812 //#endif
2813
2814 #if !CMK_LBDB_ON && CMK_GLOBAL_LOCATION_UPDATE
2815         DEBM((AA"Global location update. idx %s " 
2816               "assigned to %d \n"AB,idx2str(idx),toPe));
2817         thisProxy.updateLocation(idx, toPe);                        
2818 #endif
2819
2820         CK_MAGICNUMBER_CHECK
2821 }
2822
2823 /**
2824   Migrating array element is arriving on this processor.
2825 */
2826 void CkLocMgr::immigrate(CkArrayElementMigrateMessage *msg)
2827 {
2828         const CkArrayIndex &idx=msg->idx;
2829                 
2830         PUP::fromMem p(msg->packData); 
2831         
2832         int nMsgMan;
2833         p(nMsgMan);
2834         if (nMsgMan<nManagers)
2835                 CkAbort("Array element arrived from location with fewer managers!\n");
2836         if (nMsgMan>nManagers) {
2837                 //Some array managers haven't registered yet-- throw it back
2838                 DEBM((AA"Busy-waiting for array registration on migrating %s\n"AB,idx2str(idx)));
2839                 thisProxy[CkMyPe()].immigrate(msg);
2840                 return;
2841         }
2842
2843         //Create a record for this element
2844 //#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
2845 //#if !defined(_FAULT_MLOG_)     
2846         CkLocRec_local *rec=createLocal(idx,CmiTrue,msg->ignoreArrival,CmiFalse /* home told on departure */ );
2847 //#else
2848 //    CkLocRec_local *rec=createLocal(idx,CmiTrue,CmiTrue,CmiFalse /* home told on departure */ );
2849 //#endif
2850         
2851         //Create the new elements as we unpack the message
2852         pupElementsFor(p,rec,CkElementCreation_migrate);
2853         if (p.size()!=msg->length) {
2854                 CkError("ERROR! Array element claimed it was %d bytes to a"
2855                         "packing PUP::er, but %d bytes in the unpacking PUP::er!\n",
2856                         msg->length,p.size());
2857                 CkError("(I have %d managers; he claims %d managers)\n",
2858                         nManagers,nMsgMan);
2859                 
2860                 CkAbort("Array element's pup routine has a direction mismatch.\n");
2861         }
2862         /*
2863                 FAULT_EVAC
2864                         if this element came in as a result of being bounced off some other process,
2865                         then it needs to be resumed. It is assumed that it was bounced because load 
2866                         balancing caused it to move into a processor which later crashed
2867         */
2868         if(msg->bounced){
2869                 callMethod(rec,&CkMigratable::ResumeFromSync);
2870         }
2871         
2872         //Let all the elements know we've arrived
2873         callMethod(rec,&CkMigratable::ckJustMigrated);
2874         /*FAULT_EVAC
2875                 If this processor has started evacuating array elements on it 
2876                 dont let new immigrants in. If they arrive send them to what
2877                 would be their correct homePE.
2878                 Leave a record here mentioning the processor where it got sent
2879         */
2880         
2881         if(CkpvAccess(startedEvac)){
2882                 int newhomePE = getNextPE(idx);
2883                 DEBM((AA"Migrated into failed processor index size %s resent to %d \n"AB,idx2str(idx),newhomePE));      
2884                 CkLocMgr *mgr = rec->getLocMgr();
2885                 int targetPE=getNextPE(idx);
2886                 //set this flag so that load balancer is not informed when
2887                 //this element migrates
2888                 rec->AsyncMigrate(CmiTrue);
2889                 rec->Bounced(CmiTrue);
2890                 mgr->emigrate(rec,targetPE);
2891                 
2892         }
2893
2894         delete msg;
2895 }
2896
2897 void CkLocMgr::restore(const CkArrayIndex &idx, PUP::er &p)
2898 {
2899         //This is in broughtIntoMem during out-of-core emulation in BigSim,
2900         //informHome should not be called since such information is already
2901         //immediately updated real migration
2902 #if CMK_ERROR_CHECKING
2903         if(_BgOutOfCoreFlag!=2)
2904             CmiAbort("CkLocMgr::restore should only be used in out-of-core emulation for BigSim and be called when object is brought into memory!\n");
2905 #endif
2906         CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,CmiFalse);
2907         
2908         //BIGSIM_OOC DEBUGGING
2909         //CkPrintf("Proc[%d]: Registering element %s with LDB\n", CkMyPe(), idx2str(idx));
2910
2911         //Create the new elements as we unpack the message
2912         pupElementsFor(p,rec,CkElementCreation_restore);
2913
2914         callMethod(rec,&CkMigratable::ckJustRestored);
2915 }
2916
2917
2918 /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
2919 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2920 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool create, int dummy)
2921 {
2922         CkLocRec_local *rec;
2923         CkLocRec *recGlobal;    
2924
2925         if(create){
2926                 rec = createLocal(idx,CmiFalse,CmiFalse,CmiTrue && !dummy /* home doesn't know yet */,dummy );
2927         }else{
2928                 recGlobal = elementNrec(idx);
2929                 if(recGlobal == NULL) 
2930                         CmiAbort("Local object not found");
2931                 if(recGlobal->type() != CkLocRec::local)
2932                         CmiAbort("Local object not local, :P");
2933                 rec = (CkLocRec_local *)recGlobal;
2934         }
2935         
2936     pupElementsFor(p,rec,CkElementCreation_resume,create,dummy);
2937
2938     if(!dummy){
2939         callMethod(rec,&CkMigratable::ckJustMigrated);
2940     }
2941 }
2942 #else
2943 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool notify)
2944 {
2945         CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,notify /* home doesn't know yet */ );
2946
2947         //Create the new elements as we unpack the message
2948         pupElementsFor(p,rec,CkElementCreation_resume);
2949
2950         callMethod(rec,&CkMigratable::ckJustMigrated);
2951 }
2952 #endif
2953
2954 /********************* LocMgr: UTILITY ****************/
2955 void CkMagicNumber_impl::badMagicNumber(
2956         int expected,const char *file,int line,void *obj) const
2957 {
2958         CkError("FAILURE on pe %d, %s:%d> Expected %p's magic number "
2959                 "to be 0x%08x; but found 0x%08x!\n", CkMyPe(),file,line,obj,
2960                 expected, magic);
2961         CkAbort("Bad magic number detected!  This implies either\n"
2962                 "the heap or a message was corrupted!\n");
2963 }
2964 CkMagicNumber_impl::CkMagicNumber_impl(int m) :magic(m) { }
2965
2966 //Look up the object with this array index, or return NULL
2967 CkMigratable *CkLocMgr::lookup(const CkArrayIndex &idx,CkArrayID aid) {
2968         CkLocRec *rec=elementNrec(idx);
2969         if (rec==NULL) return NULL;
2970         else return rec->lookupElement(aid);
2971 }
2972 //"Last-known" location (returns a processor number)
2973 int CkLocMgr::lastKnown(const CkArrayIndex &idx) {
2974         CkLocMgr *vthis=(CkLocMgr *)this;//Cast away "const"
2975         CkLocRec *rec=vthis->elementNrec(idx);
2976         int pe=-1;
2977         if (rec!=NULL) pe=rec->lookupProcessor();
2978         if (pe==-1) return homePe(idx);
2979         else{
2980                 /*
2981                         FAULT_EVAC
2982                         if the lastKnownPE is invalid return homePE and delete this record
2983                 */
2984                 if(!CmiNodeAlive(pe)){
2985                         removeFromTable(idx);
2986                         return homePe(idx);
2987                 }
2988                 return pe;
2989         }       
2990 }
2991 /// Return true if this array element lives on another processor
2992 bool CkLocMgr::isRemote(const CkArrayIndex &idx,int *onPe) const
2993 {
2994         CkLocMgr *vthis=(CkLocMgr *)this;//Cast away "const"
2995         CkLocRec *rec=vthis->elementNrec(idx);
2996         if (rec==NULL || rec->type()!=CkLocRec::remote) 
2997                 return false; /* not definitely a remote element */
2998         else /* element is indeed remote */
2999         {
3000                 *onPe=rec->lookupProcessor();
3001                 return true;
3002         }
3003 }
3004
3005 static const char *rec2str[]={
3006     "base (INVALID)",//Base class (invalid type)
3007     "local",//Array element that lives on this Pe
3008     "remote",//Array element that lives on some other Pe
3009     "buffering",//Array element that was just created
3010     "dead"//Deleted element (for debugging)
3011 };
3012
3013 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3014 void CkLocMgr::setDuringMigration(CmiBool _duringMigration){
3015     duringMigration = _duringMigration;
3016 }
3017 #endif
3018
3019
3020 //Add given element array record at idx, replacing the existing record
3021 void CkLocMgr::insertRec(CkLocRec *rec,const CkArrayIndex &idx) {
3022         CkLocRec *old=elementNrec(idx);
3023         insertRecN(rec,idx);
3024         if (old!=NULL) {
3025                 DEBC((AA"  replaces old rec(%s) for %s\n"AB,rec2str[old->type()],idx2str(idx)));
3026                 //There was an old element at this location
3027                 if (old->type()==CkLocRec::local && rec->type()==CkLocRec::local) {
3028                     if (!CkInRestarting()) {    // ok if it is restarting
3029                         CkPrintf("ERROR! Duplicate array index: %s\n",idx2str(idx));
3030                         CkAbort("Duplicate array index used");
3031                     }
3032                 }
3033                 old->beenReplaced();
3034                 delete old;
3035         }
3036 }
3037
3038 //Add given record, when there is guarenteed to be no prior record
3039 void CkLocMgr::insertRecN(CkLocRec *rec,const CkArrayIndex &idx) {
3040         DEBC((AA"  adding new rec(%s) for %s\n"AB,rec2str[rec->type()],idx2str(idx)));
3041         CmiImmediateLock(hashImmLock);
3042         hash.put(*(CkArrayIndex *)&idx)=rec;
3043         CmiImmediateUnlock(hashImmLock);
3044 }
3045
3046 //Call this on an unrecognized array index
3047 static void abort_out_of_bounds(const CkArrayIndex &idx)
3048 {
3049   CkPrintf("ERROR! Unknown array index: %s\n",idx2str(idx));
3050   CkAbort("Array index out of bounds\n");
3051 }
3052
3053 //Look up array element in hash table.  Index out-of-bounds if not found.
3054 CkLocRec *CkLocMgr::elementRec(const CkArrayIndex &idx) {
3055 #if ! CMK_ERROR_CHECKING
3056 //Assume the element will be found
3057         return hash.getRef(*(CkArrayIndex *)&idx);
3058 #else
3059 //Include an out-of-bounds check if the element isn't found
3060         CkLocRec *rec=elementNrec(idx);
3061         if (rec==NULL) abort_out_of_bounds(idx);
3062         return rec;
3063 #endif
3064 }
3065
3066 //Look up array element in hash table.  Return NULL if not there.
3067 CkLocRec *CkLocMgr::elementNrec(const CkArrayIndex &idx) {
3068         return hash.get(*(CkArrayIndex *)&idx);
3069 }
3070
3071 struct LocalElementCounter :  public CkLocIterator
3072 {
3073     unsigned int count;
3074     LocalElementCounter() : count(0) {}
3075     void addLocation(CkLocation &loc)
3076         { ++count; }
3077 };
3078
3079 unsigned int CkLocMgr::numLocalElements()
3080 {
3081     LocalElementCounter c;
3082     iterate(c);
3083     return c.count;
3084 }
3085
3086
3087 /********************* LocMgr: LOAD BALANCE ****************/
3088
3089 #if !CMK_LBDB_ON
3090 //Empty versions of all load balancer calls
3091 void CkLocMgr::initLB(CkGroupID lbdbID_) {}
3092 void CkLocMgr::startInserting(void) {}
3093 void CkLocMgr::doneInserting(void) {}
3094 void CkLocMgr::dummyAtSync(void) {}
3095 #endif
3096
3097
3098 #if CMK_LBDB_ON
3099 void CkLocMgr::initLB(CkGroupID lbdbID_)
3100 { //Find and register with the load balancer
3101         the_lbdb = (LBDatabase *)CkLocalBranch(lbdbID_);
3102         if (the_lbdb == 0)
3103                 CkAbort("LBDatabase not yet created?\n");
3104         DEBL((AA"Connected to load balancer %p\n"AB,the_lbdb));
3105
3106         // Register myself as an object manager
3107         LDOMid myId;
3108         myId.id = thisgroup;
3109         LDCallbacks myCallbacks;
3110         myCallbacks.migrate = (LDMigrateFn)CkLocRec_local::staticMigrate;
3111         myCallbacks.setStats = NULL;
3112         myCallbacks.queryEstLoad = NULL;
3113         myLBHandle = the_lbdb->RegisterOM(myId,this,myCallbacks);
3114
3115         // Tell the lbdb that I'm registering objects
3116         the_lbdb->RegisteringObjects(myLBHandle);
3117
3118         /*Set up the dummy barrier-- the load balancer needs
3119           us to call Registering/DoneRegistering during each AtSync,
3120           and this is the only way to do so.
3121         */
3122         the_lbdb->AddLocalBarrierReceiver(
3123                 (LDBarrierFn)staticRecvAtSync,(void*)(this));
3124         dummyBarrierHandle = the_lbdb->AddLocalBarrierClient(
3125                 (LDResumeFn)staticDummyResumeFromSync,(void*)(this));
3126         dummyAtSync();
3127 }
3128 void CkLocMgr::dummyAtSync(void)
3129 {
3130         DEBL((AA"dummyAtSync called\n"AB));
3131         the_lbdb->AtLocalBarrier(dummyBarrierHandle);
3132 }
3133
3134 void CkLocMgr::staticDummyResumeFromSync(void* data)
3135 {      ((CkLocMgr*)data)->dummyResumeFromSync(); }
3136 void CkLocMgr::dummyResumeFromSync()
3137 {
3138         DEBL((AA"DummyResumeFromSync called\n"AB));
3139         the_lbdb->DoneRegisteringObjects(myLBHandle);
3140         dummyAtSync();
3141 }
3142 void CkLocMgr::staticRecvAtSync(void* data)
3143 {      ((CkLocMgr*)data)->recvAtSync(); }
3144 void CkLocMgr::recvAtSync()
3145 {
3146         DEBL((AA"recvAtSync called\n"AB));
3147         the_lbdb->RegisteringObjects(myLBHandle);
3148 }
3149
3150 void CkLocMgr::startInserting(void)
3151 {
3152         the_lbdb->RegisteringObjects(myLBHandle);
3153 }
3154 void CkLocMgr::doneInserting(void)
3155 {
3156         the_lbdb->DoneRegisteringObjects(myLBHandle);
3157 }
3158 #endif
3159
3160 #include "CkLocation.def.h"
3161
3162