Abstract the interface to the MSA page replacement policies
[charm.git] / src / libs / ck-libs / multiphaseSharedArrays / msa-DistPageMgr.h
1 // emacs mode line -*- mode: c++; tab-width: 4 -*-
2
3 #ifndef MSA_DISTPAGEMGR_H
4 #define MSA_DISTPAGEMGR_H
5
6 #include <charm++.h>
7 #include <string.h>
8 #include <list>
9 #include <stack>
10 #include <map>
11 #include <set>
12 #include <vector>
13 #include "msa-common.h"
14
15 // forward decl needed in msa-DistPageMgr.ci, i.e. in msa.decl.h
16
17 /// Stores a list of indices to be written out.
18 struct MSA_WriteSpan_t {
19     int start,end;
20     inline void pup(PUP::er &p) {
21         p|start; p|end;
22     }
23 };
24
25 template <class ENTRY, class MERGER,
26           unsigned int ENTRIES_PER_PAGE>
27          class MSA_PageT;
28 #include "msa.decl.h"
29
30 //=======================================================
31 // Utility Classes
32
33 /// Listens for some event on a page or set of pages.
34 class MSA_Listener {
35 public:
36         MSA_Listener() {}
37         virtual ~MSA_Listener();
38         /// Getting added to a lister list.
39         virtual void add(void) =0;
40         /// Event waiting for has occurred.
41         virtual void signal(unsigned int pageNo) =0;
42 };
43
44 /// Keeps a list of MSA_Listeners
45 class MSA_Listeners {
46         CkVec<MSA_Listener *> listeners;
47 public:
48         MSA_Listeners();
49         ~MSA_Listeners();
50         
51         /// Add this listener to your set.  Calls l->add().
52         void add(MSA_Listener *l);
53         
54         /// Return the number of listeners in our set.
55         unsigned int size(void) const {return listeners.size();}
56         
57         /// Signal all added listeners and remove them from the set.
58         void signal(unsigned int pageNo);
59 };
60
61
62 /** Resumes a thread once all needed pages have arrived */
63 class MSA_Thread_Listener : public MSA_Listener {
64         CthThread thread;    // the suspended thread of execution (0 if not suspended)
65         int count;  // number of pages we're still waiting for
66 public:
67         MSA_Thread_Listener() :thread(0), count(0) {}
68         
69         /// Wait for one more page.
70         void add(void);
71         
72         /// If we're waiting for any pages, suspend our thread.
73         void suspend(void);
74         
75         /// Another page arrived.
76         void signal(unsigned int pageNo);
77 };
78
79
80 /// Stores all housekeeping information about a cached copy of a page: 
81 ///   everything but the actual page data.
82 /// This is the non-templated superclass, which doesn't depend on the 
83 ///   page size or datatype.
84 class MSA_Page_State {
85 public:
86         /// e.g., Read_Fault for a read-only page.
87         MSA_Page_Fault_t state;
88         
89         /// If true, this page is locked in memory.
90         ///   Pages get locked so people can safely use the non-checking version of "get".
91         bool locked;
92         
93         /// Threads waiting for this page to be paged in from the network.
94         MSA_Listeners readRequests;
95         /// Threads waiting for this page to be paged out to the network.
96         MSA_Listeners writeRequests;
97         
98         /// Return true if this page can be safely written back.
99         bool canPageOut(void) const {
100                 return (!locked) && canDelete();
101         }
102         
103         /// Return true if this page can be safely purged from memory.
104         bool canDelete(void) const {
105                 return (readRequests.size()==0) && (writeRequests.size()==0);
106         }
107         
108         MSA_Page_State() :state(Uninit_State), locked(false) {}
109 };
110
111 /// Fast, fixed-size bitvector class.
112 // TODO: Replace with std::bitset
113 template <unsigned int NUM_BITS>
114 class fixedlength_bitvector {
115 public:
116         /// Data type used to store actual bits in the vector.
117         typedef unsigned long store_t;
118         enum { store_bits=8*sizeof(store_t) };
119         
120         /// Number of store_t's in our vector.
121         enum { len=(NUM_BITS+(store_bits-1))/store_bits };
122         store_t store[len];
123         
124         fixedlength_bitvector() {reset();}
125
126         /// Fill the entire vector with this value.
127         void fill(store_t s) {
128                 for (int i=0;i<len;i++) store[i]=s;
129         }
130
131         void reset(void) {fill(0);}
132         
133         /// Set-to-1 bit i of the vector.
134         void set(unsigned int i) { store[i/store_bits] |= (1lu<<(i%store_bits)); }
135
136         /// Clear-to-0 bit i of the vector.
137         void reset(unsigned int i) { store[i/store_bits] &= ~(1lu<<(i%store_bits)); }
138         
139         /// Return the i'th bit of the vector.
140         bool test(unsigned int i) { return (store[i/store_bits] & (1lu<<(i%store_bits))); }
141 };
142
143 /// Templated page housekeeping class.  
144 template <class ENTRY, unsigned int ENTRIES_PER_PAGE>
145 class MSA_Page_StateT : public MSA_Page_State {
146         /** Write tracking:
147            Somehow, we have to identify the entries in a page 
148          that have been written to.  Our method for doing this is
149          a bitvector: 0's indicate the entry hasn't been written; 
150          1's indicate the entry has been written.
151         */
152         typedef fixedlength_bitvector<ENTRIES_PER_PAGE> writes_t;
153         writes_t writes;
154         typedef typename writes_t::store_t writes_store_t;
155         enum {writes_bits=writes_t::store_bits};
156         
157         /// Tracking writes to our writes: a smaller vector, used to 
158         ///  avoid the large number of 0's in the writes vector.
159         ///  Bit i of writes2 indicates that store_t i of writes has 1's.
160         typedef fixedlength_bitvector<writes_t::len> writes2_t;
161         writes2_t writes2;
162         typedef typename writes2_t::store_t writes2_store_t;
163         enum {writes2_bits=writes2_t::store_bits*writes_t::store_bits};
164
165 public:
166         /// Write entry i of this page.
167         void write(unsigned int i) {
168                 writes.set(i);
169                 writes2.set(i/writes_t::store_bits);
170         }
171         
172         /// Clear the write list for this page.
173         void writeClear(void) {
174                 for (int i2=0;i2<writes2_t::len;i2++)
175                   if (writes2.store[i2]) { /* some bits set: clear them all */
176                         int o=i2*writes2_t::store_bits;
177                         for (int i=0;i<writes_t::len;i++) 
178                           writes.store[o+i]=0;
179                         writes2.store[i2]=0;
180                   }
181         }
182         
183         /// Return the nearest multiple of m >= v.
184         inline int roundUp(int v,int m) {
185                 return (v+m-1)/m*m;
186         }
187         
188         /// Get a list of our written output values as this list of spans.
189         ///   Returns the total number of spans written to "span".
190         int writeSpans(MSA_WriteSpan_t *span) {
191                 int nSpans=0;
192                 
193                 int cur=0; // entry we're looking at
194                 while (true) {
195                         /* skip over unwritten space */
196                         while (true) { 
197                                 if (writes2.store[cur/writes2_bits]==(writes2_store_t)0) 
198                                         cur=roundUp(cur+1,writes2_bits);
199                                 else if (writes.store[cur/writes_bits]==(writes_store_t)0)
200                                         cur=roundUp(cur+1,writes_bits); 
201                                 else if (writes.test(cur)==false)
202                                         cur++;
203                                 else /* writes.test(cur)==true */
204                                         break;
205                                 if (cur>=ENTRIES_PER_PAGE) return nSpans;
206                         }
207                         /* now writes.test(cur)==true */
208                         span[nSpans].start=cur;
209                         /* skip over written space */
210                         while (true) { 
211                                 /* // 0-1 symmetry doesn't hold here, since writes2 may have 1's, but writes may still have some 0's...
212                                 if (writes2.store[cur/writes2_bits]==~(writes2_store_t)0) 
213                                         cur=roundUp(cur+1,writes2_bits);
214                                 else */
215                                 if (writes.store[cur/writes_bits]==~(writes_store_t)0)
216                                         cur=roundUp(cur+1,writes_bits); 
217                                 else if (writes.test(cur)==true)
218                                         cur++;
219                                 else /* writes.test(cur)==false */
220                                         break;
221                                 if (cur>=ENTRIES_PER_PAGE) {
222                                         span[nSpans++].end=ENTRIES_PER_PAGE; /* finish the last span */
223                                         return nSpans;
224                                 }
225                         }
226                         /* now writes.test(cur)==false */
227                         span[nSpans++].end=cur;
228                 }
229         }
230 };
231
232
233 //=======================================================
234 // Page-out policy
235
236 /**
237    class vmPageReplacementPolicy
238    Abstract base class providing the interface to the various page
239    replacement policies available for use with an MSA
240 */
241 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
242 class MSA_PageReplacementPolicy
243 {
244 public:
245   /// Note that a page was just accessed
246   virtual void pageAccessed(unsigned int page) = 0;
247
248   /// Ask for the index of a page to discard
249   virtual unsigned int selectPage() = 0;
250 };
251
252 /**
253   class vmLRUPageReplacementPolicy
254   This class provides the functionality of least recently used page replacement policy.
255   It needs to be notified when a page is accessed using the pageAccessed() function and
256   a page can be selected for replacement using the selectPage() function.
257  
258   WARNING: a list is absolutely the wrong data structure for this class, 
259      because it makes *both* updating as well as searching for a page O(n),
260      where n is the number of pages.  A heap would be a better choice,
261      as both operations would then become O(lg(n))
262  */
263 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
264 class vmLRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
265 {
266 protected:
267     unsigned int nPages;            // number of pages
268   const std::vector<ENTRY_TYPE *> &pageTable; // actual data for pages (NULL means page is gone)
269   typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
270   const std::vector<pageState_t *> &pageState;  // state of each page
271     std::list<unsigned int> stackOfPages;
272     unsigned int lastPageAccessed;
273
274 public:
275   inline vmLRUReplacementPolicy(unsigned int nPages_, const std::vector<ENTRY_TYPE *> &pageTable_, const std::vector<pageState_t *> &pageState_)
276     : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
277
278     inline void pageAccessed(unsigned int page)
279     {
280         if(page != lastPageAccessed)
281         {
282             lastPageAccessed = page;
283
284             // delete this page from the stack and push it at the top
285                         std::list<unsigned int>::iterator i;
286             for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
287                 if(*i == page)
288                     i = stackOfPages.erase(i);
289
290             stackOfPages.push_back(page);
291         }
292     }
293
294     inline unsigned int selectPage()
295     {
296         if(stackOfPages.size() == 0)
297             return MSA_INVALID_PAGE_NO;
298
299         // find a non-empty unlocked page to swap, delete all empty pages from the stack
300                 std::list<unsigned int>::iterator i = stackOfPages.begin();
301         while(i != stackOfPages.end())
302         {
303             if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
304             else if(!pageState[*i]->canPageOut()) i++;
305             else break;
306         }
307
308         if(i != stackOfPages.end())
309             return *i;
310         else
311             return MSA_INVALID_PAGE_NO;
312     }
313 };
314
315 /**
316   class vmNRUPageReplacementPolicy
317   This class provides the functionality of not-recently-used page replacement policy.
318   It needs to be notified when a page is accessed using the pageAccessed() function and
319   a page can be selected for replacement using the selectPage() function.
320   
321   "not-recently-used" could replace any page that has not been used in the 
322   last K accesses; that is, it's a memory-limited version of LRU.
323   
324   pageAccessed is O(1).
325   selectPage best-case is O(K) (if we immediately find a doomed page); 
326              worst-case is O(K n) (if there are no doomed pages).
327  */
328 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
329 class vmNRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
330 {
331 protected:
332   unsigned int nPages;            // number of pages
333   const std::vector<ENTRY_TYPE *> &pageTable; // actual pages (NULL means page is gone)
334   typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
335   const std::vector<pageState_t *> &pageState;  // state of each page
336     enum {K=5}; // Number of distinct pages to remember
337     unsigned int last[K]; // pages that have been used recently
338     unsigned int Klast; // index into last array.
339     
340     unsigned int victim; // next page to throw out.
341     
342     bool recentlyUsed(unsigned int page) {
343         for (int k=0;k<K;k++) if (page==last[k]) return true;
344         return false;
345     }
346
347 public:
348   inline vmNRUReplacementPolicy(unsigned int nPages_, const std::vector<ENTRY_TYPE *> &pageTable_, const std::vector<pageState_t *> &pageState_)
349     : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), Klast(0), victim(0)
350     {
351         for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
352     }
353
354     inline void pageAccessed(unsigned int page)
355     {
356         if (page!=last[Klast]) {
357             Klast++; if (Klast>=K) Klast=0;
358             last[Klast]=page;
359         }
360     }
361
362     inline unsigned int selectPage() {
363         unsigned int last_victim=victim;
364         do {
365             victim++; if (victim>=nPages) victim=0;
366             if (pageTable[victim]
367                 &&pageState[victim]->canPageOut()
368                 &&!recentlyUsed(victim)) {
369                 /* victim is an allocated, unlocked, non-recently-used page: page him out. */
370                 return victim;
371             }
372         } while (victim!=last_victim);
373         return MSA_INVALID_PAGE_NO;  /* nobody is pageable */
374     }
375 };
376
377 //================================================================
378
379 /**
380   UNUSED
381
382   Holds the untyped data for one MSA page.
383   This is the interface MSA_CacheGroup uses to access a cached page.
384   MSA_CacheGroup asks the templated code to create a MSA_Page
385   for each new page, then talks to the page directly.
386 */
387 class MSA_Page {
388 public:
389         virtual ~MSA_Page();
390
391         /**
392           Pack or unpack the data in this page.
393           Used to send and receive pages from the network
394           (or even disk, if somebody needs it.)
395         */
396         virtual void pup(PUP::er &p) =0;
397
398         /**
399           Merge this page's data into our own.
400           Only parts of this page may have been set.
401         */
402         virtual void merge(MSA_Page &otherPage) =0;
403 };
404
405 /**
406   Holds the typed data for one MSA page.
407   Implementation of puppedPage used by the templated code.
408 */
409 template <
410         class ENTRY, 
411         class MERGER=DefaultEntry<ENTRY>,
412         unsigned int ENTRIES_PER_PAGE=MSA_DEFAULT_ENTRIES_PER_PAGE
413 >
414 class MSA_PageT {
415     unsigned int n; // number of entries on this page.  Used to send page updates.
416         /** The contents of this page: array of ENTRIES_PER_PAGE items */
417         ENTRY *data;
418         /** Merger object */
419         MERGER m;
420   bool duplicate;
421
422 public:
423         MSA_PageT():duplicate(false), n(ENTRIES_PER_PAGE) {
424                 data=new ENTRY[ENTRIES_PER_PAGE];
425                 for (int i=0;i<ENTRIES_PER_PAGE;i++){
426                         data[i]=m.getIdentity();
427                 }
428                 //shouldn't n be set to ENTRIES_PER_PAGE
429                 n = ENTRIES_PER_PAGE;
430         }
431     // This constructor is used in PageArray to quickly convert an
432     // array of ENTRY into an MSA_PageT.  So we just make a copy of
433     // the pointer.  When it comes time to destruct the object, we
434     // need to ensure we do NOT delete the data but just discard the
435     // pointer.
436         MSA_PageT(ENTRY *d):data(d), duplicate(true), n(ENTRIES_PER_PAGE) {
437     }
438         MSA_PageT(ENTRY *d, unsigned int n_):data(d), duplicate(true), n(n_) {
439     }
440         virtual ~MSA_PageT() {
441                 if (!duplicate) {
442             delete [] data;
443         }
444         }
445
446         virtual void pup(PUP::er &p) {
447     p | n;
448                 /*this pup routine was broken, It didnt consider the case
449                         in which n > 0 and data = NULL. This is possible when  
450                         sending empty pages. It also doesnt seem to do any allocation
451                         for the data variable while unpacking which seems to be wrong
452                 */
453                 bool nulldata;
454                 if(!p.isUnpacking()){
455                                 nulldata = (data == NULL);
456                 }
457                 p | nulldata;
458                 if(nulldata){
459                                 data = NULL;
460                                 return;
461                 }
462                 if(p.isUnpacking()){
463                                 data = new ENTRY[n];
464                 }
465                 for (int i=0;i<n;i++){
466                         p|data[i];
467                 }       
468         }
469
470         virtual void merge(MSA_PageT<ENTRY, MERGER, ENTRIES_PER_PAGE> &otherPage) {
471                 for (int i=0;i<ENTRIES_PER_PAGE;i++)
472                         m.accumulate(data[i],otherPage.data[i]);
473         }
474
475         // These accessors might be used by the templated code.
476         inline ENTRY &operator[](int i) {return data[i];}
477         inline const ENTRY &operator[](int i) const {return data[i];}
478     inline ENTRY *getData() { return data; }
479 };
480
481 //=============================== Cache Manager =================================
482
483 template <class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE>
484 class MSA_CacheGroup : public Group
485 {
486     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
487
488 protected:
489     ENTRY_OPS_CLASS *entryOpsObject;
490     unsigned int numberOfWorkerThreads;      // number of worker threads across all processors for this shared array
491     // @@ migration?
492     unsigned int numberLocalWorkerThreads;   // number of worker threads on THIS processor for this shared array
493     unsigned int enrollDoneq;                 // has enroll() been done on this processor?
494     MSA_Listeners enrollWaiters;
495     MSA_Listeners syncWaiters;
496     std::set<int> enrolledPEs;                          // which PEs are involved?
497
498     unsigned int nPages;            ///< number of pages
499   std::vector<ENTRY_TYPE*> pageTable;          ///< the page table for this PE: stores actual data.
500     typedef MSA_Page_StateT<ENTRY_TYPE,ENTRIES_PER_PAGE> pageState_t;
501   std::vector<pageState_t *> pageStateStorage; ///< Housekeeping information for each allocated page.
502     
503     /// Return the state for this page, returning NULL if no state available.
504     inline pageState_t *stateN(unsigned int pageNo) {
505         return pageStateStorage[pageNo];
506     }
507     
508     /// Return the state for this page, allocating if needed.
509     pageState_t *state(unsigned int pageNo) {
510         pageState_t *ret=pageStateStorage[pageNo];
511         if (ret==NULL) {
512             ret=new pageState_t;
513             pageStateStorage[pageNo]=ret;
514         }
515         return ret;
516     }
517
518     typedef CProxy_MSA_PageArray<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_PageArray_t;
519     CProxy_PageArray_t pageArray;     // a proxy to the page array
520     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
521     CProxy_CacheGroup_t thisProxy; // a proxy to myself.
522
523     std::map<CthThread, MSA_Thread_Listener *> threadList;
524     /// Look up or create the listener for the current thread.
525     MSA_Thread_Listener *getListener(void) {
526         CthThread t=CthSelf();
527         MSA_Thread_Listener *l=threadList[t];
528                         if (l==NULL) {
529                 l=new MSA_Thread_Listener;
530                 threadList[t]=l;
531                         }
532                         return l;
533     }
534     /// Add our thread to this list and suspend
535     void addAndSuspend(MSA_Listeners &dest) {
536         MSA_Thread_Listener *l=getListener();
537                         dest.add(l);
538                         l->suspend();
539     }
540
541     std::stack<ENTRY_TYPE*> pagePool;     // a pool of unused pages
542     
543   typedef vmNRUReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> vmPageReplacementPolicy;
544     MSA_PageReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> *replacementPolicy;
545
546     // structure for the bounds of a single write
547     typedef struct { unsigned int begin; unsigned int end; } writebounds_t;
548
549     // a list of write bounds associated with a given page
550     typedef std::list<writebounds_t> writelist_t;
551
552     writelist_t** writes;           // the write lists for each page
553
554     unsigned int resident_pages;             // pages currently allocated
555     unsigned int max_resident_pages;         // max allowable pages to allocate
556     unsigned int nEntries;          // number of entries for this array
557     unsigned int syncAckCount;      // number of sync ack's we received
558     int outOfBufferInPrefetch;      // flag to indicate if the last prefetch ran out of buffers
559
560     int syncThreadCount;            // number of local threads that have issued Sync
561     
562     
563     // used during output
564     MSA_WriteSpan_t writeSpans[ENTRIES_PER_PAGE];
565     ENTRY_TYPE writeEntries[ENTRIES_PER_PAGE];
566
567     /*********************************************************************************/
568     /** these routines deal with managing the page queue **/
569
570     // increment the number of pages the thread is waiting on.
571     // also add thread to the page queue; then thread is woken when the page arrives.
572     inline void IncrementPagesWaiting(unsigned int page)
573     {
574         state(page)->readRequests.add(getListener());
575     }
576
577     inline void IncrementChangesWaiting(unsigned int page)
578     {
579         state(page)->writeRequests.add(getListener());
580     }
581
582 /************************* Page allocation and management **************************/
583     
584     /// Allocate a new page, removing old pages if we're over the limit.
585     /// Returns NULL if no buffer space is available
586     inline ENTRY_TYPE* tryBuffer(int async=0) // @@@
587     {
588         ENTRY_TYPE* nu = NULL;
589
590         // first try the page pool
591         if(!pagePool.empty())
592         {
593             nu = pagePool.top();
594             pagePool.pop();
595         }
596
597         // else try to allocate the buffer
598         if(nu == NULL && resident_pages < max_resident_pages)
599         {
600             nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
601             resident_pages++;
602         }
603
604         // else swap out one of the pages
605         if(nu == NULL)
606         {
607             int pageToSwap = replacementPolicy->selectPage();
608             if(pageToSwap != MSA_INVALID_PAGE_NO)
609             {
610                 CkAssert(pageTable[pageToSwap] != NULL);
611                 CkAssert(state(pageToSwap)->canPageOut() == true);
612                 
613                 relocatePage(pageToSwap, async);
614                 nu = pageTable[pageToSwap];
615                 pageTable[pageToSwap] = 0;
616                 delete pageStateStorage[pageToSwap];
617                 pageStateStorage[pageToSwap]=0;
618             }
619         }
620
621         // otherwise return NULL
622
623         return nu;
624     }
625     
626     /// Allocate storage for this page, if none has been allocated already.
627     ///  Update pageTable, and return the storage for the page.
628     inline ENTRY_TYPE* makePage(unsigned int page) // @@@
629     {
630         ENTRY_TYPE* nu=pageTable[page];
631         if (nu==0) {
632             nu=tryBuffer();
633             if (nu==0) CkAbort("MSA: No available space to create pages.\n");
634             pageTable[page]=nu;
635         }
636         return nu;
637     }
638     
639     /// Throw away this allocated page.
640     ///  Returns the page itself, for deletion or recycling.
641     ENTRY_TYPE* destroyPage(unsigned int page)
642     {
643         ENTRY_TYPE* nu=pageTable[page];
644         pageTable[page] = 0;
645         if (pageStateStorage[page]->canDelete()) {
646                 delete pageStateStorage[page];
647                 pageStateStorage[page]=0;
648         }
649         resident_pages--;
650         return nu;
651     }
652     
653     //MSA_CacheGroup::
654     void pageFault(unsigned int page, MSA_Page_Fault_t why)
655     {
656         // Write the page to the page table
657         state(page)->state = why;
658         if(why == Read_Fault)
659         { // Issue a remote request to fetch the new page
660             // If the page has not been requested already, then request it.
661             if (stateN(page)->readRequests.size()==0) {
662                 pageArray[page].GetPage(CkMyPe());
663                 //ckout << "Requesting page first time"<< endl;
664             } else {
665                 ;//ckout << "Requesting page next time.  Skipping request."<< endl;
666             }
667             MSA_Thread_Listener *l=getListener();
668             stateN(page)->readRequests.add(l);
669             l->suspend(); // Suspend until page arrives.
670         }
671         else {
672             // Build an empty buffer into which to create the new page
673             ENTRY_TYPE* nu = makePage(page);
674             writeIdentity(nu);
675         }
676     }
677     
678     /// Make sure this page is accessible, faulting the page in if needed.
679     // MSA_CacheGroup::
680     inline void accessPage(unsigned int page,MSA_Page_Fault_t access)
681     {
682         if (pageTable[page] == 0) {
683 //             ckout << "p" << CkMyPe() << ": Calling pageFault" << endl;
684             pageFault(page, access);
685         }
686 #ifndef CMK_OPTIMIZE
687         if (stateN(page)->state!=access) {
688             CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
689             CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
690         }
691 #endif
692         replacementPolicy->pageAccessed(page);
693     }
694
695     // MSA_CacheGroup::
696     // Fill this page with identity values, to prepare for writes or 
697     //  accumulates.
698     void writeIdentity(ENTRY_TYPE* pagePtr)
699     {
700         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
701             pagePtr[i] = entryOpsObject->getIdentity();
702     }
703     
704 /************* Page Flush and Writeback *********************/
705     bool shouldWriteback(unsigned int page) {
706         if (!pageTable[page]) return false;
707         return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
708     }
709     
710     inline void relocatePage(unsigned int page, int async)
711     {
712         //CkAssert(pageTable[page]);
713         if(shouldWriteback(page))
714         {
715             // the page to be swapped is a writeable page. So inform any
716             // changes this node has made to the page manager
717             sendChangesToPageArray(page, async);
718         }
719     }
720
721     inline void sendChangesToPageArray(const unsigned int page, const int async)
722     {
723         sendRLEChangesToPageArray(page);
724         
725         MSA_Thread_Listener *l=getListener();
726         state(page)->writeRequests.add(l);
727         if (!async)
728             l->suspend(); // Suspend until page is really gone.
729         // TODO: Are write acknowledgements really necessary ?
730     }
731
732     // Send the page data as a contiguous block.
733     //   Note that this is INCORRECT when writes to pages overlap!
734     inline void sendNonRLEChangesToPageArray(const unsigned int page) // @@@
735     {
736         pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
737     }
738     
739     // Send the page data as an RLE block.
740     // this function assumes that there are no overlapping writes in the list
741     inline void sendRLEChangesToPageArray(const unsigned int page)
742     {
743         ENTRY_TYPE *writePage=pageTable[page];
744         int nSpans=stateN(page)->writeSpans(writeSpans);
745         if (nSpans==1) 
746         { /* common case: can make very fast */
747             int nEntries=writeSpans[0].end-writeSpans[0].start;
748             if (entryOpsObject->pupEveryElement()) {
749                 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
750                     page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
751                     CkMyPe(),stateN(page)->state);
752             } else {
753                 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
754                     &writePage[writeSpans[0].start], nEntries,
755                     CkMyPe(),stateN(page)->state);
756             }
757         } 
758         else /* nSpans>1 */ 
759         { /* must copy separate spans into a single output buffer (luckily rare) */
760             int nEntries=0;
761             for (int s=0;s<nSpans;s++) {
762                 for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
763                     writeEntries[nEntries++]=writePage[i]; // calls assign
764             }
765             if (entryOpsObject->pupEveryElement()) {
766                 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
767                                                         page_t(writeEntries,nEntries),nEntries,
768                                                         CkMyPe(),stateN(page)->state);
769             } else {
770                 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
771                                                writeEntries,nEntries,
772                                                CkMyPe(),stateN(page)->state);
773             }
774         }
775     }
776
777 /*********************** Public Interface **********************/
778 public:
779     // 
780     //
781     // MSA_CacheGroup::
782   inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
783                                                 unsigned int max_bytes_, unsigned int nEntries_, 
784                                                 unsigned int numberOfWorkerThreads_)
785         : numberOfWorkerThreads(numberOfWorkerThreads_),
786           nPages(nPages_),
787           nEntries(nEntries_), 
788           pageTable(nPages, NULL),
789           pageStateStorage(nPages, NULL),
790           pageArray(pageArrayID),
791           thisProxy(thisgroup),
792           max_resident_pages(max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE)),
793           entryOpsObject(new ENTRY_OPS_CLASS),
794           replacementPolicy(new vmPageReplacementPolicy(nPages, pageTable, pageStateStorage)),
795           outOfBufferInPrefetch(0), syncAckCount(0),syncThreadCount(0),
796           resident_pages(0),numberLocalWorkerThreads(0), enrollDoneq(0)
797   {
798         MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
799   }
800
801     // MSA_CacheGroup::
802     inline ~MSA_CacheGroup()
803     {
804         FreeMem();
805     }
806
807     /* To change the accumulate function TBD @@ race conditions */
808     inline void changeEntryOpsObject(ENTRY_OPS_CLASS *e) {
809         entryOpsObject = e;
810         pageArray.changeEntryOpsObject(e);
811     }
812
813     // MSA_CacheGroup::
814     inline const ENTRY_TYPE* readablePage(unsigned int page)
815     {
816         accessPage(page,Read_Fault);
817         
818         return pageTable[page];
819     }
820
821     // MSA_CacheGroup::
822     //
823     // known local page
824     inline const void* readablePage2(unsigned int page)
825     {
826         return pageTable[page];
827     }
828
829     // MSA_CacheGroup::
830     // Obtains a writable copy of the page.
831     inline ENTRY_TYPE* writeablePage(unsigned int page, unsigned int offset)
832     {
833         accessPage(page,Write_Fault);
834
835         // NOTE: Since we assume write once semantics, i.e. between two calls to sync,
836         // either there can be no write to a location or a single write to a location,
837         // a readable page will suffice as a writeable page too, because no one else
838         // is going to write to this location. In reality, two locations on the *same*
839         // page can be written by two different threads, in which case we will need
840         // to keep track of which parts of the page have been written, hence:
841         stateN(page)->write(offset);
842 //     ckout << "write:" << page*ENTRIES_PER_PAGE+offset << endl;
843         
844         return pageTable[page];
845     }
846
847     // MSA_CacheGroup::
848     inline ENTRY_TYPE &accumulate(unsigned int page, unsigned int offset)
849     {
850         accessPage(page,Accumulate_Fault);
851         stateN(page)->write(offset);
852         return pageTable[page][offset];
853     }
854
855     /// A requested page has arrived from the network.
856     ///  nEntriesInPage_ = num entries being sent (0 for empty page, num entries otherwise)
857     inline void ReceivePageWithPUP(unsigned int page, page_t &pageData, int size)
858     {
859         ReceivePage(page, pageData.getData(), size);
860     }
861
862     inline void ReceivePage(unsigned int page, ENTRY_TYPE* pageData, int size)
863     {
864         CkAssert(0==size || ENTRIES_PER_PAGE == size);
865         // the page we requested has been received
866         ENTRY_TYPE *nu=makePage(page);
867         if(size!=0)
868         {
869             for(unsigned int i = 0; i < size; i++)
870                 nu[i] = pageData[i]; // @@@, calls assignment operator
871         }
872         else /* isEmpty */
873         {
874             // the page we requested for is empty, so we can just initialize it.
875             writeIdentity(nu);
876         }
877         
878         state(page)->readRequests.signal(page);
879     }
880
881     // This EP is invoked during sync to acknowledge that a dirty page
882     // has been received and written back to the page owner.  We keep track
883     // of the number of ack's yet to arrive in nChangesWaiting.  Once
884     // all the dirty pages have been ack'd, we awaken the thread that
885     // flushed the page.
886     //
887     // It's not clear this is useful very often...
888     //
889     // MSA_CacheGroup::
890     inline void AckPage(unsigned int page)
891     {
892         state(page)->writeRequests.signal(page);
893     }
894
895     // MSA_CacheGroup::
896     // synchronize all the pages and also clear up the cache
897     inline void SyncReq(int single)
898     {
899           MSADEBPRINT(printf("SyncReq single %d\n",single););
900           if(single)
901         {
902             /*ask all the caches to send their updates to the page array, but we don't need to empty the caches on the other PEs*/
903             SingleSync();
904             EmptyCache();
905
906             getListener()->suspend();
907         }
908         else{
909             Sync();
910                                 }
911     }
912
913     // MSA_CacheGroup::
914     inline void FlushCache()
915     {
916         // flush the local cache
917         // for each writeable page, send that page to the array element
918         for(unsigned int i = 0; i < nPages; i++)
919         {
920             if(shouldWriteback(i)) {
921                 //ckout << "p" << CkMyPe() << "FlushCache: sending page " << i << endl;
922                 sendChangesToPageArray(i, 1);
923             }
924         }
925     }
926
927     // MSA_CacheGroup::
928     void EmptyCache()
929     {
930         /* just makes all the pages empty, assuming that the data in those pages has been flushed to the owners */
931         for(unsigned int i = 0; i < nPages; i++)
932         {
933             if(pageTable[i]) pagePool.push(destroyPage(i));
934         }
935     }
936
937 /************************ Enroll ********************/
938     /// Enroll phase 1: called by users.
939     // MSA_CacheGroup::
940     inline void enroll(unsigned int num_workers)
941     {
942         CkAssert(num_workers == numberOfWorkerThreads); // just to verify
943         CkAssert(enrollDoneq == 0);
944         numberLocalWorkerThreads++;
945         // @@ how to ensure that enroll is called only once?
946
947         //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
948         thisProxy[0].enrollAck(CkMyPe());
949         //ckout << "[" << CkMyPe() << "] suspening thread in Sync() " << endl;
950         addAndSuspend(enrollWaiters);
951         //ckout << "[" << CkMyPe() << "] rsuming thread in Sync()" << endl;
952
953         CkAssert(enrollDoneq == 1);
954         return;
955     }
956
957     /// Enroll phase 2: called on PE 0 from everywhere
958     inline void enrollAck(int originator)
959     {
960         CkAssert(CkMyPe() == 0);  // enrollAck is only called on PE 0
961         CkAssert(enrollDoneq == 0);  // prevent multiple enroll operations
962         
963         syncAckCount++;
964         enrolledPEs.insert(originator);
965         //ckout << "[" << CkMyPe() << "] SyncAckcount = " << syncAckCount << endl;
966         if(syncAckCount == numberOfWorkerThreads) {
967 //             ckout << "[" << CkMyPe() << "]" << "Enroll operation is almost done" << endl;
968             syncAckCount = 0;
969             enrollDoneq = 1;
970             // What if fewer worker threads than pe's ?  Handled in
971             // enrollDone.
972             thisProxy.enrollDone();
973         }
974     }
975
976     /// Enroll phase 3: called everywhere by PE 0
977     inline void enrollDone()
978     {
979 //         ckout << "[" << CkMyPe() << "] enrollDone.  Waking threads."
980 //               <<  " numberOfWorkerThreads=" << numberOfWorkerThreads
981 //               <<  " local=" << numberLocalWorkerThreads << endl;
982         enrollDoneq = 1;
983         enrollWaiters.signal(0);
984     }
985
986 /******************************** Sync & writeback ***********************/
987     // MSA_CacheGroup::
988     inline void SingleSync()
989     {
990         /* a single thread issued a sync call with all = 1. The first thing to do is to flush the local cache */
991         FlushCache();
992     }
993
994     // MSA_CacheGroup::
995     inline void Sync()
996     {
997         syncThreadCount++;
998         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
999         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
1000                 //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
1001
1002         // First, all threads on this processor need to reach the sync
1003         // call; only then can we proceed with merging the data.  Only
1004         // the last thread on this processor needs to do the FlushCache,
1005         // etc.  Others just suspend until the sync is over.
1006                                 MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
1007         if(syncThreadCount < numberLocalWorkerThreads)
1008         {
1009                   MSADEBPRINT(printf("Sync addAndSuspend \n"););
1010             addAndSuspend(syncWaiters);
1011             return;
1012         }
1013                 
1014         //ckout << "[" << CkMyPe() << "] Sync started" << endl;
1015
1016         // flush the cache asynchronously and also empty it
1017         FlushCache();
1018         // idea: instead of invalidating the pages, switch it to read
1019         // mode. That will not work, since the page may have also been
1020         // modified by another thread.
1021         EmptyCache();
1022
1023         // Now, we suspend too (if we had at least one dirty page).
1024         // We will be awoken when all our dirty pages have been
1025         // written and acknowledged.
1026                 MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
1027         getListener()->suspend();
1028                 MSADEBPRINT(printf("Sync awakening after suspend\n"););
1029
1030         // So far, the sync has been asynchronous, i.e. PE0 might be ahead
1031         // of PE1.  Next we basically do a barrier to ensure that all PE's
1032         // are synchronized.
1033
1034         // at this point, the sync's across the group should
1035         // synchronize among themselves by each one sending
1036         // a sync acknowledge message to PE 0. (this is like
1037         // a reduction over a group)
1038         if(CkMyPe() != 0)
1039         {
1040             thisProxy[0].SyncAck();
1041         }
1042         else /* I *am* PE 0 */
1043         {
1044             SyncAck();
1045         }
1046                                 MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
1047         /* Wait until sync is reflected from PE 0 */
1048         addAndSuspend(syncWaiters);
1049                                 
1050                 MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
1051                 //ckout << "[" << CkMyPe() << "] Sync finished" << endl;        
1052     }
1053
1054     inline unsigned int getNumEntries() { return nEntries; }
1055     inline CProxy_PageArray_t getArray() { return pageArray; }
1056
1057     // TODO: Can this SyncAck and other simple Acks be made efficient?
1058     inline void SyncAck()
1059     {
1060         CkAssert(CkMyPe() == 0);  // SyncAck is only called on PE 0
1061         syncAckCount++;
1062         // DONE @@ what if fewer worker threads than pe's ?
1063         // @@ what if fewer worker threads than pe's and >1 threads on 1 pe?
1064         //if(syncAckCount == min(numberOfWorkerThreads, CkNumPes())){
1065                 if (syncAckCount == enrolledPEs.size()) {
1066                         MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",nEntries,nPages););
1067                         pageArray.Sync();
1068                 }               
1069     }
1070
1071     inline void SyncDone()
1072     {
1073         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1074         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1075         /* Reset for next sync */
1076         syncThreadCount = 0;
1077         syncAckCount = 0;
1078                                 MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
1079         syncWaiters.signal(0);
1080     }
1081
1082     inline void FreeMem()
1083     {
1084         for(unsigned int i = 0; i < nPages; i++)
1085         {
1086             if(pageTable[i]) delete [] destroyPage(i);
1087         }
1088
1089         while(!pagePool.empty())
1090         {
1091             delete [] pagePool.top();  // @@@
1092             pagePool.pop();
1093         }
1094         
1095         resident_pages=0;
1096     }
1097
1098                 /** 
1099                         Deregister a client. Decremenet the number of local threads. If total number of local threads 
1100                         hits 0 FreeMem()
1101                 */
1102                 inline void unroll(){
1103         numberLocalWorkerThreads--;
1104                                 if(numberLocalWorkerThreads == 0){
1105                                         FreeMem();
1106                                 }
1107                 }
1108
1109     /**
1110      * Issue a prefetch request for the given range of pages. These pages will
1111      * be locked into the cache, so that they will not be swapped out.
1112      */
1113     inline void Prefetch(unsigned int pageStart, unsigned int pageEnd)
1114     {
1115         /* prefetching is feasible only if we we did not encounter an out
1116          * of buffer condition in the previous prefetch call
1117          */
1118         if(!outOfBufferInPrefetch)
1119         {
1120             //ckout << "prefetching pages " << pageStart << " through " << pageEnd << endl;
1121             for(unsigned int p = pageStart; p <= pageEnd; p++)
1122             {
1123                 if(NULL == pageTable[p])
1124                 {
1125
1126                     /* relocate the buffer asynchronously */
1127                     ENTRY_TYPE* nu = tryBuffer(1);
1128                     if(NULL == nu)
1129                     {
1130                         /* signal that sufficient buffer space is not available */
1131                         outOfBufferInPrefetch = 1;
1132                         break;
1133                     }
1134
1135                     pageTable[p] = nu;
1136                     state(p)->state = Read_Fault;
1137
1138                     pageArray[p].GetPage(CkMyPe());
1139                     IncrementPagesWaiting(p);
1140                     //ckout << "Prefetch page" << p << ", pages waiting = " << nPagesWaiting << endl;
1141                     /* don't suspend the thread */
1142                 }
1143
1144                 /* mark the page as being locked */
1145                 state(p)->locked = true;
1146             }
1147         }
1148     }
1149
1150     /**
1151      * Wait for all the prefetch pages to be fetched into the cache.
1152      * Returns: 0 if prefetch successful, 1 if not
1153      */
1154     inline int WaitAll(void)
1155     {
1156         if(outOfBufferInPrefetch)
1157         {
1158             // we encountered out of buffer in the previous prefetch call, return error
1159             outOfBufferInPrefetch = 0;
1160             getListener()->suspend();
1161             UnlockPages();
1162             return 1;
1163         }
1164         else
1165         {
1166             // prefetch requests have been successfully issued already, so suspend the
1167             // thread and wait for completion
1168             outOfBufferInPrefetch = 0;
1169             getListener()->suspend();
1170             return 0;
1171         }
1172     }
1173     
1174     inline void UnlockPage(unsigned int page) {
1175         pageState_t *s=stateN(page);
1176         if(s && s->locked) {
1177             replacementPolicy->pageAccessed(page);
1178             s->locked = false;
1179         }
1180     }
1181
1182     /**
1183      * Unlock all the pages locked in the cache
1184      */
1185     inline void UnlockPages()
1186     {
1187         // add all the locked pages to page replacement policy
1188         for(unsigned int page = 0; page < nPages; page++)
1189             UnlockPage(page);
1190     }
1191
1192     /**
1193      * Unlock the given pages: [startPage ... endPage]
1194      *  Note that the range is inclusive.
1195      */
1196     inline void UnlockPages(unsigned int startPage, unsigned int endPage)
1197     {
1198         for(unsigned int page = startPage; page <= endPage; page++)
1199             UnlockPage(page);
1200     }
1201
1202     /// Debugging routine
1203     inline void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset)
1204     {
1205         CkAssert( pageNum < nPages );
1206         CkAssert( offset < ENTRIES_PER_PAGE );
1207
1208         //ckout << "p" << CkMyPe() << "ID" << ID;
1209 //         if (pageTable[pageNum] == 0)
1210 //             ckout << "emitBufferValue: page " << pageNum << " not available in local cache." << endl;
1211 //         else
1212 //             ckout << "emitBufferValue: [" << pageNum << "," << offset << "] = " << pageTable[pageNum][offset] << endl;
1213     }
1214 };
1215
1216 // each element of this array is responsible for managing
1217 // the information about a single page. It is in effect the
1218 // "owner" as well as the "manager" for that page.
1219 //
1220 template<class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE> 
1221 class MSA_PageArray : public ArrayElement1D
1222 {
1223     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
1224     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
1225     
1226 protected:
1227     ENTRY_TYPE *epage;
1228     ENTRY_OPS_CLASS entryOpsObject;
1229     CProxy_CacheGroup_t cache;
1230
1231     unsigned int pageNo() { return thisIndex; }
1232
1233     inline void allocatePage(MSA_Page_Fault_t access) // @@@
1234     {
1235         if(epage == NULL)
1236         {
1237             epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
1238             if (access==Accumulate_Fault)
1239                writeIdentity();
1240         }
1241     }
1242
1243     // begin and end are indexes into the page.
1244     inline void set(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1245     {
1246         //ckout << "set: " << begin << "," << end << endl;
1247         for(unsigned int i = 0; i < (end - begin); i++) {
1248             epage[begin + i] = buffer[i]; // @@@, calls assignment operator
1249             //ckout << "set val[" << begin+i << "]=" << buffer[i] << endl;
1250         }
1251     }
1252
1253     // MSA_PageArray::
1254     inline void combine(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1255     {
1256         ENTRY_TYPE* pagePtr = epage + begin;
1257         for(unsigned int i = 0; i < (end - begin); i++)
1258             entryOpsObject.accumulate(pagePtr[i], buffer[i]);
1259     }
1260
1261     // MSA_PageArray::
1262     inline void writeIdentity()
1263     {
1264         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
1265             epage[i] = entryOpsObject.getIdentity();
1266     }
1267
1268 public:
1269     inline MSA_PageArray() : epage(NULL) { }
1270     inline MSA_PageArray(CkMigrateMessage* m) { delete m; }
1271     
1272     void setCacheProxy(CProxy_CacheGroup_t &cache_)
1273     {
1274        cache=cache_;
1275     }
1276     
1277     virtual void pup(PUP::er& p)
1278     {
1279         ArrayElement1D::pup(p);
1280         int epage_present=(epage!=0);
1281         p|epage_present;
1282         if (epage_present) {
1283           if(p.isUnpacking())
1284             allocatePage(Write_Fault);
1285           for (int i=0;i<ENTRIES_PER_PAGE;i++)
1286             p|epage[i];
1287         }
1288     }
1289     
1290     inline ~MSA_PageArray()
1291     {
1292         if(epage) delete [] epage;
1293     }
1294
1295     /// Request our page.
1296     ///   pe = to which to send page
1297     inline void GetPage(int pe)
1298     {
1299         if(epage == NULL) {
1300             // send empty page
1301             if (entryOpsObject.pupEveryElement())
1302                 cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
1303             else
1304                 cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
1305         } else {
1306             // send page with data
1307             if (entryOpsObject.pupEveryElement())
1308                 cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
1309             else
1310                 cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);  // send page with data                
1311         }
1312     }
1313
1314     /// Receive a non-runlength encoded page from the network:
1315     // @@ TBD: ERROR: This does not work for  varsize pages.
1316     inline void PAReceivePage(ENTRY_TYPE *pageData,
1317                             int pe, MSA_Page_Fault_t pageState)
1318     {
1319         allocatePage(pageState);
1320
1321         if(pageState == Write_Fault)
1322             set(pageData, 0, ENTRIES_PER_PAGE);
1323         else
1324             combine(pageData, 0, ENTRIES_PER_PAGE);
1325         
1326         // send the acknowledgement to the sender that we received the page
1327         //ckout << "Sending Ack to PE " << pe << endl;
1328         cache[pe].AckPage(thisIndex);
1329     }
1330
1331     /// Receive a runlength encoded page from the network:
1332     inline void PAReceiveRLEPageWithPup(
1333         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1334         MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> &entries, unsigned int nEntries, 
1335         int pe, MSA_Page_Fault_t pageState)
1336     {
1337         PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
1338     }
1339
1340
1341     inline void PAReceiveRLEPage(
1342         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1343         const ENTRY_TYPE *entries, unsigned int nEntries, 
1344         int pe, MSA_Page_Fault_t pageState)
1345     {
1346         allocatePage(pageState);
1347         
1348         //ckout << "p" << CkMyPe() << "ReceiveRLEPage nSpans=" << nSpans << " nEntries=" << nEntries << endl;
1349         int e=0; /* consumed entries */
1350         for (int s=0;s<nSpans;s++) {
1351             if(pageState == Write_Fault)
1352                 set(&entries[e], spans[s].start,spans[s].end);
1353             else /* Accumulate_Fault */
1354                 combine(&entries[e], spans[s].start,spans[s].end);
1355             e+=spans[s].end-spans[s].start;
1356         } 
1357
1358         // send the acknowledgement to the sender that we received the page
1359         //ckout << "Sending AckRLE to PE " << pe << endl;
1360         cache[pe].AckPage(thisIndex);
1361     }
1362
1363     // MSA_PageArray::
1364     inline void Sync()
1365     {
1366                                 MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n");); 
1367         contribute(0, NULL, CkReduction::concat);
1368     }
1369
1370     inline void emit(int ID, int index)
1371     {
1372           //ckout << "p" << CkMyPe() << "ID" << ID;
1373 //         if(epage == NULL)
1374 //             ckout << "emit: epage is NULL" << endl;
1375 //         else
1376 //             ckout << "emit: " << epage[index] << endl;
1377     }
1378 };
1379
1380 #define CK_TEMPLATES_ONLY
1381 #include "msa.def.h"
1382 #undef CK_TEMPLATES_ONLY
1383
1384 #endif