e65e942a26fe1b22a877b3720943d659b0493c80
[charm.git] / src / libs / ck-libs / multiphaseSharedArrays / msa-DistPageMgr.h
1 // emacs mode line -*- mode: c++; tab-width: 4 -*-
2
3 #ifndef MSA_DISTPAGEMGR_H
4 #define MSA_DISTPAGEMGR_H
5
6 #include <charm++.h>
7 #include <string.h>
8 #include <list>
9 #include <stack>
10 #include <map>
11 #include <set>
12 #include "msa-common.h"
13
14 // forward decl needed in msa-DistPageMgr.ci, i.e. in msa.decl.h
15
16 /// Stores a list of indices to be written out.
17 struct MSA_WriteSpan_t {
18     int start,end;
19     inline void pup(PUP::er &p) {
20         p|start; p|end;
21     }
22 };
23
24 template <class ENTRY, class MERGER,
25           unsigned int ENTRIES_PER_PAGE>
26          class MSA_PageT;
27 #include "msa.decl.h"
28
29 using namespace std;
30
31 inline int mymin(int a, int b)
32 {
33     return (a<b)?a:b;
34 }
35
36 //=======================================================
37 // Utility Classes
38
39 /// Listens for some event on a page or set of pages.
40 class MSA_Listener {
41 public:
42         MSA_Listener() {}
43         virtual ~MSA_Listener();
44         /// Getting added to a lister list.
45         virtual void add(void) =0;
46         /// Event waiting for has occurred.
47         virtual void signal(unsigned int pageNo) =0;
48 };
49
50 /// Keeps a list of MSA_Listeners
51 class MSA_Listeners {
52         CkVec<MSA_Listener *> listeners;
53 public:
54         MSA_Listeners();
55         ~MSA_Listeners();
56         
57         /// Add this listener to your set.  Calls l->add().
58         void add(MSA_Listener *l);
59         
60         /// Return the number of listeners in our set.
61         unsigned int size(void) const {return listeners.size();}
62         
63         /// Signal all added listeners and remove them from the set.
64         void signal(unsigned int pageNo);
65 };
66
67
68 /** Resumes a thread once all needed pages have arrived */
69 class MSA_Thread_Listener : public MSA_Listener {
70         CthThread thread;    // the suspended thread of execution (0 if not suspended)
71         int count;  // number of pages we're still waiting for
72 public:
73         MSA_Thread_Listener() :thread(0), count(0) {}
74         
75         /// Wait for one more page.
76         void add(void);
77         
78         /// If we're waiting for any pages, suspend our thread.
79         void suspend(void);
80         
81         /// Another page arrived.
82         void signal(unsigned int pageNo);
83 };
84
85
86 /// Stores all housekeeping information about a cached copy of a page: 
87 ///   everything but the actual page data.
88 /// This is the non-templated superclass, which doesn't depend on the 
89 ///   page size or datatype.
90 class MSA_Page_State {
91 public:
92         /// e.g., Read_Fault for a read-only page.
93         MSA_Page_Fault_t state;
94         
95         /// If true, this page is locked in memory.
96         ///   Pages get locked so people can safely use the non-checking version of "get".
97         bool locked;
98         
99         /// Threads waiting for this page to be paged in from the network.
100         MSA_Listeners readRequests;
101         /// Threads waiting for this page to be paged out to the network.
102         MSA_Listeners writeRequests;
103         
104         /// Return true if this page can be safely written back.
105         bool canPageOut(void) const {
106                 return (!locked) && canDelete();
107         }
108         
109         /// Return true if this page can be safely purged from memory.
110         bool canDelete(void) const {
111                 return (readRequests.size()==0) && (writeRequests.size()==0);
112         }
113         
114         MSA_Page_State() :state(Uninit_State), locked(false) {}
115 };
116
117 /// Fast, fixed-size bitvector class.
118 template <unsigned int NUM_BITS>
119 class fixedlength_bitvector {
120 public:
121         /// Data type used to store actual bits in the vector.
122         typedef unsigned long store_t;
123         enum { store_bits=8*sizeof(store_t) };
124         
125         /// Number of store_t's in our vector.
126         enum { len=(NUM_BITS+(store_bits-1))/store_bits };
127         store_t store[len];
128         
129         /// Fill the entire vector with this value.
130         void fill(store_t s) {
131                 for (int i=0;i<len;i++) store[i]=s;
132         }
133         void clear(void) {fill(0);}
134         fixedlength_bitvector() {clear();}
135         
136         /// Set-to-1 bit i of the vector.
137         void set(unsigned int i) { store[i/store_bits] |= (1lu<<(i%store_bits)); }
138         /// Clear-to-0 bit i of the vector.
139         void clear(unsigned int i) { store[i/store_bits] &= ~(1lu<<(i%store_bits)); }
140         
141         /// Return the i'th bit of the vector.
142         bool get(unsigned int i) { return (store[i/store_bits] & (1lu<<(i%store_bits))); }
143 };
144
145 /// Templated page housekeeping class.  
146 template <class ENTRY, unsigned int ENTRIES_PER_PAGE>
147 class MSA_Page_StateT : public MSA_Page_State {
148 public:
149         /** Write tracking:
150            Somehow, we have to identify the entries in a page 
151          that have been written to.  Our method for doing this is
152          a bitvector: 0's indicate the entry hasn't been written; 
153          1's indicate the entry has been written.
154         */
155         typedef fixedlength_bitvector<ENTRIES_PER_PAGE> writes_t;
156         writes_t writes;
157         typedef typename writes_t::store_t writes_store_t;
158         enum {writes_bits=writes_t::store_bits};
159         
160         /// Tracking writes to our writes: a smaller vector, used to 
161         ///  avoid the large number of 0's in the writes vector.
162         ///  Bit i of writes2 indicates that store_t i of writes has 1's.
163         typedef fixedlength_bitvector<writes_t::len> writes2_t;
164         writes2_t writes2;
165         typedef typename writes2_t::store_t writes2_store_t;
166         enum {writes2_bits=writes2_t::store_bits*writes_t::store_bits};
167         
168         /// Write entry i of this page.
169         void write(unsigned int i) {
170                 writes.set(i);
171                 writes2.set(i/writes_t::store_bits);
172         }
173         
174         /// Clear the write list for this page.
175         void writeClear(void) {
176                 for (int i2=0;i2<writes2_t::len;i2++)
177                 if (writes2.store[i2]) { /* some bits set: clear them all */
178                         int o=i2*writes2_t::store_bits;
179                         for (int i=0;i<writes_t::len;i++) 
180                                 writes.store[o+i]=0;
181                         writes2.store[i2]=0;
182                 }
183         }
184         
185         /// Return the nearest multiple of m >= v.
186         inline int roundUp(int v,int m) {
187                 return (v+m-1)/m*m;
188         }
189         
190         /// Get a list of our written output values as this list of spans.
191         ///   Returns the total number of spans written to "span".
192         int writeSpans(MSA_WriteSpan_t *span) {
193                 int nSpans=0;
194                 
195                 int cur=0; // entry we're looking at
196                 while (true) {
197                         /* skip over unwritten space */
198                         while (true) { 
199                                 if (writes2.store[cur/writes2_bits]==(writes2_store_t)0) 
200                                         cur=roundUp(cur+1,writes2_bits);
201                                 else if (writes.store[cur/writes_bits]==(writes_store_t)0)
202                                         cur=roundUp(cur+1,writes_bits); 
203                                 else if (writes.get(cur)==false)
204                                         cur++;
205                                 else /* writes.get(cur)==true */
206                                         break;
207                                 if (cur>=ENTRIES_PER_PAGE) return nSpans;
208                         }
209                         /* now writes.get(cur)==true */
210                         span[nSpans].start=cur;
211                         /* skip over written space */
212                         while (true) { 
213                                 /* // 0-1 symmetry doesn't hold here, since writes2 may have 1's, but writes may still have some 0's...
214                                 if (writes2.store[cur/writes2_bits]==~(writes2_store_t)0) 
215                                         cur=roundUp(cur+1,writes2_bits);
216                                 else */
217                                 if (writes.store[cur/writes_bits]==~(writes_store_t)0)
218                                         cur=roundUp(cur+1,writes_bits); 
219                                 else if (writes.get(cur)==true)
220                                         cur++;
221                                 else /* writes.get(cur)==false */
222                                         break;
223                                 if (cur>=ENTRIES_PER_PAGE) {
224                                         span[nSpans++].end=ENTRIES_PER_PAGE; /* finish the last span */
225                                         return nSpans;
226                                 }
227                         }
228                         /* now writes.get(cur)==false */
229                         span[nSpans++].end=cur;
230                 }
231         }
232 };
233
234
235 //=======================================================
236 // Page-out policy
237
238 /**
239   class vmLRUPageReplacementPolicy
240   This class provides the functionality of least recently used page replacement policy.
241   It needs to be notified when a page is accessed using the pageAccessed() function and
242   a page can be selected for replacement using the selectPage() function.
243  
244   WARNING: a list is absolutely the wrong data structure for this class, 
245      because it makes *both* updating as well as searching for a page O(n),
246      where n is the number of pages.  A heap would be a better choice,
247      as both operations would then become O(lg(n))
248  */
249 class vmLRUReplacementPolicy
250 {
251 protected:
252     unsigned int nPages;            // number of pages
253     const page_ptr_t* pageTable; // actual data for pages (NULL means page is gone)
254     MSA_Page_State** pageState;  // state of each page
255     list<unsigned int> stackOfPages;
256     unsigned int lastPageAccessed;
257
258 public:
259     inline vmLRUReplacementPolicy(unsigned int nPages_,const page_ptr_t* pageTable_, MSA_Page_State ** pageState_)
260     : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
261
262     inline void pageAccessed(unsigned int page)
263     {
264         if(page != lastPageAccessed)
265         {
266             lastPageAccessed = page;
267
268             // delete this page from the stack and push it at the top
269             list<unsigned int>::iterator i;
270             for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
271                 if(*i == page)
272                     i = stackOfPages.erase(i);
273
274             stackOfPages.push_back(page);
275         }
276     }
277
278     inline unsigned int selectPage()
279     {
280         if(stackOfPages.size() == 0)
281             return MSA_INVALID_PAGE_NO;
282
283         // find a non-empty unlocked page to swap, delete all empty pages from the stack
284         list<unsigned int>::iterator i = stackOfPages.begin();
285         while(i != stackOfPages.end())
286         {
287             if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
288             else if(!pageState[*i]->canPageOut()) i++;
289             else break;
290         }
291
292         if(i != stackOfPages.end())
293             return *i;
294         else
295             return MSA_INVALID_PAGE_NO;
296     }
297 };
298
299 /**
300   class vmNRUPageReplacementPolicy
301   This class provides the functionality of not-recently-used page replacement policy.
302   It needs to be notified when a page is accessed using the pageAccessed() function and
303   a page can be selected for replacement using the selectPage() function.
304   
305   "not-recently-used" could replace any page that has not been used in the 
306   last K accesses; that is, it's a memory-limited version of LRU.
307   
308   pageAccessed is O(1).
309   selectPage best-case is O(K) (if we immediately find a doomed page); 
310              worst-case is O(K n) (if there are no doomed pages).
311  */
312 class vmNRUReplacementPolicy
313 {
314 protected:
315     unsigned int nPages;            // number of pages
316     const page_ptr_t* pageTable; // actual pages (NULL means page is gone)
317     MSA_Page_State** pageState;  // state of each page
318     enum {K=5}; // Number of distinct pages to remember
319     unsigned int last[K]; // pages that have been used recently
320     unsigned int Klast; // index into last array.
321     
322     unsigned int victim; // next page to throw out.
323     
324     bool recentlyUsed(unsigned int page) {
325         for (int k=0;k<K;k++) if (page==last[k]) return true;
326         return false;
327     }
328
329 public:
330     inline vmNRUReplacementPolicy(unsigned int nPages_,const page_ptr_t* pageTable_,MSA_Page_State ** pageState_)
331     : nPages(nPages_), pageTable(pageTable_), pageState(pageState_)
332     {
333         Klast=0;
334         for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
335         victim=0;
336     }
337
338     inline void pageAccessed(unsigned int page)
339     {
340         if (page!=last[Klast]) {
341             Klast++; if (Klast>=K) Klast=0;
342             last[Klast]=page;
343         }
344     }
345
346     inline unsigned int selectPage() {
347         unsigned int last_victim=victim;
348         do {
349             victim++; if (victim>=nPages) victim=0;
350             if (pageTable[victim]
351                 &&pageState[victim]->canPageOut()
352                 &&!recentlyUsed(victim)) {
353                 /* victim is an allocated, unlocked, non-recently-used page: page him out. */
354                 return victim;
355             }
356         } while (victim!=last_victim);
357         return MSA_INVALID_PAGE_NO;  /* nobody is pageable */
358     }
359 };
360
361 //================================================================
362
363 /**
364   UNUSED
365
366   Holds the untyped data for one MSA page.
367   This is the interface MSA_CacheGroup uses to access a cached page.
368   MSA_CacheGroup asks the templated code to create a MSA_Page
369   for each new page, then talks to the page directly.
370 */
371 class MSA_Page {
372 public:
373         virtual ~MSA_Page();
374
375         /**
376           Pack or unpack the data in this page.
377           Used to send and receive pages from the network
378           (or even disk, if somebody needs it.)
379         */
380         virtual void pup(PUP::er &p) =0;
381
382         /**
383           Merge this page's data into our own.
384           Only parts of this page may have been set.
385         */
386         virtual void merge(MSA_Page &otherPage) =0;
387 };
388
389 /**
390   Holds the typed data for one MSA page.
391   Implementation of puppedPage used by the templated code.
392 */
393 template <
394         class ENTRY, 
395         class MERGER=DefaultEntry<ENTRY>,
396         unsigned int ENTRIES_PER_PAGE=MSA_DEFAULT_ENTRIES_PER_PAGE
397 >
398 class MSA_PageT {
399     unsigned int n; // number of entries on this page.  Used to send page updates.
400         /** The contents of this page: array of ENTRIES_PER_PAGE items */
401         ENTRY *data;
402         /** Merger object */
403         MERGER m;
404   bool duplicate;
405
406 public:
407         MSA_PageT():duplicate(false), n(ENTRIES_PER_PAGE) {
408                 data=new ENTRY[ENTRIES_PER_PAGE];
409                 for (int i=0;i<ENTRIES_PER_PAGE;i++){
410                         data[i]=m.getIdentity();
411                 }
412                 //shouldn't n be set to ENTRIES_PER_PAGE
413                 n = ENTRIES_PER_PAGE;
414         }
415     // This constructor is used in PageArray to quickly convert an
416     // array of ENTRY into an MSA_PageT.  So we just make a copy of
417     // the pointer.  When it comes time to destruct the object, we
418     // need to ensure we do NOT delete the data but just discard the
419     // pointer.
420         MSA_PageT(ENTRY *d):data(d), duplicate(true), n(ENTRIES_PER_PAGE) {
421     }
422         MSA_PageT(ENTRY *d, unsigned int n_):data(d), duplicate(true), n(n_) {
423     }
424         virtual ~MSA_PageT() {
425                 if (!duplicate) {
426             delete [] data;
427         }
428         }
429
430         virtual void pup(PUP::er &p) {
431     p | n;
432                 /*this pup routine was broken, It didnt consider the case
433                         in which n > 0 and data = NULL. This is possible when  
434                         sending empty pages. It also doesnt seem to do any allocation
435                         for the data variable while unpacking which seems to be wrong
436                 */
437                 bool nulldata;
438                 if(!p.isUnpacking()){
439                                 nulldata = (data == NULL);
440                 }
441                 p | nulldata;
442                 if(nulldata){
443                                 data = NULL;
444                                 return;
445                 }
446                 if(p.isUnpacking()){
447                                 data = new ENTRY[n];
448                 }
449                 for (int i=0;i<n;i++){
450                         p|data[i];
451                 }       
452         }
453
454         virtual void merge(MSA_PageT<ENTRY, MERGER, ENTRIES_PER_PAGE> &otherPage) {
455                 for (int i=0;i<ENTRIES_PER_PAGE;i++)
456                         m.accumulate(data[i],otherPage.data[i]);
457         }
458
459         // These accessors might be used by the templated code.
460         inline ENTRY &operator[](int i) {return data[i];}
461         inline const ENTRY &operator[](int i) const {return data[i];}
462     inline ENTRY *getData() { return data; }
463 };
464
465 //=============================== Cache Manager =================================
466
467 template <class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE>
468 class MSA_CacheGroup : public Group
469 {
470     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
471
472 protected:
473     ENTRY_OPS_CLASS *entryOpsObject;
474     unsigned int numberOfWorkerThreads;      // number of worker threads across all processors for this shared array
475     // @@ migration?
476     unsigned int numberLocalWorkerThreads;   // number of worker threads on THIS processor for this shared array
477     unsigned int enrollDoneq;                 // has enroll() been done on this processor?
478     MSA_Listeners enrollWaiters;
479     MSA_Listeners syncWaiters;
480     std::set<int> enrolledPEs;                          // which PEs are involved?
481
482     unsigned int nPages;            ///< number of pages
483     ENTRY_TYPE** pageTable;          ///< the page table for this PE: stores actual data.
484     typedef MSA_Page_StateT<ENTRY_TYPE,ENTRIES_PER_PAGE> pageState_t;
485     pageState_t **pageStateStorage; ///< Housekeeping information for each allocated page.
486     
487     /// Return the state for this page, returning NULL if no state available.
488     inline pageState_t *stateN(unsigned int pageNo) {
489         return pageStateStorage[pageNo];
490     }
491     
492     /// Return the state for this page, allocating if needed.
493     pageState_t *state(unsigned int pageNo) {
494         pageState_t *ret=pageStateStorage[pageNo];
495         if (ret==NULL) {
496             ret=new pageState_t;
497             pageStateStorage[pageNo]=ret;
498         }
499         return ret;
500     }
501
502     typedef CProxy_MSA_PageArray<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_PageArray_t;
503     CProxy_PageArray_t pageArray;     // a proxy to the page array
504     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
505     CProxy_CacheGroup_t thisProxy; // a proxy to myself.
506
507     std::map<CthThread, MSA_Thread_Listener *> threadList;
508     /// Look up or create the listener for the current thread.
509     MSA_Thread_Listener *getListener(void) {
510         CthThread t=CthSelf();
511         MSA_Thread_Listener *l=threadList[t];
512                         if (l==NULL) {
513                 l=new MSA_Thread_Listener;
514                 threadList[t]=l;
515                         }
516                         return l;
517     }
518     /// Add our thread to this list and suspend
519     void addAndSuspend(MSA_Listeners &dest) {
520         MSA_Thread_Listener *l=getListener();
521                         dest.add(l);
522                         l->suspend();
523     }
524
525     stack<ENTRY_TYPE*> pagePool;     // a pool of unused pages
526     
527     typedef vmNRUReplacementPolicy vmPageReplacementPolicy;
528     vmPageReplacementPolicy* replacementPolicy;
529
530     // structure for the bounds of a single write
531     typedef struct { unsigned int begin; unsigned int end; } writebounds_t;
532
533     // a list of write bounds associated with a given page
534     typedef list<writebounds_t> writelist_t;
535
536     writelist_t** writes;           // the write lists for each page
537
538     unsigned int resident_pages;             // pages currently allocated
539     unsigned int max_resident_pages;         // max allowable pages to allocate
540     unsigned int nEntries;          // number of entries for this array
541     unsigned int syncAckCount;      // number of sync ack's we received
542     int outOfBufferInPrefetch;      // flag to indicate if the last prefetch ran out of buffers
543
544     int syncThreadCount;            // number of local threads that have issued Sync
545     
546     
547     // used during output
548     MSA_WriteSpan_t writeSpans[ENTRIES_PER_PAGE];
549     ENTRY_TYPE writeEntries[ENTRIES_PER_PAGE];
550
551     /*********************************************************************************/
552     /** these routines deal with managing the page queue **/
553
554     // increment the number of pages the thread is waiting on.
555     // also add thread to the page queue; then thread is woken when the page arrives.
556     inline void IncrementPagesWaiting(unsigned int page)
557     {
558         state(page)->readRequests.add(getListener());
559     }
560
561     inline void IncrementChangesWaiting(unsigned int page)
562     {
563         state(page)->writeRequests.add(getListener());
564     }
565
566 /************************* Page allocation and management **************************/
567     
568     /// Allocate a new page, removing old pages if we're over the limit.
569     /// Returns NULL if no buffer space is available
570     inline ENTRY_TYPE* tryBuffer(int async=0) // @@@
571     {
572         ENTRY_TYPE* nu = NULL;
573
574         // first try the page pool
575         if(!pagePool.empty())
576         {
577             nu = pagePool.top();
578             pagePool.pop();
579         }
580
581         // else try to allocate the buffer
582         if(nu == NULL && resident_pages < max_resident_pages)
583         {
584             nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
585             resident_pages++;
586         }
587
588         // else swap out one of the pages
589         if(nu == NULL)
590         {
591             int pageToSwap = replacementPolicy->selectPage();
592             if(pageToSwap != MSA_INVALID_PAGE_NO)
593             {
594                 CkAssert(pageTable[pageToSwap] != NULL);
595                 CkAssert(state(pageToSwap)->canPageOut() == true);
596                 
597                 relocatePage(pageToSwap, async);
598                 nu = pageTable[pageToSwap];
599                 pageTable[pageToSwap] = 0;
600                 delete pageStateStorage[pageToSwap];
601                 pageStateStorage[pageToSwap]=0;
602             }
603         }
604
605         // otherwise return NULL
606
607         return nu;
608     }
609     
610     /// Allocate storage for this page, if none has been allocated already.
611     ///  Update pageTable, and return the storage for the page.
612     inline ENTRY_TYPE* makePage(unsigned int page) // @@@
613     {
614         ENTRY_TYPE* nu=pageTable[page];
615         if (nu==0) {
616             nu=tryBuffer();
617             if (nu==0) CkAbort("MSA: No available space to create pages.\n");
618             pageTable[page]=nu;
619         }
620         return nu;
621     }
622     
623     /// Throw away this allocated page.
624     ///  Returns the page itself, for deletion or recycling.
625     ENTRY_TYPE* destroyPage(unsigned int page)
626     {
627         ENTRY_TYPE* nu=pageTable[page];
628         pageTable[page] = 0;
629         if (pageStateStorage[page]->canDelete()) {
630                 delete pageStateStorage[page];
631                 pageStateStorage[page]=0;
632         }
633         resident_pages--;
634         return nu;
635     }
636     
637     //MSA_CacheGroup::
638     void pageFault(unsigned int page, MSA_Page_Fault_t why)
639     {
640         // Write the page to the page table
641         state(page)->state = why;
642         if(why == Read_Fault)
643         { // Issue a remote request to fetch the new page
644             // If the page has not been requested already, then request it.
645             if (stateN(page)->readRequests.size()==0) {
646                 pageArray[page].GetPage(CkMyPe());
647                 //ckout << "Requesting page first time"<< endl;
648             } else {
649                 ;//ckout << "Requesting page next time.  Skipping request."<< endl;
650             }
651             MSA_Thread_Listener *l=getListener();
652             stateN(page)->readRequests.add(l);
653             l->suspend(); // Suspend until page arrives.
654         }
655         else {
656             // Build an empty buffer into which to create the new page
657             ENTRY_TYPE* nu = makePage(page);
658             writeIdentity(nu);
659         }
660     }
661     
662     /// Make sure this page is accessible, faulting the page in if needed.
663     // MSA_CacheGroup::
664     inline void accessPage(unsigned int page,MSA_Page_Fault_t access)
665     {
666         if (pageTable[page] == 0) {
667 //             ckout << "p" << CkMyPe() << ": Calling pageFault" << endl;
668             pageFault(page, access);
669         }
670 #ifndef CMK_OPTIMIZE
671         if (stateN(page)->state!=access) {
672             CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
673             CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
674         }
675 #endif
676         replacementPolicy->pageAccessed(page);
677     }
678
679     // MSA_CacheGroup::
680     // Fill this page with identity values, to prepare for writes or 
681     //  accumulates.
682     void writeIdentity(ENTRY_TYPE* pagePtr)
683     {
684         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
685             pagePtr[i] = entryOpsObject->getIdentity();
686     }
687     
688 /************* Page Flush and Writeback *********************/
689     bool shouldWriteback(unsigned int page) {
690         if (!pageTable[page]) return false;
691         return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
692     }
693     
694     inline void relocatePage(unsigned int page, int async)
695     {
696         //CkAssert(pageTable[page]);
697         if(shouldWriteback(page))
698         {
699             // the page to be swapped is a writeable page. So inform any
700             // changes this node has made to the page manager
701             sendChangesToPageArray(page, async);
702         }
703     }
704
705     inline void sendChangesToPageArray(const unsigned int page, const int async)
706     {
707         sendRLEChangesToPageArray(page);
708         
709         MSA_Thread_Listener *l=getListener();
710         state(page)->writeRequests.add(l);
711         if (!async)
712             l->suspend(); // Suspend until page is really gone.
713         // TODO: Are write acknowledgements really necessary ?
714     }
715
716     // Send the page data as a contiguous block.
717     //   Note that this is INCORRECT when writes to pages overlap!
718     inline void sendNonRLEChangesToPageArray(const unsigned int page) // @@@
719     {
720         pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
721     }
722     
723     // Send the page data as an RLE block.
724     // this function assumes that there are no overlapping writes in the list
725     inline void sendRLEChangesToPageArray(const unsigned int page)
726     {
727         ENTRY_TYPE *writePage=pageTable[page];
728         int nSpans=stateN(page)->writeSpans(writeSpans);
729         if (nSpans==1) 
730         { /* common case: can make very fast */
731             int nEntries=writeSpans[0].end-writeSpans[0].start;
732             if (entryOpsObject->pupEveryElement()) {
733                 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
734                     page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
735                     CkMyPe(),stateN(page)->state);
736             } else {
737                 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
738                     &writePage[writeSpans[0].start], nEntries,
739                     CkMyPe(),stateN(page)->state);
740             }
741         } 
742         else /* nSpans>1 */ 
743         { /* must copy separate spans into a single output buffer (luckily rare) */
744             int nEntries=0;
745             for (int s=0;s<nSpans;s++) {
746                 for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
747                     writeEntries[nEntries++]=writePage[i]; // calls assign
748             }
749             if (entryOpsObject->pupEveryElement()) {
750                 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
751                                                         page_t(writeEntries,nEntries),nEntries,
752                                                         CkMyPe(),stateN(page)->state);
753             } else {
754                 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
755                                                writeEntries,nEntries,
756                                                CkMyPe(),stateN(page)->state);
757             }
758         }
759     }
760
761 /*********************** Public Interface **********************/
762 public:
763     // 
764     //
765     // MSA_CacheGroup::
766     inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
767                       unsigned int max_bytes_, unsigned int nEntries_, unsigned int numberOfWorkerThreads_)
768         : nEntries(nEntries_), numberOfWorkerThreads(numberOfWorkerThreads_) 
769     {
770         numberLocalWorkerThreads = 0;  // populated after enroll
771         enrollDoneq = 0;  // populated after enroll
772
773         nPages = nPages_;
774         pageArray = pageArrayID;
775         thisProxy=thisgroup;
776         resident_pages = 0;
777         max_resident_pages = max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE);
778         
779         outOfBufferInPrefetch = 0;
780         entryOpsObject = new ENTRY_OPS_CLASS();
781
782         // initialize the page table
783         typedef ENTRY_TYPE* entry_type_ptr;
784         pageTable = new entry_type_ptr[nPages];
785         pageStateStorage = new pageState_t*[nPages];
786
787         // this is the number of sync ack's received till yet
788         syncAckCount = 0;
789         syncThreadCount = 0;
790
791         replacementPolicy = new vmPageReplacementPolicy(nPages,(page_ptr_t*) pageTable, (MSA_Page_State **)pageStateStorage);
792
793         for(unsigned int i = 0; i < nPages; i++)
794         {
795             pageTable[i] = 0;
796             pageStateStorage[i] = 0;
797         }
798                                 MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
799     }
800
801     // MSA_CacheGroup::
802     inline ~MSA_CacheGroup()
803     {
804         FreeMem();
805         
806         delete[] pageTable; pageTable = 0;
807         delete[] pageStateStorage; pageStateStorage = 0;
808     }
809
810     /* To change the accumulate function TBD @@ race conditions */
811     inline void changeEntryOpsObject(ENTRY_OPS_CLASS *e) {
812         entryOpsObject = e;
813         pageArray.changeEntryOpsObject(e);
814     }
815
816     // MSA_CacheGroup::
817     inline const ENTRY_TYPE* readablePage(unsigned int page)
818     {
819         accessPage(page,Read_Fault);
820         
821         return pageTable[page];
822     }
823
824     // MSA_CacheGroup::
825     //
826     // known local page
827     inline const void* readablePage2(unsigned int page)
828     {
829         return pageTable[page];
830     }
831
832     // MSA_CacheGroup::
833     // Obtains a writable copy of the page.
834     inline ENTRY_TYPE* writeablePage(unsigned int page, unsigned int offset)
835     {
836         accessPage(page,Write_Fault);
837
838         // NOTE: Since we assume write once semantics, i.e. between two calls to sync,
839         // either there can be no write to a location or a single write to a location,
840         // a readable page will suffice as a writeable page too, because no one else
841         // is going to write to this location. In reality, two locations on the *same*
842         // page can be written by two different threads, in which case we will need
843         // to keep track of which parts of the page have been written, hence:
844         stateN(page)->write(offset);
845 //     ckout << "write:" << page*ENTRIES_PER_PAGE+offset << endl;
846         
847         return pageTable[page];
848     }
849
850     // MSA_CacheGroup::
851     inline ENTRY_TYPE &accumulate(unsigned int page, unsigned int offset)
852     {
853         accessPage(page,Accumulate_Fault);
854         stateN(page)->write(offset);
855         return pageTable[page][offset];
856     }
857
858     /// A requested page has arrived from the network.
859     ///  nEntriesInPage_ = num entries being sent (0 for empty page, num entries otherwise)
860     inline void ReceivePageWithPUP(unsigned int page, page_t &pageData, int size)
861     {
862         ReceivePage(page, pageData.getData(), size);
863     }
864
865     inline void ReceivePage(unsigned int page, ENTRY_TYPE* pageData, int size)
866     {
867         CkAssert(0==size || ENTRIES_PER_PAGE == size);
868         // the page we requested has been received
869         ENTRY_TYPE *nu=makePage(page);
870         if(size!=0)
871         {
872             for(unsigned int i = 0; i < size; i++)
873                 nu[i] = pageData[i]; // @@@, calls assignment operator
874         }
875         else /* isEmpty */
876         {
877             // the page we requested for is empty, so we can just initialize it.
878             writeIdentity(nu);
879         }
880         
881         state(page)->readRequests.signal(page);
882     }
883
884     // This EP is invoked during sync to acknowledge that a dirty page
885     // has been received and written back to the page owner.  We keep track
886     // of the number of ack's yet to arrive in nChangesWaiting.  Once
887     // all the dirty pages have been ack'd, we awaken the thread that
888     // flushed the page.
889     //
890     // It's not clear this is useful very often...
891     //
892     // MSA_CacheGroup::
893     inline void AckPage(unsigned int page)
894     {
895         state(page)->writeRequests.signal(page);
896     }
897
898     // MSA_CacheGroup::
899     // synchronize all the pages and also clear up the cache
900     inline void SyncReq(int single)
901     {
902           MSADEBPRINT(printf("SyncReq single %d\n",single););
903           if(single)
904         {
905             /*ask all the caches to send their updates to the page array, but we don't need to empty the caches on the other PEs*/
906             SingleSync();
907             EmptyCache();
908
909             getListener()->suspend();
910         }
911         else{
912             Sync();
913                                 }
914     }
915
916     // MSA_CacheGroup::
917     inline void FlushCache()
918     {
919         // flush the local cache
920         // for each writeable page, send that page to the array element
921         for(unsigned int i = 0; i < nPages; i++)
922         {
923             if(shouldWriteback(i)) {
924                 //ckout << "p" << CkMyPe() << "FlushCache: sending page " << i << endl;
925                 sendChangesToPageArray(i, 1);
926             }
927         }
928     }
929
930     // MSA_CacheGroup::
931     void EmptyCache()
932     {
933         /* just makes all the pages empty, assuming that the data in those pages has been flushed to the owners */
934         for(unsigned int i = 0; i < nPages; i++)
935         {
936             if(pageTable[i]) pagePool.push(destroyPage(i));
937         }
938     }
939
940 /************************ Enroll ********************/
941     /// Enroll phase 1: called by users.
942     // MSA_CacheGroup::
943     inline void enroll(unsigned int num_workers)
944     {
945         CkAssert(num_workers == numberOfWorkerThreads); // just to verify
946         CkAssert(enrollDoneq == 0);
947         numberLocalWorkerThreads++;
948         // @@ how to ensure that enroll is called only once?
949
950         //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
951         thisProxy[0].enrollAck(CkMyPe());
952         //ckout << "[" << CkMyPe() << "] suspening thread in Sync() " << endl;
953         addAndSuspend(enrollWaiters);
954         //ckout << "[" << CkMyPe() << "] rsuming thread in Sync()" << endl;
955
956         CkAssert(enrollDoneq == 1);
957         return;
958     }
959
960     /// Enroll phase 2: called on PE 0 from everywhere
961     inline void enrollAck(int originator)
962     {
963         CkAssert(CkMyPe() == 0);  // enrollAck is only called on PE 0
964         CkAssert(enrollDoneq == 0);  // prevent multiple enroll operations
965         
966         syncAckCount++;
967         enrolledPEs.insert(originator);
968         //ckout << "[" << CkMyPe() << "] SyncAckcount = " << syncAckCount << endl;
969         if(syncAckCount == numberOfWorkerThreads) {
970 //             ckout << "[" << CkMyPe() << "]" << "Enroll operation is almost done" << endl;
971             syncAckCount = 0;
972             enrollDoneq = 1;
973             // What if fewer worker threads than pe's ?  Handled in
974             // enrollDone.
975             thisProxy.enrollDone();
976         }
977     }
978
979     /// Enroll phase 3: called everywhere by PE 0
980     inline void enrollDone()
981     {
982 //         ckout << "[" << CkMyPe() << "] enrollDone.  Waking threads."
983 //               <<  " numberOfWorkerThreads=" << numberOfWorkerThreads
984 //               <<  " local=" << numberLocalWorkerThreads << endl;
985         enrollDoneq = 1;
986         enrollWaiters.signal(0);
987     }
988
989 /******************************** Sync & writeback ***********************/
990     // MSA_CacheGroup::
991     inline void SingleSync()
992     {
993         /* a single thread issued a sync call with all = 1. The first thing to do is to flush the local cache */
994         FlushCache();
995     }
996
997     // MSA_CacheGroup::
998     inline void Sync()
999     {
1000         syncThreadCount++;
1001         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
1002         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
1003                 //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
1004
1005         // First, all threads on this processor need to reach the sync
1006         // call; only then can we proceed with merging the data.  Only
1007         // the last thread on this processor needs to do the FlushCache,
1008         // etc.  Others just suspend until the sync is over.
1009                                 MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
1010         if(syncThreadCount < numberLocalWorkerThreads)
1011         {
1012                   MSADEBPRINT(printf("Sync addAndSuspend \n"););
1013             addAndSuspend(syncWaiters);
1014             return;
1015         }
1016                 
1017         //ckout << "[" << CkMyPe() << "] Sync started" << endl;
1018
1019         // flush the cache asynchronously and also empty it
1020         FlushCache();
1021         // idea: instead of invalidating the pages, switch it to read
1022         // mode. That will not work, since the page may have also been
1023         // modified by another thread.
1024         EmptyCache();
1025
1026         // Now, we suspend too (if we had at least one dirty page).
1027         // We will be awoken when all our dirty pages have been
1028         // written and acknowledged.
1029                 MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
1030         getListener()->suspend();
1031                 MSADEBPRINT(printf("Sync awakening after suspend\n"););
1032
1033         // So far, the sync has been asynchronous, i.e. PE0 might be ahead
1034         // of PE1.  Next we basically do a barrier to ensure that all PE's
1035         // are synchronized.
1036
1037         // at this point, the sync's across the group should
1038         // synchronize among themselves by each one sending
1039         // a sync acknowledge message to PE 0. (this is like
1040         // a reduction over a group)
1041         if(CkMyPe() != 0)
1042         {
1043             thisProxy[0].SyncAck();
1044         }
1045         else /* I *am* PE 0 */
1046         {
1047             SyncAck();
1048         }
1049                                 MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
1050         /* Wait until sync is reflected from PE 0 */
1051         addAndSuspend(syncWaiters);
1052                                 
1053                 MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
1054                 //ckout << "[" << CkMyPe() << "] Sync finished" << endl;        
1055     }
1056
1057     inline unsigned int getNumEntries() { return nEntries; }
1058     inline CProxy_PageArray_t getArray() { return pageArray; }
1059
1060     // TODO: Can this SyncAck and other simple Acks be made efficient?
1061     inline void SyncAck()
1062     {
1063         CkAssert(CkMyPe() == 0);  // SyncAck is only called on PE 0
1064         syncAckCount++;
1065         // DONE @@ what if fewer worker threads than pe's ?
1066         // @@ what if fewer worker threads than pe's and >1 threads on 1 pe?
1067         //if(syncAckCount == mymin(numberOfWorkerThreads, CkNumPes())){
1068                 if (syncAckCount == enrolledPEs.size()) {
1069                         MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",nEntries,nPages););
1070                         pageArray.Sync();
1071                 }               
1072     }
1073
1074     inline void SyncDone()
1075     {
1076         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1077         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1078         /* Reset for next sync */
1079         syncThreadCount = 0;
1080         syncAckCount = 0;
1081                                 MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
1082         syncWaiters.signal(0);
1083     }
1084
1085     inline void FreeMem()
1086     {
1087         for(unsigned int i = 0; i < nPages; i++)
1088         {
1089             if(pageTable[i]) delete [] destroyPage(i);
1090         }
1091
1092         while(!pagePool.empty())
1093         {
1094             delete [] pagePool.top();  // @@@
1095             pagePool.pop();
1096         }
1097         
1098         resident_pages=0;
1099     }
1100
1101                 /** 
1102                         Deregister a client. Decremenet the number of local threads. If total number of local threads 
1103                         hits 0 FreeMem()
1104                 */
1105                 inline void unroll(){
1106         numberLocalWorkerThreads--;
1107                                 if(numberLocalWorkerThreads == 0){
1108                                         FreeMem();
1109                                 }
1110                 }
1111
1112     /**
1113      * Issue a prefetch request for the given range of pages. These pages will
1114      * be locked into the cache, so that they will not be swapped out.
1115      */
1116     inline void Prefetch(unsigned int pageStart, unsigned int pageEnd)
1117     {
1118         /* prefetching is feasible only if we we did not encounter an out
1119          * of buffer condition in the previous prefetch call
1120          */
1121         if(!outOfBufferInPrefetch)
1122         {
1123             //ckout << "prefetching pages " << pageStart << " through " << pageEnd << endl;
1124             for(unsigned int p = pageStart; p <= pageEnd; p++)
1125             {
1126                 if(NULL == pageTable[p])
1127                 {
1128
1129                     /* relocate the buffer asynchronously */
1130                     ENTRY_TYPE* nu = tryBuffer(1);
1131                     if(NULL == nu)
1132                     {
1133                         /* signal that sufficient buffer space is not available */
1134                         outOfBufferInPrefetch = 1;
1135                         break;
1136                     }
1137
1138                     pageTable[p] = nu;
1139                     state(p)->state = Read_Fault;
1140
1141                     pageArray[p].GetPage(CkMyPe());
1142                     IncrementPagesWaiting(p);
1143                     //ckout << "Prefetch page" << p << ", pages waiting = " << nPagesWaiting << endl;
1144                     /* don't suspend the thread */
1145                 }
1146
1147                 /* mark the page as being locked */
1148                 state(p)->locked = true;
1149             }
1150         }
1151     }
1152
1153     /**
1154      * Wait for all the prefetch pages to be fetched into the cache.
1155      * Returns: 0 if prefetch successful, 1 if not
1156      */
1157     inline int WaitAll(void)
1158     {
1159         if(outOfBufferInPrefetch)
1160         {
1161             // we encountered out of buffer in the previous prefetch call, return error
1162             outOfBufferInPrefetch = 0;
1163             getListener()->suspend();
1164             UnlockPages();
1165             return 1;
1166         }
1167         else
1168         {
1169             // prefetch requests have been successfully issued already, so suspend the
1170             // thread and wait for completion
1171             outOfBufferInPrefetch = 0;
1172             getListener()->suspend();
1173             return 0;
1174         }
1175     }
1176     
1177     inline void UnlockPage(unsigned int page) {
1178         pageState_t *s=stateN(page);
1179         if(s && s->locked) {
1180             replacementPolicy->pageAccessed(page);
1181             s->locked = false;
1182         }
1183     }
1184
1185     /**
1186      * Unlock all the pages locked in the cache
1187      */
1188     inline void UnlockPages()
1189     {
1190         // add all the locked pages to page replacement policy
1191         for(unsigned int page = 0; page < nPages; page++)
1192             UnlockPage(page);
1193     }
1194
1195     /**
1196      * Unlock the given pages: [startPage ... endPage]
1197      *  Note that the range is inclusive.
1198      */
1199     inline void UnlockPages(unsigned int startPage, unsigned int endPage)
1200     {
1201         for(unsigned int page = startPage; page <= endPage; page++)
1202             UnlockPage(page);
1203     }
1204
1205     /// Debugging routine
1206     inline void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset)
1207     {
1208         CkAssert( pageNum < nPages );
1209         CkAssert( offset < ENTRIES_PER_PAGE );
1210
1211         //ckout << "p" << CkMyPe() << "ID" << ID;
1212 //         if (pageTable[pageNum] == 0)
1213 //             ckout << "emitBufferValue: page " << pageNum << " not available in local cache." << endl;
1214 //         else
1215 //             ckout << "emitBufferValue: [" << pageNum << "," << offset << "] = " << pageTable[pageNum][offset] << endl;
1216     }
1217 };
1218
1219 // each element of this array is responsible for managing
1220 // the information about a single page. It is in effect the
1221 // "owner" as well as the "manager" for that page.
1222 //
1223 template<class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE> 
1224 class MSA_PageArray : public ArrayElement1D
1225 {
1226     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
1227     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
1228     
1229 protected:
1230     ENTRY_TYPE *epage;
1231     ENTRY_OPS_CLASS entryOpsObject;
1232     CProxy_CacheGroup_t cache;
1233
1234     unsigned int pageNo() { return thisIndex; }
1235
1236     inline void allocatePage(MSA_Page_Fault_t access) // @@@
1237     {
1238         if(epage == NULL)
1239         {
1240             epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
1241             if (access==Accumulate_Fault)
1242                writeIdentity();
1243         }
1244     }
1245
1246     // begin and end are indexes into the page.
1247     inline void set(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1248     {
1249         //ckout << "set: " << begin << "," << end << endl;
1250         for(unsigned int i = 0; i < (end - begin); i++) {
1251             epage[begin + i] = buffer[i]; // @@@, calls assignment operator
1252             //ckout << "set val[" << begin+i << "]=" << buffer[i] << endl;
1253         }
1254     }
1255
1256     // MSA_PageArray::
1257     inline void combine(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1258     {
1259         ENTRY_TYPE* pagePtr = epage + begin;
1260         for(unsigned int i = 0; i < (end - begin); i++)
1261             entryOpsObject.accumulate(pagePtr[i], buffer[i]);
1262     }
1263
1264     // MSA_PageArray::
1265     inline void writeIdentity()
1266     {
1267         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
1268             epage[i] = entryOpsObject.getIdentity();
1269     }
1270
1271 public:
1272     inline MSA_PageArray() : epage(NULL) { }
1273     inline MSA_PageArray(CkMigrateMessage* m) { delete m; }
1274     
1275     void setCacheProxy(CProxy_CacheGroup_t &cache_)
1276     {
1277        cache=cache_;
1278     }
1279     
1280     virtual void pup(PUP::er& p)
1281     {
1282         ArrayElement1D::pup(p);
1283         int epage_present=(epage!=0);
1284         p|epage_present;
1285         if (epage_present) {
1286           if(p.isUnpacking())
1287             allocatePage(Write_Fault);
1288           for (int i=0;i<ENTRIES_PER_PAGE;i++)
1289             p|epage[i];
1290         }
1291     }
1292     
1293     inline ~MSA_PageArray()
1294     {
1295         if(epage) delete [] epage;
1296     }
1297
1298     /// Request our page.
1299     ///   pe = to which to send page
1300     inline void GetPage(int pe)
1301     {
1302         if(epage == NULL) {
1303             // send empty page
1304             if (entryOpsObject.pupEveryElement())
1305                 cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
1306             else
1307                 cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
1308         } else {
1309             // send page with data
1310             if (entryOpsObject.pupEveryElement())
1311                 cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
1312             else
1313                 cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);  // send page with data                
1314         }
1315     }
1316
1317     /// Receive a non-runlength encoded page from the network:
1318     // @@ TBD: ERROR: This does not work for  varsize pages.
1319     inline void PAReceivePage(ENTRY_TYPE *pageData,
1320                             int pe, MSA_Page_Fault_t pageState)
1321     {
1322         allocatePage(pageState);
1323
1324         if(pageState == Write_Fault)
1325             set(pageData, 0, ENTRIES_PER_PAGE);
1326         else
1327             combine(pageData, 0, ENTRIES_PER_PAGE);
1328         
1329         // send the acknowledgement to the sender that we received the page
1330         //ckout << "Sending Ack to PE " << pe << endl;
1331         cache[pe].AckPage(thisIndex);
1332     }
1333
1334     /// Receive a runlength encoded page from the network:
1335     inline void PAReceiveRLEPageWithPup(
1336         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1337         MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> &entries, unsigned int nEntries, 
1338         int pe, MSA_Page_Fault_t pageState)
1339     {
1340         PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
1341     }
1342
1343
1344     inline void PAReceiveRLEPage(
1345         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1346         const ENTRY_TYPE *entries, unsigned int nEntries, 
1347         int pe, MSA_Page_Fault_t pageState)
1348     {
1349         allocatePage(pageState);
1350         
1351         //ckout << "p" << CkMyPe() << "ReceiveRLEPage nSpans=" << nSpans << " nEntries=" << nEntries << endl;
1352         int e=0; /* consumed entries */
1353         for (int s=0;s<nSpans;s++) {
1354             if(pageState == Write_Fault)
1355                 set(&entries[e], spans[s].start,spans[s].end);
1356             else /* Accumulate_Fault */
1357                 combine(&entries[e], spans[s].start,spans[s].end);
1358             e+=spans[s].end-spans[s].start;
1359         } 
1360
1361         // send the acknowledgement to the sender that we received the page
1362         //ckout << "Sending AckRLE to PE " << pe << endl;
1363         cache[pe].AckPage(thisIndex);
1364     }
1365
1366     // MSA_PageArray::
1367     inline void Sync()
1368     {
1369                                 MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n");); 
1370         contribute(0, NULL, CkReduction::concat);
1371     }
1372
1373     inline void emit(int ID, int index)
1374     {
1375           //ckout << "p" << CkMyPe() << "ID" << ID;
1376 //         if(epage == NULL)
1377 //             ckout << "emit: epage is NULL" << endl;
1378 //         else
1379 //             ckout << "emit: " << epage[index] << endl;
1380     }
1381 };
1382
1383 #define CK_TEMPLATES_ONLY
1384 #include "msa.def.h"
1385 #undef CK_TEMPLATES_ONLY
1386
1387 #endif