Don't do 'using namespace std;' in an MSA header
[charm.git] / src / libs / ck-libs / multiphaseSharedArrays / msa-DistPageMgr.h
1 // emacs mode line -*- mode: c++; tab-width: 4 -*-
2
3 #ifndef MSA_DISTPAGEMGR_H
4 #define MSA_DISTPAGEMGR_H
5
6 #include <charm++.h>
7 #include <string.h>
8 #include <list>
9 #include <stack>
10 #include <map>
11 #include <set>
12 #include "msa-common.h"
13
14 // forward decl needed in msa-DistPageMgr.ci, i.e. in msa.decl.h
15
16 /// Stores a list of indices to be written out.
17 struct MSA_WriteSpan_t {
18     int start,end;
19     inline void pup(PUP::er &p) {
20         p|start; p|end;
21     }
22 };
23
24 template <class ENTRY, class MERGER,
25           unsigned int ENTRIES_PER_PAGE>
26          class MSA_PageT;
27 #include "msa.decl.h"
28
29 inline int mymin(int a, int b)
30 {
31     return (a<b)?a:b;
32 }
33
34 //=======================================================
35 // Utility Classes
36
37 /// Listens for some event on a page or set of pages.
38 class MSA_Listener {
39 public:
40         MSA_Listener() {}
41         virtual ~MSA_Listener();
42         /// Getting added to a lister list.
43         virtual void add(void) =0;
44         /// Event waiting for has occurred.
45         virtual void signal(unsigned int pageNo) =0;
46 };
47
48 /// Keeps a list of MSA_Listeners
49 class MSA_Listeners {
50         CkVec<MSA_Listener *> listeners;
51 public:
52         MSA_Listeners();
53         ~MSA_Listeners();
54         
55         /// Add this listener to your set.  Calls l->add().
56         void add(MSA_Listener *l);
57         
58         /// Return the number of listeners in our set.
59         unsigned int size(void) const {return listeners.size();}
60         
61         /// Signal all added listeners and remove them from the set.
62         void signal(unsigned int pageNo);
63 };
64
65
66 /** Resumes a thread once all needed pages have arrived */
67 class MSA_Thread_Listener : public MSA_Listener {
68         CthThread thread;    // the suspended thread of execution (0 if not suspended)
69         int count;  // number of pages we're still waiting for
70 public:
71         MSA_Thread_Listener() :thread(0), count(0) {}
72         
73         /// Wait for one more page.
74         void add(void);
75         
76         /// If we're waiting for any pages, suspend our thread.
77         void suspend(void);
78         
79         /// Another page arrived.
80         void signal(unsigned int pageNo);
81 };
82
83
84 /// Stores all housekeeping information about a cached copy of a page: 
85 ///   everything but the actual page data.
86 /// This is the non-templated superclass, which doesn't depend on the 
87 ///   page size or datatype.
88 class MSA_Page_State {
89 public:
90         /// e.g., Read_Fault for a read-only page.
91         MSA_Page_Fault_t state;
92         
93         /// If true, this page is locked in memory.
94         ///   Pages get locked so people can safely use the non-checking version of "get".
95         bool locked;
96         
97         /// Threads waiting for this page to be paged in from the network.
98         MSA_Listeners readRequests;
99         /// Threads waiting for this page to be paged out to the network.
100         MSA_Listeners writeRequests;
101         
102         /// Return true if this page can be safely written back.
103         bool canPageOut(void) const {
104                 return (!locked) && canDelete();
105         }
106         
107         /// Return true if this page can be safely purged from memory.
108         bool canDelete(void) const {
109                 return (readRequests.size()==0) && (writeRequests.size()==0);
110         }
111         
112         MSA_Page_State() :state(Uninit_State), locked(false) {}
113 };
114
115 /// Fast, fixed-size bitvector class.
116 template <unsigned int NUM_BITS>
117 class fixedlength_bitvector {
118 public:
119         /// Data type used to store actual bits in the vector.
120         typedef unsigned long store_t;
121         enum { store_bits=8*sizeof(store_t) };
122         
123         /// Number of store_t's in our vector.
124         enum { len=(NUM_BITS+(store_bits-1))/store_bits };
125         store_t store[len];
126         
127         /// Fill the entire vector with this value.
128         void fill(store_t s) {
129                 for (int i=0;i<len;i++) store[i]=s;
130         }
131         void clear(void) {fill(0);}
132         fixedlength_bitvector() {clear();}
133         
134         /// Set-to-1 bit i of the vector.
135         void set(unsigned int i) { store[i/store_bits] |= (1lu<<(i%store_bits)); }
136         /// Clear-to-0 bit i of the vector.
137         void clear(unsigned int i) { store[i/store_bits] &= ~(1lu<<(i%store_bits)); }
138         
139         /// Return the i'th bit of the vector.
140         bool get(unsigned int i) { return (store[i/store_bits] & (1lu<<(i%store_bits))); }
141 };
142
143 /// Templated page housekeeping class.  
144 template <class ENTRY, unsigned int ENTRIES_PER_PAGE>
145 class MSA_Page_StateT : public MSA_Page_State {
146 public:
147         /** Write tracking:
148            Somehow, we have to identify the entries in a page 
149          that have been written to.  Our method for doing this is
150          a bitvector: 0's indicate the entry hasn't been written; 
151          1's indicate the entry has been written.
152         */
153         typedef fixedlength_bitvector<ENTRIES_PER_PAGE> writes_t;
154         writes_t writes;
155         typedef typename writes_t::store_t writes_store_t;
156         enum {writes_bits=writes_t::store_bits};
157         
158         /// Tracking writes to our writes: a smaller vector, used to 
159         ///  avoid the large number of 0's in the writes vector.
160         ///  Bit i of writes2 indicates that store_t i of writes has 1's.
161         typedef fixedlength_bitvector<writes_t::len> writes2_t;
162         writes2_t writes2;
163         typedef typename writes2_t::store_t writes2_store_t;
164         enum {writes2_bits=writes2_t::store_bits*writes_t::store_bits};
165         
166         /// Write entry i of this page.
167         void write(unsigned int i) {
168                 writes.set(i);
169                 writes2.set(i/writes_t::store_bits);
170         }
171         
172         /// Clear the write list for this page.
173         void writeClear(void) {
174                 for (int i2=0;i2<writes2_t::len;i2++)
175                 if (writes2.store[i2]) { /* some bits set: clear them all */
176                         int o=i2*writes2_t::store_bits;
177                         for (int i=0;i<writes_t::len;i++) 
178                                 writes.store[o+i]=0;
179                         writes2.store[i2]=0;
180                 }
181         }
182         
183         /// Return the nearest multiple of m >= v.
184         inline int roundUp(int v,int m) {
185                 return (v+m-1)/m*m;
186         }
187         
188         /// Get a list of our written output values as this list of spans.
189         ///   Returns the total number of spans written to "span".
190         int writeSpans(MSA_WriteSpan_t *span) {
191                 int nSpans=0;
192                 
193                 int cur=0; // entry we're looking at
194                 while (true) {
195                         /* skip over unwritten space */
196                         while (true) { 
197                                 if (writes2.store[cur/writes2_bits]==(writes2_store_t)0) 
198                                         cur=roundUp(cur+1,writes2_bits);
199                                 else if (writes.store[cur/writes_bits]==(writes_store_t)0)
200                                         cur=roundUp(cur+1,writes_bits); 
201                                 else if (writes.get(cur)==false)
202                                         cur++;
203                                 else /* writes.get(cur)==true */
204                                         break;
205                                 if (cur>=ENTRIES_PER_PAGE) return nSpans;
206                         }
207                         /* now writes.get(cur)==true */
208                         span[nSpans].start=cur;
209                         /* skip over written space */
210                         while (true) { 
211                                 /* // 0-1 symmetry doesn't hold here, since writes2 may have 1's, but writes may still have some 0's...
212                                 if (writes2.store[cur/writes2_bits]==~(writes2_store_t)0) 
213                                         cur=roundUp(cur+1,writes2_bits);
214                                 else */
215                                 if (writes.store[cur/writes_bits]==~(writes_store_t)0)
216                                         cur=roundUp(cur+1,writes_bits); 
217                                 else if (writes.get(cur)==true)
218                                         cur++;
219                                 else /* writes.get(cur)==false */
220                                         break;
221                                 if (cur>=ENTRIES_PER_PAGE) {
222                                         span[nSpans++].end=ENTRIES_PER_PAGE; /* finish the last span */
223                                         return nSpans;
224                                 }
225                         }
226                         /* now writes.get(cur)==false */
227                         span[nSpans++].end=cur;
228                 }
229         }
230 };
231
232
233 //=======================================================
234 // Page-out policy
235
236 /**
237   class vmLRUPageReplacementPolicy
238   This class provides the functionality of least recently used page replacement policy.
239   It needs to be notified when a page is accessed using the pageAccessed() function and
240   a page can be selected for replacement using the selectPage() function.
241  
242   WARNING: a list is absolutely the wrong data structure for this class, 
243      because it makes *both* updating as well as searching for a page O(n),
244      where n is the number of pages.  A heap would be a better choice,
245      as both operations would then become O(lg(n))
246  */
247 class vmLRUReplacementPolicy
248 {
249 protected:
250     unsigned int nPages;            // number of pages
251     const page_ptr_t* pageTable; // actual data for pages (NULL means page is gone)
252     MSA_Page_State** pageState;  // state of each page
253     std::list<unsigned int> stackOfPages;
254     unsigned int lastPageAccessed;
255
256 public:
257     inline vmLRUReplacementPolicy(unsigned int nPages_,const page_ptr_t* pageTable_, MSA_Page_State ** pageState_)
258     : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
259
260     inline void pageAccessed(unsigned int page)
261     {
262         if(page != lastPageAccessed)
263         {
264             lastPageAccessed = page;
265
266             // delete this page from the stack and push it at the top
267                         std::list<unsigned int>::iterator i;
268             for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
269                 if(*i == page)
270                     i = stackOfPages.erase(i);
271
272             stackOfPages.push_back(page);
273         }
274     }
275
276     inline unsigned int selectPage()
277     {
278         if(stackOfPages.size() == 0)
279             return MSA_INVALID_PAGE_NO;
280
281         // find a non-empty unlocked page to swap, delete all empty pages from the stack
282                 std::list<unsigned int>::iterator i = stackOfPages.begin();
283         while(i != stackOfPages.end())
284         {
285             if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
286             else if(!pageState[*i]->canPageOut()) i++;
287             else break;
288         }
289
290         if(i != stackOfPages.end())
291             return *i;
292         else
293             return MSA_INVALID_PAGE_NO;
294     }
295 };
296
297 /**
298   class vmNRUPageReplacementPolicy
299   This class provides the functionality of not-recently-used page replacement policy.
300   It needs to be notified when a page is accessed using the pageAccessed() function and
301   a page can be selected for replacement using the selectPage() function.
302   
303   "not-recently-used" could replace any page that has not been used in the 
304   last K accesses; that is, it's a memory-limited version of LRU.
305   
306   pageAccessed is O(1).
307   selectPage best-case is O(K) (if we immediately find a doomed page); 
308              worst-case is O(K n) (if there are no doomed pages).
309  */
310 class vmNRUReplacementPolicy
311 {
312 protected:
313     unsigned int nPages;            // number of pages
314     const page_ptr_t* pageTable; // actual pages (NULL means page is gone)
315     MSA_Page_State** pageState;  // state of each page
316     enum {K=5}; // Number of distinct pages to remember
317     unsigned int last[K]; // pages that have been used recently
318     unsigned int Klast; // index into last array.
319     
320     unsigned int victim; // next page to throw out.
321     
322     bool recentlyUsed(unsigned int page) {
323         for (int k=0;k<K;k++) if (page==last[k]) return true;
324         return false;
325     }
326
327 public:
328     inline vmNRUReplacementPolicy(unsigned int nPages_,const page_ptr_t* pageTable_,MSA_Page_State ** pageState_)
329     : nPages(nPages_), pageTable(pageTable_), pageState(pageState_)
330     {
331         Klast=0;
332         for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
333         victim=0;
334     }
335
336     inline void pageAccessed(unsigned int page)
337     {
338         if (page!=last[Klast]) {
339             Klast++; if (Klast>=K) Klast=0;
340             last[Klast]=page;
341         }
342     }
343
344     inline unsigned int selectPage() {
345         unsigned int last_victim=victim;
346         do {
347             victim++; if (victim>=nPages) victim=0;
348             if (pageTable[victim]
349                 &&pageState[victim]->canPageOut()
350                 &&!recentlyUsed(victim)) {
351                 /* victim is an allocated, unlocked, non-recently-used page: page him out. */
352                 return victim;
353             }
354         } while (victim!=last_victim);
355         return MSA_INVALID_PAGE_NO;  /* nobody is pageable */
356     }
357 };
358
359 //================================================================
360
361 /**
362   UNUSED
363
364   Holds the untyped data for one MSA page.
365   This is the interface MSA_CacheGroup uses to access a cached page.
366   MSA_CacheGroup asks the templated code to create a MSA_Page
367   for each new page, then talks to the page directly.
368 */
369 class MSA_Page {
370 public:
371         virtual ~MSA_Page();
372
373         /**
374           Pack or unpack the data in this page.
375           Used to send and receive pages from the network
376           (or even disk, if somebody needs it.)
377         */
378         virtual void pup(PUP::er &p) =0;
379
380         /**
381           Merge this page's data into our own.
382           Only parts of this page may have been set.
383         */
384         virtual void merge(MSA_Page &otherPage) =0;
385 };
386
387 /**
388   Holds the typed data for one MSA page.
389   Implementation of puppedPage used by the templated code.
390 */
391 template <
392         class ENTRY, 
393         class MERGER=DefaultEntry<ENTRY>,
394         unsigned int ENTRIES_PER_PAGE=MSA_DEFAULT_ENTRIES_PER_PAGE
395 >
396 class MSA_PageT {
397     unsigned int n; // number of entries on this page.  Used to send page updates.
398         /** The contents of this page: array of ENTRIES_PER_PAGE items */
399         ENTRY *data;
400         /** Merger object */
401         MERGER m;
402   bool duplicate;
403
404 public:
405         MSA_PageT():duplicate(false), n(ENTRIES_PER_PAGE) {
406                 data=new ENTRY[ENTRIES_PER_PAGE];
407                 for (int i=0;i<ENTRIES_PER_PAGE;i++){
408                         data[i]=m.getIdentity();
409                 }
410                 //shouldn't n be set to ENTRIES_PER_PAGE
411                 n = ENTRIES_PER_PAGE;
412         }
413     // This constructor is used in PageArray to quickly convert an
414     // array of ENTRY into an MSA_PageT.  So we just make a copy of
415     // the pointer.  When it comes time to destruct the object, we
416     // need to ensure we do NOT delete the data but just discard the
417     // pointer.
418         MSA_PageT(ENTRY *d):data(d), duplicate(true), n(ENTRIES_PER_PAGE) {
419     }
420         MSA_PageT(ENTRY *d, unsigned int n_):data(d), duplicate(true), n(n_) {
421     }
422         virtual ~MSA_PageT() {
423                 if (!duplicate) {
424             delete [] data;
425         }
426         }
427
428         virtual void pup(PUP::er &p) {
429     p | n;
430                 /*this pup routine was broken, It didnt consider the case
431                         in which n > 0 and data = NULL. This is possible when  
432                         sending empty pages. It also doesnt seem to do any allocation
433                         for the data variable while unpacking which seems to be wrong
434                 */
435                 bool nulldata;
436                 if(!p.isUnpacking()){
437                                 nulldata = (data == NULL);
438                 }
439                 p | nulldata;
440                 if(nulldata){
441                                 data = NULL;
442                                 return;
443                 }
444                 if(p.isUnpacking()){
445                                 data = new ENTRY[n];
446                 }
447                 for (int i=0;i<n;i++){
448                         p|data[i];
449                 }       
450         }
451
452         virtual void merge(MSA_PageT<ENTRY, MERGER, ENTRIES_PER_PAGE> &otherPage) {
453                 for (int i=0;i<ENTRIES_PER_PAGE;i++)
454                         m.accumulate(data[i],otherPage.data[i]);
455         }
456
457         // These accessors might be used by the templated code.
458         inline ENTRY &operator[](int i) {return data[i];}
459         inline const ENTRY &operator[](int i) const {return data[i];}
460     inline ENTRY *getData() { return data; }
461 };
462
463 //=============================== Cache Manager =================================
464
465 template <class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE>
466 class MSA_CacheGroup : public Group
467 {
468     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
469
470 protected:
471     ENTRY_OPS_CLASS *entryOpsObject;
472     unsigned int numberOfWorkerThreads;      // number of worker threads across all processors for this shared array
473     // @@ migration?
474     unsigned int numberLocalWorkerThreads;   // number of worker threads on THIS processor for this shared array
475     unsigned int enrollDoneq;                 // has enroll() been done on this processor?
476     MSA_Listeners enrollWaiters;
477     MSA_Listeners syncWaiters;
478     std::set<int> enrolledPEs;                          // which PEs are involved?
479
480     unsigned int nPages;            ///< number of pages
481     ENTRY_TYPE** pageTable;          ///< the page table for this PE: stores actual data.
482     typedef MSA_Page_StateT<ENTRY_TYPE,ENTRIES_PER_PAGE> pageState_t;
483     pageState_t **pageStateStorage; ///< Housekeeping information for each allocated page.
484     
485     /// Return the state for this page, returning NULL if no state available.
486     inline pageState_t *stateN(unsigned int pageNo) {
487         return pageStateStorage[pageNo];
488     }
489     
490     /// Return the state for this page, allocating if needed.
491     pageState_t *state(unsigned int pageNo) {
492         pageState_t *ret=pageStateStorage[pageNo];
493         if (ret==NULL) {
494             ret=new pageState_t;
495             pageStateStorage[pageNo]=ret;
496         }
497         return ret;
498     }
499
500     typedef CProxy_MSA_PageArray<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_PageArray_t;
501     CProxy_PageArray_t pageArray;     // a proxy to the page array
502     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
503     CProxy_CacheGroup_t thisProxy; // a proxy to myself.
504
505     std::map<CthThread, MSA_Thread_Listener *> threadList;
506     /// Look up or create the listener for the current thread.
507     MSA_Thread_Listener *getListener(void) {
508         CthThread t=CthSelf();
509         MSA_Thread_Listener *l=threadList[t];
510                         if (l==NULL) {
511                 l=new MSA_Thread_Listener;
512                 threadList[t]=l;
513                         }
514                         return l;
515     }
516     /// Add our thread to this list and suspend
517     void addAndSuspend(MSA_Listeners &dest) {
518         MSA_Thread_Listener *l=getListener();
519                         dest.add(l);
520                         l->suspend();
521     }
522
523     std::stack<ENTRY_TYPE*> pagePool;     // a pool of unused pages
524     
525     typedef vmNRUReplacementPolicy vmPageReplacementPolicy;
526     vmPageReplacementPolicy* replacementPolicy;
527
528     // structure for the bounds of a single write
529     typedef struct { unsigned int begin; unsigned int end; } writebounds_t;
530
531     // a list of write bounds associated with a given page
532     typedef std::list<writebounds_t> writelist_t;
533
534     writelist_t** writes;           // the write lists for each page
535
536     unsigned int resident_pages;             // pages currently allocated
537     unsigned int max_resident_pages;         // max allowable pages to allocate
538     unsigned int nEntries;          // number of entries for this array
539     unsigned int syncAckCount;      // number of sync ack's we received
540     int outOfBufferInPrefetch;      // flag to indicate if the last prefetch ran out of buffers
541
542     int syncThreadCount;            // number of local threads that have issued Sync
543     
544     
545     // used during output
546     MSA_WriteSpan_t writeSpans[ENTRIES_PER_PAGE];
547     ENTRY_TYPE writeEntries[ENTRIES_PER_PAGE];
548
549     /*********************************************************************************/
550     /** these routines deal with managing the page queue **/
551
552     // increment the number of pages the thread is waiting on.
553     // also add thread to the page queue; then thread is woken when the page arrives.
554     inline void IncrementPagesWaiting(unsigned int page)
555     {
556         state(page)->readRequests.add(getListener());
557     }
558
559     inline void IncrementChangesWaiting(unsigned int page)
560     {
561         state(page)->writeRequests.add(getListener());
562     }
563
564 /************************* Page allocation and management **************************/
565     
566     /// Allocate a new page, removing old pages if we're over the limit.
567     /// Returns NULL if no buffer space is available
568     inline ENTRY_TYPE* tryBuffer(int async=0) // @@@
569     {
570         ENTRY_TYPE* nu = NULL;
571
572         // first try the page pool
573         if(!pagePool.empty())
574         {
575             nu = pagePool.top();
576             pagePool.pop();
577         }
578
579         // else try to allocate the buffer
580         if(nu == NULL && resident_pages < max_resident_pages)
581         {
582             nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
583             resident_pages++;
584         }
585
586         // else swap out one of the pages
587         if(nu == NULL)
588         {
589             int pageToSwap = replacementPolicy->selectPage();
590             if(pageToSwap != MSA_INVALID_PAGE_NO)
591             {
592                 CkAssert(pageTable[pageToSwap] != NULL);
593                 CkAssert(state(pageToSwap)->canPageOut() == true);
594                 
595                 relocatePage(pageToSwap, async);
596                 nu = pageTable[pageToSwap];
597                 pageTable[pageToSwap] = 0;
598                 delete pageStateStorage[pageToSwap];
599                 pageStateStorage[pageToSwap]=0;
600             }
601         }
602
603         // otherwise return NULL
604
605         return nu;
606     }
607     
608     /// Allocate storage for this page, if none has been allocated already.
609     ///  Update pageTable, and return the storage for the page.
610     inline ENTRY_TYPE* makePage(unsigned int page) // @@@
611     {
612         ENTRY_TYPE* nu=pageTable[page];
613         if (nu==0) {
614             nu=tryBuffer();
615             if (nu==0) CkAbort("MSA: No available space to create pages.\n");
616             pageTable[page]=nu;
617         }
618         return nu;
619     }
620     
621     /// Throw away this allocated page.
622     ///  Returns the page itself, for deletion or recycling.
623     ENTRY_TYPE* destroyPage(unsigned int page)
624     {
625         ENTRY_TYPE* nu=pageTable[page];
626         pageTable[page] = 0;
627         if (pageStateStorage[page]->canDelete()) {
628                 delete pageStateStorage[page];
629                 pageStateStorage[page]=0;
630         }
631         resident_pages--;
632         return nu;
633     }
634     
635     //MSA_CacheGroup::
636     void pageFault(unsigned int page, MSA_Page_Fault_t why)
637     {
638         // Write the page to the page table
639         state(page)->state = why;
640         if(why == Read_Fault)
641         { // Issue a remote request to fetch the new page
642             // If the page has not been requested already, then request it.
643             if (stateN(page)->readRequests.size()==0) {
644                 pageArray[page].GetPage(CkMyPe());
645                 //ckout << "Requesting page first time"<< endl;
646             } else {
647                 ;//ckout << "Requesting page next time.  Skipping request."<< endl;
648             }
649             MSA_Thread_Listener *l=getListener();
650             stateN(page)->readRequests.add(l);
651             l->suspend(); // Suspend until page arrives.
652         }
653         else {
654             // Build an empty buffer into which to create the new page
655             ENTRY_TYPE* nu = makePage(page);
656             writeIdentity(nu);
657         }
658     }
659     
660     /// Make sure this page is accessible, faulting the page in if needed.
661     // MSA_CacheGroup::
662     inline void accessPage(unsigned int page,MSA_Page_Fault_t access)
663     {
664         if (pageTable[page] == 0) {
665 //             ckout << "p" << CkMyPe() << ": Calling pageFault" << endl;
666             pageFault(page, access);
667         }
668 #ifndef CMK_OPTIMIZE
669         if (stateN(page)->state!=access) {
670             CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
671             CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
672         }
673 #endif
674         replacementPolicy->pageAccessed(page);
675     }
676
677     // MSA_CacheGroup::
678     // Fill this page with identity values, to prepare for writes or 
679     //  accumulates.
680     void writeIdentity(ENTRY_TYPE* pagePtr)
681     {
682         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
683             pagePtr[i] = entryOpsObject->getIdentity();
684     }
685     
686 /************* Page Flush and Writeback *********************/
687     bool shouldWriteback(unsigned int page) {
688         if (!pageTable[page]) return false;
689         return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
690     }
691     
692     inline void relocatePage(unsigned int page, int async)
693     {
694         //CkAssert(pageTable[page]);
695         if(shouldWriteback(page))
696         {
697             // the page to be swapped is a writeable page. So inform any
698             // changes this node has made to the page manager
699             sendChangesToPageArray(page, async);
700         }
701     }
702
703     inline void sendChangesToPageArray(const unsigned int page, const int async)
704     {
705         sendRLEChangesToPageArray(page);
706         
707         MSA_Thread_Listener *l=getListener();
708         state(page)->writeRequests.add(l);
709         if (!async)
710             l->suspend(); // Suspend until page is really gone.
711         // TODO: Are write acknowledgements really necessary ?
712     }
713
714     // Send the page data as a contiguous block.
715     //   Note that this is INCORRECT when writes to pages overlap!
716     inline void sendNonRLEChangesToPageArray(const unsigned int page) // @@@
717     {
718         pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
719     }
720     
721     // Send the page data as an RLE block.
722     // this function assumes that there are no overlapping writes in the list
723     inline void sendRLEChangesToPageArray(const unsigned int page)
724     {
725         ENTRY_TYPE *writePage=pageTable[page];
726         int nSpans=stateN(page)->writeSpans(writeSpans);
727         if (nSpans==1) 
728         { /* common case: can make very fast */
729             int nEntries=writeSpans[0].end-writeSpans[0].start;
730             if (entryOpsObject->pupEveryElement()) {
731                 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
732                     page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
733                     CkMyPe(),stateN(page)->state);
734             } else {
735                 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
736                     &writePage[writeSpans[0].start], nEntries,
737                     CkMyPe(),stateN(page)->state);
738             }
739         } 
740         else /* nSpans>1 */ 
741         { /* must copy separate spans into a single output buffer (luckily rare) */
742             int nEntries=0;
743             for (int s=0;s<nSpans;s++) {
744                 for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
745                     writeEntries[nEntries++]=writePage[i]; // calls assign
746             }
747             if (entryOpsObject->pupEveryElement()) {
748                 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
749                                                         page_t(writeEntries,nEntries),nEntries,
750                                                         CkMyPe(),stateN(page)->state);
751             } else {
752                 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
753                                                writeEntries,nEntries,
754                                                CkMyPe(),stateN(page)->state);
755             }
756         }
757     }
758
759 /*********************** Public Interface **********************/
760 public:
761     // 
762     //
763     // MSA_CacheGroup::
764     inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
765                       unsigned int max_bytes_, unsigned int nEntries_, unsigned int numberOfWorkerThreads_)
766         : nEntries(nEntries_), numberOfWorkerThreads(numberOfWorkerThreads_) 
767     {
768         numberLocalWorkerThreads = 0;  // populated after enroll
769         enrollDoneq = 0;  // populated after enroll
770
771         nPages = nPages_;
772         pageArray = pageArrayID;
773         thisProxy=thisgroup;
774         resident_pages = 0;
775         max_resident_pages = max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE);
776         
777         outOfBufferInPrefetch = 0;
778         entryOpsObject = new ENTRY_OPS_CLASS();
779
780         // initialize the page table
781         typedef ENTRY_TYPE* entry_type_ptr;
782         pageTable = new entry_type_ptr[nPages];
783         pageStateStorage = new pageState_t*[nPages];
784
785         // this is the number of sync ack's received till yet
786         syncAckCount = 0;
787         syncThreadCount = 0;
788
789         replacementPolicy = new vmPageReplacementPolicy(nPages,(page_ptr_t*) pageTable, (MSA_Page_State **)pageStateStorage);
790
791         for(unsigned int i = 0; i < nPages; i++)
792         {
793             pageTable[i] = 0;
794             pageStateStorage[i] = 0;
795         }
796                                 MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
797     }
798
799     // MSA_CacheGroup::
800     inline ~MSA_CacheGroup()
801     {
802         FreeMem();
803         
804         delete[] pageTable; pageTable = 0;
805         delete[] pageStateStorage; pageStateStorage = 0;
806     }
807
808     /* To change the accumulate function TBD @@ race conditions */
809     inline void changeEntryOpsObject(ENTRY_OPS_CLASS *e) {
810         entryOpsObject = e;
811         pageArray.changeEntryOpsObject(e);
812     }
813
814     // MSA_CacheGroup::
815     inline const ENTRY_TYPE* readablePage(unsigned int page)
816     {
817         accessPage(page,Read_Fault);
818         
819         return pageTable[page];
820     }
821
822     // MSA_CacheGroup::
823     //
824     // known local page
825     inline const void* readablePage2(unsigned int page)
826     {
827         return pageTable[page];
828     }
829
830     // MSA_CacheGroup::
831     // Obtains a writable copy of the page.
832     inline ENTRY_TYPE* writeablePage(unsigned int page, unsigned int offset)
833     {
834         accessPage(page,Write_Fault);
835
836         // NOTE: Since we assume write once semantics, i.e. between two calls to sync,
837         // either there can be no write to a location or a single write to a location,
838         // a readable page will suffice as a writeable page too, because no one else
839         // is going to write to this location. In reality, two locations on the *same*
840         // page can be written by two different threads, in which case we will need
841         // to keep track of which parts of the page have been written, hence:
842         stateN(page)->write(offset);
843 //     ckout << "write:" << page*ENTRIES_PER_PAGE+offset << endl;
844         
845         return pageTable[page];
846     }
847
848     // MSA_CacheGroup::
849     inline ENTRY_TYPE &accumulate(unsigned int page, unsigned int offset)
850     {
851         accessPage(page,Accumulate_Fault);
852         stateN(page)->write(offset);
853         return pageTable[page][offset];
854     }
855
856     /// A requested page has arrived from the network.
857     ///  nEntriesInPage_ = num entries being sent (0 for empty page, num entries otherwise)
858     inline void ReceivePageWithPUP(unsigned int page, page_t &pageData, int size)
859     {
860         ReceivePage(page, pageData.getData(), size);
861     }
862
863     inline void ReceivePage(unsigned int page, ENTRY_TYPE* pageData, int size)
864     {
865         CkAssert(0==size || ENTRIES_PER_PAGE == size);
866         // the page we requested has been received
867         ENTRY_TYPE *nu=makePage(page);
868         if(size!=0)
869         {
870             for(unsigned int i = 0; i < size; i++)
871                 nu[i] = pageData[i]; // @@@, calls assignment operator
872         }
873         else /* isEmpty */
874         {
875             // the page we requested for is empty, so we can just initialize it.
876             writeIdentity(nu);
877         }
878         
879         state(page)->readRequests.signal(page);
880     }
881
882     // This EP is invoked during sync to acknowledge that a dirty page
883     // has been received and written back to the page owner.  We keep track
884     // of the number of ack's yet to arrive in nChangesWaiting.  Once
885     // all the dirty pages have been ack'd, we awaken the thread that
886     // flushed the page.
887     //
888     // It's not clear this is useful very often...
889     //
890     // MSA_CacheGroup::
891     inline void AckPage(unsigned int page)
892     {
893         state(page)->writeRequests.signal(page);
894     }
895
896     // MSA_CacheGroup::
897     // synchronize all the pages and also clear up the cache
898     inline void SyncReq(int single)
899     {
900           MSADEBPRINT(printf("SyncReq single %d\n",single););
901           if(single)
902         {
903             /*ask all the caches to send their updates to the page array, but we don't need to empty the caches on the other PEs*/
904             SingleSync();
905             EmptyCache();
906
907             getListener()->suspend();
908         }
909         else{
910             Sync();
911                                 }
912     }
913
914     // MSA_CacheGroup::
915     inline void FlushCache()
916     {
917         // flush the local cache
918         // for each writeable page, send that page to the array element
919         for(unsigned int i = 0; i < nPages; i++)
920         {
921             if(shouldWriteback(i)) {
922                 //ckout << "p" << CkMyPe() << "FlushCache: sending page " << i << endl;
923                 sendChangesToPageArray(i, 1);
924             }
925         }
926     }
927
928     // MSA_CacheGroup::
929     void EmptyCache()
930     {
931         /* just makes all the pages empty, assuming that the data in those pages has been flushed to the owners */
932         for(unsigned int i = 0; i < nPages; i++)
933         {
934             if(pageTable[i]) pagePool.push(destroyPage(i));
935         }
936     }
937
938 /************************ Enroll ********************/
939     /// Enroll phase 1: called by users.
940     // MSA_CacheGroup::
941     inline void enroll(unsigned int num_workers)
942     {
943         CkAssert(num_workers == numberOfWorkerThreads); // just to verify
944         CkAssert(enrollDoneq == 0);
945         numberLocalWorkerThreads++;
946         // @@ how to ensure that enroll is called only once?
947
948         //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
949         thisProxy[0].enrollAck(CkMyPe());
950         //ckout << "[" << CkMyPe() << "] suspening thread in Sync() " << endl;
951         addAndSuspend(enrollWaiters);
952         //ckout << "[" << CkMyPe() << "] rsuming thread in Sync()" << endl;
953
954         CkAssert(enrollDoneq == 1);
955         return;
956     }
957
958     /// Enroll phase 2: called on PE 0 from everywhere
959     inline void enrollAck(int originator)
960     {
961         CkAssert(CkMyPe() == 0);  // enrollAck is only called on PE 0
962         CkAssert(enrollDoneq == 0);  // prevent multiple enroll operations
963         
964         syncAckCount++;
965         enrolledPEs.insert(originator);
966         //ckout << "[" << CkMyPe() << "] SyncAckcount = " << syncAckCount << endl;
967         if(syncAckCount == numberOfWorkerThreads) {
968 //             ckout << "[" << CkMyPe() << "]" << "Enroll operation is almost done" << endl;
969             syncAckCount = 0;
970             enrollDoneq = 1;
971             // What if fewer worker threads than pe's ?  Handled in
972             // enrollDone.
973             thisProxy.enrollDone();
974         }
975     }
976
977     /// Enroll phase 3: called everywhere by PE 0
978     inline void enrollDone()
979     {
980 //         ckout << "[" << CkMyPe() << "] enrollDone.  Waking threads."
981 //               <<  " numberOfWorkerThreads=" << numberOfWorkerThreads
982 //               <<  " local=" << numberLocalWorkerThreads << endl;
983         enrollDoneq = 1;
984         enrollWaiters.signal(0);
985     }
986
987 /******************************** Sync & writeback ***********************/
988     // MSA_CacheGroup::
989     inline void SingleSync()
990     {
991         /* a single thread issued a sync call with all = 1. The first thing to do is to flush the local cache */
992         FlushCache();
993     }
994
995     // MSA_CacheGroup::
996     inline void Sync()
997     {
998         syncThreadCount++;
999         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
1000         //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
1001                 //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
1002
1003         // First, all threads on this processor need to reach the sync
1004         // call; only then can we proceed with merging the data.  Only
1005         // the last thread on this processor needs to do the FlushCache,
1006         // etc.  Others just suspend until the sync is over.
1007                                 MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
1008         if(syncThreadCount < numberLocalWorkerThreads)
1009         {
1010                   MSADEBPRINT(printf("Sync addAndSuspend \n"););
1011             addAndSuspend(syncWaiters);
1012             return;
1013         }
1014                 
1015         //ckout << "[" << CkMyPe() << "] Sync started" << endl;
1016
1017         // flush the cache asynchronously and also empty it
1018         FlushCache();
1019         // idea: instead of invalidating the pages, switch it to read
1020         // mode. That will not work, since the page may have also been
1021         // modified by another thread.
1022         EmptyCache();
1023
1024         // Now, we suspend too (if we had at least one dirty page).
1025         // We will be awoken when all our dirty pages have been
1026         // written and acknowledged.
1027                 MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
1028         getListener()->suspend();
1029                 MSADEBPRINT(printf("Sync awakening after suspend\n"););
1030
1031         // So far, the sync has been asynchronous, i.e. PE0 might be ahead
1032         // of PE1.  Next we basically do a barrier to ensure that all PE's
1033         // are synchronized.
1034
1035         // at this point, the sync's across the group should
1036         // synchronize among themselves by each one sending
1037         // a sync acknowledge message to PE 0. (this is like
1038         // a reduction over a group)
1039         if(CkMyPe() != 0)
1040         {
1041             thisProxy[0].SyncAck();
1042         }
1043         else /* I *am* PE 0 */
1044         {
1045             SyncAck();
1046         }
1047                                 MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
1048         /* Wait until sync is reflected from PE 0 */
1049         addAndSuspend(syncWaiters);
1050                                 
1051                 MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
1052                 //ckout << "[" << CkMyPe() << "] Sync finished" << endl;        
1053     }
1054
1055     inline unsigned int getNumEntries() { return nEntries; }
1056     inline CProxy_PageArray_t getArray() { return pageArray; }
1057
1058     // TODO: Can this SyncAck and other simple Acks be made efficient?
1059     inline void SyncAck()
1060     {
1061         CkAssert(CkMyPe() == 0);  // SyncAck is only called on PE 0
1062         syncAckCount++;
1063         // DONE @@ what if fewer worker threads than pe's ?
1064         // @@ what if fewer worker threads than pe's and >1 threads on 1 pe?
1065         //if(syncAckCount == mymin(numberOfWorkerThreads, CkNumPes())){
1066                 if (syncAckCount == enrolledPEs.size()) {
1067                         MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",nEntries,nPages););
1068                         pageArray.Sync();
1069                 }               
1070     }
1071
1072     inline void SyncDone()
1073     {
1074         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1075         //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
1076         /* Reset for next sync */
1077         syncThreadCount = 0;
1078         syncAckCount = 0;
1079                                 MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
1080         syncWaiters.signal(0);
1081     }
1082
1083     inline void FreeMem()
1084     {
1085         for(unsigned int i = 0; i < nPages; i++)
1086         {
1087             if(pageTable[i]) delete [] destroyPage(i);
1088         }
1089
1090         while(!pagePool.empty())
1091         {
1092             delete [] pagePool.top();  // @@@
1093             pagePool.pop();
1094         }
1095         
1096         resident_pages=0;
1097     }
1098
1099                 /** 
1100                         Deregister a client. Decremenet the number of local threads. If total number of local threads 
1101                         hits 0 FreeMem()
1102                 */
1103                 inline void unroll(){
1104         numberLocalWorkerThreads--;
1105                                 if(numberLocalWorkerThreads == 0){
1106                                         FreeMem();
1107                                 }
1108                 }
1109
1110     /**
1111      * Issue a prefetch request for the given range of pages. These pages will
1112      * be locked into the cache, so that they will not be swapped out.
1113      */
1114     inline void Prefetch(unsigned int pageStart, unsigned int pageEnd)
1115     {
1116         /* prefetching is feasible only if we we did not encounter an out
1117          * of buffer condition in the previous prefetch call
1118          */
1119         if(!outOfBufferInPrefetch)
1120         {
1121             //ckout << "prefetching pages " << pageStart << " through " << pageEnd << endl;
1122             for(unsigned int p = pageStart; p <= pageEnd; p++)
1123             {
1124                 if(NULL == pageTable[p])
1125                 {
1126
1127                     /* relocate the buffer asynchronously */
1128                     ENTRY_TYPE* nu = tryBuffer(1);
1129                     if(NULL == nu)
1130                     {
1131                         /* signal that sufficient buffer space is not available */
1132                         outOfBufferInPrefetch = 1;
1133                         break;
1134                     }
1135
1136                     pageTable[p] = nu;
1137                     state(p)->state = Read_Fault;
1138
1139                     pageArray[p].GetPage(CkMyPe());
1140                     IncrementPagesWaiting(p);
1141                     //ckout << "Prefetch page" << p << ", pages waiting = " << nPagesWaiting << endl;
1142                     /* don't suspend the thread */
1143                 }
1144
1145                 /* mark the page as being locked */
1146                 state(p)->locked = true;
1147             }
1148         }
1149     }
1150
1151     /**
1152      * Wait for all the prefetch pages to be fetched into the cache.
1153      * Returns: 0 if prefetch successful, 1 if not
1154      */
1155     inline int WaitAll(void)
1156     {
1157         if(outOfBufferInPrefetch)
1158         {
1159             // we encountered out of buffer in the previous prefetch call, return error
1160             outOfBufferInPrefetch = 0;
1161             getListener()->suspend();
1162             UnlockPages();
1163             return 1;
1164         }
1165         else
1166         {
1167             // prefetch requests have been successfully issued already, so suspend the
1168             // thread and wait for completion
1169             outOfBufferInPrefetch = 0;
1170             getListener()->suspend();
1171             return 0;
1172         }
1173     }
1174     
1175     inline void UnlockPage(unsigned int page) {
1176         pageState_t *s=stateN(page);
1177         if(s && s->locked) {
1178             replacementPolicy->pageAccessed(page);
1179             s->locked = false;
1180         }
1181     }
1182
1183     /**
1184      * Unlock all the pages locked in the cache
1185      */
1186     inline void UnlockPages()
1187     {
1188         // add all the locked pages to page replacement policy
1189         for(unsigned int page = 0; page < nPages; page++)
1190             UnlockPage(page);
1191     }
1192
1193     /**
1194      * Unlock the given pages: [startPage ... endPage]
1195      *  Note that the range is inclusive.
1196      */
1197     inline void UnlockPages(unsigned int startPage, unsigned int endPage)
1198     {
1199         for(unsigned int page = startPage; page <= endPage; page++)
1200             UnlockPage(page);
1201     }
1202
1203     /// Debugging routine
1204     inline void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset)
1205     {
1206         CkAssert( pageNum < nPages );
1207         CkAssert( offset < ENTRIES_PER_PAGE );
1208
1209         //ckout << "p" << CkMyPe() << "ID" << ID;
1210 //         if (pageTable[pageNum] == 0)
1211 //             ckout << "emitBufferValue: page " << pageNum << " not available in local cache." << endl;
1212 //         else
1213 //             ckout << "emitBufferValue: [" << pageNum << "," << offset << "] = " << pageTable[pageNum][offset] << endl;
1214     }
1215 };
1216
1217 // each element of this array is responsible for managing
1218 // the information about a single page. It is in effect the
1219 // "owner" as well as the "manager" for that page.
1220 //
1221 template<class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE> 
1222 class MSA_PageArray : public ArrayElement1D
1223 {
1224     typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
1225     typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
1226     
1227 protected:
1228     ENTRY_TYPE *epage;
1229     ENTRY_OPS_CLASS entryOpsObject;
1230     CProxy_CacheGroup_t cache;
1231
1232     unsigned int pageNo() { return thisIndex; }
1233
1234     inline void allocatePage(MSA_Page_Fault_t access) // @@@
1235     {
1236         if(epage == NULL)
1237         {
1238             epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
1239             if (access==Accumulate_Fault)
1240                writeIdentity();
1241         }
1242     }
1243
1244     // begin and end are indexes into the page.
1245     inline void set(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1246     {
1247         //ckout << "set: " << begin << "," << end << endl;
1248         for(unsigned int i = 0; i < (end - begin); i++) {
1249             epage[begin + i] = buffer[i]; // @@@, calls assignment operator
1250             //ckout << "set val[" << begin+i << "]=" << buffer[i] << endl;
1251         }
1252     }
1253
1254     // MSA_PageArray::
1255     inline void combine(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
1256     {
1257         ENTRY_TYPE* pagePtr = epage + begin;
1258         for(unsigned int i = 0; i < (end - begin); i++)
1259             entryOpsObject.accumulate(pagePtr[i], buffer[i]);
1260     }
1261
1262     // MSA_PageArray::
1263     inline void writeIdentity()
1264     {
1265         for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
1266             epage[i] = entryOpsObject.getIdentity();
1267     }
1268
1269 public:
1270     inline MSA_PageArray() : epage(NULL) { }
1271     inline MSA_PageArray(CkMigrateMessage* m) { delete m; }
1272     
1273     void setCacheProxy(CProxy_CacheGroup_t &cache_)
1274     {
1275        cache=cache_;
1276     }
1277     
1278     virtual void pup(PUP::er& p)
1279     {
1280         ArrayElement1D::pup(p);
1281         int epage_present=(epage!=0);
1282         p|epage_present;
1283         if (epage_present) {
1284           if(p.isUnpacking())
1285             allocatePage(Write_Fault);
1286           for (int i=0;i<ENTRIES_PER_PAGE;i++)
1287             p|epage[i];
1288         }
1289     }
1290     
1291     inline ~MSA_PageArray()
1292     {
1293         if(epage) delete [] epage;
1294     }
1295
1296     /// Request our page.
1297     ///   pe = to which to send page
1298     inline void GetPage(int pe)
1299     {
1300         if(epage == NULL) {
1301             // send empty page
1302             if (entryOpsObject.pupEveryElement())
1303                 cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
1304             else
1305                 cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
1306         } else {
1307             // send page with data
1308             if (entryOpsObject.pupEveryElement())
1309                 cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
1310             else
1311                 cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);  // send page with data                
1312         }
1313     }
1314
1315     /// Receive a non-runlength encoded page from the network:
1316     // @@ TBD: ERROR: This does not work for  varsize pages.
1317     inline void PAReceivePage(ENTRY_TYPE *pageData,
1318                             int pe, MSA_Page_Fault_t pageState)
1319     {
1320         allocatePage(pageState);
1321
1322         if(pageState == Write_Fault)
1323             set(pageData, 0, ENTRIES_PER_PAGE);
1324         else
1325             combine(pageData, 0, ENTRIES_PER_PAGE);
1326         
1327         // send the acknowledgement to the sender that we received the page
1328         //ckout << "Sending Ack to PE " << pe << endl;
1329         cache[pe].AckPage(thisIndex);
1330     }
1331
1332     /// Receive a runlength encoded page from the network:
1333     inline void PAReceiveRLEPageWithPup(
1334         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1335         MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> &entries, unsigned int nEntries, 
1336         int pe, MSA_Page_Fault_t pageState)
1337     {
1338         PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
1339     }
1340
1341
1342     inline void PAReceiveRLEPage(
1343         const MSA_WriteSpan_t *spans, unsigned int nSpans, 
1344         const ENTRY_TYPE *entries, unsigned int nEntries, 
1345         int pe, MSA_Page_Fault_t pageState)
1346     {
1347         allocatePage(pageState);
1348         
1349         //ckout << "p" << CkMyPe() << "ReceiveRLEPage nSpans=" << nSpans << " nEntries=" << nEntries << endl;
1350         int e=0; /* consumed entries */
1351         for (int s=0;s<nSpans;s++) {
1352             if(pageState == Write_Fault)
1353                 set(&entries[e], spans[s].start,spans[s].end);
1354             else /* Accumulate_Fault */
1355                 combine(&entries[e], spans[s].start,spans[s].end);
1356             e+=spans[s].end-spans[s].start;
1357         } 
1358
1359         // send the acknowledgement to the sender that we received the page
1360         //ckout << "Sending AckRLE to PE " << pe << endl;
1361         cache[pe].AckPage(thisIndex);
1362     }
1363
1364     // MSA_PageArray::
1365     inline void Sync()
1366     {
1367                                 MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n");); 
1368         contribute(0, NULL, CkReduction::concat);
1369     }
1370
1371     inline void emit(int ID, int index)
1372     {
1373           //ckout << "p" << CkMyPe() << "ID" << ID;
1374 //         if(epage == NULL)
1375 //             ckout << "emit: epage is NULL" << endl;
1376 //         else
1377 //             ckout << "emit: " << epage[index] << endl;
1378     }
1379 };
1380
1381 #define CK_TEMPLATES_ONLY
1382 #include "msa.def.h"
1383 #undef CK_TEMPLATES_ONLY
1384
1385 #endif