minor
[charm.git] / src / ck-core / cklocation.C
1 /** \file cklocation.C
2  *  \addtogroup CkArrayImpl
3  *
4  *  The location manager keeps track of an indexed set of migratable objects.
5  *  It is used by the array manager to locate array elements, interact with the
6  *  load balancer, and perform migrations.
7  *
8  *  Orion Sky Lawlor, olawlor@acm.org 9/29/2001
9  */
10
11 #include "charm++.h"
12 #include "register.h"
13 #include "ck.h"
14 #include "trace.h"
15 #include "TopoManager.h"
16
17 #include<sstream>
18
19 #if CMK_LBDB_ON
20 #include "LBDatabase.h"
21 #if CMK_GLOBAL_LOCATION_UPDATE
22 #include "BaseLB.h"
23 #include "init.h"
24 #endif
25 #endif // CMK_LBDB_ON
26
27 #if CMK_GRID_QUEUE_AVAILABLE
28 CpvExtern(void *, CkGridObject);
29 #endif
30
31 static const char *idx2str(const CkArrayMessage *m) {
32   return idx2str(((CkArrayMessage *)m)->array_index());
33 }
34
35 #define ARRAY_DEBUG_OUTPUT 0
36
37 #if ARRAY_DEBUG_OUTPUT 
38 #   define DEB(x) CkPrintf x  //General debug messages
39 #   define DEBI(x) CkPrintf x  //Index debug messages
40 #   define DEBC(x) CkPrintf x  //Construction debug messages
41 #   define DEBS(x) CkPrintf x  //Send/recv/broadcast debug messages
42 #   define DEBM(x) CkPrintf x  //Migration debug messages
43 #   define DEBL(x) CkPrintf x  //Load balancing debug messages
44 #   define DEBK(x) CkPrintf x  //Spring Cleaning debug messages
45 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
46 #   define AA "LocMgr on %d: "
47 #   define AB ,CkMyPe()
48 #   define DEBUG(x) CkPrintf x
49 #else
50 #   define DEB(X) /*CkPrintf x*/
51 #   define DEBI(X) /*CkPrintf x*/
52 #   define DEBC(X) /*CkPrintf x*/
53 #   define DEBS(x) /*CkPrintf x*/
54 #   define DEBM(X) /*CkPrintf x*/
55 #   define DEBL(X) /*CkPrintf x*/
56 #   define DEBK(x) /*CkPrintf x*/
57 #   define DEBB(x) /*CkPrintf x*/
58 #   define str(x) /**/
59 #   define DEBUG(x)   /**/
60 #endif
61
62 //whether to use block mapping in the SMP node level
63 bool useNodeBlkMapping;
64
65 #if CMK_LBDB_ON
66 /*LBDB object handles are fixed-sized, and not necc.
67 the same size as ArrayIndices.
68 */
69 LDObjid idx2LDObjid(const CkArrayIndex &idx)
70 {
71   LDObjid r;
72   int i;
73   const int *data=idx.data();
74   if (OBJ_ID_SZ>=idx.nInts) {
75     for (i=0;i<idx.nInts;i++)
76       r.id[i]=data[i];
77     for (i=idx.nInts;i<OBJ_ID_SZ;i++)
78       r.id[i]=0;
79   } else {
80     //Must hash array index into LBObjid
81     int j;
82     for (j=0;j<OBJ_ID_SZ;j++)
83         r.id[j]=data[j];
84     for (i=0;i<idx.nInts;i++)
85       for (j=0;j<OBJ_ID_SZ;j++)
86         r.id[j]+=circleShift(data[i],22+11*i*(j+1))+
87           circleShift(data[i],21-9*i*(j+1));
88   }
89
90 #if CMK_GLOBAL_LOCATION_UPDATE
91   r.dimension = idx.dimension;
92   r.nInts = idx.nInts; 
93   r.isArrayElement = 1; 
94 #endif
95
96   return r;
97 }
98
99 #if CMK_GLOBAL_LOCATION_UPDATE
100 void UpdateLocation(MigrateInfo& migData) {
101
102   if (migData.obj.id.isArrayElement == 0) {
103     return;
104   }
105
106   CkArrayIndex idx; 
107   idx.dimension = migData.obj.id.dimension; 
108   idx.nInts = migData.obj.id.nInts; 
109
110   for (int i = 0; i < idx.nInts; i++) {
111     idx.data()[i] = migData.obj.id.id[i];    
112   }
113
114   CkGroupID locMgrGid;
115   locMgrGid.idx = migData.obj.id.locMgrGid;
116   CkLocMgr *localLocMgr = (CkLocMgr *) CkLocalBranch(locMgrGid);
117   localLocMgr->updateLocation(idx, migData.to_pe); 
118 }
119 #endif
120
121 #endif
122
123 /*********************** Array Messages ************************/
124 CkArrayIndex &CkArrayMessage::array_index(void)
125 {
126     return UsrToEnv((void *)this)->getsetArrayIndex();
127 }
128 unsigned short &CkArrayMessage::array_ep(void)
129 {
130         return UsrToEnv((void *)this)->getsetArrayEp();
131 }
132 unsigned short &CkArrayMessage::array_ep_bcast(void)
133 {
134         return UsrToEnv((void *)this)->getsetArrayBcastEp();
135 }
136 unsigned char &CkArrayMessage::array_hops(void)
137 {
138         return UsrToEnv((void *)this)->getsetArrayHops();
139 }
140 unsigned int CkArrayMessage::array_getSrcPe(void)
141 {
142         return UsrToEnv((void *)this)->getsetArraySrcPe();
143 }
144 unsigned int CkArrayMessage::array_ifNotThere(void)
145 {
146         return UsrToEnv((void *)this)->getArrayIfNotThere();
147 }
148 void CkArrayMessage::array_setIfNotThere(unsigned int i)
149 {
150         UsrToEnv((void *)this)->setArrayIfNotThere(i);
151 }
152
153 /*********************** Array Map ******************
154 Given an array element index, an array map tells us 
155 the index's "home" Pe.  This is the Pe the element will
156 be created on, and also where messages to this element will
157 be forwarded by default.
158 */
159
160 CkArrayMap::CkArrayMap(void) { }
161 CkArrayMap::~CkArrayMap() { }
162 int CkArrayMap::registerArray(CkArrayIndex& numElements,CkArrayID aid)
163 {return 0;}
164
165 #define CKARRAYMAP_POPULATE_INITIAL(POPULATE_CONDITION) \
166         int i; \
167         for (int i1=0; i1<numElements.data()[0]; i1++) { \
168           if (numElements.dimension == 1) { \
169             /* Make 1D indices */ \
170             i = i1; \
171             CkArrayIndex1D idx(i1); \
172             if (POPULATE_CONDITION) \
173               mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
174           } else { \
175             /* higher dimensionality */ \
176             for (int i2=0; i2<numElements.data()[1]; i2++) { \
177               if (numElements.dimension == 2) { \
178                 /* Make 2D indices */ \
179                 i = i1 * numElements.data()[1] + i2; \
180                 CkArrayIndex2D idx(i1, i2); \
181                 if (POPULATE_CONDITION) \
182                   mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
183               } else { \
184                 /* higher dimensionality */ \
185                 CkAssert(numElements.dimension == 3); \
186                 for (int i3=0; i3<numElements.data()[2]; i3++) { \
187                   /* Make 3D indices */ \
188                   i = (i1 * numElements.data()[1] + i2) * numElements.data()[2] + i3; \
189                   CkArrayIndex3D idx(i1, i2, i3 ); \
190                   if (POPULATE_CONDITION) \
191                     mgr->insertInitial(idx,CkCopyMsg(&ctorMsg)); \
192                 } \
193               } \
194             } \
195           } \
196         }
197
198 void CkArrayMap::populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr)
199 {
200         if (numElements.nInts==0) {
201           CkFreeMsg(ctorMsg);
202           return;
203         }
204         int thisPe=CkMyPe();
205         /* The CkArrayIndex is supposed to have at most 3 dimensions, which
206            means that all the fields are ints, and numElements.nInts represents
207            how many of them are used */
208         CKARRAYMAP_POPULATE_INITIAL(procNum(arrayHdl,idx)==thisPe);
209
210 #if CMK_BIGSIM_CHARM
211         BgEntrySplit("split-array-new-end");
212 #endif
213
214         mgr->doneInserting();
215         CkFreeMsg(ctorMsg);
216 }
217
218 CkGroupID _defaultArrayMapID;
219 CkGroupID _fastArrayMapID;
220
221 class RRMap : public CkArrayMap
222 {
223 public:
224   RRMap(void)
225   {
226           DEBC((AA"Creating RRMap\n"AB));
227   }
228   RRMap(CkMigrateMessage *m):CkArrayMap(m){}
229   int procNum(int /*arrayHdl*/, const CkArrayIndex &i)
230   {
231 #if 1
232     if (i.nInts==1) {
233       //Map 1D integer indices in simple round-robin fashion
234       int ans= (i.data()[0])%CkNumPes();
235       while(!CmiNodeAlive(ans) || (ans == CkMyPe() && CkpvAccess(startedEvac))){
236         ans = (ans +1 )%CkNumPes();
237       }
238       return ans;
239     }
240     else 
241 #endif
242       {
243         //Map other indices based on their hash code, mod a big prime.
244         unsigned int hash=(i.hash()+739)%1280107;
245         int ans = (hash % CkNumPes());
246         while(!CmiNodeAlive(ans)){
247                 ans = (ans +1 )%CkNumPes();
248         }
249         return ans;
250
251       }
252   }
253 };
254
255 /** 
256  * Class used to store the dimensions of the array and precalculate numChares,
257  * binSize and other values for the DefaultArrayMap -- ASB
258  */
259 class arrayMapInfo {
260 public:
261   CkArrayIndex _nelems;
262   int _binSizeFloor;            /* floor of numChares/numPes */
263   int _binSizeCeil;             /* ceiling of numChares/numPes */
264   int _numChares;               /* initial total number of chares */
265   int _remChares;               /* numChares % numPes -- equals the number of
266                                    processors in the first set */
267   int _numFirstSet;             /* _remChares X (_binSize + 1) -- number of
268                                    chares in the first set */
269
270   int _nBinSizeFloor;           /* floor of numChares/numNodes */
271   int _nRemChares;              /* numChares % numNodes -- equals the number of
272                                    nodes in the first set */
273   int _nNumFirstSet;            /* _remChares X (_binSize + 1) -- number of
274                                    chares in the first set of nodes */
275
276   /** All processors are divided into two sets. Processors in the first set
277    *  have one chare more than the processors in the second set. */
278
279   arrayMapInfo(void) { }
280
281   arrayMapInfo(CkArrayIndex& n) : _nelems(n), _numChares(0) {
282     compute_binsize();
283   }
284
285   ~arrayMapInfo() {}
286   
287   void compute_binsize()
288   {
289     int numPes = CkNumPes();
290     //Now assuming homogenous nodes where each node has the same number of PEs
291     int numNodes = CkNumNodes();
292
293     if (_nelems.nInts == 1) {
294       _numChares = _nelems.data()[0];
295     } else if (_nelems.nInts == 2) {
296       _numChares = _nelems.data()[0] * _nelems.data()[1];
297     } else if (_nelems.nInts == 3) {
298       _numChares = _nelems.data()[0] * _nelems.data()[1] * _nelems.data()[2];
299     }
300
301     _remChares = _numChares % numPes;
302     _binSizeFloor = (int)floor((double)_numChares/(double)numPes);
303     _binSizeCeil = (int)ceil((double)_numChares/(double)numPes);
304     _numFirstSet = _remChares * (_binSizeFloor + 1);
305
306     _nRemChares = _numChares % numNodes;
307     _nBinSizeFloor = _numChares/numNodes;
308     _nNumFirstSet = _nRemChares * (_nBinSizeFloor +1);
309   }
310
311   void pup(PUP::er& p){
312     p|_nelems;
313     p|_binSizeFloor;
314     p|_binSizeCeil;
315     p|_numChares;
316     p|_remChares;
317     p|_numFirstSet;
318     p|_nRemChares;
319     p|_nBinSizeFloor;
320     p|_nNumFirstSet;
321   }
322 }c;
323
324
325 /**
326  * The default map object -- This does blocked mapping in the general case and
327  * calls the round-robin procNum for the dynamic insertion case -- ASB
328  */
329 class DefaultArrayMap : public RRMap
330 {
331 public:
332   /** This array stores information about different chare arrays in a Charm
333    *  program (dimensions, binsize, numChares etc ... ) */
334   CkPupPtrVec<arrayMapInfo> amaps;
335
336 public:
337   DefaultArrayMap(void) {
338     DEBC((AA"Creating DefaultArrayMap\n"AB));
339   }
340
341   DefaultArrayMap(CkMigrateMessage *m) : RRMap(m){}
342
343   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
344   {
345     int idx = amaps.size();
346     amaps.resize(idx+1);
347     amaps[idx] = new arrayMapInfo(numElements);
348     return idx;
349   }
350  
351   int procNum(int arrayHdl, const CkArrayIndex &i) {
352     int flati;
353     if (amaps[arrayHdl]->_nelems.nInts == 0) {
354       return RRMap::procNum(arrayHdl, i);
355     }
356
357     if (i.nInts == 1) {
358       flati = i.data()[0];
359     } else if (i.nInts == 2) {
360       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
361     } else if (i.nInts == 3) {
362       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
363     }
364 #if CMK_ERROR_CHECKING
365     else {
366       CkAbort("CkArrayIndex has more than 3 integers!");
367     }
368 #endif
369
370     if(useNodeBlkMapping){
371       if(flati < amaps[arrayHdl]->_numChares){
372         int numCharesOnNode = amaps[arrayHdl]->_nBinSizeFloor;
373         int startNodeID, offsetInNode;
374         if(flati < amaps[arrayHdl]->_nNumFirstSet){
375           numCharesOnNode++;
376           startNodeID = flati/numCharesOnNode;
377           offsetInNode = flati%numCharesOnNode;
378         }else{
379           startNodeID = amaps[arrayHdl]->_nRemChares+(flati-amaps[arrayHdl]->_nNumFirstSet)/numCharesOnNode;
380           offsetInNode = (flati-amaps[arrayHdl]->_nNumFirstSet)%numCharesOnNode;
381         }
382         int nodeSize = CkMyNodeSize(); //assuming every node has same number of PEs
383         int elemsPerPE = numCharesOnNode/nodeSize;
384         int remElems = numCharesOnNode%nodeSize;
385         int firstSetPEs = remElems*(elemsPerPE+1);
386         if(offsetInNode<firstSetPEs){
387           return CkNodeFirst(startNodeID)+offsetInNode/(elemsPerPE+1);
388         }else{
389           return CkNodeFirst(startNodeID)+remElems+(offsetInNode-firstSetPEs)/elemsPerPE;
390         }
391       } else
392           return (flati % CkNumPes());
393     }
394     //regular PE-based block mapping
395     if(flati < amaps[arrayHdl]->_numFirstSet)
396       return (flati / (amaps[arrayHdl]->_binSizeFloor + 1));
397     else if (flati < amaps[arrayHdl]->_numChares)
398       return (amaps[arrayHdl]->_remChares + (flati - amaps[arrayHdl]->_numFirstSet) / (amaps[arrayHdl]->_binSizeFloor));
399     else
400       return (flati % CkNumPes());
401   }
402
403   void pup(PUP::er& p){
404     RRMap::pup(p);
405     int npes = CkNumPes();
406     p|npes;
407     p|amaps;
408     if (p.isUnpacking() && npes != CkNumPes())  {   // binSize needs update
409       for (int i=0; i<amaps.size(); i++)
410         amaps[i]->compute_binsize();
411     }
412   }
413 };
414
415 /**
416  *  A fast map for chare arrays which do static insertions and promise NOT
417  *  to do late insertions -- ASB
418  */
419 class FastArrayMap : public DefaultArrayMap
420 {
421 public:
422   FastArrayMap(void) {
423     DEBC((AA"Creating FastArrayMap\n"AB));
424   }
425
426   FastArrayMap(CkMigrateMessage *m) : DefaultArrayMap(m){}
427
428   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
429   {
430     int idx;
431     idx = DefaultArrayMap::registerArray(numElements, aid);
432
433     return idx;
434   }
435
436   int procNum(int arrayHdl, const CkArrayIndex &i) {
437     int flati;
438     if (amaps[arrayHdl]->_nelems.nInts == 0) {
439       return RRMap::procNum(arrayHdl, i);
440     }
441
442     if (i.nInts == 1) {
443       flati = i.data()[0];
444     } else if (i.nInts == 2) {
445       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
446     } else if (i.nInts == 3) {
447       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
448     }
449 #if CMK_ERROR_CHECKING
450     else {
451       CkAbort("CkArrayIndex has more than 3 integers!");
452     }
453 #endif
454
455     /** binSize used in DefaultArrayMap is the floor of numChares/numPes
456      *  but for this FastArrayMap, we need the ceiling */
457     return (flati / amaps[arrayHdl]->_binSizeCeil);
458   }
459
460   void pup(PUP::er& p){
461     DefaultArrayMap::pup(p);
462   }
463 };
464
465
466 /**
467  * This map can be used for topology aware mapping when the mapping is provided
468  * through a file -- ASB
469  */
470 class ReadFileMap : public DefaultArrayMap
471 {
472 private:
473   CkVec<int> mapping;
474
475 public:
476   ReadFileMap(void) {
477     DEBC((AA"Creating ReadFileMap\n"AB));
478   }
479
480   ReadFileMap(CkMigrateMessage *m) : DefaultArrayMap(m){}
481
482   int registerArray(CkArrayIndex& numElements, CkArrayID aid)
483   {
484     int idx;
485     idx = DefaultArrayMap::registerArray(numElements, aid);
486
487     if(mapping.size() == 0) {
488       int numChares;
489
490       if (amaps[idx]->_nelems.nInts == 1) {
491         numChares = amaps[idx]->_nelems.data()[0];
492       } else if (amaps[idx]->_nelems.nInts == 2) {
493         numChares = amaps[idx]->_nelems.data()[0] * amaps[idx]->_nelems.data()[1];
494       } else if (amaps[idx]->_nelems.nInts == 3) {
495         numChares = amaps[idx]->_nelems.data()[0] * amaps[idx]->_nelems.data()[1] * amaps[idx]->_nelems.data()[2];
496       } else {
497         CkAbort("CkArrayIndex has more than 3 integers!");
498       }
499
500       mapping.resize(numChares);
501       FILE *mapf = fopen("mapfile", "r");
502       TopoManager tmgr;
503       int x, y, z, t;
504
505       for(int i=0; i<numChares; i++) {
506         (void) fscanf(mapf, "%d %d %d %d", &x, &y, &z, &t);
507         mapping[i] = tmgr.coordinatesToRank(x, y, z, t);
508       }
509       fclose(mapf);
510     }
511
512     return idx;
513   }
514
515   int procNum(int arrayHdl, const CkArrayIndex &i) {
516     int flati;
517
518     if (i.nInts == 1) {
519       flati = i.data()[0];
520     } else if (i.nInts == 2) {
521       flati = i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1];
522     } else if (i.nInts == 3) {
523       flati = (i.data()[0] * amaps[arrayHdl]->_nelems.data()[1] + i.data()[1]) * amaps[arrayHdl]->_nelems.data()[2] + i.data()[2];
524     } else {
525       CkAbort("CkArrayIndex has more than 3 integers!");
526     }
527
528     return mapping[flati];
529   }
530
531   void pup(PUP::er& p){
532     DefaultArrayMap::pup(p);
533     p|mapping;
534   }
535 };
536
537 class BlockMap : public RRMap
538 {
539 public:
540   BlockMap(void){
541         DEBC((AA"Creating BlockMap\n"AB));
542   }
543   BlockMap(CkMigrateMessage *m):RRMap(m){ }
544   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr){
545         if (numElements.nInts==0) {
546           CkFreeMsg(ctorMsg);
547           return;
548         }
549         int thisPe=CkMyPe();
550         int numPes=CkNumPes();
551         int binSize;
552         if (numElements.nInts == 1) {
553           binSize = (int)ceil((double)numElements.data()[0]/(double)numPes);
554         } else if (numElements.nInts == 2) {
555           binSize = (int)ceil((double)(numElements.data()[0]*numElements.data()[1])/(double)numPes);
556         } else if (numElements.nInts == 3) {
557           binSize = (int)ceil((double)(numElements.data()[0]*numElements.data()[1]*numElements.data()[2])/(double)numPes);
558         } else {
559           CkAbort("CkArrayIndex has more than 3 integers!");
560         }
561         CKARRAYMAP_POPULATE_INITIAL(i/binSize==thisPe);
562
563         /*
564         CkArrayIndex idx;
565         for (idx=numElements.begin(); idx<numElements; idx.getNext(numElements)) {
566           //for (int i=0;i<numElements;i++) {
567                 int binSize = (int)ceil((double)numElements.getCombinedCount()/(double)numPes);
568                 if (i/binSize==thisPe)
569                         mgr->insertInitial(idx,CkCopyMsg(&ctorMsg));
570         }*/
571         mgr->doneInserting();
572         CkFreeMsg(ctorMsg);
573   }
574 };
575
576 /**
577  * map object-- use seed load balancer.  
578  */
579 class CldMap : public CkArrayMap
580 {
581 public:
582   CldMap(void)
583   {
584           DEBC((AA"Creating CldMap\n"AB));
585   }
586   CldMap(CkMigrateMessage *m):CkArrayMap(m){}
587   int homePe(int /*arrayHdl*/, const CkArrayIndex &i)
588   {
589     if (i.nInts==1) {
590       //Map 1D integer indices in simple round-robin fashion
591       return (i.data()[0])%CkNumPes();
592     }
593     else 
594       {
595         //Map other indices based on their hash code, mod a big prime.
596         unsigned int hash=(i.hash()+739)%1280107;
597         return (hash % CkNumPes());
598       }
599   }
600   int procNum(int arrayHdl, const CkArrayIndex &i)
601   {
602      return CLD_ANYWHERE;   // -1
603   }
604   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr)  {
605         if (numElements.nInts==0) {
606           CkFreeMsg(ctorMsg);
607           return;
608         }
609         int thisPe=CkMyPe();
610         int numPes=CkNumPes();
611         //CkArrayIndex idx;
612
613         CKARRAYMAP_POPULATE_INITIAL(i%numPes==thisPe);
614         /*for (idx=numElements.begin(); idx<numElements; idx.getNext(numElements)) {
615           //for (int i=0;i<numElements;i++)
616                         if((idx.getRank(numElements))%numPes==thisPe)
617                                 mgr->insertInitial(CkArrayIndex1D(i),CkCopyMsg(&ctorMsg),0);
618         }*/
619         mgr->doneInserting();
620         CkFreeMsg(ctorMsg);
621   }
622
623 };
624
625
626 /// A class responsible for parsing the command line arguments for the PE
627 /// to extract the format string passed in with +ConfigurableRRMap
628 class ConfigurableRRMapLoader {
629 public:
630   
631   int *locations;
632   int objs_per_block;
633   int PE_per_block;
634
635   /// labels for states used when parsing the ConfigurableRRMap from ARGV
636   enum ConfigurableRRMapLoadStatus{
637     not_loaded,
638     loaded_found,
639     loaded_not_found
640   };
641   
642   enum ConfigurableRRMapLoadStatus state;
643   
644   ConfigurableRRMapLoader(){
645     state = not_loaded;
646     locations = NULL;
647     objs_per_block = 0;
648     PE_per_block = 0;
649   }
650   
651   /// load configuration if possible, and return whether a valid configuration exists
652   bool haveConfiguration() {
653     if(state == not_loaded) {
654       DEBUG(("[%d] loading ConfigurableRRMap configuration\n", CkMyPe()));
655       char **argv=CkGetArgv();
656       char *configuration = NULL;
657       bool found = CmiGetArgString(argv, "+ConfigurableRRMap", &configuration);
658       if(!found){
659         DEBUG(("Couldn't find +ConfigurableRRMap command line argument\n"));
660         state = loaded_not_found;
661         return false;
662       } else {
663
664         DEBUG(("Found +ConfigurableRRMap command line argument in %p=\"%s\"\n", configuration, configuration));
665
666         std::istringstream instream(configuration);
667         CkAssert(instream.good());
668          
669         // Example line:
670         // 10 8 0 1 2 3 4 5 6 7 7 7 7
671         // Map 10 objects to 8 PEs, with each object's index among the 8 PEs.
672         
673         // extract first integer
674         instream >> objs_per_block >> PE_per_block;
675         CkAssert(instream.good());
676         CkAssert(objs_per_block > 0);
677         CkAssert(PE_per_block > 0);
678         locations = new int[objs_per_block];
679         for(int i=0;i<objs_per_block;i++){
680           locations[i] = 0;
681           CkAssert(instream.good());
682           instream >> locations[i];
683           CkAssert(locations[i] < PE_per_block);
684         }
685         state = loaded_found;
686         return true;
687       }
688
689     } else {
690       DEBUG(("[%d] ConfigurableRRMap has already been loaded\n", CkMyPe()));
691       return state == loaded_found;
692     }      
693      
694   }
695   
696 };
697
698 CkpvDeclare(ConfigurableRRMapLoader, myConfigRRMapState);
699
700 void _initConfigurableRRMap(){
701   CkpvInitialize(ConfigurableRRMapLoader, myConfigRRMapState);
702 }
703
704
705 /// Try to load the command line arguments for ConfigurableRRMap
706 bool haveConfigurableRRMap(){
707   DEBUG(("haveConfigurableRRMap()\n"));
708   ConfigurableRRMapLoader &loader =  CkpvAccess(myConfigRRMapState);
709   return loader.haveConfiguration();
710 }
711
712 class ConfigurableRRMap : public RRMap
713 {
714 public:
715   ConfigurableRRMap(void){
716         DEBC((AA"Creating ConfigurableRRMap\n"AB));
717   }
718   ConfigurableRRMap(CkMigrateMessage *m):RRMap(m){ }
719
720
721   void populateInitial(int arrayHdl,CkArrayIndex& numElements,void *ctorMsg,CkArrMgr *mgr){
722     // Try to load the configuration from command line argument
723     CkAssert(haveConfigurableRRMap());
724     ConfigurableRRMapLoader &loader =  CkpvAccess(myConfigRRMapState);
725     if (numElements.nInts==0) {
726       CkFreeMsg(ctorMsg);
727       return;
728     }
729     int thisPe=CkMyPe();
730     int maxIndex = numElements.data()[0];
731     DEBUG(("[%d] ConfigurableRRMap: index=%d,%d,%d\n", CkMyPe(),(int)numElements.data()[0], (int)numElements.data()[1], (int)numElements.data()[2]));
732
733     if (numElements.nInts != 1) {
734       CkAbort("ConfigurableRRMap only supports dimension 1!");
735     }
736         
737     for (int index=0; index<maxIndex; index++) {        
738       CkArrayIndex1D idx(index);                
739       
740       int cyclic_block = index / loader.objs_per_block;
741       int cyclic_local = index % loader.objs_per_block;
742       int l = loader.locations[ cyclic_local ];
743       int PE = (cyclic_block*loader.PE_per_block + l) % CkNumPes();
744
745       DEBUG(("[%d] ConfigurableRRMap: index=%d is located on PE %d l=%d\n", CkMyPe(), (int)index, (int)PE, l));
746
747       if(PE == thisPe)
748         mgr->insertInitial(idx,CkCopyMsg(&ctorMsg));
749
750     }
751     //        CKARRAYMAP_POPULATE_INITIAL(PE == thisPe);
752         
753     mgr->doneInserting();
754     CkFreeMsg(ctorMsg);
755   }
756 };
757
758
759 CkpvStaticDeclare(double*, rem);
760
761 class arrInfo {
762  private:
763    CkArrayIndex _nelems;
764    int *_map;
765    void distrib(int *speeds);
766  public:
767    arrInfo(void):_map(NULL){}
768    arrInfo(CkArrayIndex& n, int *speeds)
769    {
770      _nelems = n;
771      _map = new int[_nelems.getCombinedCount()];
772      distrib(speeds);
773    }
774    ~arrInfo() { delete[] _map; }
775    int getMap(const CkArrayIndex &i);
776    void pup(PUP::er& p){
777      p|_nelems;
778      int totalElements = _nelems.getCombinedCount();
779      if(p.isUnpacking()){
780        _map = new int[totalElements];
781      }
782      p(_map,totalElements);
783    }
784 };
785
786 static int cmp(const void *first, const void *second)
787 {
788   int fi = *((const int *)first);
789   int si = *((const int *)second);
790   return ((CkpvAccess(rem)[fi]==CkpvAccess(rem)[si]) ?
791           0 :
792           ((CkpvAccess(rem)[fi]<CkpvAccess(rem)[si]) ?
793           1 : (-1)));
794 }
795
796 void
797 arrInfo::distrib(int *speeds)
798 {
799   int _nelemsCount = _nelems.getCombinedCount();
800   double total = 0.0;
801   int npes = CkNumPes();
802   int i,j,k;
803   for(i=0;i<npes;i++)
804     total += (double) speeds[i];
805   double *nspeeds = new double[npes];
806   for(i=0;i<npes;i++)
807     nspeeds[i] = (double) speeds[i] / total;
808   int *cp = new int[npes];
809   for(i=0;i<npes;i++)
810     cp[i] = (int) (nspeeds[i]*_nelemsCount);
811   int nr = 0;
812   for(i=0;i<npes;i++)
813     nr += cp[i];
814   nr = _nelemsCount - nr;
815   if(nr != 0)
816   {
817     CkpvAccess(rem) = new double[npes];
818     for(i=0;i<npes;i++)
819       CkpvAccess(rem)[i] = (double)_nelemsCount*nspeeds[i] - cp[i];
820     int *pes = new int[npes];
821     for(i=0;i<npes;i++)
822       pes[i] = i;
823     qsort(pes, npes, sizeof(int), cmp);
824     for(i=0;i<nr;i++)
825       cp[pes[i]]++;
826     delete[] pes;
827     delete[] CkpvAccess(rem);
828   }
829   k = 0;
830   for(i=0;i<npes;i++)
831   {
832     for(j=0;j<cp[i];j++)
833       _map[k++] = i;
834   }
835   delete[] cp;
836   delete[] nspeeds;
837 }
838
839 int
840 arrInfo::getMap(const CkArrayIndex &i)
841 {
842   if(i.nInts==1)
843     return _map[i.data()[0]];
844   else
845     return _map[((i.hash()+739)%1280107)%_nelems.getCombinedCount()];
846 }
847
848 //Speeds maps processor number to "speed" (some sort of iterations per second counter)
849 // It is initialized by processor 0.
850 static int* speeds;
851
852 #if CMK_USE_PROP_MAP
853 typedef struct _speedmsg
854 {
855   char hdr[CmiMsgHeaderSizeBytes];
856   int node;
857   int speed;
858 } speedMsg;
859
860 static void _speedHdlr(void *m)
861 {
862   speedMsg *msg=(speedMsg *)m;
863   if (CmiMyRank()==0)
864     for (int pe=0;pe<CmiNodeSize(msg->node);pe++)
865       speeds[CmiNodeFirst(msg->node)+pe] = msg->speed;  
866   CmiFree(m);
867 }
868
869 // initnode call
870 void _propMapInit(void)
871 {
872   speeds = new int[CkNumPes()];
873   int hdlr = CkRegisterHandler((CmiHandler)_speedHdlr);
874   CmiPrintf("[%d]Measuring processor speed for prop. mapping...\n", CkMyPe());
875   int s = LDProcessorSpeed();
876   speedMsg msg;
877   CmiSetHandler(&msg, hdlr);
878   msg.node = CkMyNode();
879   msg.speed = s;
880   CmiSyncBroadcastAllAndFree(sizeof(msg), &msg);
881   for(int i=0;i<CkNumNodes();i++)
882     CmiDeliverSpecificMsg(hdlr);
883 }
884 #else
885 void _propMapInit(void)
886 {
887   speeds = new int[CkNumPes()];
888   int i;
889   for(i=0;i<CkNumPes();i++)
890     speeds[i] = 1;
891 }
892 #endif
893 /**
894  * A proportional map object-- tries to map more objects to
895  * faster processors and fewer to slower processors.  Also
896  * attempts to ensure good locality by mapping nearby elements
897  * together.
898  */
899 class PropMap : public CkArrayMap
900 {
901 private:
902   CkPupPtrVec<arrInfo> arrs;
903 public:
904   PropMap(void)
905   {
906     CkpvInitialize(double*, rem);
907     DEBC((AA"Creating PropMap\n"AB));
908   }
909   PropMap(CkMigrateMessage *m) {}
910   int registerArray(CkArrayIndex& numElements,CkArrayID aid)
911   {
912     int idx = arrs.size();
913     arrs.resize(idx+1);
914     arrs[idx] = new arrInfo(numElements, speeds);
915     return idx;
916   }
917   int procNum(int arrayHdl, const CkArrayIndex &i)
918   {
919     return arrs[arrayHdl]->getMap(i);
920   }
921   void pup(PUP::er& p){
922     p|arrs;
923   }
924 };
925
926 class CkMapsInit : public Chare
927 {
928 public:
929         CkMapsInit(CkArgMsg *msg) {
930                 _defaultArrayMapID = CProxy_DefaultArrayMap::ckNew();
931                 _fastArrayMapID = CProxy_FastArrayMap::ckNew();
932                 delete msg;
933         }
934
935         CkMapsInit(CkMigrateMessage *m) {}
936 };
937
938 // given an envelope of a Charm msg, find the recipient object pointer
939 CkMigratable * CkArrayMessageObjectPtr(envelope *env) {
940   if (env->getMsgtype()!=ForArrayEltMsg) return NULL;   // not an array msg
941
942   CkArrayID aid = env->getsetArrayMgr();
943   CkArray *mgr=(CkArray *)_localBranch(aid);
944   if (mgr) {
945     CkLocMgr *locMgr = mgr->getLocMgr();
946     if (locMgr) {
947       return locMgr->lookup(env->getsetArrayIndex(),aid);
948     }
949   }
950   return NULL;
951 }
952
953 /****************************** Out-of-Core support ********************/
954
955 #if CMK_OUT_OF_CORE
956 CooPrefetchManager CkArrayElementPrefetcher;
957 CkpvDeclare(int,CkSaveRestorePrefetch);
958
959 /**
960  * Return the out-of-core objid (from CooRegisterObject)
961  * that this Converse message will access.  If the message
962  * will not access an object, return -1.
963  */
964 int CkArrayPrefetch_msg2ObjId(void *msg) {
965   envelope *env=(envelope *)msg;
966   CkMigratable *elt = CkArrayMessageObjectPtr(env);
967   return elt?elt->prefetchObjID:-1;
968 }
969
970 /**
971  * Write this object (registered with RegisterObject)
972  * to this writable file.
973  */
974 void CkArrayPrefetch_writeToSwap(FILE *swapfile,void *objptr) {
975   CkMigratable *elt=(CkMigratable *)objptr;
976
977   //Save the element's data to disk:
978   PUP::toDisk p(swapfile);
979   elt->pup(p);
980
981   //Call the element's destructor in-place (so pointer doesn't change)
982   CkpvAccess(CkSaveRestorePrefetch)=1;
983   elt->~CkMigratable(); //< because destuctor is virtual, destroys user class too.
984   CkpvAccess(CkSaveRestorePrefetch)=0;
985 }
986         
987 /**
988  * Read this object (registered with RegisterObject)
989  * from this readable file.
990  */
991 void CkArrayPrefetch_readFromSwap(FILE *swapfile,void *objptr) {
992   CkMigratable *elt=(CkMigratable *)objptr;
993   //Call the element's migration constructor in-place
994   CkpvAccess(CkSaveRestorePrefetch)=1;
995   int ctorIdx=_chareTable[elt->thisChareType]->migCtor;
996   elt->myRec->invokeEntry(elt,(CkMigrateMessage *)0,ctorIdx,CmiTrue);
997   CkpvAccess(CkSaveRestorePrefetch)=0;
998   
999   //Restore the element's data from disk:
1000   PUP::fromDisk p(swapfile);
1001   elt->pup(p);
1002 }
1003
1004 static void _CkMigratable_prefetchInit(void) 
1005 {
1006   CkpvExtern(int,CkSaveRestorePrefetch);
1007   CkpvAccess(CkSaveRestorePrefetch)=0;
1008   CkArrayElementPrefetcher.msg2ObjId=CkArrayPrefetch_msg2ObjId;
1009   CkArrayElementPrefetcher.writeToSwap=CkArrayPrefetch_writeToSwap;
1010   CkArrayElementPrefetcher.readFromSwap=CkArrayPrefetch_readFromSwap;
1011   CooRegisterManager(&CkArrayElementPrefetcher, _charmHandlerIdx);
1012 }
1013 #endif
1014
1015 /****************************** CkMigratable ***************************/
1016 /**
1017  * This tiny class is used to convey information to the 
1018  * newly created CkMigratable object when its constructor is called.
1019  */
1020 class CkMigratable_initInfo {
1021 public:
1022         CkLocRec_local *locRec;
1023         int chareType;
1024         CmiBool forPrefetch; /* If true, this creation is only a prefetch restore-from-disk.*/
1025 };
1026
1027 CkpvStaticDeclare(CkMigratable_initInfo,mig_initInfo);
1028
1029
1030 void _CkMigratable_initInfoInit(void) {
1031   CkpvInitialize(CkMigratable_initInfo,mig_initInfo);
1032 #if CMK_OUT_OF_CORE
1033   _CkMigratable_prefetchInit();
1034 #endif
1035 }
1036
1037 void CkMigratable::commonInit(void) {
1038         CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
1039 #if CMK_OUT_OF_CORE
1040         isInCore=CmiTrue;
1041         if (CkpvAccess(CkSaveRestorePrefetch))
1042                 return; /* Just restoring from disk--don't touch object */
1043         prefetchObjID=-1; //Unregistered
1044 #endif
1045         myRec=i.locRec;
1046         thisIndexMax=myRec->getIndex();
1047         thisChareType=i.chareType;
1048         usesAtSync=CmiFalse;
1049         usesAutoMeasure=CmiTrue;
1050         barrierRegistered=CmiFalse;
1051         /*
1052         FAULT_EVAC
1053         */
1054         AsyncEvacuate(CmiTrue);
1055 }
1056
1057 CkMigratable::CkMigratable(void) {
1058         DEBC((AA"In CkMigratable constructor\n"AB));
1059         commonInit();
1060 }
1061 CkMigratable::CkMigratable(CkMigrateMessage *m): Chare(m) {
1062         commonInit();
1063 }
1064
1065 int CkMigratable::ckGetChareType(void) const {return thisChareType;}
1066
1067 void CkMigratable::pup(PUP::er &p) {
1068         DEBM((AA"In CkMigratable::pup %s\n"AB,idx2str(thisIndexMax)));
1069         Chare::pup(p);
1070         p|thisIndexMax;
1071         p(usesAtSync);
1072         p(usesAutoMeasure);
1073 #if CMK_LBDB_ON 
1074         int readyMigrate;
1075         if (p.isPacking()) readyMigrate = myRec->isReadyMigrate();
1076         p|readyMigrate;
1077         if (p.isUnpacking()) myRec->ReadyMigrate(readyMigrate);
1078 #endif
1079         if(p.isUnpacking()) barrierRegistered=CmiFalse;
1080         /*
1081                 FAULT_EVAC
1082         */
1083         p | asyncEvacuate;
1084         if(p.isUnpacking()){myRec->AsyncEvacuate(asyncEvacuate);}
1085         
1086         ckFinishConstruction();
1087 }
1088
1089 void CkMigratable::ckDestroy(void) {
1090         DEBC((AA"In CkMigratable::ckDestroy %s\n"AB,idx2str(thisIndexMax)));
1091         myRec->destroy();
1092 }
1093
1094 void CkMigratable::ckAboutToMigrate(void) { }
1095 void CkMigratable::ckJustMigrated(void) { }
1096 void CkMigratable::ckJustRestored(void) { }
1097
1098 CkMigratable::~CkMigratable() {
1099         DEBC((AA"In CkMigratable::~CkMigratable %s\n"AB,idx2str(thisIndexMax)));
1100 #if CMK_OUT_OF_CORE
1101         isInCore=CmiFalse;
1102         if (CkpvAccess(CkSaveRestorePrefetch)) 
1103                 return; /* Just saving to disk--don't deregister anything. */
1104         /* We're really leaving or dying-- unregister from the ooc system*/
1105         if (prefetchObjID!=-1) {
1106                 CooDeregisterObject(prefetchObjID);
1107                 prefetchObjID=-1;
1108         }
1109 #endif
1110         /*Might want to tell myRec about our doom here--
1111         it's difficult to avoid some kind of circular-delete, though.
1112         */
1113 #if CMK_LBDB_ON 
1114         if (barrierRegistered) {
1115           DEBL((AA"Removing barrier for element %s\n"AB,idx2str(thisIndexMax)));
1116           if (usesAtSync)
1117                 myRec->getLBDB()->RemoveLocalBarrierClient(ldBarrierHandle);
1118           else
1119                 myRec->getLBDB()->RemoveLocalBarrierReceiver(ldBarrierRecvHandle);
1120         }
1121 #endif
1122         //To detect use-after-delete
1123         thisIndexMax.nInts=-12345;
1124         thisIndexMax.dimension=-12345;
1125 }
1126
1127 void CkMigratable::CkAbort(const char *why) const {
1128         CkError("CkMigratable '%s' aborting:\n",_chareTable[thisChareType]->name);
1129         ::CkAbort(why);
1130 }
1131
1132 void CkMigratable::ResumeFromSync(void)
1133 {
1134 //      CkAbort("::ResumeFromSync() not defined for this array element!\n");
1135 }
1136
1137 void CkMigratable::UserSetLBLoad() {
1138         CkAbort("::UserSetLBLoad() not defined for this array element!\n");
1139 }
1140
1141 #if CMK_LBDB_ON  //For load balancing:
1142 // user can call this helper function to set obj load (for model-based lb)
1143 void CkMigratable::setObjTime(double cputime) {
1144         myRec->setObjTime(cputime);
1145 }
1146 double CkMigratable::getObjTime() {
1147         return myRec->getObjTime();
1148 }
1149
1150 void CkMigratable::ckFinishConstruction(void)
1151 {
1152 //      if ((!usesAtSync) || barrierRegistered) return;
1153         myRec->setMeasure(usesAutoMeasure);
1154         if (barrierRegistered) return;
1155         DEBL((AA"Registering barrier client for %s\n"AB,idx2str(thisIndexMax)));
1156         if (usesAtSync)
1157           ldBarrierHandle = myRec->getLBDB()->AddLocalBarrierClient(
1158                 (LDBarrierFn)staticResumeFromSync,(void*)(this));
1159         else
1160           ldBarrierRecvHandle = myRec->getLBDB()->AddLocalBarrierReceiver(
1161                 (LDBarrierFn)staticResumeFromSync,(void*)(this));
1162         barrierRegistered=CmiTrue;
1163 }
1164 void CkMigratable::AtSync(int waitForMigration)
1165 {
1166         if (!usesAtSync)
1167                 CkAbort("You must set usesAtSync=CmiTrue in your array element constructor to use AtSync!\n");
1168 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1169         mlogData->toResumeOrNot=1;
1170 #endif
1171         myRec->AsyncMigrate(!waitForMigration);
1172         if (waitForMigration) ReadyMigrate(CmiTrue);
1173         ckFinishConstruction();
1174         DEBL((AA"Element %s going to sync\n"AB,idx2str(thisIndexMax)));
1175           // model-based load balancing, ask user to provide cpu load
1176         if (usesAutoMeasure == CmiFalse) UserSetLBLoad();
1177         myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
1178 }
1179 void CkMigratable::ReadyMigrate(CmiBool ready)
1180 {
1181         myRec->ReadyMigrate(ready);
1182 }
1183
1184 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1185     extern int globalResumeCount;
1186 #endif
1187
1188 void CkMigratable::staticResumeFromSync(void* data)
1189 {
1190         CkMigratable *el=(CkMigratable *)data;
1191 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1192     if(el->mlogData->toResumeOrNot ==0 || el->mlogData->resumeCount >= globalResumeCount){
1193         return;
1194     }
1195 #endif
1196         DEBL((AA"Element %s resuming from sync\n"AB,idx2str(el->thisIndexMax)));
1197 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1198     CpvAccess(_currentObj) = el;
1199 #endif
1200         el->ResumeFromSync();
1201 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1202     el->mlogData->resumeCount++;
1203 #endif
1204 }
1205 void CkMigratable::setMigratable(int migratable) 
1206 {
1207         myRec->setMigratable(migratable);
1208 }
1209
1210 struct CkArrayThreadListener {
1211         struct CthThreadListener base;
1212         CkMigratable *mig;
1213 };
1214
1215 extern "C"
1216 void CkArrayThreadListener_suspend(struct CthThreadListener *l)
1217 {
1218         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1219         a->mig->ckStopTiming();
1220 }
1221
1222 extern "C"
1223 void CkArrayThreadListener_resume(struct CthThreadListener *l)
1224 {
1225         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1226         a->mig->ckStartTiming();
1227 }
1228
1229 extern "C"
1230 void CkArrayThreadListener_free(struct CthThreadListener *l)
1231 {
1232         CkArrayThreadListener *a=(CkArrayThreadListener *)l;
1233         delete a;
1234 }
1235
1236 void CkMigratable::CkAddThreadListeners(CthThread tid, void *msg)
1237 {
1238         Chare::CkAddThreadListeners(tid, msg);   // for trace
1239         CthSetThreadID(tid, thisIndexMax.data()[0], thisIndexMax.data()[1], 
1240                        thisIndexMax.data()[2]);
1241         CkArrayThreadListener *a=new CkArrayThreadListener;
1242         a->base.suspend=CkArrayThreadListener_suspend;
1243         a->base.resume=CkArrayThreadListener_resume;
1244         a->base.free=CkArrayThreadListener_free;
1245         a->mig=this;
1246         CthAddListener(tid,(struct CthThreadListener *)a);
1247 }
1248 #else
1249 void CkMigratable::setObjTime(double cputime) {}
1250 double CkMigratable::getObjTime() {return 0.0;}
1251
1252 /* no load balancer: need dummy implementations to prevent link error */
1253 void CkMigratable::CkAddThreadListeners(CthThread tid, void *msg)
1254 {
1255 }
1256 #endif
1257
1258
1259 /*CkMigratableList*/
1260 CkMigratableList::CkMigratableList() {}
1261 CkMigratableList::~CkMigratableList() {}
1262
1263 void CkMigratableList::setSize(int s) {
1264         el.resize(s);
1265 }
1266
1267 void CkMigratableList::put(CkMigratable *v,int atIdx) {
1268 #if CMK_ERROR_CHECKING
1269         if (atIdx>=length())
1270                 CkAbort("Internal array manager error (CkMigrableList::put index out of bounds)");
1271 #endif
1272         el[atIdx]=v;
1273 }
1274
1275
1276 /************************** Location Records: *********************************/
1277
1278 //---------------- Base type:
1279 void CkLocRec::weAreObsolete(const CkArrayIndex &idx) {}
1280 CkLocRec::~CkLocRec() { }
1281 void CkLocRec::beenReplaced(void)
1282     {/*Default: ignore replacement*/}
1283
1284 //Return the represented array element; or NULL if there is none
1285 CkMigratable *CkLocRec::lookupElement(CkArrayID aid) {return NULL;}
1286
1287 //Return the last known processor; or -1 if none
1288 int CkLocRec::lookupProcessor(void) {return -1;}
1289
1290
1291 /*----------------- Local:
1292 Matches up the array index with the local index, an
1293 interfaces with the load balancer on behalf of the
1294 represented array elements.
1295 */
1296 CkLocRec_local::CkLocRec_local(CkLocMgr *mgr,CmiBool fromMigration,
1297   CmiBool ignoreArrival, const CkArrayIndex &idx_,int localIdx_)
1298         :CkLocRec(mgr),idx(idx_),localIdx(localIdx_),
1299          running(CmiFalse),deletedMarker(NULL)
1300 {
1301 #if CMK_LBDB_ON
1302         DEBL((AA"Registering element %s with load balancer\n"AB,idx2str(idx)));
1303         //BIGSIM_OOC DEBUGGING
1304         //CkPrintf("LocMgr on %d: Registering element %s with load balancer\n", CkMyPe(), idx2str(idx));
1305         nextPe = -1;
1306         asyncMigrate = CmiFalse;
1307         readyMigrate = CmiTrue;
1308         enable_measure = CmiTrue;
1309         bounced  = CmiFalse;
1310         the_lbdb=mgr->getLBDB();
1311         LDObjid ldid = idx2LDObjid(idx);
1312 #if CMK_GLOBAL_LOCATION_UPDATE
1313         ldid.locMgrGid = mgr->getGroupID().idx;
1314 #endif        
1315         ldHandle=the_lbdb->RegisterObj(mgr->getOMHandle(),
1316                 ldid,(void *)this,1);
1317         if (fromMigration) {
1318                 DEBL((AA"Element %s migrated in\n"AB,idx2str(idx)));
1319                 if (!ignoreArrival)  {
1320                         the_lbdb->Migrated(ldHandle, CmiTrue);
1321                   // load balancer should ignore this objects movement
1322                 //  AsyncMigrate(CmiTrue);
1323                 }
1324         }
1325 #endif
1326         /*
1327                 FAULT_EVAC
1328         */
1329         asyncEvacuate = CmiTrue;
1330 }
1331 CkLocRec_local::~CkLocRec_local()
1332 {
1333         if (deletedMarker!=NULL) *deletedMarker=CmiTrue;
1334         myLocMgr->reclaim(idx,localIdx);
1335 #if CMK_LBDB_ON
1336         stopTiming();
1337         DEBL((AA"Unregistering element %s from load balancer\n"AB,idx2str(idx)));
1338         the_lbdb->UnregisterObj(ldHandle);
1339 #endif
1340 }
1341 void CkLocRec_local::migrateMe(int toPe) //Leaving this processor
1342 {
1343         //This will pack us up, send us off, and delete us
1344 //      printf("[%d] migrating migrateMe to %d \n",CkMyPe(),toPe);
1345         myLocMgr->emigrate(this,toPe);
1346 }
1347
1348 #if CMK_LBDB_ON
1349 void CkLocRec_local::startTiming(int ignore_running) {
1350         if (!ignore_running) running=CmiTrue;
1351         DEBL((AA"Start timing for %s at %.3fs {\n"AB,idx2str(idx),CkWallTimer()));
1352         if (enable_measure) the_lbdb->ObjectStart(ldHandle);
1353 }
1354 void CkLocRec_local::stopTiming(int ignore_running) {
1355         DEBL((AA"} Stop timing for %s at %.3fs\n"AB,idx2str(idx),CkWallTimer()));
1356         if ((ignore_running || running) && enable_measure) the_lbdb->ObjectStop(ldHandle);
1357         if (!ignore_running) running=CmiFalse;
1358 }
1359 void CkLocRec_local::setObjTime(double cputime) {
1360         the_lbdb->EstObjLoad(ldHandle, cputime);
1361 }
1362 double CkLocRec_local::getObjTime() {
1363         LBRealType walltime, cputime;
1364         the_lbdb->GetObjLoad(ldHandle, walltime, cputime);
1365         return walltime;
1366 }
1367 #endif
1368
1369 void CkLocRec_local::destroy(void) //User called destructor
1370 {
1371         //Our destructor does all the needed work
1372         delete this;
1373 }
1374 //Return the represented array element; or NULL if there is none
1375 CkMigratable *CkLocRec_local::lookupElement(CkArrayID aid) {
1376         return myLocMgr->lookupLocal(localIdx,aid);
1377 }
1378
1379 //Return the last known processor; or -1 if none
1380 int CkLocRec_local::lookupProcessor(void) {
1381         return CkMyPe();
1382 }
1383
1384 CkLocRec::RecType CkLocRec_local::type(void)
1385 {
1386         return local;
1387 }
1388
1389 void CkLocRec_local::addedElement(void) 
1390 {
1391         //Push everything in the half-created queue into the system--
1392         // anything not ready yet will be put back in.
1393         while (!halfCreated.isEmpty()) 
1394                 CkArrayManagerDeliver(CkMyPe(),halfCreated.deq());
1395 }
1396
1397 CmiBool CkLocRec_local::isObsolete(int nSprings,const CkArrayIndex &idx_)
1398
1399         int len=halfCreated.length();
1400         if (len!=0) {
1401                 /* This is suspicious-- the halfCreated queue should be extremely
1402                  transient.  It's possible we just looked at the wrong time, though;
1403                  so this is only a warning. 
1404                 */
1405                 CkPrintf("CkLoc WARNING> %d messages still around for uncreated element %s!\n",
1406                          len,idx2str(idx));
1407         }
1408         //A local element never expires
1409         return CmiFalse;
1410 }
1411
1412 /**********Added for cosmology (inline function handling without parameter marshalling)***********/
1413
1414 LDObjHandle CkMigratable::timingBeforeCall(int* objstopped){
1415
1416         LDObjHandle objHandle;
1417 #if CMK_LBDB_ON
1418         if (getLBDB()->RunningObject(&objHandle)) {
1419                 *objstopped = 1;
1420                 getLBDB()->ObjectStop(objHandle);
1421         }
1422         myRec->startTiming(1);
1423 #endif
1424
1425   //DEBS((AA"   Invoking entry %d on element %s\n"AB,epIdx,idx2str(idx)));
1426         //CmiBool isDeleted=CmiFalse; //Enables us to detect deletion during processing
1427         //deletedMarker=&isDeleted;
1428 /*#ifndef CMK_OPTIMIZE
1429         if (msg) {  Tracing: 
1430                 envelope *env=UsrToEnv(msg);
1431         //      CkPrintf("ckLocation.C beginExecuteDetailed %d %d \n",env->getEvent(),env->getsetArraySrcPe());
1432                 if (_entryTable[epIdx]->traceEnabled)
1433                         _TRACE_BEGIN_EXECUTE_DETAILED(env->getEvent(),
1434                                  ForChareMsg,epIdx,env->getsetArraySrcPe(), env->getTotalsize(), idx.getProjectionID(((CkGroupID)env->getsetArrayMgr())).idx);
1435         }
1436 #endif*/
1437
1438   return objHandle;
1439 }
1440
1441 void CkMigratable::timingAfterCall(LDObjHandle objHandle,int *objstopped){
1442   
1443 /*#ifndef CMK_OPTIMIZE
1444         if (msg) {  Tracing: 
1445                 if (_entryTable[epIdx]->traceEnabled)
1446                         _TRACE_END_EXECUTE();
1447         }
1448 #endif*/
1449 //#if CMK_LBDB_ON
1450 //        if (!isDeleted) checkBufferedMigration();   // check if should migrate
1451 //#endif
1452 //      if (isDeleted) return CmiFalse;//We were deleted
1453 //      deletedMarker=NULL;
1454 //      return CmiTrue;
1455         myRec->stopTiming(1);
1456 #if CMK_LBDB_ON
1457         if (*objstopped) {
1458                  getLBDB()->ObjectStart(objHandle);
1459         }
1460 #endif
1461
1462  return;
1463 }
1464 /****************************************************************************/
1465
1466
1467 CmiBool CkLocRec_local::invokeEntry(CkMigratable *obj,void *msg,
1468         int epIdx,CmiBool doFree) 
1469 {
1470
1471         DEBS((AA"   Invoking entry %d on element %s\n"AB,epIdx,idx2str(idx)));
1472         CmiBool isDeleted=CmiFalse; //Enables us to detect deletion during processing
1473         deletedMarker=&isDeleted;
1474         startTiming();
1475
1476
1477 #if CMK_TRACE_ENABLED
1478         if (msg) { /* Tracing: */
1479                 envelope *env=UsrToEnv(msg);
1480         //      CkPrintf("ckLocation.C beginExecuteDetailed %d %d \n",env->getEvent(),env->getsetArraySrcPe());
1481                 if (_entryTable[epIdx]->traceEnabled)
1482                         _TRACE_BEGIN_EXECUTE_DETAILED(env->getEvent(),
1483                                  ForChareMsg,epIdx,env->getsetArraySrcPe(), env->getTotalsize(), idx.getProjectionID((((CkGroupID)env->getsetArrayMgr())).idx));
1484         }
1485 #endif
1486
1487         if (doFree) 
1488            CkDeliverMessageFree(epIdx,msg,obj);
1489         else /* !doFree */
1490            CkDeliverMessageReadonly(epIdx,msg,obj);
1491
1492
1493 #if CMK_TRACE_ENABLED
1494         if (msg) { /* Tracing: */
1495                 if (_entryTable[epIdx]->traceEnabled)
1496                         _TRACE_END_EXECUTE();
1497         }
1498 #endif
1499 #if CMK_LBDB_ON
1500         if (!isDeleted) checkBufferedMigration();   // check if should migrate
1501 #endif
1502         if (isDeleted) return CmiFalse;//We were deleted
1503         deletedMarker=NULL;
1504         stopTiming();
1505         return CmiTrue;
1506 }
1507
1508 CmiBool CkLocRec_local::deliver(CkArrayMessage *msg,CkDeliver_t type,int opts)
1509 {
1510
1511         if (type==CkDeliver_queue) { /*Send via the message queue */
1512                 if (opts & CK_MSG_KEEP)
1513                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1514                 CkArrayManagerDeliver(CkMyPe(),msg,opts);
1515                 return CmiTrue;
1516         }
1517         else
1518         {
1519                 CkMigratable *obj=myLocMgr->lookupLocal(localIdx,
1520                         UsrToEnv(msg)->getsetArrayMgr());
1521                 if (obj==NULL) {//That sibling of this object isn't created yet!
1522                         if (opts & CK_MSG_KEEP)
1523                                 msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1524                         if (msg->array_ifNotThere()!=CkArray_IfNotThere_buffer) {
1525                                 return myLocMgr->demandCreateElement(msg,CkMyPe(),type);
1526                         }
1527                         else {
1528                                 DEBS((AA"   BUFFERING message for nonexistent element %s!\n"AB,idx2str(this->idx)));
1529                                 halfCreated.enq(msg);
1530                                 return CmiTrue;
1531                         }
1532                 }
1533                         
1534                 if (msg->array_hops()>1)
1535                         myLocMgr->multiHop(msg);
1536                 CmiBool doFree = (CmiBool)!(opts & CK_MSG_KEEP);
1537 #if CMK_LBDB_ON
1538                 // if there is a running obj being measured, stop it temporarily
1539                 LDObjHandle objHandle;
1540                 int objstopped = 0;
1541                 if (the_lbdb->RunningObject(&objHandle)) {
1542                         objstopped = 1;
1543                         the_lbdb->ObjectStop(objHandle);
1544                 }
1545 #endif
1546 #if CMK_GRID_QUEUE_AVAILABLE
1547                 // retain a pointer to the sending object (needed later)
1548                 CpvAccess(CkGridObject) = obj;
1549 #endif
1550
1551         CmiBool status = invokeEntry(obj,(void *)msg,msg->array_ep(),doFree);
1552         
1553 #if CMK_GRID_QUEUE_AVAILABLE
1554                 CpvAccess(CkGridObject) = NULL;
1555 #endif
1556 #if CMK_LBDB_ON
1557                 if (objstopped) the_lbdb->ObjectStart(objHandle);
1558 #endif
1559                 return status;
1560         }
1561
1562
1563 }
1564
1565 #if CMK_LBDB_ON
1566 void CkLocRec_local::staticMigrate(LDObjHandle h, int dest)
1567 {
1568         CkLocRec_local *el=(CkLocRec_local *)LDObjUserData(h);
1569         DEBL((AA"Load balancer wants to migrate %s to %d\n"AB,idx2str(el->idx),dest));
1570         el->recvMigrate(dest);
1571 }
1572
1573 void CkLocRec_local::recvMigrate(int toPe)
1574 {
1575         // we are in the mode of delaying actual migration
1576         // till readyMigrate()
1577         if (readyMigrate) { migrateMe(toPe); }
1578         else nextPe = toPe;
1579 }
1580
1581 void CkLocRec_local::AsyncMigrate(CmiBool use)  
1582 {
1583         asyncMigrate = use; 
1584         the_lbdb->UseAsyncMigrate(ldHandle, use);
1585 }
1586
1587 CmiBool CkLocRec_local::checkBufferedMigration()
1588 {
1589         // we don't migrate in user's code when calling ReadyMigrate(true)
1590         // we postphone the action to here until we exit from the user code.
1591         if (readyMigrate && nextPe != -1) {
1592             int toPe = nextPe;
1593             nextPe = -1;
1594             // don't migrate inside the object call
1595             migrateMe(toPe);
1596             // don't do anything
1597             return CmiTrue;
1598         }
1599         return CmiFalse;
1600 }
1601
1602 int CkLocRec_local::MigrateToPe()
1603 {
1604         int pe = nextPe;
1605         nextPe = -1;
1606         return pe;
1607 }
1608
1609 void CkLocRec_local::setMigratable(int migratable)
1610 {
1611         if (migratable)
1612           the_lbdb->Migratable(ldHandle);
1613         else
1614           the_lbdb->NonMigratable(ldHandle);
1615 }
1616 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1617 void CkLocRec_local::Migrated(){
1618     the_lbdb->Migrated(ldHandle, CmiTrue);
1619 }
1620 #endif
1621 #endif
1622
1623 /**
1624  * Represents a deleted array element (and prevents re-use).
1625  * These are a debugging aid, usable only by uncommenting a line in
1626  * the element destruction code.
1627  */
1628 class CkLocRec_dead:public CkLocRec {
1629 public:
1630         CkLocRec_dead(CkLocMgr *Narr):CkLocRec(Narr) {}
1631   
1632         virtual RecType type(void) {return dead;}
1633   
1634         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1635                 CkPrintf("Dead array element is %s.\n",idx2str(msg->array_index()));
1636                 CkAbort("Send to dead array element!\n");
1637                 return CmiFalse;
1638         }
1639         virtual void beenReplaced(void) 
1640                 {CkAbort("Can't re-use dead array element!\n");}
1641   
1642         //Return if this element is now obsolete (it isn't)
1643         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {return CmiFalse;}     
1644 };
1645
1646 /**
1647  * This is the abstract superclass of arrayRecs that keep track of their age,
1648  * and eventually expire. Its kids are remote and buffering.
1649  */
1650 class CkLocRec_aging:public CkLocRec {
1651 private:
1652         int lastAccess;//Age when last accessed
1653 protected:
1654         //Update our access time
1655         inline void access(void) {
1656                 lastAccess=myLocMgr->getSpringCount();
1657         }
1658         //Return if we are "stale"-- we were last accessed a while ago
1659         CmiBool isStale(void) {
1660                 if (myLocMgr->getSpringCount()-lastAccess>3) return CmiTrue;
1661                 else return CmiFalse;
1662         }
1663 public:
1664         CkLocRec_aging(CkLocMgr *Narr):CkLocRec(Narr) {
1665                 lastAccess=myLocMgr->getSpringCount();
1666         }
1667         //Return if this element is now obsolete
1668         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx)=0;
1669         //virtual void pup(PUP::er &p) { CkLocRec::pup(p); p(lastAccess); }
1670 };
1671
1672
1673 /**
1674  * Represents a remote array element.  This is just a PE number.
1675  */
1676 class CkLocRec_remote:public CkLocRec_aging {
1677 private:
1678         int onPe;//The last known Pe for this element
1679 public:
1680         CkLocRec_remote(CkLocMgr *Narr,int NonPe)
1681                 :CkLocRec_aging(Narr)
1682                 {
1683                         onPe=NonPe;
1684 #if CMK_ERROR_CHECKING
1685                         if (onPe==CkMyPe())
1686                                 CkAbort("ERROR!  'remote' array element on this Pe!\n");
1687 #endif
1688                 }
1689         //Return the last known processor for this element
1690         int lookupProcessor(void) {
1691                 return onPe;
1692         }  
1693         virtual RecType type(void) {return remote;}
1694   
1695         //Send a message for this element.
1696         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1697                 /*FAULT_EVAC*/
1698                 int destPE = onPe;
1699                 if((!CmiNodeAlive(onPe) && onPe != allowMessagesOnly)){
1700 //                      printf("Delivery failed because process %d is invalid\n",onPe);
1701                         /*
1702                                 Send it to its home processor instead
1703                         */
1704                         const CkArrayIndex &idx=msg->array_index();
1705                         destPE = getNextPE(idx);
1706                 }
1707                 access();//Update our modification date
1708                 msg->array_hops()++;
1709                 DEBS((AA"   Forwarding message for element %s to %d (REMOTE)\n"AB,
1710                       idx2str(msg->array_index()),destPE));
1711                 if (opts & CK_MSG_KEEP)
1712                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1713                 CkArrayManagerDeliver(destPE,msg,opts);
1714                 return CmiTrue;
1715         }
1716         //Return if this element is now obsolete
1717         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {
1718                 if (myLocMgr->isHome(idx)) 
1719                         //Home elements never become obsolete
1720                         // if they did, we couldn't deliver messages to that element.
1721                         return CmiFalse;
1722                 else if (isStale())
1723                         return CmiTrue;//We haven't been used in a long time
1724                 else
1725                         return CmiFalse;//We're fairly recent
1726         }
1727         //virtual void pup(PUP::er &p) { CkLocRec_aging::pup(p); p(onPe); }
1728 };
1729
1730
1731 /**
1732  * Buffers messages until record is replaced in the hash table, 
1733  * then delivers all messages to the replacing record.  This is 
1734  * used when a message arrives for a local element that has not
1735  * yet been created, buffering messages until the new element finally
1736  * checks in.
1737  *
1738  * It's silly to send a message to an element you won't ever create,
1739  * so this kind of record causes an abort "Stale array manager message!"
1740  * if it's left undelivered too long.
1741  */
1742 class CkLocRec_buffering:public CkLocRec_aging {
1743 private:
1744         CkQ<CkArrayMessage *> buffer;//Buffered messages.
1745 public:
1746         CkLocRec_buffering(CkLocMgr *Narr):CkLocRec_aging(Narr) {}
1747         virtual ~CkLocRec_buffering() {
1748                 if (0!=buffer.length()) {
1749                         CkPrintf("[%d] Warning: Messages abandoned in array manager buffer!\n", CkMyPe());
1750                         CkArrayMessage *m;
1751                         while (NULL!=(m=buffer.deq()))  {
1752                                 delete m;
1753                         }
1754                 }
1755         }
1756   
1757         virtual RecType type(void) {return buffering;}
1758   
1759         //Buffer a message for this element.
1760         virtual CmiBool deliver(CkArrayMessage *msg,CkDeliver_t type,int opts=0) {
1761                 DEBS((AA" Queued message for %s\n"AB,idx2str(msg->array_index())));
1762                 if (opts & CK_MSG_KEEP)
1763                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
1764                 buffer.enq(msg);
1765 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1766                 envelope *env = UsrToEnv(msg);
1767                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
1768 #endif
1769                 return CmiTrue;
1770         }
1771  
1772         //This is called when this ArrayRec is about to be replaced.
1773         // We dump all our buffered messages off on the next guy,
1774         // who should know what to do with them.
1775         virtual void beenReplaced(void) {
1776                 DEBS((AA" Delivering queued messages:\n"AB));
1777                 CkArrayMessage *m;
1778                 while (NULL!=(m=buffer.deq())) {
1779 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))         
1780                 DEBUG(CmiPrintf("[%d] buffered message being sent\n",CmiMyPe()));
1781                 envelope *env = UsrToEnv(m);
1782                 Chare *oldObj = CpvAccess(_currentObj);
1783                 CpvAccess(_currentObj) =(Chare *) env->sender.getObject();
1784                 env->sender.type = TypeInvalid;
1785 #endif
1786                 DEBS((AA"Sending buffered message to %s\n"AB,idx2str(m->array_index())));
1787                 myLocMgr->deliverViaQueue(m);
1788 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))         
1789                 CpvAccess(_currentObj) = oldObj;
1790 #endif
1791                 }
1792         }
1793   
1794         //Return if this element is now obsolete
1795         virtual CmiBool isObsolete(int nSprings,const CkArrayIndex &idx) {
1796                 if (isStale() && buffer.length()>0) {
1797                         /*This indicates something is seriously wrong--
1798                           buffers should be short-lived.*/
1799                         CkPrintf("[%d] WARNING: %d stale array message(s) found!\n",CkMyPe(),buffer.length());
1800                         CkArrayMessage *msg=buffer[0];
1801                         CkPrintf("Addressed to: ");
1802                         CkPrintEntryMethod(msg->array_ep());
1803                         CkPrintf(" index %s\n",idx2str(idx));
1804                         if (myLocMgr->isHome(idx)) 
1805                                 CkPrintf("is this an out-of-bounds array index, or was it never created?\n");
1806                         else //Idx is a remote-home index
1807                                 CkPrintf("why weren't they forwarded?\n");
1808                         
1809                         // CkAbort("Stale array manager message(s)!\n");
1810                 }
1811                 return CmiFalse;
1812         }
1813   
1814 /*  virtual void pup(PUP::er &p) {
1815     CkLocRec_aging::pup(p);
1816     CkArray::pupArrayMsgQ(buffer, p);
1817     }*/
1818 };
1819
1820 /*********************** Spring Cleaning *****************/
1821 /**
1822  * Used to periodically flush out unused remote element pointers.
1823  *
1824  * Cleaning often will free up memory quickly, but slow things
1825  * down because the cleaning takes time and some not-recently-referenced
1826  * remote element pointers might be valid and used some time in 
1827  * the future.
1828  *
1829  * Also used to determine if buffered messages have become stale.
1830  */
1831 inline void CkLocMgr::springCleaning(void)
1832 {
1833   nSprings++;
1834
1835   //Poke through the hash table for old ArrayRecs.
1836   void *objp;
1837   void *keyp;
1838   
1839   CkHashtableIterator *it=hash.iterator();
1840   CmiImmediateLock(hashImmLock);
1841   while (NULL!=(objp=it->next(&keyp))) {
1842     CkLocRec *rec=*(CkLocRec **)objp;
1843     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1844     if (rec->isObsolete(nSprings,idx)) {
1845       //This record is obsolete-- remove it from the table
1846       DEBK((AA"Cleaning out old record %s\n"AB,idx2str(idx)));
1847       hash.remove(*(CkArrayIndex *)&idx);
1848       delete rec;
1849       it->seek(-1);//retry this hash slot
1850     }
1851   }
1852   CmiImmediateUnlock(hashImmLock);
1853   delete it;
1854 }
1855 void CkLocMgr::staticSpringCleaning(void *forWhom,double curWallTime) {
1856         DEBK((AA"Starting spring cleaning at %.2f\n"AB,CkWallTimer()));
1857         ((CkLocMgr *)forWhom)->springCleaning();
1858 }
1859
1860 // clean all buffer'ed messages and also free local objects
1861 void CkLocMgr::flushAllRecs(void)
1862 {
1863   void *objp;
1864   void *keyp;
1865     
1866   CkHashtableIterator *it=hash.iterator();
1867   CmiImmediateLock(hashImmLock);
1868   while (NULL!=(objp=it->next(&keyp))) {
1869     CkLocRec *rec=*(CkLocRec **)objp;
1870     CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1871     if (rec->type() != CkLocRec::local) {
1872       //In the case of taking core out of memory (in BigSim's emulation)
1873       //the meta data in the location manager are not deleted so we need
1874       //this condition
1875       if(_BgOutOfCoreFlag!=1){
1876       //  hash.remove(*(CkArrayIndex *)&idx);
1877        // delete rec;
1878        // it->seek(-1);//retry this hash slot
1879       }
1880     }
1881     else {
1882         callMethod((CkLocRec_local*)rec, &CkMigratable::ckDestroy);
1883         it->seek(-1);//retry this hash slot
1884     }
1885   }
1886   delete it;
1887   CmiImmediateUnlock(hashImmLock);
1888 }
1889
1890
1891 void CkLocMgr::checkpointRemoteIdx(PUP::er &p)
1892 {
1893   void *objp;
1894   void *keyp;
1895   int flag = 0;
1896   CkHashtableIterator *it=hash.iterator();
1897   CmiImmediateLock(hashImmLock);
1898   while (NULL!=(objp=it->next(&keyp))) {
1899     CkLocRec *rec=*(CkLocRec **)objp;
1900     if (rec->type() != CkLocRec::local) {
1901       CkArrayIndexMax &idx=*(CkArrayIndex *)keyp;
1902       p|idx;
1903       int onPe = ((CkLocRec_remote *)rec)->lookupProcessor();
1904       p|onPe;
1905       flag++;
1906     }
1907   }
1908  // CkPrintf("[%d] has %d remote elements\n",CkMyPe(),flag);
1909 }
1910
1911 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1912 void CkLocMgr::callForAllRecords(CkLocFn fnPointer,CkArray *arr,void *data){
1913         void *objp;
1914         void *keyp;
1915
1916         CkHashtableIterator *it = hash.iterator();
1917         while (NULL!=(objp=it->next(&keyp))) {
1918                 CkLocRec *rec=*(CkLocRec **)objp;
1919                 CkArrayIndex &idx=*(CkArrayIndex *)keyp;
1920                 fnPointer(arr,data,rec,&idx);
1921         }
1922
1923         // releasing iterator memory
1924         delete it;
1925 }
1926 #endif
1927
1928 /*************************** LocMgr: CREATION *****************************/
1929 CkLocMgr::CkLocMgr(CkGroupID mapID_,CkGroupID lbdbID_,CkArrayIndex& numInitial)
1930         :thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),
1931          hash(17,0.3)
1932 {
1933         DEBC((AA"Creating new location manager %d\n"AB,thisgroup));
1934 // moved to _CkMigratable_initInfoInit()
1935 //      CkpvInitialize(CkMigratable_initInfo,mig_initInfo);
1936
1937         managers.init();
1938         nManagers=0;
1939         firstManager=NULL;
1940         firstFree=localLen=0;
1941         duringMigration=CmiFalse;
1942         nSprings=0;
1943 #if !CMK_GLOBAL_LOCATION_UPDATE
1944         CcdCallOnConditionKeepOnPE(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this, CkMyPe());
1945 #endif
1946
1947 //Register with the map object
1948         mapID=mapID_;
1949         map=(CkArrayMap *)CkLocalBranch(mapID);
1950         if (map==NULL) CkAbort("ERROR!  Local branch of array map is NULL!");
1951         mapHandle=map->registerArray(numInitial,thisgroup);
1952
1953 //Find and register with the load balancer
1954         lbdbID = lbdbID_;
1955         initLB(lbdbID_);
1956         hashImmLock = CmiCreateImmediateLock();
1957 }
1958
1959 CkLocMgr::CkLocMgr(CkMigrateMessage* m)
1960         :IrrGroup(m),thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),hash(17,0.3)
1961 {
1962         managers.init();
1963         nManagers=0;
1964         firstManager=NULL;
1965         firstFree=localLen=0;
1966         duringMigration=CmiFalse;
1967         nSprings=0;
1968 #if !CMK_GLOBAL_LOCATION_UPDATE
1969         CcdCallOnConditionKeepOnPE(CcdPERIODIC_1minute,staticSpringCleaning,(void *)this, CkMyPe());
1970 #endif
1971         hashImmLock = CmiCreateImmediateLock();
1972 }
1973
1974 void CkLocMgr::pup(PUP::er &p){
1975         IrrGroup::pup(p);
1976         p|mapID;
1977         p|mapHandle;
1978         p|lbdbID;
1979         mapID = _defaultArrayMapID;
1980         if(p.isUnpacking()){
1981                 thisProxy=thisgroup;
1982                 CProxyElement_CkLocMgr newlocalproxy(thisgroup,CkMyPe());
1983                 thislocalproxy=newlocalproxy;
1984                 //Register with the map object
1985                 map=(CkArrayMap *)CkLocalBranch(mapID);
1986                 if (map==NULL) CkAbort("ERROR!  Local branch of array map is NULL!");
1987                 CkArrayIndex emptyIndex;
1988                 map->registerArray(emptyIndex,thisgroup);
1989                 // _lbdb is the fixed global groupID
1990                 initLB(lbdbID);
1991
1992 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
1993         int count;
1994         p | count;
1995         DEBUG(CmiPrintf("[%d] Unpacking Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
1996         homeElementCount = count;
1997
1998         for(int i=0;i<count;i++){
1999             CkArrayIndex idx;
2000             int pe;
2001             idx.pup(p);
2002             p | pe;
2003             DEBUG(CmiPrintf("[%d] idx %s is a home element exisiting on pe %d\n",CmiMyPe(),idx2str(idx),pe));
2004             inform(idx,pe);
2005             CkLocRec *rec = elementNrec(idx);
2006             CmiAssert(rec!=NULL);
2007             CmiAssert(lastKnown(idx) == pe);
2008         }
2009 #endif
2010                 // delay doneInserting when it is unpacking during restart.
2011                 // to prevent load balancing kicking in
2012                 if (!CkInRestarting()) 
2013                         doneInserting();
2014         }else{
2015  /**
2016  * pack the indexes of elements which have their homes on this processor
2017  * but dont exist on it.. needed for broadcast after a restart
2018  * indexes of local elements dont need to be packed
2019  * since they will be recreated later anyway
2020  */
2021 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
2022         int count=0,count1=0;
2023         void *objp;
2024         void *keyp;
2025         CkHashtableIterator *it = hash.iterator();
2026       while (NULL!=(objp=it->next(&keyp))) {
2027       CkLocRec *rec=*(CkLocRec **)objp;
2028         CkArrayIndex &idx=*(CkArrayIndex *)keyp;
2029             if(rec->type() != CkLocRec::local){
2030                 if(homePe(idx) == CmiMyPe()){
2031                     count++;
2032                 }
2033             }
2034         }
2035         p | count;
2036         DEBUG(CmiPrintf("[%d] Packing Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
2037
2038                 // releasing iterator memory
2039                 delete it;
2040
2041         it = hash.iterator();
2042       while (NULL!=(objp=it->next(&keyp))) {
2043       CkLocRec *rec=*(CkLocRec **)objp;
2044         CkArrayIndex &idx=*(CkArrayIndex *)keyp;
2045             CkArrayIndex max = idx;
2046             if(rec->type() != CkLocRec::local){
2047                 if(homePe(idx) == CmiMyPe()){
2048                     int pe;
2049                     max.pup(p);
2050                     pe = rec->lookupProcessor();
2051                     p | pe;
2052                     count1++;
2053                 }
2054             }
2055         }
2056         CmiAssert(count == count1);
2057
2058                 // releasing iterator memory
2059                 delete it;
2060
2061 #endif
2062
2063         }
2064 }
2065
2066 void _CkLocMgrInit(void) {
2067   /* Don't trace our deliver method--it does its own tracing */
2068   CkDisableTracing(CkIndex_CkLocMgr::deliverInline(0));
2069 }
2070
2071 /// Add a new local array manager to our list.
2072 /// Returns a new CkMigratableList for the manager to store his
2073 /// elements in.
2074 CkMigratableList *CkLocMgr::addManager(CkArrayID id,CkArrMgr *mgr)
2075 {
2076         CK_MAGICNUMBER_CHECK
2077         DEBC((AA"Adding new array manager\n"AB));
2078         //Link new manager into list
2079         ManagerRec *n=new ManagerRec;
2080         managers.find(id)=n;
2081         n->next=firstManager;
2082         n->mgr=mgr;
2083         n->elts.setSize(localLen);
2084         nManagers++;
2085         firstManager=n;
2086         return &n->elts;
2087 }
2088
2089 /// Return the next unused local element index.
2090 int CkLocMgr::nextFree(void) {
2091         if (firstFree>=localLen)
2092         {//Need more space in the local index arrays-- enlarge them
2093                 int oldLen=localLen;
2094                 localLen=localLen*2+8;
2095                 DEBC((AA"Growing the local list from %d to %d...\n"AB,oldLen,localLen));
2096                 for (ManagerRec *m=firstManager;m!=NULL;m=m->next)
2097                         m->elts.setSize(localLen);
2098                 //Update the free list
2099                 freeList.resize(localLen);
2100                 for (int i=oldLen;i<localLen;i++)
2101                         freeList[i]=i+1;
2102         }
2103         int localIdx=firstFree;
2104         if (localIdx==-1) CkAbort("CkLocMgr free list corrupted!");
2105         firstFree=freeList[localIdx];
2106         freeList[localIdx]=-1; //Mark as used
2107         return localIdx;
2108 }
2109
2110 CkLocRec_remote *CkLocMgr::insertRemote(const CkArrayIndex &idx,int nowOnPe)
2111 {
2112         DEBS((AA"Remote element %s lives on %d\n"AB,idx2str(idx),nowOnPe));
2113         CkLocRec_remote *rem=new CkLocRec_remote(this,nowOnPe);
2114         insertRec(rem,idx);
2115         return rem;
2116 }
2117
2118 //This element now lives on the given Pe
2119 void CkLocMgr::inform(const CkArrayIndex &idx,int nowOnPe)
2120 {
2121         if (nowOnPe==CkMyPe())
2122                 return; //Never insert a "remote" record pointing here
2123         CkLocRec *rec=elementNrec(idx);
2124         if (rec!=NULL && rec->type()==CkLocRec::local){
2125 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2126         CmiPrintf("[%d]WARNING!!! Element %d:%s is local but is being told it exists on %d\n",CkMyPe(),idx.dimension,idx2str(idx), nowOnPe);
2127 #endif
2128                 return; //Never replace a local element's record!
2129         }
2130         insertRemote(idx,nowOnPe);
2131 }
2132
2133 //Tell this element's home processor it now lives "there"
2134 void CkLocMgr::informHome(const CkArrayIndex &idx,int nowOnPe)
2135 {
2136         int home=homePe(idx);
2137         if (home!=CkMyPe() && home!=nowOnPe) {
2138                 //Let this element's home Pe know it lives here now
2139                 DEBC((AA"  Telling %s's home %d that it lives on %d.\n"AB,idx2str(idx),home,nowOnPe));
2140 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2141 //#if defined(_FAULT_MLOG_)
2142 //        informLocationHome(thisgroup,idx,home,CkMyPe());
2143 //#else
2144                 thisProxy[home].updateLocation(idx,nowOnPe);
2145 //#endif
2146         }
2147 }
2148
2149 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2150 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx,
2151         CmiBool forMigration, CmiBool ignoreArrival,
2152         CmiBool notifyHome,int dummy)
2153 {
2154     int localIdx=nextFree();
2155     DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
2156     CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
2157     if(!dummy){
2158         insertRec(rec,idx); //Add to global hashtable
2159     }   
2160     if (notifyHome) informHome(idx,CkMyPe());
2161     return rec; 
2162 }
2163 #else
2164 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx, 
2165                 CmiBool forMigration, CmiBool ignoreArrival,
2166                 CmiBool notifyHome)
2167 {
2168         int localIdx=nextFree();
2169         DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
2170         CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
2171         insertRec(rec,idx); //Add to global hashtable
2172
2173
2174         if (notifyHome) informHome(idx,CkMyPe());
2175         return rec;
2176 }
2177 #endif
2178
2179 //Add a new local array element, calling element's constructor
2180 CmiBool CkLocMgr::addElement(CkArrayID id,const CkArrayIndex &idx,
2181                 CkMigratable *elt,int ctorIdx,void *ctorMsg)
2182 {
2183         CK_MAGICNUMBER_CHECK
2184         CkLocRec *oldRec=elementNrec(idx);
2185         CkLocRec_local *rec;
2186         if (oldRec==NULL||oldRec->type()!=CkLocRec::local) 
2187         { //This is the first we've heard of that element-- add new local record
2188                 rec=createLocal(idx,CmiFalse,CmiFalse,CmiTrue);
2189 #if CMK_GLOBAL_LOCATION_UPDATE
2190                 if (homePe(idx) != CkMyPe()) {
2191                   DEBC((AA"Global location broadcast for new element idx %s "
2192                         "assigned to %d \n"AB, idx2str(idx), CkMyPe()));
2193                   thisProxy.updateLocation(idx, CkMyPe());  
2194                 }
2195 #endif
2196                 
2197         } else 
2198         { //rec is *already* local-- must not be the first insertion    
2199                 rec=((CkLocRec_local *)oldRec);
2200                 rec->addedElement();
2201         }
2202         if (!addElementToRec(rec,managers.find(id),elt,ctorIdx,ctorMsg)) return CmiFalse;
2203         elt->ckFinishConstruction();
2204         return CmiTrue;
2205 }
2206
2207 //As above, but shared with the migration code
2208 CmiBool CkLocMgr::addElementToRec(CkLocRec_local *rec,ManagerRec *m,
2209                 CkMigratable *elt,int ctorIdx,void *ctorMsg)
2210 {//Insert the new element into its manager's local list
2211         int localIdx=rec->getLocalIndex();
2212         if (m->elts.get(localIdx)!=NULL) CkAbort("Cannot insert array element twice!");
2213         m->elts.put(elt,localIdx); //Local element table
2214
2215 //Call the element's constructor
2216         DEBC((AA"Constructing element %s of array\n"AB,idx2str(rec->getIndex())));
2217         CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
2218         i.locRec=rec;
2219         i.chareType=_entryTable[ctorIdx]->chareIdx;
2220         if (!rec->invokeEntry(elt,ctorMsg,ctorIdx,CmiTrue)) return CmiFalse;
2221
2222 #if CMK_OUT_OF_CORE
2223         /* Register new element with out-of-core */
2224         PUP::sizer p_getSize; elt->pup(p_getSize);
2225         elt->prefetchObjID=CooRegisterObject(&CkArrayElementPrefetcher,p_getSize.size(),elt);
2226 #endif
2227         
2228         return CmiTrue;
2229 }
2230 void CkLocMgr::updateLocation(const CkArrayIndex &idx,int nowOnPe) {
2231         inform(idx,nowOnPe);
2232         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
2233         if(CkInRestarting()){
2234                 //CkPrintf("[%d] reply to %d at %lf\n",CkMyPe(),nowOnPe,CmiWallTimer());
2235                 //checkptMgr[nowOnPe].gotReply();
2236         }
2237 }
2238
2239 /*************************** LocMgr: DELETION *****************************/
2240 /// This index will no longer be used-- delete the associated elements
2241 void CkLocMgr::reclaim(const CkArrayIndex &idx,int localIdx) {
2242         CK_MAGICNUMBER_CHECK
2243         DEBC((AA"Destroying element %s (local %d)\n"AB,idx2str(idx),localIdx));
2244         //Delete, and mark as empty, each array element
2245         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2246                 delete m->elts.get(localIdx);
2247                 m->elts.empty(localIdx);
2248         }
2249         
2250         removeFromTable(idx);
2251         
2252         //Link local index into free list
2253         freeList[localIdx]=firstFree;
2254         firstFree=localIdx;
2255         
2256                 
2257         if (!duringMigration) 
2258         { //This is a local element dying a natural death
2259             #if CMK_BIGSIM_CHARM
2260                 //After migration, reclaimRemote will be called through 
2261                 //the CkRemoveArrayElement in the pupping routines for those 
2262                 //objects that are not on the home processors. However,
2263                 //those remote records should not be deleted since the corresponding
2264                 //objects are not actually deleted but on disk. If deleted, msgs
2265                 //that seeking where is the object will be accumulated (a circular
2266                 //msg chain) and causes the program no progress
2267                 if(_BgOutOfCoreFlag==1) return; 
2268             #endif
2269                 int home=homePe(idx);
2270                 if (home!=CkMyPe())
2271 #if CMK_MEM_CHECKPOINT
2272                 if (!CkInRestarting()) // all array elements are removed anyway
2273 #endif
2274                         thisProxy[home].reclaimRemote(idx,CkMyPe());
2275         /*      //Install a zombie to keep the living from re-using this index.
2276                 insertRecN(new CkLocRec_dead(this),idx); */
2277         }
2278 }
2279
2280 void CkLocMgr::reclaimRemote(const CkArrayIndex &idx,int deletedOnPe) {
2281         DEBC((AA"Our element %s died on PE %d\n"AB,idx2str(idx),deletedOnPe));
2282         CkLocRec *rec=elementNrec(idx);
2283         if (rec==NULL) return; //We never knew him
2284         if (rec->type()==CkLocRec::local) return; //He's already been reborn
2285         removeFromTable(idx);
2286         delete rec;
2287 }
2288 void CkLocMgr::removeFromTable(const CkArrayIndex &idx) {
2289 #if CMK_ERROR_CHECKING
2290         //Make sure it's actually in the table before we delete it
2291         if (NULL==elementNrec(idx))
2292                 CkAbort("CkLocMgr::removeFromTable called on invalid index!");
2293 #endif
2294         CmiImmediateLock(hashImmLock);
2295         hash.remove(*(CkArrayIndex *)&idx);
2296         CmiImmediateUnlock(hashImmLock);
2297 #if CMK_ERROR_CHECKING
2298         //Make sure it's really gone
2299         if (NULL!=elementNrec(idx))
2300                 CkAbort("CkLocMgr::removeFromTable called, but element still there!");
2301 #endif
2302 }
2303
2304 /************************** LocMgr: MESSAGING *************************/
2305 /// Deliver message to this element, going via the scheduler if local
2306 /// @return 0 if object local, 1 if not
2307 int CkLocMgr::deliver(CkMessage *m,CkDeliver_t type,int opts) {
2308         DEBS((AA"deliver \n"AB));
2309         CK_MAGICNUMBER_CHECK
2310         CkArrayMessage *msg=(CkArrayMessage *)m;
2311
2312
2313         const CkArrayIndex &idx=msg->array_index();
2314         DEBS((AA"deliver %s\n"AB,idx2str(idx)));
2315         if (type==CkDeliver_queue)
2316                 _TRACE_CREATION_DETAILED(UsrToEnv(m),msg->array_ep());
2317         CkLocRec *rec=elementNrec(idx);
2318         if(rec != NULL){
2319                 DEBS((AA"deliver %s of type %d \n"AB,idx2str(idx),rec->type()));
2320         }else{
2321                 DEBS((AA"deliver %s rec is null\n"AB,idx2str(idx)));
2322         }
2323 //#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2324 //#if !defined(_FAULT_MLOG_)
2325 #if CMK_LBDB_ON
2326
2327         LDObjid ldid = idx2LDObjid(idx);
2328 #if CMK_GLOBAL_LOCATION_UPDATE
2329         ldid.locMgrGid = thisgroup.idx;
2330 #endif        
2331         if (type==CkDeliver_queue) {
2332                 if (!(opts & CK_MSG_LB_NOTRACE) && the_lbdb->CollectingCommStats()) {
2333                 if(rec!=NULL) the_lbdb->Send(myLBHandle,ldid,UsrToEnv(msg)->getTotalsize(), rec->lookupProcessor(), 1);
2334                 else /*rec==NULL*/ the_lbdb->Send(myLBHandle,ldid,UsrToEnv(msg)->getTotalsize(),homePe(msg->array_index()), 1);
2335                 }
2336         }
2337 #endif
2338 //#endif
2339 #if CMK_GRID_QUEUE_AVAILABLE
2340         int gridSrcPE;
2341         int gridSrcCluster;
2342         int gridDestPE;
2343         int gridDestCluster;
2344         CkMigratable *obj;
2345         ArrayElement *obj2;
2346         CkGroupID gid;
2347         int *data;
2348
2349         obj = (CkMigratable *) CpvAccess(CkGridObject);   // CkGridObject is a pointer to the sending object (retained earlier)
2350         if (obj != NULL) {
2351           obj2 = dynamic_cast<ArrayElement *> (obj);
2352           if (obj2 > 0) {
2353             // Get the sending object's array gid and indexes.
2354             // These are guaranteed to exist due to the succeeding dynamic cast above.
2355             gid = obj2->ckGetArrayID ();
2356             data = obj2->thisIndexMax.data ();
2357
2358             // Get the source PE and destination PE.
2359             gridSrcPE = CkMyPe ();
2360             if (rec != NULL) {
2361               gridDestPE = rec->lookupProcessor ();
2362             } else {
2363               gridDestPE = homePe (msg->array_index ());
2364             }
2365
2366             // Get the source cluster and destination cluster.
2367             gridSrcCluster = CmiGetCluster (gridSrcPE);
2368             gridDestCluster = CmiGetCluster (gridDestPE);
2369
2370             // If the Grid queue interval is greater than zero, it means that the more complicated
2371             // technique for registering border objects that exceed a specified threshold of
2372             // cross-cluster messages within a specified interval (and deregistering border objects
2373             // that do not meet this threshold) is used.  Otherwise a much simpler technique is used
2374             // where a border object is registered immediately upon sending a single cross-cluster
2375             // message (and deregistered when load balancing takes place).
2376             if (obj2->grid_queue_interval > 0) {
2377               // Increment the sending object's count of all messages.
2378               obj2->msg_count += 1;
2379
2380               // If the source cluster and destination cluster differ, this is a Grid message.
2381               // (Increment the count of all Grid messages.)
2382               if (gridSrcCluster != gridDestCluster) {
2383                 obj2->msg_count_grid += 1;
2384               }
2385
2386               // If the number of messages exceeds the interval, check to see if the object has
2387               // sent enough cross-cluster messages to qualify as a border object.
2388               if (obj2->msg_count >= obj2->grid_queue_interval) {
2389                 if (obj2->msg_count_grid >= obj2->grid_queue_threshold) {
2390                   // The object is a border object; if it is not already registered, register it.
2391                   if (!obj2->border_flag) {
2392                     CmiGridQueueRegister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2393                   }
2394                   obj2->border_flag = 1;
2395                 } else {
2396                   // The object is not a border object; if it is registered, deregister it.
2397                   if (obj2->border_flag) {
2398                     CmiGridQueueDeregister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2399                   }
2400                   obj2->border_flag = 0;
2401                 }
2402                 // Reset the counts.
2403                 obj2->msg_count = 0;
2404                 obj2->msg_count_grid = 0;
2405               }
2406             } else {
2407               if (gridSrcCluster != gridDestCluster) {
2408                 CmiGridQueueRegister (gid.idx, obj2->thisIndexMax.nInts, data[0], data[1], data[2]);
2409               }
2410             }
2411           }
2412
2413           // Reset the CkGridObject pointer.
2414           CpvAccess(CkGridObject) = NULL;
2415         }
2416 #endif
2417         /**FAULT_EVAC*/
2418         if (rec!=NULL){
2419                 CmiBool result = rec->deliver(msg,type,opts);
2420                 // if result is CmiFalse, than rec is not valid anymore, as the object
2421                 // the message was just delivered to has died or migrated out.
2422                 // Therefore rec->type() cannot be invoked!
2423                 if (result==CmiTrue && rec->type()==CkLocRec::local) return 0;
2424                 else return 1;
2425                 /*if(!result){
2426                         //DEBS((AA"deliver %s failed type %d \n"AB,idx2str(idx),rec->type()));
2427                         DEBS((AA"deliver %s failed \n"AB,idx2str(idx)));
2428                         if(rec->type() == CkLocRec::remote){
2429                                 if (opts & CK_MSG_KEEP)
2430                                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
2431                                 deliverUnknown(msg,type);
2432                         }
2433                 }*/
2434         }else /* rec==NULL*/ {
2435                 if (opts & CK_MSG_KEEP)
2436                         msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
2437                 deliverUnknown(msg,type,opts);
2438                 return 1;
2439         }
2440
2441 }
2442
2443 /// This index is not hashed-- somehow figure out what to do.
2444 CmiBool CkLocMgr::deliverUnknown(CkArrayMessage *msg,CkDeliver_t type,int opts)
2445 {
2446         CK_MAGICNUMBER_CHECK
2447         const CkArrayIndex &idx=msg->array_index();
2448         int onPe=homePe(idx);
2449         if (onPe!=CkMyPe()) 
2450         {// Forward the message to its home processor
2451                 DEBM((AA"Forwarding message for unknown %s to home %d \n"AB,idx2str(idx),onPe));
2452                 msg->array_hops()++;
2453                 CkArrayManagerDeliver(onPe,msg,opts);
2454                 return CmiTrue;
2455         }
2456         else
2457         { // We *are* the home processor:
2458         //Check if the element's array manager has been registered yet:
2459           CkArrMgr *mgr=managers.find(UsrToEnv((void *)msg)->getsetArrayMgr())->mgr;
2460           if (!mgr) { //No manager yet-- postpone the message (stupidly)
2461             if (CkInRestarting()) {
2462               // during restarting, this message should be ignored
2463               delete msg;
2464             }
2465             else {
2466               CkArrayManagerDeliver(CkMyPe(),msg); 
2467             }
2468           }
2469           else { // Has a manager-- must buffer the message
2470             DEBC((AA"Adding buffer for unknown element %s\n"AB,idx2str(idx)));
2471             CkLocRec *rec=new CkLocRec_buffering(this);
2472             insertRecN(rec,idx);
2473             rec->deliver(msg,type);
2474           
2475             if (msg->array_ifNotThere()!=CkArray_IfNotThere_buffer) 
2476             { //Demand-create the element:
2477               return demandCreateElement(msg,-1,type);
2478             }
2479           }
2480           return CmiTrue;
2481         }
2482 }
2483
2484 CmiBool CkLocMgr::demandCreateElement(CkArrayMessage *msg,int onPe,CkDeliver_t type)
2485 {
2486         CK_MAGICNUMBER_CHECK
2487         const CkArrayIndex &idx=msg->array_index();
2488         int chareType=_entryTable[msg->array_ep()]->chareIdx;
2489         int ctor=_chareTable[chareType]->getDefaultCtor();
2490         if (ctor==-1) CkAbort("Can't create array element to handle message--\n"
2491                               "The element has no default constructor in the .ci file!\n");
2492         if (onPe==-1) 
2493         { //Decide where element needs to live
2494                 if (msg->array_ifNotThere()==CkArray_IfNotThere_createhere) 
2495                         onPe=UsrToEnv(msg)->getsetArraySrcPe();
2496                 else //Createhome
2497                         onPe=homePe(idx);
2498         }
2499         
2500         //Find the manager and build the element
2501         DEBC((AA"Demand-creating element %s on pe %d\n"AB,idx2str(idx),onPe));
2502         CkArrMgr *mgr=managers.find(UsrToEnv((void *)msg)->getsetArrayMgr())->mgr;
2503         if (!mgr) CkAbort("Tried to demand-create for nonexistent arrMgr");
2504         return mgr->demandCreateElement(idx,onPe,ctor,type);
2505 }
2506
2507 //This message took several hops to reach us-- fix it
2508 void CkLocMgr::multiHop(CkArrayMessage *msg)
2509 {
2510
2511         CK_MAGICNUMBER_CHECK
2512         int srcPe=msg->array_getSrcPe();
2513         if (srcPe==CkMyPe())
2514                 DEB((AA"Odd routing: local element %s is %d hops away!\n"AB,idx2str(msg),msg->array_hops()));
2515         else
2516         {//Send a routing message letting original sender know new element location
2517                 DEBS((AA"Sending update back to %d for element\n"AB,srcPe,idx2str(msg)));
2518                 thisProxy[srcPe].updateLocation(msg->array_index(),CkMyPe());
2519         }
2520 }
2521
2522 /************************** LocMgr: ITERATOR *************************/
2523 CkLocation::CkLocation(CkLocMgr *mgr_, CkLocRec_local *rec_)
2524         :mgr(mgr_), rec(rec_) {}
2525         
2526 const CkArrayIndex &CkLocation::getIndex(void) const {
2527         return rec->getIndex();
2528 }
2529
2530 void CkLocation::destroyAll() {
2531         mgr->callMethod(rec, &CkMigratable::ckDestroy);
2532 }
2533
2534 void CkLocation::pup(PUP::er &p) {
2535         mgr->pupElementsFor(p,rec,CkElementCreation_migrate);
2536 }
2537
2538 CkLocIterator::~CkLocIterator() {}
2539
2540 /// Iterate over our local elements:
2541 void CkLocMgr::iterate(CkLocIterator &dest) {
2542   //Poke through the hash table for local ArrayRecs.
2543   void *objp;
2544   CkHashtableIterator *it=hash.iterator();
2545   CmiImmediateLock(hashImmLock);
2546
2547   while (NULL!=(objp=it->next())) {
2548     CkLocRec *rec=*(CkLocRec **)objp;
2549     if (rec->type()==CkLocRec::local) {
2550       CkLocation loc(this,(CkLocRec_local *)rec);
2551       dest.addLocation(loc);
2552     }
2553   }
2554   CmiImmediateUnlock(hashImmLock);
2555   delete it;
2556 }
2557
2558
2559
2560
2561 /************************** LocMgr: MIGRATION *************************/
2562 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2563 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
2564         CkElementCreation_t type, CmiBool create, int dummy)
2565 {
2566     p.comment("-------- Array Location --------");
2567     register ManagerRec *m;
2568     int localIdx=rec->getLocalIndex();
2569     CkVec<CkMigratable *> dummyElts;
2570
2571     for (m=firstManager;m!=NULL;m=m->next) {
2572         int elCType;
2573         if (!p.isUnpacking())
2574         { //Need to find the element's existing type
2575             CkMigratable *elt=m->element(localIdx);
2576             if (elt) elCType=elt->ckGetChareType();
2577             else elCType=-1; //Element hasn't been created
2578         }
2579         p(elCType);
2580         if (p.isUnpacking() && elCType!=-1) {
2581             CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
2582             int migCtorIdx=_chareTable[elCType]->getMigCtor();
2583                 if(!dummy){
2584                         if(create)
2585                                 if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
2586                                 }else{
2587                     CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
2588                     i.locRec=rec;
2589                     i.chareType=_entryTable[migCtorIdx]->chareIdx;
2590                     dummyElts.push_back(elt);
2591                     if (!rec->invokeEntry(elt,NULL,migCtorIdx,CmiTrue)) return ;
2592                 }
2593         }
2594     }
2595     if(!dummy){
2596         for (m=firstManager;m!=NULL;m=m->next) {
2597             CkMigratable *elt=m->element(localIdx);
2598             if (elt!=NULL)
2599                 {
2600                        elt->pup(p);
2601                 }
2602         }
2603     }else{
2604             for(int i=0;i<dummyElts.size();i++){
2605                 CkMigratable *elt = dummyElts[i];
2606                 if (elt!=NULL){
2607             elt->pup(p);
2608                         }
2609                 delete elt;
2610             }
2611                         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2612                 m->elts.empty(localIdx);
2613             }
2614         freeList[localIdx]=firstFree;
2615         firstFree=localIdx;
2616     }
2617 }
2618 #else
2619 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
2620                 CkElementCreation_t type)
2621 {
2622         p.comment("-------- Array Location --------");
2623         register ManagerRec *m;
2624         int localIdx=rec->getLocalIndex();
2625
2626         //First pup the element types
2627         // (A separate loop so ckLocal works even in element pup routines)
2628         for (m=firstManager;m!=NULL;m=m->next) {
2629                 int elCType;
2630                 if (!p.isUnpacking())
2631                 { //Need to find the element's existing type
2632                         CkMigratable *elt=m->element(localIdx);
2633                         if (elt) elCType=elt->ckGetChareType();
2634                         else elCType=-1; //Element hasn't been created
2635                 }
2636                 p(elCType);
2637                 if (p.isUnpacking() && elCType!=-1) {
2638                         //Create the element
2639                         CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
2640                         int migCtorIdx=_chareTable[elCType]->getMigCtor();
2641                         //Insert into our tables and call migration constructor
2642                         if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
2643                 }
2644         }
2645         //Next pup the element data
2646         for (m=firstManager;m!=NULL;m=m->next) {
2647                 CkMigratable *elt=m->element(localIdx);
2648                 if (elt!=NULL)
2649                 {
2650                         elt->pup(p);
2651 #if CMK_ERROR_CHECKING
2652                         if (p.isUnpacking()) elt->sanitycheck();
2653 #endif
2654                 }
2655         }
2656 #if CMK_MEM_CHECKPOINT
2657         if(CkInRestarting()){
2658           ArrayElement *elt;
2659           CkVec<CkMigratable *> list;
2660           migratableList(rec, list);
2661           CmiAssert(list.length() > 0);
2662           for (int l=0; l<list.length(); l++) {
2663                 //    reset, may not needed now
2664                 // for now.
2665                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
2666                         ArrayElement * elt = (ArrayElement *)list[l];
2667                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
2668                   if (c) c->redNo = 0;
2669                 }
2670           }
2671                 
2672         }
2673 #endif
2674 }
2675 #endif
2676
2677 /// Call this member function on each element of this location:
2678 void CkLocMgr::callMethod(CkLocRec_local *rec,CkMigratable_voidfn_t fn)
2679 {
2680         int localIdx=rec->getLocalIndex();
2681         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2682                 CkMigratable *el=m->element(localIdx);
2683                 if (el) (el->* fn)();
2684         }
2685 }
2686
2687 /// return a list of migratables in this local record
2688 void CkLocMgr::migratableList(CkLocRec_local *rec, CkVec<CkMigratable *> &list)
2689 {
2690         register ManagerRec *m;
2691         int localIdx=rec->getLocalIndex();
2692
2693         for (m=firstManager;m!=NULL;m=m->next) {
2694                 CkMigratable *elt=m->element(localIdx);
2695                 if (elt) list.push_back(elt);
2696         }
2697 }
2698
2699 /// Migrate this local element away to another processor.
2700 void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
2701 {
2702         CK_MAGICNUMBER_CHECK
2703         if (toPe==CkMyPe()) return; //You're already there!
2704         /*
2705                 FAULT_EVAC
2706                 if the toProcessor is already marked as invalid, dont emigrate
2707                 Shouldn't happen but might
2708         */
2709         if(!CmiNodeAlive(toPe)){
2710                 return;
2711         }
2712         CkArrayIndex idx=rec->getIndex();
2713
2714 #if CMK_OUT_OF_CORE
2715         int localIdx=rec->getLocalIndex();
2716         /* Load in any elements that are out-of-core */
2717         for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
2718                 CkMigratable *el=m->element(localIdx);
2719                 if (el) if (!el->isInCore) CooBringIn(el->prefetchObjID);
2720         }
2721 #endif
2722
2723         //Let all the elements know we're leaving
2724         callMethod(rec,&CkMigratable::ckAboutToMigrate);
2725         /*EVAC*/
2726
2727 //First pass: find size of migration message
2728         int bufSize;
2729         {
2730                 PUP::sizer p;
2731                 p(nManagers);
2732                 pupElementsFor(p,rec,CkElementCreation_migrate);
2733                 bufSize=p.size(); 
2734         }
2735
2736 //Allocate and pack into message
2737         int doubleSize=bufSize/sizeof(double)+1;
2738         CkArrayElementMigrateMessage *msg = 
2739                 new (doubleSize, 0) CkArrayElementMigrateMessage;
2740         msg->idx=idx;
2741         msg->length=bufSize;
2742 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
2743     msg->gid = ckGetGroupID();
2744 #endif
2745 #if CMK_LBDB_ON
2746         msg->ignoreArrival = rec->isAsyncMigrate()?1:0;
2747 #endif
2748         /*
2749                 FAULT_EVAC
2750         */
2751         msg->bounced = rec->isBounced();
2752         {
2753                 PUP::toMem p(msg->packData); 
2754                 p.becomeDeleting(); 
2755                 p(nManagers);
2756                 pupElementsFor(p,rec,CkElementCreation_migrate);
2757                 if (p.size()!=bufSize) {
2758                         CkError("ERROR! Array element claimed it was %d bytes to a "
2759                                 "sizing PUP::er, but copied %d bytes into the packing PUP::er!\n",
2760                                 bufSize,p.size());
2761                         CkAbort("Array element's pup routine has a direction mismatch.\n");
2762                 }
2763         }
2764
2765         DEBM((AA"Migrated index size %s to %d \n"AB,idx2str(idx),toPe));        
2766
2767 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2768 //#if defined(_FAULT_MLOG_)
2769 //    sendMlogLocation(toPe,UsrToEnv(msg));
2770 //#else
2771         //Send off message and delete old copy
2772         thisProxy[toPe].immigrate(msg);
2773 //#endif
2774
2775         duringMigration=CmiTrue;
2776         delete rec; //Removes elements, hashtable entries, local index
2777         
2778         
2779         duringMigration=CmiFalse;
2780         //The element now lives on another processor-- tell ourselves and its home
2781         inform(idx,toPe);
2782 //#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
2783 //#if !defined(_FAULT_MLOG_)    
2784         informHome(idx,toPe);
2785 //#endif
2786
2787 #if !CMK_LBDB_ON && CMK_GLOBAL_LOCATION_UPDATE
2788         DEBM((AA"Global location update. idx %s " 
2789               "assigned to %d \n"AB,idx2str(idx),toPe));
2790         thisProxy.updateLocation(idx, toPe);                        
2791 #endif
2792
2793         CK_MAGICNUMBER_CHECK
2794 }
2795
2796 /**
2797   Migrating array element is arriving on this processor.
2798 */
2799 void CkLocMgr::immigrate(CkArrayElementMigrateMessage *msg)
2800 {
2801         const CkArrayIndex &idx=msg->idx;
2802                 
2803         PUP::fromMem p(msg->packData); 
2804         
2805         int nMsgMan;
2806         p(nMsgMan);
2807         if (nMsgMan<nManagers)
2808                 CkAbort("Array element arrived from location with fewer managers!\n");
2809         if (nMsgMan>nManagers) {
2810                 //Some array managers haven't registered yet-- throw it back
2811                 DEBM((AA"Busy-waiting for array registration on migrating %s\n"AB,idx2str(idx)));
2812                 thisProxy[CkMyPe()].immigrate(msg);
2813                 return;
2814         }
2815
2816         //Create a record for this element
2817 //#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
2818 //#if !defined(_FAULT_MLOG_)     
2819         CkLocRec_local *rec=createLocal(idx,CmiTrue,msg->ignoreArrival,CmiFalse /* home told on departure */ );
2820 //#else
2821 //    CkLocRec_local *rec=createLocal(idx,CmiTrue,CmiTrue,CmiFalse /* home told on departure */ );
2822 //#endif
2823         
2824         //Create the new elements as we unpack the message
2825         pupElementsFor(p,rec,CkElementCreation_migrate);
2826         if (p.size()!=msg->length) {
2827                 CkError("ERROR! Array element claimed it was %d bytes to a"
2828                         "packing PUP::er, but %d bytes in the unpacking PUP::er!\n",
2829                         msg->length,p.size());
2830                 CkError("(I have %d managers; he claims %d managers)\n",
2831                         nManagers,nMsgMan);
2832                 
2833                 CkAbort("Array element's pup routine has a direction mismatch.\n");
2834         }
2835         /*
2836                 FAULT_EVAC
2837                         if this element came in as a result of being bounced off some other process,
2838                         then it needs to be resumed. It is assumed that it was bounced because load 
2839                         balancing caused it to move into a processor which later crashed
2840         */
2841         if(msg->bounced){
2842                 callMethod(rec,&CkMigratable::ResumeFromSync);
2843         }
2844         
2845         //Let all the elements know we've arrived
2846         callMethod(rec,&CkMigratable::ckJustMigrated);
2847         /*FAULT_EVAC
2848                 If this processor has started evacuating array elements on it 
2849                 dont let new immigrants in. If they arrive send them to what
2850                 would be their correct homePE.
2851                 Leave a record here mentioning the processor where it got sent
2852         */
2853         
2854         if(CkpvAccess(startedEvac)){
2855                 int newhomePE = getNextPE(idx);
2856                 DEBM((AA"Migrated into failed processor index size %s resent to %d \n"AB,idx2str(idx),newhomePE));      
2857                 CkLocMgr *mgr = rec->getLocMgr();
2858                 int targetPE=getNextPE(idx);
2859                 //set this flag so that load balancer is not informed when
2860                 //this element migrates
2861                 rec->AsyncMigrate(CmiTrue);
2862                 rec->Bounced(CmiTrue);
2863                 mgr->emigrate(rec,targetPE);
2864                 
2865         }
2866
2867         delete msg;
2868 }
2869
2870 void CkLocMgr::restore(const CkArrayIndex &idx, PUP::er &p)
2871 {
2872         //This is in broughtIntoMem during out-of-core emulation in BigSim,
2873         //informHome should not be called since such information is already
2874         //immediately updated real migration
2875 #if CMK_ERROR_CHECKING
2876         if(_BgOutOfCoreFlag!=2)
2877             CmiAbort("CkLocMgr::restore should only be used in out-of-core emulation for BigSim and be called when object is brought into memory!\n");
2878 #endif
2879         CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,CmiFalse);
2880         
2881         //BIGSIM_OOC DEBUGGING
2882         //CkPrintf("Proc[%d]: Registering element %s with LDB\n", CkMyPe(), idx2str(idx));
2883
2884         //Create the new elements as we unpack the message
2885         pupElementsFor(p,rec,CkElementCreation_restore);
2886
2887         callMethod(rec,&CkMigratable::ckJustRestored);
2888 }
2889
2890
2891 /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
2892 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2893 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool create, int dummy)
2894 {
2895         CkLocRec_local *rec;
2896         CkLocRec *recGlobal;    
2897
2898         if(create){
2899                 rec = createLocal(idx,CmiFalse,CmiFalse,CmiTrue && !dummy /* home doesn't know yet */,dummy );
2900         }else{
2901                 recGlobal = elementNrec(idx);
2902                 if(recGlobal == NULL) 
2903                         CmiAbort("Local object not found");
2904                 if(recGlobal->type() != CkLocRec::local)
2905                         CmiAbort("Local object not local, :P");
2906                 rec = (CkLocRec_local *)recGlobal;
2907         }
2908         
2909     pupElementsFor(p,rec,CkElementCreation_resume,create,dummy);
2910
2911     if(!dummy){
2912         callMethod(rec,&CkMigratable::ckJustMigrated);
2913     }
2914 }
2915 #else
2916 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool notify)
2917 {
2918         CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,notify /* home doesn't know yet */ );
2919
2920         //Create the new elements as we unpack the message
2921         pupElementsFor(p,rec,CkElementCreation_resume);
2922
2923         callMethod(rec,&CkMigratable::ckJustMigrated);
2924 }
2925 #endif
2926
2927 /********************* LocMgr: UTILITY ****************/
2928 void CkMagicNumber_impl::badMagicNumber(
2929         int expected,const char *file,int line,void *obj) const
2930 {
2931         CkError("FAILURE on pe %d, %s:%d> Expected %p's magic number "
2932                 "to be 0x%08x; but found 0x%08x!\n", CkMyPe(),file,line,obj,
2933                 expected, magic);
2934         CkAbort("Bad magic number detected!  This implies either\n"
2935                 "the heap or a message was corrupted!\n");
2936 }
2937 CkMagicNumber_impl::CkMagicNumber_impl(int m) :magic(m) { }
2938
2939 //Look up the object with this array index, or return NULL
2940 CkMigratable *CkLocMgr::lookup(const CkArrayIndex &idx,CkArrayID aid) {
2941         CkLocRec *rec=elementNrec(idx);
2942         if (rec==NULL) return NULL;
2943         else return rec->lookupElement(aid);
2944 }
2945 //"Last-known" location (returns a processor number)
2946 int CkLocMgr::lastKnown(const CkArrayIndex &idx) {
2947         CkLocMgr *vthis=(CkLocMgr *)this;//Cast away "const"
2948         CkLocRec *rec=vthis->elementNrec(idx);
2949         int pe=-1;
2950         if (rec!=NULL) pe=rec->lookupProcessor();
2951         if (pe==-1) return homePe(idx);
2952         else{
2953                 /*
2954                         FAULT_EVAC
2955                         if the lastKnownPE is invalid return homePE and delete this record
2956                 */
2957                 if(!CmiNodeAlive(pe)){
2958                         removeFromTable(idx);
2959                         return homePe(idx);
2960                 }
2961                 return pe;
2962         }       
2963 }
2964 /// Return true if this array element lives on another processor
2965 bool CkLocMgr::isRemote(const CkArrayIndex &idx,int *onPe) const
2966 {
2967         CkLocMgr *vthis=(CkLocMgr *)this;//Cast away "const"
2968         CkLocRec *rec=vthis->elementNrec(idx);
2969         if (rec==NULL || rec->type()!=CkLocRec::remote) 
2970                 return false; /* not definitely a remote element */
2971         else /* element is indeed remote */
2972         {
2973                 *onPe=rec->lookupProcessor();
2974                 return true;
2975         }
2976 }
2977
2978 static const char *rec2str[]={
2979     "base (INVALID)",//Base class (invalid type)
2980     "local",//Array element that lives on this Pe
2981     "remote",//Array element that lives on some other Pe
2982     "buffering",//Array element that was just created
2983     "dead"//Deleted element (for debugging)
2984 };
2985
2986 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2987 void CkLocMgr::setDuringMigration(CmiBool _duringMigration){
2988     duringMigration = _duringMigration;
2989 }
2990 #endif
2991
2992
2993 //Add given element array record at idx, replacing the existing record
2994 void CkLocMgr::insertRec(CkLocRec *rec,const CkArrayIndex &idx) {
2995         CkLocRec *old=elementNrec(idx);
2996         insertRecN(rec,idx);
2997         if (old!=NULL) {
2998                 DEBC((AA"  replaces old rec(%s) for %s\n"AB,rec2str[old->type()],idx2str(idx)));
2999                 //There was an old element at this location
3000                 if (old->type()==CkLocRec::local && rec->type()==CkLocRec::local) {
3001                     if (!CkInRestarting()) {    // ok if it is restarting
3002                         CkPrintf("ERROR! Duplicate array index: %s\n",idx2str(idx));
3003                         CkAbort("Duplicate array index used");
3004                     }
3005                 }
3006                 old->beenReplaced();
3007                 delete old;
3008         }
3009 }
3010
3011 //Add given record, when there is guarenteed to be no prior record
3012 void CkLocMgr::insertRecN(CkLocRec *rec,const CkArrayIndex &idx) {
3013         DEBC((AA"  adding new rec(%s) for %s\n"AB,rec2str[rec->type()],idx2str(idx)));
3014         CmiImmediateLock(hashImmLock);
3015         hash.put(*(CkArrayIndex *)&idx)=rec;
3016         CmiImmediateUnlock(hashImmLock);
3017 }
3018
3019 //Call this on an unrecognized array index
3020 static void abort_out_of_bounds(const CkArrayIndex &idx)
3021 {
3022   CkPrintf("ERROR! Unknown array index: %s\n",idx2str(idx));
3023   CkAbort("Array index out of bounds\n");
3024 }
3025
3026 //Look up array element in hash table.  Index out-of-bounds if not found.
3027 CkLocRec *CkLocMgr::elementRec(const CkArrayIndex &idx) {
3028 #if ! CMK_ERROR_CHECKING
3029 //Assume the element will be found
3030         return hash.getRef(*(CkArrayIndex *)&idx);
3031 #else
3032 //Include an out-of-bounds check if the element isn't found
3033         CkLocRec *rec=elementNrec(idx);
3034         if (rec==NULL) abort_out_of_bounds(idx);
3035         return rec;
3036 #endif
3037 }
3038
3039 //Look up array element in hash table.  Return NULL if not there.
3040 CkLocRec *CkLocMgr::elementNrec(const CkArrayIndex &idx) {
3041         return hash.get(*(CkArrayIndex *)&idx);
3042 }
3043
3044 struct LocalElementCounter :  public CkLocIterator
3045 {
3046     unsigned int count;
3047     LocalElementCounter() : count(0) {}
3048     void addLocation(CkLocation &loc)
3049         { ++count; }
3050 };
3051
3052 unsigned int CkLocMgr::numLocalElements()
3053 {
3054     LocalElementCounter c;
3055     iterate(c);
3056     return c.count;
3057 }
3058
3059
3060 /********************* LocMgr: LOAD BALANCE ****************/
3061
3062 #if !CMK_LBDB_ON
3063 //Empty versions of all load balancer calls
3064 void CkLocMgr::initLB(CkGroupID lbdbID_) {}
3065 void CkLocMgr::startInserting(void) {}
3066 void CkLocMgr::doneInserting(void) {}
3067 void CkLocMgr::dummyAtSync(void) {}
3068 #endif
3069
3070
3071 #if CMK_LBDB_ON
3072 void CkLocMgr::initLB(CkGroupID lbdbID_)
3073 { //Find and register with the load balancer
3074         the_lbdb = (LBDatabase *)CkLocalBranch(lbdbID_);
3075         if (the_lbdb == 0)
3076                 CkAbort("LBDatabase not yet created?\n");
3077         DEBL((AA"Connected to load balancer %p\n"AB,the_lbdb));
3078
3079         // Register myself as an object manager
3080         LDOMid myId;
3081         myId.id = thisgroup;
3082         LDCallbacks myCallbacks;
3083         myCallbacks.migrate = (LDMigrateFn)CkLocRec_local::staticMigrate;
3084         myCallbacks.setStats = NULL;
3085         myCallbacks.queryEstLoad = NULL;
3086         myLBHandle = the_lbdb->RegisterOM(myId,this,myCallbacks);
3087
3088         // Tell the lbdb that I'm registering objects
3089         the_lbdb->RegisteringObjects(myLBHandle);
3090
3091         /*Set up the dummy barrier-- the load balancer needs
3092           us to call Registering/DoneRegistering during each AtSync,
3093           and this is the only way to do so.
3094         */
3095         the_lbdb->AddLocalBarrierReceiver(
3096                 (LDBarrierFn)staticRecvAtSync,(void*)(this));
3097         dummyBarrierHandle = the_lbdb->AddLocalBarrierClient(
3098                 (LDResumeFn)staticDummyResumeFromSync,(void*)(this));
3099         dummyAtSync();
3100 }
3101 void CkLocMgr::dummyAtSync(void)
3102 {
3103         DEBL((AA"dummyAtSync called\n"AB));
3104         the_lbdb->AtLocalBarrier(dummyBarrierHandle);
3105 }
3106
3107 void CkLocMgr::staticDummyResumeFromSync(void* data)
3108 {      ((CkLocMgr*)data)->dummyResumeFromSync(); }
3109 void CkLocMgr::dummyResumeFromSync()
3110 {
3111         DEBL((AA"DummyResumeFromSync called\n"AB));
3112         the_lbdb->DoneRegisteringObjects(myLBHandle);
3113         dummyAtSync();
3114 }
3115 void CkLocMgr::staticRecvAtSync(void* data)
3116 {      ((CkLocMgr*)data)->recvAtSync(); }
3117 void CkLocMgr::recvAtSync()
3118 {
3119         DEBL((AA"recvAtSync called\n"AB));
3120         the_lbdb->RegisteringObjects(myLBHandle);
3121 }
3122
3123 void CkLocMgr::startInserting(void)
3124 {
3125         the_lbdb->RegisteringObjects(myLBHandle);
3126 }
3127 void CkLocMgr::doneInserting(void)
3128 {
3129         the_lbdb->DoneRegisteringObjects(myLBHandle);
3130 }
3131 #endif
3132
3133 #include "CkLocation.def.h"
3134
3135