MSA: Set PageArray's synchronization reduction client at contribute time
[charm.git] / src / libs / ck-libs / multiphaseSharedArrays / msa-DistPageMgr.h
1 // emacs mode line -*- mode: c++; tab-width: 4 ; c-basic-style: stroustrup -*-
2
3 #ifndef MSA_DISTPAGEMGR_H
4 #define MSA_DISTPAGEMGR_H
5
6 #include <charm++.h>
7 #include <string.h>
8 #include <list>
9 #include <stack>
10 #include <map>
11 #include <set>
12 #include <vector>
13 #include "msa-common.h"
14
15 // forward decl needed in msa-DistPageMgr.ci, i.e. in msa.decl.h
16
17 /// Stores a list of indices to be written out.
18 struct MSA_WriteSpan_t {
19     int start,end;
20     inline void pup(PUP::er &p) {
21         p|start; p|end;
22     }
23 };
24
25 template <class ENTRY, class MERGER,
26           unsigned int ENTRIES_PER_PAGE>
27 class MSA_PageT;
28 #include "msa.decl.h"
29
30 //=======================================================
31 // Utility Classes
32
33 /// Listens for some event on a page or set of pages.
34 class MSA_Listener {
35 public:
36         MSA_Listener() {}
37         virtual ~MSA_Listener();
38         /// Getting added to a lister list.
39         virtual void add(void) =0;
40         /// Event waiting for has occurred.
41         virtual void signal(unsigned int pageNo) =0;
42 };
43
44 /// Keeps a list of MSA_Listeners
45 class MSA_Listeners {
46         CkVec<MSA_Listener *> listeners;
47 public:
48         MSA_Listeners();
49         ~MSA_Listeners();
50         
51         /// Add this listener to your set.  Calls l->add().
52         void add(MSA_Listener *l);
53         
54         /// Return the number of listeners in our set.
55         unsigned int size(void) const {return listeners.size();}
56         
57         /// Signal all added listeners and remove them from the set.
58         void signal(unsigned int pageNo);
59 };
60
61
62 /** Resumes a thread once all needed pages have arrived */
63 class MSA_Thread_Listener : public MSA_Listener {
64         CthThread thread;    // the suspended thread of execution (0 if not suspended)
65         int count;  // number of pages we're still waiting for
66 public:
67         MSA_Thread_Listener() :thread(0), count(0) {}
68         
69         /// Wait for one more page.
70         void add(void);
71         
72         /// If we're waiting for any pages, suspend our thread.
73         void suspend(void);
74         
75         /// Another page arrived.
76         void signal(unsigned int pageNo);
77 };
78
79
80
81 /// Fast, fixed-size bitvector class.
82 template <unsigned int NUM_BITS>
83 class fixedlength_bitvector {
84 public:
85         /// Data type used to store actual bits in the vector.
86         typedef unsigned long store_t;
87         enum { store_bits=8*sizeof(store_t) };
88         
89         /// Number of store_t's in our vector.
90         enum { len=(NUM_BITS+(store_bits-1))/store_bits };
91         store_t store[len];
92         
93         fixedlength_bitvector() {reset();}
94
95         /// Fill the entire vector with this value.
96         void fill(store_t s) {
97                 for (int i=0;i<len;i++) store[i]=s;
98         }
99
100         void reset(void) {fill(0);}
101         
102         /// Set-to-1 bit i of the vector.
103         void set(unsigned int i) { store[i/store_bits] |= (1lu<<(i%store_bits)); }
104
105         /// Clear-to-0 bit i of the vector.
106         void reset(unsigned int i) { store[i/store_bits] &= ~(1lu<<(i%store_bits)); }
107         
108         /// Return the i'th bit of the vector.
109         bool test(unsigned int i) { return (store[i/store_bits] & (1lu<<(i%store_bits))); }
110 };
111
112 /// Stores all housekeeping information about a cached copy of a page: 
113 ///   everything but the actual page data.
114 template <class ENTRY, unsigned int ENTRIES_PER_PAGE>
115 class MSA_Page_StateT
116 {
117         /** Write tracking:
118                 Somehow, we have to identify the entries in a page 
119                 that have been written to.  Our method for doing this is
120                 a bitvector: 0's indicate the entry hasn't been written; 
121                 1's indicate the entry has been written.
122         */
123         typedef fixedlength_bitvector<ENTRIES_PER_PAGE> writes_t;
124         writes_t writes;
125         typedef typename writes_t::store_t writes_store_t;
126         enum {writes_bits=writes_t::store_bits};
127         
128         /// Tracking writes to our writes: a smaller vector, used to 
129         ///  avoid the large number of 0's in the writes vector.
130         ///  Bit i of writes2 indicates that store_t i of writes has 1's.
131         typedef fixedlength_bitvector<writes_t::len> writes2_t;
132         writes2_t writes2;
133         typedef typename writes2_t::store_t writes2_store_t;
134         enum {writes2_bits=writes2_t::store_bits*writes_t::store_bits};
135
136 public:
137         /// e.g., Read_Fault for a read-only page.
138         MSA_Page_Fault_t state;
139         
140         /// If true, this page is locked in memory.
141         ///   Pages get locked so people can safely use the non-checking version of "get".
142         bool locked;
143         
144         /// Threads waiting for this page to be paged in from the network.
145         MSA_Listeners readRequests;
146         /// Threads waiting for this page to be paged out to the network.
147         MSA_Listeners writeRequests;
148         
149         /// Return true if this page can be safely written back.
150         bool canPageOut(void) const {
151                 return (!locked) && canDelete();
152         }
153         
154         /// Return true if this page can be safely purged from memory.
155         bool canDelete(void) const {
156                 return (readRequests.size()==0) && (writeRequests.size()==0);
157         }
158         
159         MSA_Page_StateT()
160                 : writes(), writes2(), state(Uninit_State), locked(false),
161                   readRequests(), writeRequests()
162                 { }
163
164         /// Write entry i of this page.
165         void write(unsigned int i) {
166                 writes.set(i);
167                 writes2.set(i/writes_t::store_bits);
168         }
169         
170         /// Clear the write list for this page.
171         void writeClear(void) {
172                 for (int i2=0;i2<writes2_t::len;i2++)
173                         if (writes2.store[i2]) { /* some bits set: clear them all */
174                                 int o=i2*writes2_t::store_bits;
175                                 for (int i=0;i<writes_t::len;i++) 
176                                         writes.store[o+i]=0;
177                                 writes2.store[i2]=0;
178                         }
179         }
180         
181         /// Return the nearest multiple of m >= v.
182         inline int roundUp(int v,int m) {
183                 return (v+m-1)/m*m;
184         }
185         
186         /// Get a list of our written output values as this list of spans.
187         ///   Returns the total number of spans written to "span".
188         int writeSpans(MSA_WriteSpan_t *span) {
189                 int nSpans=0;
190                 
191                 int cur=0; // entry we're looking at
192                 while (true) {
193                         /* skip over unwritten space */
194                         while (true) { 
195                                 if (writes2.store[cur/writes2_bits]==(writes2_store_t)0) 
196                                         cur=roundUp(cur+1,writes2_bits);
197                                 else if (writes.store[cur/writes_bits]==(writes_store_t)0)
198                                         cur=roundUp(cur+1,writes_bits); 
199                                 else if (writes.test(cur)==false)
200                                         cur++;
201                                 else /* writes.test(cur)==true */
202                                         break;
203                                 if (cur>=ENTRIES_PER_PAGE) return nSpans;
204                         }
205                         /* now writes.test(cur)==true */
206                         span[nSpans].start=cur;
207                         /* skip over written space */
208                         while (true) { 
209                                 /* // 0-1 symmetry doesn't hold here, since writes2 may have 1's, but writes may still have some 0's...
210                                    if (writes2.store[cur/writes2_bits]==~(writes2_store_t)0) 
211                                    cur=roundUp(cur+1,writes2_bits);
212                                    else */
213                                 if (writes.store[cur/writes_bits]==~(writes_store_t)0)
214                                         cur=roundUp(cur+1,writes_bits); 
215                                 else if (writes.test(cur)==true)
216                                         cur++;
217                                 else /* writes.test(cur)==false */
218                                         break;
219                                 if (cur>=ENTRIES_PER_PAGE) {
220                                         span[nSpans++].end=ENTRIES_PER_PAGE; /* finish the last span */
221                                         return nSpans;
222                                 }
223                         }
224                         /* now writes.test(cur)==false */
225                         span[nSpans++].end=cur;
226                 }
227         }
228 };
229
230
231 //=======================================================
232 // Page-out policy
233
234 /**
235    class vmPageReplacementPolicy
236    Abstract base class providing the interface to the various page
237    replacement policies available for use with an MSA
238 */
239 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
240 class MSA_PageReplacementPolicy
241 {
242 public:
243         /// Note that a page was just accessed
244         virtual void pageAccessed(unsigned int page) = 0;
245
246         /// Ask for the index of a page to discard
247         virtual unsigned int selectPage() = 0;
248 };
249
250 /**
251    class vmLRUPageReplacementPolicy
252    This class provides the functionality of least recently used page replacement policy.
253    It needs to be notified when a page is accessed using the pageAccessed() function and
254    a page can be selected for replacement using the selectPage() function.
255  
256    WARNING: a list is absolutely the wrong data structure for this class, 
257    because it makes *both* updating as well as searching for a page O(n),
258    where n is the number of pages.  A heap would be a better choice,
259    as both operations would then become O(lg(n))
260 */
261 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
262 class vmLRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
263 {
264 protected:
265     unsigned int nPages;            // number of pages
266         const std::vector<ENTRY_TYPE *> &pageTable; // actual data for pages (NULL means page is gone)
267         typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
268         const std::vector<pageState_t *> &pageState;  // state of each page
269     std::list<unsigned int> stackOfPages;
270     unsigned int lastPageAccessed;
271
272 public:
273         inline vmLRUReplacementPolicy(unsigned int nPages_, 
274                                                                   const std::vector<ENTRY_TYPE *> &pageTable_, 
275                                                                   const std::vector<pageState_t *> &pageState_)
276                 : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
277
278     inline void pageAccessed(unsigned int page)
279                 {
280                         if(page != lastPageAccessed)
281                         {
282                                 lastPageAccessed = page;
283
284                                 // delete this page from the stack and push it at the top
285                                 std::list<unsigned int>::iterator i;
286                                 for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
287                                         if(*i == page)
288                                                 i = stackOfPages.erase(i);
289
290                                 stackOfPages.push_back(page);
291                         }
292                 }
293
294     inline unsigned int selectPage()
295                 {
296                         if(stackOfPages.size() == 0)
297                                 return MSA_INVALID_PAGE_NO;
298
299                         // find a non-empty unlocked page to swap, delete all empty pages from the stack
300                         std::list<unsigned int>::iterator i = stackOfPages.begin();
301                         while(i != stackOfPages.end())
302                         {
303                                 if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
304                                 else if(!pageState[*i]->canPageOut()) i++;
305                                 else break;
306                         }
307
308                         if(i != stackOfPages.end())
309                                 return *i;
310                         else
311                                 return MSA_INVALID_PAGE_NO;
312                 }
313 };
314
315 /**
316    class vmNRUPageReplacementPolicy
317    This class provides the functionality of not-recently-used page replacement policy.
318    It needs to be notified when a page is accessed using the pageAccessed() function and
319    a page can be selected for replacement using the selectPage() function.
320   
321    "not-recently-used" could replace any page that has not been used in the 
322    last K accesses; that is, it's a memory-limited version of LRU.
323   
324    pageAccessed is O(1).
325    selectPage best-case is O(K) (if we immediately find a doomed page); 
326    worst-case is O(K n) (if there are no doomed pages).
327 */
328 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
329 class vmNRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
330 {
331 protected:
332         unsigned int nPages;            // number of pages
333         const std::vector<ENTRY_TYPE *> &pageTable; // actual pages (NULL means page is gone)
334         typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
335         const std::vector<pageState_t *> &pageState;  // state of each page
336     enum {K=5}; // Number of distinct pages to remember
337     unsigned int last[K]; // pages that have been used recently
338     unsigned int Klast; // index into last array.
339     
340     unsigned int victim; // next page to throw out.
341     
342     bool recentlyUsed(unsigned int page) {
343         for (int k=0;k<K;k++) if (page==last[k]) return true;
344         return false;
345     }
346
347 public:
348         inline vmNRUReplacementPolicy(unsigned int nPages_, 
349                                                                   const std::vector<ENTRY_TYPE *> &pageTable_, 
350                                                                   const std::vector<pageState_t *> &pageState_)
351                 : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), Klast(0), victim(0)
352                 {
353                         for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
354                 }
355
356     inline void pageAccessed(unsigned int page)
357                 {
358                         if (page!=last[Klast]) {
359                                 Klast++; if (Klast>=K) Klast=0;
360                                 last[Klast]=page;
361                         }
362                 }
363
364     inline unsigned int selectPage() {
365         unsigned int last_victim=victim;
366         do {
367             victim++; if (victim>=nPages) victim=0;
368             if (pageTable[victim]
369                 &&pageState[victim]->canPageOut()
370                 &&!recentlyUsed(victim)) {
371                 /* victim is an allocated, unlocked, non-recently-used page: page him out. */
372                 return victim;
373             }
374         } while (victim!=last_victim);
375         return MSA_INVALID_PAGE_NO;  /* nobody is pageable */
376     }
377 };
378
379 //================================================================
380
381 /**
382    Holds the typed data for one MSA page.
383    Implementation of puppedPage used by the templated code.
384 */
385 template <
386         class ENTRY, 
387         class MERGER=DefaultEntry<ENTRY>,
388         unsigned int ENTRIES_PER_PAGE=MSA_DEFAULT_ENTRIES_PER_PAGE
389         >
390 class MSA_PageT {
391     unsigned int n; // number of entries on this page.  Used to send page updates.
392         /** The contents of this page: array of ENTRIES_PER_PAGE items */
393         ENTRY *data;
394         /** Merger object */
395         MERGER m;
396         bool duplicate;
397
398 public:
399
400         MSA_PageT()
401                 : n(ENTRIES_PER_PAGE), data(new ENTRY[ENTRIES_PER_PAGE]), duplicate(false)
402                 {
403                         for (int i=0;i<ENTRIES_PER_PAGE;i++){
404                                 data[i]=m.getIdentity();
405                         }
406                 }
407
408     // This constructor is used in PageArray to quickly convert an
409     // array of ENTRY into an MSA_PageT.  So we just make a copy of
410     // the pointer.  When it comes time to destruct the object, we
411     // need to ensure we do NOT delete the data but just discard the
412     // pointer.
413         MSA_PageT(ENTRY *d):data(d), duplicate(true), n(ENTRIES_PER_PAGE) {
414     }
415         MSA_PageT(ENTRY *d, unsigned int n_):data(d), duplicate(true), n(n_) {
416     }
417         virtual ~MSA_PageT() {
418                 if (!duplicate) {
419             delete [] data;
420         }
421         }
422
423         virtual void pup(PUP::er &p) {
424                 p | n;
425                 /*this pup routine was broken, It didnt consider the case
426                   in which n > 0 and data = NULL. This is possible when  
427                   sending empty pages. It also doesnt seem to do any allocation
428                   for the data variable while unpacking which seems to be wrong
429                 */
430                 bool nulldata;
431                 if(!p.isUnpacking()){
432                         nulldata = (data == NULL);
433                 }
434                 p | nulldata;
435                 if(nulldata){
436                         data = NULL;
437                         return;
438                 }
439                 if(p.isUnpacking()){
440                         data = new ENTRY[n];
441                 }
442                 for (int i=0;i<n;i++){
443                         p|data[i];
444                 }       
445         }
446
447         virtual void merge(MSA_PageT<ENTRY, MERGER, ENTRIES_PER_PAGE> &otherPage) {
448                 for (int i=0;i<ENTRIES_PER_PAGE;i++)
449                         m.accumulate(data[i],otherPage.data[i]);
450         }
451
452         // These accessors might be used by the templated code.
453         inline ENTRY &operator[](int i) {return data[i];}
454         inline const ENTRY &operator[](int i) const {return data[i];}
455     inline ENTRY *getData() { return data; }
456 };
457
458 //=============================== Cache Manager =================================
459
460 template <class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE>
461 class MSA_CacheGroup : public Group
462 {
463     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
464
465 protected:
466     ENTRY_OPS_CLASS *entryOpsObject;
467     unsigned int numberOfWorkerThreads;      // number of worker threads across all processors for this shared array
468     // @@ migration?
469     unsigned int numberLocalWorkerThreads;   // number of worker threads on THIS processor for this shared array
470         unsigned int numberLocalWorkerThreadsActive;
471     unsigned int enrollDoneq;                 // has enroll() been done on this processor?
472     MSA_Listeners enrollWaiters;
473     MSA_Listeners syncWaiters;
474     std::set<int> enrolledPEs;                          // which PEs are involved?
475
476     unsigned int nPages;            ///< number of pages
477         std::vector<ENTRY_TYPE*> pageTable;          ///< the page table for this PE: stores actual data.
478     typedef MSA_Page_StateT<ENTRY_TYPE,ENTRIES_PER_PAGE> pageState_t;
479         std::vector<pageState_t *> pageStateStorage; ///< Housekeeping information for each allocated page.
480     
481     std::stack<ENTRY_TYPE*> pagePool;     // a pool of unused pages
482     
483         typedef vmNRUReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> vmPageReplacementPolicy;
484     MSA_PageReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> *replacementPolicy;
485
486     // structure for the bounds of a single write
487     typedef struct { unsigned int begin; unsigned int end; } writebounds_t;
488
489     // a list of write bounds associated with a given page
490     typedef std::list<writebounds_t> writelist_t;
491
492     writelist_t** writes;           // the write lists for each page
493
494     unsigned int resident_pages;             // pages currently allocated
495     unsigned int max_resident_pages;         // max allowable pages to allocate
496     unsigned int nEntries;          // number of entries for this array
497     unsigned int syncAckCount;      // number of sync ack's we received
498     int outOfBufferInPrefetch;      // flag to indicate if the last prefetch ran out of buffers
499
500     int syncThreadCount;            // number of local threads that have issued Sync
501     
502     
503     // used during output
504     MSA_WriteSpan_t writeSpans[ENTRIES_PER_PAGE];
505     ENTRY_TYPE writeEntries[ENTRIES_PER_PAGE];
506
507     typedef CProxy_MSA_PageArray<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_PageArray_t;
508     CProxy_PageArray_t pageArray;     // a proxy to the page array
509     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
510     CProxy_CacheGroup_t thisProxy; // a proxy to myself.
511
512     std::map<CthThread, MSA_Thread_Listener *> threadList;
513
514     /// Return the state for this page, returning NULL if no state available.
515     inline pageState_t *stateN(unsigned int pageNo) {
516         return pageStateStorage[pageNo];
517     }
518     
519         /// Return the state for this page, allocating if needed.
520         pageState_t *state(unsigned int pageNo)
521                 {
522                         pageState_t *ret=pageStateStorage[pageNo];
523                         if (ret==NULL)
524                         {
525                                 ret=new pageState_t;
526                                 pageStateStorage[pageNo]=ret;
527                         }
528                         return ret;
529                 }
530
531     /// Look up or create the listener for the current thread.
532     MSA_Thread_Listener *getListener(void) {
533         CthThread t=CthSelf();
534         MSA_Thread_Listener *l=threadList[t];
535                 if (l==NULL) {
536                 l=new MSA_Thread_Listener;
537                 threadList[t]=l;
538                 }
539                 return l;
540     }
541     /// Add our thread to this list and suspend
542     void addAndSuspend(MSA_Listeners &dest) {
543         MSA_Thread_Listener *l=getListener();
544                 dest.add(l);
545                 l->suspend();
546     }
547
548     /*********************************************************************************/
549     /** these routines deal with managing the page queue **/
550
551     // increment the number of pages the thread is waiting on.
552     // also add thread to the page queue; then thread is woken when the page arrives.
553     inline void IncrementPagesWaiting(unsigned int page)
554                 {
555                         state(page)->readRequests.add(getListener());
556                 }
557
558     inline void IncrementChangesWaiting(unsigned int page)
559                 {
560                         state(page)->writeRequests.add(getListener());
561                 }
562
563 /************************* Page allocation and management **************************/
564     
565     /// Allocate a new page, removing old pages if we're over the limit.
566     /// Returns NULL if no buffer space is available
567     inline ENTRY_TYPE* tryBuffer(int async=0) // @@@
568                 {
569                         ENTRY_TYPE* nu = NULL;
570
571                         // first try the page pool
572                         if(!pagePool.empty())
573                         {
574                                 nu = pagePool.top();
575                                 CkAssert(nu != NULL);
576                                 pagePool.pop();
577                         }
578
579                         // else try to allocate the buffer
580                         if(nu == NULL && resident_pages < max_resident_pages)
581                         {
582                                 nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
583                                 resident_pages++;
584                         }
585
586                         // else swap out one of the pages
587                         if(nu == NULL)
588                         {
589                                 int pageToSwap = replacementPolicy->selectPage();
590                                 if(pageToSwap != MSA_INVALID_PAGE_NO)
591                                 {
592                                         CkAssert(pageTable[pageToSwap] != NULL);
593                                         CkAssert(state(pageToSwap)->canPageOut() == true);
594                 
595                                         relocatePage(pageToSwap, async);
596                                         nu = pageTable[pageToSwap];
597                                         pageTable[pageToSwap] = 0;
598                                         delete pageStateStorage[pageToSwap];
599                                         pageStateStorage[pageToSwap]=0;
600                                 }
601                         }
602
603                         // otherwise return NULL
604
605                         return nu;
606                 }
607     
608     /// Allocate storage for this page, if none has been allocated already.
609     ///  Update pageTable, and return the storage for the page.
610     inline ENTRY_TYPE* makePage(unsigned int page) // @@@
611                 {
612                         ENTRY_TYPE* nu=pageTable[page];
613                         if (nu==0) {
614                                 nu=tryBuffer();
615                                 if (nu==0) CkAbort("MSA: No available space to create pages.\n");
616                                 pageTable[page]=nu;
617                         }
618                         return nu;
619                 }
620     
621     /// Throw away this allocated page.
622     ///  Returns the page itself, for deletion or recycling.
623     ENTRY_TYPE* destroyPage(unsigned int page)
624                 {
625                         ENTRY_TYPE* nu=pageTable[page];
626                         pageTable[page] = 0;
627                         if (pageStateStorage[page]->canDelete()) {
628                                 delete pageStateStorage[page];
629                                 pageStateStorage[page]=0;
630                         }
631                         resident_pages--;
632                         return nu;
633                 }
634     
635     //MSA_CacheGroup::
636     void pageFault(unsigned int page, MSA_Page_Fault_t why)
637                 {
638                         // Write the page to the page table
639                         state(page)->state = why;
640                         if(why == Read_Fault)
641                         { // Issue a remote request to fetch the new page
642                                 // If the page has not been requested already, then request it.
643                                 if (stateN(page)->readRequests.size()==0) {
644                                         pageArray[page].GetPage(CkMyPe());
645                                         //ckout << "Requesting page first time"<< endl;
646                                 } else {
647                                         ;//ckout << "Requesting page next time.  Skipping request."<< endl;
648                                 }
649                                 MSA_Thread_Listener *l=getListener();
650                                 stateN(page)->readRequests.add(l);
651                                 l->suspend(); // Suspend until page arrives.
652                         }
653                         else {
654                                 // Build an empty buffer into which to create the new page
655                                 ENTRY_TYPE* nu = makePage(page);
656                                 writeIdentity(nu);
657                         }
658                 }
659     
660     /// Make sure this page is accessible, faulting the page in if needed.
661     // MSA_CacheGroup::
662     inline void accessPage(unsigned int page,MSA_Page_Fault_t access)
663                 {
664                         if (pageTable[page] == 0) {
665 //             ckout << "p" << CkMyPe() << ": Calling pageFault" << endl;
666                                 pageFault(page, access);
667                         }
668 #ifndef CMK_OPTIMIZE
669                         if (stateN(page)->state!=access) {
670                                 CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
671                                 CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
672                         }
673 #endif
674                         replacementPolicy->pageAccessed(page);
675                 }
676
677     // MSA_CacheGroup::
678     // Fill this page with identity values, to prepare for writes or 
679     //  accumulates.
680     void writeIdentity(ENTRY_TYPE* pagePtr)
681                 {
682                         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
683                                 pagePtr[i] = entryOpsObject->getIdentity();
684                 }
685     
686 /************* Page Flush and Writeback *********************/
687     bool shouldWriteback(unsigned int page) {
688         if (!pageTable[page]) return false;
689                 return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
690     }
691     
692     inline void relocatePage(unsigned int page, int async)
693                 {
694                         //CkAssert(pageTable[page]);
695                         if(shouldWriteback(page))
696                         {
697                                 // the page to be swapped is a writeable page. So inform any
698                                 // changes this node has made to the page manager
699                                 sendChangesToPageArray(page, async);
700                         }
701                 }
702
703     inline void sendChangesToPageArray(const unsigned int page, const int async)
704                 {
705                         sendRLEChangesToPageArray(page);
706         
707                         MSA_Thread_Listener *l=getListener();
708                         state(page)->writeRequests.add(l);
709                         if (!async)
710                                 l->suspend(); // Suspend until page is really gone.
711                         // TODO: Are write acknowledgements really necessary ?
712                 }
713
714     // Send the page data as a contiguous block.
715     //   Note that this is INCORRECT when writes to pages overlap!
716     inline void sendNonRLEChangesToPageArray(const unsigned int page) // @@@
717                 {
718                         pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
719                 }
720     
721     // Send the page data as an RLE block.
722     // this function assumes that there are no overlapping writes in the list
723     inline void sendRLEChangesToPageArray(const unsigned int page)
724                 {
725                         ENTRY_TYPE *writePage=pageTable[page];
726                         int nSpans=stateN(page)->writeSpans(writeSpans);
727                         if (nSpans==1) 
728                         { /* common case: can make very fast */
729                                 int nEntries=writeSpans[0].end-writeSpans[0].start;
730                                 if (entryOpsObject->pupEveryElement()) {
731                                         pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
732                                                                                                                         page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
733                                                                                                                         CkMyPe(),stateN(page)->state);
734                                 } else {
735                                         pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
736                                                                                                          &writePage[writeSpans[0].start], nEntries,
737                                                                                                          CkMyPe(),stateN(page)->state);
738                                 }
739                         } 
740                         else /* nSpans>1 */ 
741                         { /* must copy separate spans into a single output buffer (luckily rare) */
742                                 int nEntries=0;
743                                 for (int s=0;s<nSpans;s++) {
744                                         for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
745                                                 writeEntries[nEntries++]=writePage[i]; // calls assign
746                                 }
747                                 if (entryOpsObject->pupEveryElement()) {
748                                         pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
749                                                                                                                         page_t(writeEntries,nEntries),nEntries,
750                                                                                                                         CkMyPe(),stateN(page)->state);
751                                 } else {
752                                         pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
753                                                                                                          writeEntries,nEntries,
754                                                                                                          CkMyPe(),stateN(page)->state);
755                                 }
756                         }
757                 }
758
759 /*********************** Public Interface **********************/
760 public:
761     // 
762     //
763     // MSA_CacheGroup::
764         inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
765                                                   unsigned int max_bytes_, unsigned int nEntries_, 
766                                                   unsigned int numberOfWorkerThreads_)
767                 : numberOfWorkerThreads(numberOfWorkerThreads_),
768                   nPages(nPages_),
769                   nEntries(nEntries_), 
770                   pageTable(nPages, NULL),
771                   pageStateStorage(nPages, NULL),
772                   pageArray(pageArrayID),
773                   thisProxy(thisgroup),
774                   max_resident_pages(max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE)),
775                   entryOpsObject(new ENTRY_OPS_CLASS),
776                   replacementPolicy(new vmPageReplacementPolicy(nPages, pageTable, pageStateStorage)),
777                   outOfBufferInPrefetch(0), syncAckCount(0),syncThreadCount(0),
778                   resident_pages(0), numberLocalWorkerThreads(0), 
779                   numberLocalWorkerThreadsActive(0), enrollDoneq(0)
780                 {
781                         MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
782                 }
783
784     // MSA_CacheGroup::
785     inline ~MSA_CacheGroup()
786                 {
787                         FreeMem();
788                 }
789
790     /* To change the accumulate function TBD @@ race conditions */
791     inline void changeEntryOpsObject(ENTRY_OPS_CLASS *e) {
792         entryOpsObject = e;
793         pageArray.changeEntryOpsObject(e);
794     }
795
796     // MSA_CacheGroup::
797     inline const ENTRY_TYPE* readablePage(unsigned int page)
798                 {
799                         accessPage(page,Read_Fault);
800         
801                         return pageTable[page];
802                 }
803
804     // MSA_CacheGroup::
805     //
806     // known local page
807     inline const void* readablePage2(unsigned int page)
808                 {
809                         return pageTable[page];
810                 }
811
812     // MSA_CacheGroup::
813     // Obtains a writable copy of the page.
814     inline ENTRY_TYPE* writeablePage(unsigned int page, unsigned int offset)
815                 {
816                         accessPage(page,Write_Fault);
817
818                         // NOTE: Since we assume write once semantics, i.e. between two calls to sync,
819                         // either there can be no write to a location or a single write to a location,
820                         // a readable page will suffice as a writeable page too, because no one else
821                         // is going to write to this location. In reality, two locations on the *same*
822                         // page can be written by two different threads, in which case we will need
823                         // to keep track of which parts of the page have been written, hence:
824                         stateN(page)->write(offset);
825 //     ckout << "write:" << page*ENTRIES_PER_PAGE+offset << endl;
826         
827                         return pageTable[page];
828                 }
829
830     // MSA_CacheGroup::
831     inline ENTRY_TYPE &accumulate(unsigned int page, unsigned int offset)
832                 {
833                         accessPage(page,Accumulate_Fault);
834                         stateN(page)->write(offset);
835                         return pageTable[page][offset];
836                 }
837
838     /// A requested page has arrived from the network.
839     ///  nEntriesInPage_ = num entries being sent (0 for empty page, num entries otherwise)
840     inline void ReceivePageWithPUP(unsigned int page, page_t &pageData, int size)
841                 {
842                         ReceivePage(page, pageData.getData(), size);
843                 }
844
845     inline void ReceivePage(unsigned int page, ENTRY_TYPE* pageData, int size)
846                 {
847                         CkAssert(0==size || ENTRIES_PER_PAGE == size);
848                         // the page we requested has been received
849                         ENTRY_TYPE *nu=makePage(page);
850                         if(size!=0)
851                         {
852                                 for(unsigned int i = 0; i < size; i++)
853                                         nu[i] = pageData[i]; // @@@, calls assignment operator
854                         }
855                         else /* isEmpty */
856                         {
857                                 // the page we requested for is empty, so we can just initialize it.
858                                 writeIdentity(nu);
859                         }
860         
861                         state(page)->readRequests.signal(page);
862                 }
863
864     // This EP is invoked during sync to acknowledge that a dirty page
865     // has been received and written back to the page owner.  We keep track
866     // of the number of ack's yet to arrive in nChangesWaiting.  Once
867     // all the dirty pages have been ack'd, we awaken the thread that
868     // flushed the page.
869     //
870     // It's not clear this is useful very often...
871     //
872     // MSA_CacheGroup::
873     inline void AckPage(unsigned int page)
874                 {
875                         state(page)->writeRequests.signal(page);
876                 }
877
878     // MSA_CacheGroup::
879     // synchronize all the pages and also clear up the cache
880     inline void SyncReq(int single)
881                 {
882                         MSADEBPRINT(printf("SyncReq single %d\n",single););
883                         if(single)
884                         {
885                                 /*ask all the caches to send their updates to the page
886                                  * array, but we don't need to empty the caches on the
887                                  * other PEs*/
888                                 SingleSync();
889                                 EmptyCache();
890
891                                 getListener()->suspend();
892                         }
893                         else{
894                                 Sync();
895                         }
896                 }
897
898     // MSA_CacheGroup::
899     inline void FlushCache()
900                 {
901                         // flush the local cache
902                         // for each writeable page, send that page to the array element
903                         for(unsigned int i = 0; i < nPages; i++)
904                         {
905                                 if(shouldWriteback(i)) {
906                                         //ckout << "p" << CkMyPe() << "FlushCache: sending page " << i << endl;
907                                         sendChangesToPageArray(i, 1);
908                                 }
909                         }
910                 }
911
912     // MSA_CacheGroup::
913     void EmptyCache()
914                 {
915                         /* just makes all the pages empty, assuming that the data
916                          * in those pages has been flushed to the owners */
917                         for(unsigned int i = 0; i < nPages; i++)
918                         {
919                                 if(pageTable[i]) pagePool.push(destroyPage(i));
920                         }
921                 }
922
923 /************************ Enroll ********************/
924     /// Enroll phase 1: called by users.
925     // MSA_CacheGroup::
926     inline void enroll(unsigned int num_workers)
927                 {
928                         CkAssert(num_workers == numberOfWorkerThreads); // just to verify
929                         CkAssert(enrollDoneq == 0);
930                         numberLocalWorkerThreads++;
931                         numberLocalWorkerThreadsActive++;
932                         // @@ how to ensure that enroll is called only once?
933
934                         //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
935                         thisProxy[0].enrollAck(CkMyPe());
936                         //ckout << "[" << CkMyPe() << "] suspening thread in Sync() " << endl;
937                         addAndSuspend(enrollWaiters);
938                         //ckout << "[" << CkMyPe() << "] rsuming thread in Sync()" << endl;
939
940                         CkAssert(enrollDoneq == 1);
941                         return;
942                 }
943
944     /// Enroll phase 2: called on PE 0 from everywhere
945     inline void enrollAck(int originator)
946                 {
947                         CkAssert(CkMyPe() == 0);  // enrollAck is only called on PE 0
948                         CkAssert(enrollDoneq == 0);  // prevent multiple enroll operations
949         
950                         syncAckCount++;
951                         enrolledPEs.insert(originator);
952                         //ckout << "[" << CkMyPe() << "] SyncAckcount = " << syncAckCount << endl;
953                         if(syncAckCount == numberOfWorkerThreads) {
954 //             ckout << "[" << CkMyPe() << "]" << "Enroll operation is almost done" << endl;
955                                 syncAckCount = 0;
956                                 enrollDoneq = 1;
957                                 // What if fewer worker threads than pe's ?  Handled in
958                                 // enrollDone.
959                                 thisProxy.enrollDone();
960                         }
961                 }
962
963     /// Enroll phase 3: called everywhere by PE 0
964     inline void enrollDone()
965                 {
966 //         ckout << "[" << CkMyPe() << "] enrollDone.  Waking threads."
967 //               <<  " numberOfWorkerThreads=" << numberOfWorkerThreads
968 //               <<  " local=" << numberLocalWorkerThreads << endl;
969                         enrollDoneq = 1;
970                         enrollWaiters.signal(0);
971                 }
972
973 /******************************** Sync & writeback ***********************/
974     // MSA_CacheGroup::
975     inline void SingleSync()
976                 {
977                         /* a single thread issued a sync call with all = 1. The
978                          * first thing to do is to flush the local cache */
979                         FlushCache();
980                 }
981
982         void SyncRelease()
983                 {
984                         numberLocalWorkerThreadsActive--;
985
986                         syncDebug();
987                         
988                         if(syncThreadCount < numberLocalWorkerThreadsActive)
989                         {
990                                 return;
991                         }
992
993                         thisProxy[CkMyPe()].FinishSync();
994                 }
995
996         void syncDebug()
997                 {
998                         MSADEBPRINT(printf("Sync  (Total threads: %d, Active: %d, Synced: %d)\n", 
999                                                            numberLocalWorkerThreads, numberLocalWorkerThreadsActive, syncThreadCount));
1000                 }
1001
1002         void activate()
1003                 {
1004                         numberLocalWorkerThreadsActive++;
1005                         
1006                         CkAssert(numberLocalWorkerThreadsActive <= numberLocalWorkerThreads);
1007                 }
1008
1009         void FinishSync()
1010                 {
1011                         //ckout << "[" << CkMyPe() << "] Sync started" << endl;
1012
1013                         // flush the cache asynchronously and also empty it
1014                         FlushCache();
1015                         // idea: instead of invalidating the pages, switch it to read
1016                         // mode. That will not work, since the page may have also been
1017                         // modified by another thread.
1018                         EmptyCache();
1019
1020                         // Now, we suspend too (if we had at least one dirty page).
1021                         // We will be awoken when all our dirty pages have been
1022                         // written and acknowledged.
1023                         MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
1024                         getListener()->suspend();
1025                         MSADEBPRINT(printf("Sync awakening after suspend\n"););
1026
1027                         // So far, the sync has been asynchronous, i.e. PE0 might be ahead
1028                         // of PE1.  Next we basically do a barrier to ensure that all PE's
1029                         // are synchronized.
1030
1031                         // at this point, the sync's across the group should
1032                         // synchronize among themselves by each one sending
1033                         // a sync acknowledge message to PE 0. (this is like
1034                         // a reduction over a group)
1035                         if(CkMyPe() != 0)
1036                         {
1037                                 thisProxy[0].SyncAck();
1038                         }
1039                         else /* I *am* PE 0 */
1040                         {
1041                                 SyncAck();
1042                         }
1043                         MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
1044                         /* Wait until sync is reflected from PE 0 */
1045                         addAndSuspend(syncWaiters);
1046                                 
1047                         MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
1048                         //ckout << "[" << CkMyPe() << "] Sync finished" << endl;                        
1049                 }
1050
1051     // MSA_CacheGroup::
1052     inline void Sync()
1053                 {
1054                         syncThreadCount++;
1055                         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
1056                         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
1057                         //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
1058
1059                         syncDebug();
1060
1061                         // First, all threads on this processor need to reach the sync
1062                         // call; only then can we proceed with merging the data.  Only
1063                         // the last thread on this processor needs to do the FlushCache,
1064                         // etc.  Others just suspend until the sync is over.
1065                         MSADEBPRINT(printf("Sync  (Total threads: %d, Active: %d, Synced: %d)\n", 
1066                                                            numberLocalWorkerThreads, numberLocalWorkerThreadsActive, syncThreadCount));
1067                         if(syncThreadCount < numberLocalWorkerThreadsActive)
1068                         {
1069                                 MSADEBPRINT(printf("Sync addAndSuspend\n"));
1070                                 addAndSuspend(syncWaiters);
1071                                 return;
1072                         }
1073
1074                         FinishSync();
1075                 }
1076
1077     inline unsigned int getNumEntries() { return nEntries; }
1078     inline CProxy_PageArray_t getArray() { return pageArray; }
1079
1080     // TODO: Can this SyncAck and other simple Acks be made efficient?
1081 // Yes - Replace calls to this with contributes to a reduction that calls pageArray.Sync()
1082     inline void SyncAck()
1083                 {
1084                         CkAssert(CkMyPe() == 0);  // SyncAck is only called on PE 0
1085                         syncAckCount++;
1086                         // DONE @@ what if fewer worker threads than pe's ?
1087                         // @@ what if fewer worker threads than pe's and >1 threads on 1 pe?
1088                         //if(syncAckCount == min(numberOfWorkerThreads, CkNumPes())){
1089                         if (syncAckCount == enrolledPEs.size()) {
1090                                 MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",
1091                                                                    nEntries, nPages););
1092                                 pageArray.Sync();
1093                         }               
1094                 }
1095
1096     inline void SyncDone(CkReductionMsg *m)
1097                 {
1098                         delete m;
1099                         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1100                         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1101                         /* Reset for next sync */
1102                         syncThreadCount = 0;
1103                         syncAckCount = 0;
1104                         MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
1105                         syncWaiters.signal(0);
1106                 }
1107
1108     inline void FreeMem()
1109                 {
1110                         for(unsigned int i = 0; i < nPages; i++)
1111                         {
1112                                 if(pageTable[i]) delete [] destroyPage(i);
1113                         }
1114
1115                         while(!pagePool.empty())
1116                         {
1117                                 delete [] pagePool.top();  // @@@
1118                                 pagePool.pop();
1119                         }
1120         
1121                         resident_pages=0;
1122                 }
1123
1124         /** 
1125                 Deregister a client. Decrement the number of local threads. If total number of local threads 
1126                 hits 0 FreeMem()
1127         */
1128         inline void unroll() {
1129                 numberLocalWorkerThreads--;
1130                 if(numberLocalWorkerThreads == 0){
1131                         FreeMem();
1132                 }
1133         }
1134
1135     /**
1136      * Issue a prefetch request for the given range of pages. These pages will
1137      * be locked into the cache, so that they will not be swapped out.
1138      */
1139     inline void Prefetch(unsigned int pageStart, unsigned int pageEnd)
1140                 {
1141                         /* prefetching is feasible only if we we did not encounter an out
1142                          * of buffer condition in the previous prefetch call
1143                          */
1144                         if(!outOfBufferInPrefetch)
1145                         {
1146                                 //ckout << "prefetching pages " << pageStart << " through " << pageEnd << endl;
1147                                 for(unsigned int p = pageStart; p <= pageEnd; p++)
1148                                 {
1149                                         if(NULL == pageTable[p])
1150                                         {
1151
1152                                                 /* relocate the buffer asynchronously */
1153                                                 ENTRY_TYPE* nu = tryBuffer(1);
1154                                                 if(NULL == nu)
1155                                                 {
1156                                                         /* signal that sufficient buffer space is not available */
1157                                                         outOfBufferInPrefetch = 1;
1158                                                         break;
1159                                                 }
1160
1161                                                 pageTable[p] = nu;
1162                                                 state(p)->state = Read_Fault;
1163
1164                                                 pageArray[p].GetPage(CkMyPe());
1165                                                 IncrementPagesWaiting(p);
1166                                                 //ckout << "Prefetch page" << p << ", pages waiting = " << nPagesWaiting << endl;
1167                                                 /* don't suspend the thread */
1168                                         }
1169
1170                                         /* mark the page as being locked */
1171                                         state(p)->locked = true;
1172                                 }
1173                         }
1174                 }
1175
1176     /**
1177      * Wait for all the prefetch pages to be fetched into the cache.
1178      * Returns: 0 if prefetch successful, 1 if not
1179      */
1180     inline int WaitAll(void)
1181                 {
1182                         if(outOfBufferInPrefetch)
1183                         {
1184                                 // we encountered out of buffer in the previous prefetch call, return error
1185                                 outOfBufferInPrefetch = 0;
1186                                 getListener()->suspend();
1187                                 UnlockPages();
1188                                 return 1;
1189                         }
1190                         else
1191                         {
1192                                 // prefetch requests have been successfully issued already, so suspend the
1193                                 // thread and wait for completion
1194                                 outOfBufferInPrefetch = 0;
1195                                 getListener()->suspend();
1196                                 return 0;
1197                         }
1198                 }
1199     
1200     inline void UnlockPage(unsigned int page) {
1201         pageState_t *s=stateN(page);
1202                 if(s && s->locked) {
1203             replacementPolicy->pageAccessed(page);
1204             s->locked = false;
1205                 }
1206     }
1207
1208     /**
1209      * Unlock all the pages locked in the cache
1210      */
1211     inline void UnlockPages()
1212                 {
1213                         // add all the locked pages to page replacement policy
1214                         for(unsigned int page = 0; page < nPages; page++)
1215                                 UnlockPage(page);
1216                 }
1217
1218     /**
1219      * Unlock the given pages: [startPage ... endPage]
1220      *  Note that the range is inclusive.
1221      */
1222     inline void UnlockPages(unsigned int startPage, unsigned int endPage)
1223                 {
1224                         for(unsigned int page = startPage; page <= endPage; page++)
1225                                 UnlockPage(page);
1226                 }
1227
1228     /// Debugging routine
1229     inline void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset)
1230                 {
1231                         CkAssert( pageNum < nPages );
1232                         CkAssert( offset < ENTRIES_PER_PAGE );
1233
1234                         //ckout << "p" << CkMyPe() << "ID" << ID;
1235 //         if (pageTable[pageNum] == 0)
1236 //             ckout << "emitBufferValue: page " << pageNum << " not available in local cache." << endl;
1237 //         else
1238 //             ckout << "emitBufferValue: [" << pageNum << "," << offset << "] = " << pageTable[pageNum][offset] << endl;
1239                 }
1240 };
1241
1242 // each element of this array is responsible for managing
1243 // the information about a single page. It is in effect the
1244 // "owner" as well as the "manager" for that page.
1245 //
1246 template<class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE> 
1247 class MSA_PageArray : public ArrayElement1D
1248 {
1249     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
1250     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
1251     
1252 protected:
1253     ENTRY_TYPE *epage;
1254     ENTRY_OPS_CLASS entryOpsObject;
1255     CProxy_CacheGroup_t cache;
1256
1257     unsigned int pageNo() { return thisIndex; }
1258
1259     inline void allocatePage(MSA_Page_Fault_t access) // @@@
1260                 {
1261                         if(epage == NULL)
1262                         {
1263                                 epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
1264                                 writeIdentity();
1265                         }
1266                 }
1267
1268     // begin and end are indexes into the page.
1269     inline void set(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1270                 {
1271                         //ckout << "set: " << begin << "," << end << endl;
1272                         for(unsigned int i = 0; i < (end - begin); i++) {
1273                                 epage[begin + i] = buffer[i]; // @@@, calls assignment operator
1274                                 //ckout << "set val[" << begin+i << "]=" << buffer[i] << endl;
1275                         }
1276                 }
1277
1278     // MSA_PageArray::
1279     inline void combine(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1280                 {
1281                         ENTRY_TYPE* pagePtr = epage + begin;
1282                         for(unsigned int i = 0; i < (end - begin); i++)
1283                                 entryOpsObject.accumulate(pagePtr[i], buffer[i]);
1284                 }
1285
1286     // MSA_PageArray::
1287     inline void writeIdentity()
1288                 {
1289                         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
1290                                 epage[i] = entryOpsObject.getIdentity();
1291                 }
1292
1293 public:
1294     inline MSA_PageArray() : epage(NULL) { }
1295     inline MSA_PageArray(CkMigrateMessage* m) { delete m; }
1296     
1297     void setCacheProxy(CProxy_CacheGroup_t &cache_)
1298                 {
1299                         cache=cache_;
1300                 }
1301     
1302     virtual void pup(PUP::er& p)
1303                 {
1304                         ArrayElement1D::pup(p);
1305                         int epage_present=(epage!=0);
1306                         p|epage_present;
1307                         if (epage_present) {
1308                                 if(p.isUnpacking())
1309                                         allocatePage(Write_Fault);
1310                                 for (int i=0;i<ENTRIES_PER_PAGE;i++)
1311                                         p|epage[i];
1312                         }
1313                 }
1314     
1315     inline ~MSA_PageArray()
1316                 {
1317                         if(epage) delete [] epage;
1318                 }
1319
1320     /// Request our page.
1321     ///   pe = to which to send page
1322     inline void GetPage(int pe)
1323                 {
1324                         if(epage == NULL) {
1325                                 // send empty page
1326                                 if (entryOpsObject.pupEveryElement())
1327                                         cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
1328                                 else
1329                                         cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
1330                         } else {
1331                                 // send page with data
1332                                 if (entryOpsObject.pupEveryElement())
1333                                         cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
1334                                 else
1335                                         cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);  // send page with data                
1336                         }
1337                 }
1338
1339     /// Receive a non-runlength encoded page from the network:
1340     // @@ TBD: ERROR: This does not work for  varsize pages.
1341     inline void PAReceivePage(ENTRY_TYPE *pageData,
1342                                                           int pe, MSA_Page_Fault_t pageState)
1343                 {
1344                         allocatePage(pageState);
1345
1346                         if(pageState == Write_Fault)
1347                                 set(pageData, 0, ENTRIES_PER_PAGE);
1348                         else
1349                                 combine(pageData, 0, ENTRIES_PER_PAGE);
1350         
1351                         // send the acknowledgement to the sender that we received the page
1352                         //ckout << "Sending Ack to PE " << pe << endl;
1353                         cache[pe].AckPage(thisIndex);
1354                 }
1355
1356     /// Receive a runlength encoded page from the network:
1357     inline void PAReceiveRLEPageWithPup(
1358         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1359         page_t &entries, unsigned int nEntries, 
1360         int pe, MSA_Page_Fault_t pageState)
1361                 {
1362                         PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
1363                 }
1364
1365
1366     inline void PAReceiveRLEPage(
1367         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1368         const ENTRY_TYPE *entries, unsigned int nEntries, 
1369         int pe, MSA_Page_Fault_t pageState)
1370                 {
1371                         allocatePage(pageState);
1372         
1373                         //ckout << "p" << CkMyPe() << "ReceiveRLEPage nSpans=" << nSpans << " nEntries=" << nEntries << endl;
1374                         int e=0; /* consumed entries */
1375                         for (int s=0;s<nSpans;s++) {
1376                                 if(pageState == Write_Fault)
1377                                         set(&entries[e], spans[s].start,spans[s].end);
1378                                 else /* Accumulate_Fault */
1379                                         combine(&entries[e], spans[s].start,spans[s].end);
1380                                 e+=spans[s].end-spans[s].start;
1381                         } 
1382
1383                         // send the acknowledgement to the sender that we received the page
1384                         //ckout << "Sending AckRLE to PE " << pe << endl;
1385                         cache[pe].AckPage(thisIndex);
1386                 }
1387
1388     // MSA_PageArray::
1389     inline void Sync()
1390                 {
1391                         MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n"););
1392                         CkCallback cb(CkIndex_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE>::SyncDone(NULL), cache);
1393                         contribute(0, NULL, CkReduction::concat, cb);
1394                 }
1395
1396     inline void emit(int ID, int index)
1397                 {
1398                         //ckout << "p" << CkMyPe() << "ID" << ID;
1399 //         if(epage == NULL)
1400 //             ckout << "emit: epage is NULL" << endl;
1401 //         else
1402 //             ckout << "emit: " << epage[index] << endl;
1403                 }
1404 };
1405
1406 #define CK_TEMPLATES_ONLY
1407 #include "msa.def.h"
1408 #undef CK_TEMPLATES_ONLY
1409
1410 #endif