Reformat so that I don't claw out my eyes
authorPhil Miller <mille121@illinois.edu>
Fri, 6 Feb 2009 21:53:55 +0000 (15:53 -0600)
committerPhil Miller <mille121@illinois.edu>
Thu, 10 Dec 2009 22:22:35 +0000 (16:22 -0600)
- Format msa-DistPageMgr.h in stroustrup style, and tell emacs to
  maintain that.
- Fix a spelling error and some absurdly long lines. 80 is not enough,
  but the 120 I can fit across my smaller monitor ought to be.

src/libs/ck-libs/multiphaseSharedArrays/msa-DistPageMgr.h

index c2fc129b503dcf0c0bf42655d09029e4467349ac..13a0fa2f312975360e4ec6623699380e5d070522 100644 (file)
@@ -1,4 +1,4 @@
-// emacs mode line -*- mode: c++; tab-width: 4 -*-
+// emacs mode line -*- mode: c++; tab-width: 4 ; c-basic-style: stroustrup -*-
 
 #ifndef MSA_DISTPAGEMGR_H
 #define MSA_DISTPAGEMGR_H
@@ -24,7 +24,7 @@ struct MSA_WriteSpan_t {
 
 template <class ENTRY, class MERGER,
           unsigned int ENTRIES_PER_PAGE>
-         class MSA_PageT;
+class MSA_PageT;
 #include "msa.decl.h"
 
 //=======================================================
@@ -115,10 +115,10 @@ template <class ENTRY, unsigned int ENTRIES_PER_PAGE>
 class MSA_Page_StateT
 {
        /** Write tracking:
-          Somehow, we have to identify the entries in a page 
-        that have been written to.  Our method for doing this is
-        a bitvector: 0's indicate the entry hasn't been written; 
-        1's indicate the entry has been written.
+               Somehow, we have to identify the entries in a page 
+               that have been written to.  Our method for doing this is
+               a bitvector: 0's indicate the entry hasn't been written; 
+               1's indicate the entry has been written.
        */
        typedef fixedlength_bitvector<ENTRIES_PER_PAGE> writes_t;
        writes_t writes;
@@ -157,9 +157,9 @@ public:
        }
        
        MSA_Page_StateT()
-         : writes(), writes2(), state(Uninit_State), locked(false),
-               readRequests(), writeRequests()
-    { }
+               : writes(), writes2(), state(Uninit_State), locked(false),
+                 readRequests(), writeRequests()
+               { }
 
        /// Write entry i of this page.
        void write(unsigned int i) {
@@ -170,12 +170,12 @@ public:
        /// Clear the write list for this page.
        void writeClear(void) {
                for (int i2=0;i2<writes2_t::len;i2++)
-                 if (writes2.store[i2]) { /* some bits set: clear them all */
-                       int o=i2*writes2_t::store_bits;
-                       for (int i=0;i<writes_t::len;i++) 
-                         writes.store[o+i]=0;
-                       writes2.store[i2]=0;
-                 }
+                       if (writes2.store[i2]) { /* some bits set: clear them all */
+                               int o=i2*writes2_t::store_bits;
+                               for (int i=0;i<writes_t::len;i++) 
+                                       writes.store[o+i]=0;
+                               writes2.store[i2]=0;
+                       }
        }
        
        /// Return the nearest multiple of m >= v.
@@ -207,9 +207,9 @@ public:
                        /* skip over written space */
                        while (true) { 
                                /* // 0-1 symmetry doesn't hold here, since writes2 may have 1's, but writes may still have some 0's...
-                               if (writes2.store[cur/writes2_bits]==~(writes2_store_t)0) 
-                                       cur=roundUp(cur+1,writes2_bits);
-                               else */
+                                  if (writes2.store[cur/writes2_bits]==~(writes2_store_t)0) 
+                                  cur=roundUp(cur+1,writes2_bits);
+                                  else */
                                if (writes.store[cur/writes_bits]==~(writes_store_t)0)
                                        cur=roundUp(cur+1,writes_bits); 
                                else if (writes.test(cur)==true)
@@ -240,97 +240,99 @@ template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
 class MSA_PageReplacementPolicy
 {
 public:
-  /// Note that a page was just accessed
-  virtual void pageAccessed(unsigned int page) = 0;
+       /// Note that a page was just accessed
+       virtual void pageAccessed(unsigned int page) = 0;
 
-  /// Ask for the index of a page to discard
-  virtual unsigned int selectPage() = 0;
+       /// Ask for the index of a page to discard
+       virtual unsigned int selectPage() = 0;
 };
 
 /**
-  class vmLRUPageReplacementPolicy
-  This class provides the functionality of least recently used page replacement policy.
-  It needs to be notified when a page is accessed using the pageAccessed() function and
-  a page can be selected for replacement using the selectPage() function.
+   class vmLRUPageReplacementPolicy
+   This class provides the functionality of least recently used page replacement policy.
+   It needs to be notified when a page is accessed using the pageAccessed() function and
+   a page can be selected for replacement using the selectPage() function.
  
-  WARNING: a list is absolutely the wrong data structure for this class, 
-     because it makes *both* updating as well as searching for a page O(n),
-     where n is the number of pages.  A heap would be a better choice,
-     as both operations would then become O(lg(n))
- */
+   WARNING: a list is absolutely the wrong data structure for this class, 
+   because it makes *both* updating as well as searching for a page O(n),
+   where n is the number of pages.  A heap would be a better choice,
+   as both operations would then become O(lg(n))
+*/
 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
 class vmLRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
 {
 protected:
     unsigned int nPages;            // number of pages
-  const std::vector<ENTRY_TYPE *> &pageTable; // actual data for pages (NULL means page is gone)
-  typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
-  const std::vector<pageState_t *> &pageState;  // state of each page
+       const std::vector<ENTRY_TYPE *> &pageTable; // actual data for pages (NULL means page is gone)
+       typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
+       const std::vector<pageState_t *> &pageState;  // state of each page
     std::list<unsigned int> stackOfPages;
     unsigned int lastPageAccessed;
 
 public:
-  inline vmLRUReplacementPolicy(unsigned int nPages_, const std::vector<ENTRY_TYPE *> &pageTable_, const std::vector<pageState_t *> &pageState_)
-    : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
+       inline vmLRUReplacementPolicy(unsigned int nPages_, 
+                                                                 const std::vector<ENTRY_TYPE *> &pageTable_, 
+                                                                 const std::vector<pageState_t *> &pageState_)
+               : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
 
     inline void pageAccessed(unsigned int page)
-    {
-        if(page != lastPageAccessed)
-        {
-            lastPageAccessed = page;
-
-            // delete this page from the stack and push it at the top
-                       std::list<unsigned int>::iterator i;
-            for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
-                if(*i == page)
-                    i = stackOfPages.erase(i);
-
-            stackOfPages.push_back(page);
-        }
-    }
+               {
+                       if(page != lastPageAccessed)
+                       {
+                               lastPageAccessed = page;
+
+                               // delete this page from the stack and push it at the top
+                               std::list<unsigned int>::iterator i;
+                               for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
+                                       if(*i == page)
+                                               i = stackOfPages.erase(i);
+
+                               stackOfPages.push_back(page);
+                       }
+               }
 
     inline unsigned int selectPage()
-    {
-        if(stackOfPages.size() == 0)
-            return MSA_INVALID_PAGE_NO;
-
-        // find a non-empty unlocked page to swap, delete all empty pages from the stack
-               std::list<unsigned int>::iterator i = stackOfPages.begin();
-        while(i != stackOfPages.end())
-        {
-            if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
-            else if(!pageState[*i]->canPageOut()) i++;
-            else break;
-        }
+               {
+                       if(stackOfPages.size() == 0)
+                               return MSA_INVALID_PAGE_NO;
+
+                       // find a non-empty unlocked page to swap, delete all empty pages from the stack
+                       std::list<unsigned int>::iterator i = stackOfPages.begin();
+                       while(i != stackOfPages.end())
+                       {
+                               if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
+                               else if(!pageState[*i]->canPageOut()) i++;
+                               else break;
+                       }
 
-        if(i != stackOfPages.end())
-            return *i;
-        else
-            return MSA_INVALID_PAGE_NO;
-    }
+                       if(i != stackOfPages.end())
+                               return *i;
+                       else
+                               return MSA_INVALID_PAGE_NO;
+               }
 };
 
 /**
-  class vmNRUPageReplacementPolicy
-  This class provides the functionality of not-recently-used page replacement policy.
-  It needs to be notified when a page is accessed using the pageAccessed() function and
-  a page can be selected for replacement using the selectPage() function.
+   class vmNRUPageReplacementPolicy
+   This class provides the functionality of not-recently-used page replacement policy.
+   It needs to be notified when a page is accessed using the pageAccessed() function and
+   a page can be selected for replacement using the selectPage() function.
   
-  "not-recently-used" could replace any page that has not been used in the 
-  last K accesses; that is, it's a memory-limited version of LRU.
+   "not-recently-used" could replace any page that has not been used in the 
+   last K accesses; that is, it's a memory-limited version of LRU.
   
-  pageAccessed is O(1).
-  selectPage best-case is O(K) (if we immediately find a doomed page); 
-             worst-case is O(K n) (if there are no doomed pages).
- */
+   pageAccessed is O(1).
+   selectPage best-case is O(K) (if we immediately find a doomed page); 
+   worst-case is O(K n) (if there are no doomed pages).
+*/
 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
 class vmNRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
 {
 protected:
-  unsigned int nPages;            // number of pages
-  const std::vector<ENTRY_TYPE *> &pageTable; // actual pages (NULL means page is gone)
-  typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
-  const std::vector<pageState_t *> &pageState;  // state of each page
+       unsigned int nPages;            // number of pages
+       const std::vector<ENTRY_TYPE *> &pageTable; // actual pages (NULL means page is gone)
+       typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
+       const std::vector<pageState_t *> &pageState;  // state of each page
     enum {K=5}; // Number of distinct pages to remember
     unsigned int last[K]; // pages that have been used recently
     unsigned int Klast; // index into last array.
@@ -343,19 +345,21 @@ protected:
     }
 
 public:
-  inline vmNRUReplacementPolicy(unsigned int nPages_, const std::vector<ENTRY_TYPE *> &pageTable_, const std::vector<pageState_t *> &pageState_)
-    : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), Klast(0), victim(0)
-    {
-        for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
-    }
+       inline vmNRUReplacementPolicy(unsigned int nPages_, 
+                                                                 const std::vector<ENTRY_TYPE *> &pageTable_, 
+                                                                 const std::vector<pageState_t *> &pageState_)
+               : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), Klast(0), victim(0)
+               {
+                       for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
+               }
 
     inline void pageAccessed(unsigned int page)
-    {
-        if (page!=last[Klast]) {
-            Klast++; if (Klast>=K) Klast=0;
-            last[Klast]=page;
-        }
-    }
+               {
+                       if (page!=last[Klast]) {
+                               Klast++; if (Klast>=K) Klast=0;
+                               last[Klast]=page;
+                       }
+               }
 
     inline unsigned int selectPage() {
         unsigned int last_victim=victim;
@@ -375,57 +379,57 @@ public:
 //================================================================
 
 /**
-  UNUSED
+   UNUSED
 
-  Holds the untyped data for one MSA page.
-  This is the interface MSA_CacheGroup uses to access a cached page.
-  MSA_CacheGroup asks the templated code to create a MSA_Page
-  for each new page, then talks to the page directly.
+   Holds the untyped data for one MSA page.
+   This is the interface MSA_CacheGroup uses to access a cached page.
+   MSA_CacheGroup asks the templated code to create a MSA_Page
+   for each new page, then talks to the page directly.
 */
 class MSA_Page {
 public:
        virtual ~MSA_Page();
 
        /**
-         Pack or unpack the data in this page.
-         Used to send and receive pages from the network
-         (or even disk, if somebody needs it.)
+          Pack or unpack the data in this page.
+          Used to send and receive pages from the network
+          (or even disk, if somebody needs it.)
        */
        virtual void pup(PUP::er &p) =0;
 
        /**
-         Merge this page's data into our own.
-         Only parts of this page may have been set.
+          Merge this page's data into our own.
+          Only parts of this page may have been set.
        */
        virtual void merge(MSA_Page &otherPage) =0;
 };
 
 /**
-  Holds the typed data for one MSA page.
-  Implementation of puppedPage used by the templated code.
+   Holds the typed data for one MSA page.
+   Implementation of puppedPage used by the templated code.
 */
 template <
        class ENTRY, 
        class MERGER=DefaultEntry<ENTRY>,
        unsigned int ENTRIES_PER_PAGE=MSA_DEFAULT_ENTRIES_PER_PAGE
->
+       >
 class MSA_PageT {
     unsigned int n; // number of entries on this page.  Used to send page updates.
        /** The contents of this page: array of ENTRIES_PER_PAGE items */
        ENTRY *data;
        /** Merger object */
        MERGER m;
-  bool duplicate;
+       bool duplicate;
 
 public:
 
-  MSA_PageT()
-  : n(ENTRIES_PER_PAGE), data(new ENTRY[ENTRIES_PER_PAGE]), duplicate(false)
-  {
-       for (int i=0;i<ENTRIES_PER_PAGE;i++){
-         data[i]=m.getIdentity();
-       }
-  }
+       MSA_PageT()
+               : n(ENTRIES_PER_PAGE), data(new ENTRY[ENTRIES_PER_PAGE]), duplicate(false)
+               {
+                       for (int i=0;i<ENTRIES_PER_PAGE;i++){
+                               data[i]=m.getIdentity();
+                       }
+               }
 
     // This constructor is used in PageArray to quickly convert an
     // array of ENTRY into an MSA_PageT.  So we just make a copy of
@@ -443,23 +447,23 @@ public:
        }
 
        virtual void pup(PUP::er &p) {
-    p | n;
+               p | n;
                /*this pup routine was broken, It didnt consider the case
-                       in which n > 0 and data = NULL. This is possible when  
-                       sending empty pages. It also doesnt seem to do any allocation
-                       for the data variable while unpacking which seems to be wrong
+                 in which n > 0 and data = NULL. This is possible when  
+                 sending empty pages. It also doesnt seem to do any allocation
+                 for the data variable while unpacking which seems to be wrong
                */
                bool nulldata;
                if(!p.isUnpacking()){
-                               nulldata = (data == NULL);
+                       nulldata = (data == NULL);
                }
                p | nulldata;
                if(nulldata){
-                               data = NULL;
-                               return;
+                       data = NULL;
+                       return;
                }
                if(p.isUnpacking()){
-                               data = new ENTRY[n];
+                       data = new ENTRY[n];
                }
                for (int i=0;i<n;i++){
                        p|data[i];
@@ -495,13 +499,13 @@ protected:
     std::set<int> enrolledPEs;                         // which PEs are involved?
 
     unsigned int nPages;            ///< number of pages
-  std::vector<ENTRY_TYPE*> pageTable;          ///< the page table for this PE: stores actual data.
+       std::vector<ENTRY_TYPE*> pageTable;          ///< the page table for this PE: stores actual data.
     typedef MSA_Page_StateT<ENTRY_TYPE,ENTRIES_PER_PAGE> pageState_t;
-  std::vector<pageState_t *> pageStateStorage; ///< Housekeeping information for each allocated page.
+       std::vector<pageState_t *> pageStateStorage; ///< Housekeeping information for each allocated page.
     
     std::stack<ENTRY_TYPE*> pagePool;     // a pool of unused pages
     
-  typedef vmNRUReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> vmPageReplacementPolicy;
+       typedef vmNRUReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> vmPageReplacementPolicy;
     MSA_PageReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> *replacementPolicy;
 
     // structure for the bounds of a single write
@@ -537,33 +541,33 @@ protected:
         return pageStateStorage[pageNo];
     }
     
-  /// Return the state for this page, allocating if needed.
-  pageState_t *state(unsigned int pageNo)
-  {
-       pageState_t *ret=pageStateStorage[pageNo];
-       if (ret==NULL)
-         {
-               ret=new pageState_t;
-               pageStateStorage[pageNo]=ret;
-         }
-       return ret;
-  }
+       /// Return the state for this page, allocating if needed.
+       pageState_t *state(unsigned int pageNo)
+               {
+                       pageState_t *ret=pageStateStorage[pageNo];
+                       if (ret==NULL)
+                       {
+                               ret=new pageState_t;
+                               pageStateStorage[pageNo]=ret;
+                       }
+                       return ret;
+               }
 
     /// Look up or create the listener for the current thread.
     MSA_Thread_Listener *getListener(void) {
        CthThread t=CthSelf();
        MSA_Thread_Listener *l=threadList[t];
-                       if (l==NULL) {
+               if (l==NULL) {
                l=new MSA_Thread_Listener;
                threadList[t]=l;
-                       }
-                       return l;
+               }
+               return l;
     }
     /// Add our thread to this list and suspend
     void addAndSuspend(MSA_Listeners &dest) {
        MSA_Thread_Listener *l=getListener();
-                       dest.add(l);
-                       l->suspend();
+               dest.add(l);
+               l->suspend();
     }
 
     /*********************************************************************************/
@@ -572,239 +576,239 @@ protected:
     // increment the number of pages the thread is waiting on.
     // also add thread to the page queue; then thread is woken when the page arrives.
     inline void IncrementPagesWaiting(unsigned int page)
-    {
-        state(page)->readRequests.add(getListener());
-    }
+               {
+                       state(page)->readRequests.add(getListener());
+               }
 
     inline void IncrementChangesWaiting(unsigned int page)
-    {
-        state(page)->writeRequests.add(getListener());
-    }
+               {
+                       state(page)->writeRequests.add(getListener());
+               }
 
 /************************* Page allocation and management **************************/
     
     /// Allocate a new page, removing old pages if we're over the limit.
     /// Returns NULL if no buffer space is available
     inline ENTRY_TYPE* tryBuffer(int async=0) // @@@
-    {
-        ENTRY_TYPE* nu = NULL;
-
-        // first try the page pool
-        if(!pagePool.empty())
-        {
-            nu = pagePool.top();
-            pagePool.pop();
-        }
+               {
+                       ENTRY_TYPE* nu = NULL;
+
+                       // first try the page pool
+                       if(!pagePool.empty())
+                       {
+                               nu = pagePool.top();
+                               pagePool.pop();
+                       }
 
-        // else try to allocate the buffer
-        if(nu == NULL && resident_pages < max_resident_pages)
-        {
-            nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
-            resident_pages++;
-        }
+                       // else try to allocate the buffer
+                       if(nu == NULL && resident_pages < max_resident_pages)
+                       {
+                               nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
+                               resident_pages++;
+                       }
 
-        // else swap out one of the pages
-        if(nu == NULL)
-        {
-            int pageToSwap = replacementPolicy->selectPage();
-            if(pageToSwap != MSA_INVALID_PAGE_NO)
-            {
-                CkAssert(pageTable[pageToSwap] != NULL);
-                CkAssert(state(pageToSwap)->canPageOut() == true);
+                       // else swap out one of the pages
+                       if(nu == NULL)
+                       {
+                               int pageToSwap = replacementPolicy->selectPage();
+                               if(pageToSwap != MSA_INVALID_PAGE_NO)
+                               {
+                                       CkAssert(pageTable[pageToSwap] != NULL);
+                                       CkAssert(state(pageToSwap)->canPageOut() == true);
                
-                relocatePage(pageToSwap, async);
-                nu = pageTable[pageToSwap];
-                pageTable[pageToSwap] = 0;
-                delete pageStateStorage[pageToSwap];
-                pageStateStorage[pageToSwap]=0;
-            }
-        }
+                                       relocatePage(pageToSwap, async);
+                                       nu = pageTable[pageToSwap];
+                                       pageTable[pageToSwap] = 0;
+                                       delete pageStateStorage[pageToSwap];
+                                       pageStateStorage[pageToSwap]=0;
+                               }
+                       }
 
-        // otherwise return NULL
+                       // otherwise return NULL
 
-        return nu;
-    }
+                       return nu;
+               }
     
     /// Allocate storage for this page, if none has been allocated already.
     ///  Update pageTable, and return the storage for the page.
     inline ENTRY_TYPE* makePage(unsigned int page) // @@@
-    {
-        ENTRY_TYPE* nu=pageTable[page];
-        if (nu==0) {
-            nu=tryBuffer();
-            if (nu==0) CkAbort("MSA: No available space to create pages.\n");
-            pageTable[page]=nu;
-        }
-        return nu;
-    }
+               {
+                       ENTRY_TYPE* nu=pageTable[page];
+                       if (nu==0) {
+                               nu=tryBuffer();
+                               if (nu==0) CkAbort("MSA: No available space to create pages.\n");
+                               pageTable[page]=nu;
+                       }
+                       return nu;
+               }
     
     /// Throw away this allocated page.
     ///  Returns the page itself, for deletion or recycling.
     ENTRY_TYPE* destroyPage(unsigned int page)
-    {
-        ENTRY_TYPE* nu=pageTable[page];
-        pageTable[page] = 0;
-        if (pageStateStorage[page]->canDelete()) {
-               delete pageStateStorage[page];
-               pageStateStorage[page]=0;
-        }
-        resident_pages--;
-        return nu;
-    }
+               {
+                       ENTRY_TYPE* nu=pageTable[page];
+                       pageTable[page] = 0;
+                       if (pageStateStorage[page]->canDelete()) {
+                               delete pageStateStorage[page];
+                               pageStateStorage[page]=0;
+                       }
+                       resident_pages--;
+                       return nu;
+               }
     
     //MSA_CacheGroup::
     void pageFault(unsigned int page, MSA_Page_Fault_t why)
-    {
-        // Write the page to the page table
-        state(page)->state = why;
-        if(why == Read_Fault)
-        { // Issue a remote request to fetch the new page
-            // If the page has not been requested already, then request it.
-            if (stateN(page)->readRequests.size()==0) {
-                pageArray[page].GetPage(CkMyPe());
-                //ckout << "Requesting page first time"<< endl;
-            } else {
-                ;//ckout << "Requesting page next time.  Skipping request."<< endl;
-           }
-           MSA_Thread_Listener *l=getListener();
-            stateN(page)->readRequests.add(l);
-           l->suspend(); // Suspend until page arrives.
-        }
-        else {
-            // Build an empty buffer into which to create the new page
-            ENTRY_TYPE* nu = makePage(page);
-           writeIdentity(nu);
-       }
-    }
+               {
+                       // Write the page to the page table
+                       state(page)->state = why;
+                       if(why == Read_Fault)
+                       { // Issue a remote request to fetch the new page
+                               // If the page has not been requested already, then request it.
+                               if (stateN(page)->readRequests.size()==0) {
+                                       pageArray[page].GetPage(CkMyPe());
+                                       //ckout << "Requesting page first time"<< endl;
+                               } else {
+                                       ;//ckout << "Requesting page next time.  Skipping request."<< endl;
+                               }
+                               MSA_Thread_Listener *l=getListener();
+                               stateN(page)->readRequests.add(l);
+                               l->suspend(); // Suspend until page arrives.
+                       }
+                       else {
+                               // Build an empty buffer into which to create the new page
+                               ENTRY_TYPE* nu = makePage(page);
+                               writeIdentity(nu);
+                       }
+               }
     
     /// Make sure this page is accessible, faulting the page in if needed.
     // MSA_CacheGroup::
     inline void accessPage(unsigned int page,MSA_Page_Fault_t access)
-    {
-        if (pageTable[page] == 0) {
+               {
+                       if (pageTable[page] == 0) {
 //             ckout << "p" << CkMyPe() << ": Calling pageFault" << endl;
-            pageFault(page, access);
-        }
+                               pageFault(page, access);
+                       }
 #ifndef CMK_OPTIMIZE
-        if (stateN(page)->state!=access) {
-            CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
-            CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
-        }
+                       if (stateN(page)->state!=access) {
+                               CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
+                               CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
+                       }
 #endif
-        replacementPolicy->pageAccessed(page);
-    }
+                       replacementPolicy->pageAccessed(page);
+               }
 
     // MSA_CacheGroup::
     // Fill this page with identity values, to prepare for writes or 
     //  accumulates.
     void writeIdentity(ENTRY_TYPE* pagePtr)
-    {
-        for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
-            pagePtr[i] = entryOpsObject->getIdentity();
-    }
+               {
+                       for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
+                               pagePtr[i] = entryOpsObject->getIdentity();
+               }
     
 /************* Page Flush and Writeback *********************/
     bool shouldWriteback(unsigned int page) {
         if (!pageTable[page]) return false;
-       return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
+               return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
     }
     
     inline void relocatePage(unsigned int page, int async)
-    {
-        //CkAssert(pageTable[page]);
-        if(shouldWriteback(page))
-        {
-            // the page to be swapped is a writeable page. So inform any
-            // changes this node has made to the page manager
-            sendChangesToPageArray(page, async);
-        }
-    }
+               {
+                       //CkAssert(pageTable[page]);
+                       if(shouldWriteback(page))
+                       {
+                               // the page to be swapped is a writeable page. So inform any
+                               // changes this node has made to the page manager
+                               sendChangesToPageArray(page, async);
+                       }
+               }
 
     inline void sendChangesToPageArray(const unsigned int page, const int async)
-    {
-        sendRLEChangesToPageArray(page);
+               {
+                       sendRLEChangesToPageArray(page);
        
-        MSA_Thread_Listener *l=getListener();
-        state(page)->writeRequests.add(l);
-        if (!async)
-            l->suspend(); // Suspend until page is really gone.
-        // TODO: Are write acknowledgements really necessary ?
-    }
+                       MSA_Thread_Listener *l=getListener();
+                       state(page)->writeRequests.add(l);
+                       if (!async)
+                               l->suspend(); // Suspend until page is really gone.
+                       // TODO: Are write acknowledgements really necessary ?
+               }
 
     // Send the page data as a contiguous block.
     //   Note that this is INCORRECT when writes to pages overlap!
     inline void sendNonRLEChangesToPageArray(const unsigned int page) // @@@
-    {
-        pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
-    }
+               {
+                       pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
+               }
     
     // Send the page data as an RLE block.
     // this function assumes that there are no overlapping writes in the list
     inline void sendRLEChangesToPageArray(const unsigned int page)
-    {
-        ENTRY_TYPE *writePage=pageTable[page];
-        int nSpans=stateN(page)->writeSpans(writeSpans);
-        if (nSpans==1) 
-        { /* common case: can make very fast */
-            int nEntries=writeSpans[0].end-writeSpans[0].start;
-            if (entryOpsObject->pupEveryElement()) {
-                pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
-                    page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
-                    CkMyPe(),stateN(page)->state);
-            } else {
-                pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
-                    &writePage[writeSpans[0].start], nEntries,
-                    CkMyPe(),stateN(page)->state);
-            }
-        } 
-        else /* nSpans>1 */ 
-        { /* must copy separate spans into a single output buffer (luckily rare) */
-            int nEntries=0;
-            for (int s=0;s<nSpans;s++) {
-                for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
-                    writeEntries[nEntries++]=writePage[i]; // calls assign
-            }
-            if (entryOpsObject->pupEveryElement()) {
-                pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
-                                                        page_t(writeEntries,nEntries),nEntries,
-                                                        CkMyPe(),stateN(page)->state);
-            } else {
-                pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
-                                               writeEntries,nEntries,
-                                               CkMyPe(),stateN(page)->state);
-            }
-        }
-    }
+               {
+                       ENTRY_TYPE *writePage=pageTable[page];
+                       int nSpans=stateN(page)->writeSpans(writeSpans);
+                       if (nSpans==1) 
+                       { /* common case: can make very fast */
+                               int nEntries=writeSpans[0].end-writeSpans[0].start;
+                               if (entryOpsObject->pupEveryElement()) {
+                                       pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
+                                                                                                                       page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
+                                                                                                                       CkMyPe(),stateN(page)->state);
+                               } else {
+                                       pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
+                                                                                                        &writePage[writeSpans[0].start], nEntries,
+                                                                                                        CkMyPe(),stateN(page)->state);
+                               }
+                       
+                       else /* nSpans>1 */ 
+                       { /* must copy separate spans into a single output buffer (luckily rare) */
+                               int nEntries=0;
+                               for (int s=0;s<nSpans;s++) {
+                                       for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
+                                               writeEntries[nEntries++]=writePage[i]; // calls assign
+                               }
+                               if (entryOpsObject->pupEveryElement()) {
+                                       pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
+                                                                                                                       page_t(writeEntries,nEntries),nEntries,
+                                                                                                                       CkMyPe(),stateN(page)->state);
+                               } else {
+                                       pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
+                                                                                                        writeEntries,nEntries,
+                                                                                                        CkMyPe(),stateN(page)->state);
+                               }
+                       }
+               }
 
 /*********************** Public Interface **********************/
 public:
     // 
     //
     // MSA_CacheGroup::
-  inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
-                                               unsigned int max_bytes_, unsigned int nEntries_, 
-                                               unsigned int numberOfWorkerThreads_)
-       : numberOfWorkerThreads(numberOfWorkerThreads_),
-         nPages(nPages_),
-         nEntries(nEntries_), 
-         pageTable(nPages, NULL),
-         pageStateStorage(nPages, NULL),
-         pageArray(pageArrayID),
-         thisProxy(thisgroup),
-         max_resident_pages(max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE)),
-         entryOpsObject(new ENTRY_OPS_CLASS),
-         replacementPolicy(new vmPageReplacementPolicy(nPages, pageTable, pageStateStorage)),
-         outOfBufferInPrefetch(0), syncAckCount(0),syncThreadCount(0),
-         resident_pages(0),numberLocalWorkerThreads(0), enrollDoneq(0)
-  {
-       MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
-  }
+       inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
+                                                 unsigned int max_bytes_, unsigned int nEntries_, 
+                                                 unsigned int numberOfWorkerThreads_)
+               : numberOfWorkerThreads(numberOfWorkerThreads_),
+                 nPages(nPages_),
+                 nEntries(nEntries_), 
+                 pageTable(nPages, NULL),
+                 pageStateStorage(nPages, NULL),
+                 pageArray(pageArrayID),
+                 thisProxy(thisgroup),
+                 max_resident_pages(max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE)),
+                 entryOpsObject(new ENTRY_OPS_CLASS),
+                 replacementPolicy(new vmPageReplacementPolicy(nPages, pageTable, pageStateStorage)),
+                 outOfBufferInPrefetch(0), syncAckCount(0),syncThreadCount(0),
+                 resident_pages(0),numberLocalWorkerThreads(0), enrollDoneq(0)
+               {
+                       MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
+               }
 
     // MSA_CacheGroup::
     inline ~MSA_CacheGroup()
-    {
-        FreeMem();
-    }
+               {
+                       FreeMem();
+               }
 
     /* To change the accumulate function TBD @@ race conditions */
     inline void changeEntryOpsObject(ENTRY_OPS_CLASS *e) {
@@ -814,71 +818,71 @@ public:
 
     // MSA_CacheGroup::
     inline const ENTRY_TYPE* readablePage(unsigned int page)
-    {
-        accessPage(page,Read_Fault);
+               {
+                       accessPage(page,Read_Fault);
        
-        return pageTable[page];
-    }
+                       return pageTable[page];
+               }
 
     // MSA_CacheGroup::
     //
     // known local page
     inline const void* readablePage2(unsigned int page)
-    {
-        return pageTable[page];
-    }
+               {
+                       return pageTable[page];
+               }
 
     // MSA_CacheGroup::
     // Obtains a writable copy of the page.
     inline ENTRY_TYPE* writeablePage(unsigned int page, unsigned int offset)
-    {
-        accessPage(page,Write_Fault);
-
-        // NOTE: Since we assume write once semantics, i.e. between two calls to sync,
-        // either there can be no write to a location or a single write to a location,
-        // a readable page will suffice as a writeable page too, because no one else
-        // is going to write to this location. In reality, two locations on the *same*
-        // page can be written by two different threads, in which case we will need
-        // to keep track of which parts of the page have been written, hence:
-        stateN(page)->write(offset);
+               {
+                       accessPage(page,Write_Fault);
+
+                       // NOTE: Since we assume write once semantics, i.e. between two calls to sync,
+                       // either there can be no write to a location or a single write to a location,
+                       // a readable page will suffice as a writeable page too, because no one else
+                       // is going to write to this location. In reality, two locations on the *same*
+                       // page can be written by two different threads, in which case we will need
+                       // to keep track of which parts of the page have been written, hence:
+                       stateN(page)->write(offset);
 //     ckout << "write:" << page*ENTRIES_PER_PAGE+offset << endl;
        
-        return pageTable[page];
-    }
+                       return pageTable[page];
+               }
 
     // MSA_CacheGroup::
     inline ENTRY_TYPE &accumulate(unsigned int page, unsigned int offset)
-    {
-        accessPage(page,Accumulate_Fault);
-        stateN(page)->write(offset);
-       return pageTable[page][offset];
-    }
+               {
+                       accessPage(page,Accumulate_Fault);
+                       stateN(page)->write(offset);
+                       return pageTable[page][offset];
+               }
 
     /// A requested page has arrived from the network.
     ///  nEntriesInPage_ = num entries being sent (0 for empty page, num entries otherwise)
     inline void ReceivePageWithPUP(unsigned int page, page_t &pageData, int size)
-    {
-        ReceivePage(page, pageData.getData(), size);
-    }
+               {
+                       ReceivePage(page, pageData.getData(), size);
+               }
 
     inline void ReceivePage(unsigned int page, ENTRY_TYPE* pageData, int size)
-    {
-        CkAssert(0==size || ENTRIES_PER_PAGE == size);
-        // the page we requested has been received
-        ENTRY_TYPE *nu=makePage(page);
-        if(size!=0)
-        {
-            for(unsigned int i = 0; i < size; i++)
-                nu[i] = pageData[i]; // @@@, calls assignment operator
-        }
-        else /* isEmpty */
-        {
-            // the page we requested for is empty, so we can just initialize it.
-            writeIdentity(nu);
-        }
+               {
+                       CkAssert(0==size || ENTRIES_PER_PAGE == size);
+                       // the page we requested has been received
+                       ENTRY_TYPE *nu=makePage(page);
+                       if(size!=0)
+                       {
+                               for(unsigned int i = 0; i < size; i++)
+                                       nu[i] = pageData[i]; // @@@, calls assignment operator
+                       }
+                       else /* isEmpty */
+                       {
+                               // the page we requested for is empty, so we can just initialize it.
+                               writeIdentity(nu);
+                       }
        
-        state(page)->readRequests.signal(page);
-    }
+                       state(page)->readRequests.signal(page);
+               }
 
     // This EP is invoked during sync to acknowledge that a dirty page
     // has been received and written back to the page owner.  We keep track
@@ -890,329 +894,334 @@ public:
     //
     // MSA_CacheGroup::
     inline void AckPage(unsigned int page)
-    {
-        state(page)->writeRequests.signal(page);
-    }
+               {
+                       state(page)->writeRequests.signal(page);
+               }
 
     // MSA_CacheGroup::
     // synchronize all the pages and also clear up the cache
     inline void SyncReq(int single)
-    {
-         MSADEBPRINT(printf("SyncReq single %d\n",single););
-         if(single)
-        {
-            /*ask all the caches to send their updates to the page array, but we don't need to empty the caches on the other PEs*/
-            SingleSync();
-            EmptyCache();
-
-            getListener()->suspend();
-        }
-        else{
-            Sync();
-                               }
-    }
+               {
+                       MSADEBPRINT(printf("SyncReq single %d\n",single););
+                       if(single)
+                       {
+                               /*ask all the caches to send their updates to the page
+                                * array, but we don't need to empty the caches on the
+                                * other PEs*/
+                               SingleSync();
+                               EmptyCache();
+
+                               getListener()->suspend();
+                       }
+                       else{
+                               Sync();
+                       }
+               }
 
     // MSA_CacheGroup::
     inline void FlushCache()
-    {
-        // flush the local cache
-        // for each writeable page, send that page to the array element
-        for(unsigned int i = 0; i < nPages; i++)
-        {
-            if(shouldWriteback(i)) {
-                //ckout << "p" << CkMyPe() << "FlushCache: sending page " << i << endl;
-                sendChangesToPageArray(i, 1);
-            }
-        }
-    }
+               {
+                       // flush the local cache
+                       // for each writeable page, send that page to the array element
+                       for(unsigned int i = 0; i < nPages; i++)
+                       {
+                               if(shouldWriteback(i)) {
+                                       //ckout << "p" << CkMyPe() << "FlushCache: sending page " << i << endl;
+                                       sendChangesToPageArray(i, 1);
+                               }
+                       }
+               }
 
     // MSA_CacheGroup::
     void EmptyCache()
-    {
-        /* just makes all the pages empty, assuming that the data in those pages has been flushed to the owners */
-        for(unsigned int i = 0; i < nPages; i++)
-        {
-            if(pageTable[i]) pagePool.push(destroyPage(i));
-        }
-    }
+               {
+                       /* just makes all the pages empty, assuming that the data
+                        * in those pages has been flushed to the owners */
+                       for(unsigned int i = 0; i < nPages; i++)
+                       {
+                               if(pageTable[i]) pagePool.push(destroyPage(i));
+                       }
+               }
 
 /************************ Enroll ********************/
     /// Enroll phase 1: called by users.
     // MSA_CacheGroup::
     inline void enroll(unsigned int num_workers)
-    {
-        CkAssert(num_workers == numberOfWorkerThreads); // just to verify
-        CkAssert(enrollDoneq == 0);
-        numberLocalWorkerThreads++;
-        // @@ how to ensure that enroll is called only once?
-
-        //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
-        thisProxy[0].enrollAck(CkMyPe());
-        //ckout << "[" << CkMyPe() << "] suspening thread in Sync() " << endl;
-        addAndSuspend(enrollWaiters);
-        //ckout << "[" << CkMyPe() << "] rsuming thread in Sync()" << endl;
-
-        CkAssert(enrollDoneq == 1);
-        return;
-    }
+               {
+                       CkAssert(num_workers == numberOfWorkerThreads); // just to verify
+                       CkAssert(enrollDoneq == 0);
+                       numberLocalWorkerThreads++;
+                       // @@ how to ensure that enroll is called only once?
+
+                       //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
+                       thisProxy[0].enrollAck(CkMyPe());
+                       //ckout << "[" << CkMyPe() << "] suspening thread in Sync() " << endl;
+                       addAndSuspend(enrollWaiters);
+                       //ckout << "[" << CkMyPe() << "] rsuming thread in Sync()" << endl;
+
+                       CkAssert(enrollDoneq == 1);
+                       return;
+               }
 
     /// Enroll phase 2: called on PE 0 from everywhere
     inline void enrollAck(int originator)
-    {
-        CkAssert(CkMyPe() == 0);  // enrollAck is only called on PE 0
-        CkAssert(enrollDoneq == 0);  // prevent multiple enroll operations
+               {
+                       CkAssert(CkMyPe() == 0);  // enrollAck is only called on PE 0
+                       CkAssert(enrollDoneq == 0);  // prevent multiple enroll operations
         
-        syncAckCount++;
-        enrolledPEs.insert(originator);
-        //ckout << "[" << CkMyPe() << "] SyncAckcount = " << syncAckCount << endl;
-        if(syncAckCount == numberOfWorkerThreads) {
+                       syncAckCount++;
+                       enrolledPEs.insert(originator);
+                       //ckout << "[" << CkMyPe() << "] SyncAckcount = " << syncAckCount << endl;
+                       if(syncAckCount == numberOfWorkerThreads) {
 //             ckout << "[" << CkMyPe() << "]" << "Enroll operation is almost done" << endl;
-            syncAckCount = 0;
-            enrollDoneq = 1;
-            // What if fewer worker threads than pe's ?  Handled in
-            // enrollDone.
-            thisProxy.enrollDone();
-        }
-    }
+                               syncAckCount = 0;
+                               enrollDoneq = 1;
+                               // What if fewer worker threads than pe's ?  Handled in
+                               // enrollDone.
+                               thisProxy.enrollDone();
+                       }
+               }
 
     /// Enroll phase 3: called everywhere by PE 0
     inline void enrollDone()
-    {
+               {
 //         ckout << "[" << CkMyPe() << "] enrollDone.  Waking threads."
 //               <<  " numberOfWorkerThreads=" << numberOfWorkerThreads
 //               <<  " local=" << numberLocalWorkerThreads << endl;
-        enrollDoneq = 1;
-        enrollWaiters.signal(0);
-    }
+                       enrollDoneq = 1;
+                       enrollWaiters.signal(0);
+               }
 
 /******************************** Sync & writeback ***********************/
     // MSA_CacheGroup::
     inline void SingleSync()
-    {
-        /* a single thread issued a sync call with all = 1. The first thing to do is to flush the local cache */
-        FlushCache();
-    }
+               {
+                       /* a single thread issued a sync call with all = 1. The
+                        * first thing to do is to flush the local cache */
+                       FlushCache();
+               }
 
     // MSA_CacheGroup::
     inline void Sync()
-    {
-        syncThreadCount++;
-        //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
-        //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
-               //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
-
-        // First, all threads on this processor need to reach the sync
-        // call; only then can we proceed with merging the data.  Only
-        // the last thread on this processor needs to do the FlushCache,
-        // etc.  Others just suspend until the sync is over.
-                               MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
-        if(syncThreadCount < numberLocalWorkerThreads)
-        {
-                 MSADEBPRINT(printf("Sync addAndSuspend \n"););
-            addAndSuspend(syncWaiters);
-            return;
-        }
+               {
+                       syncThreadCount++;
+                       //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
+                       //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
+                       //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
+
+                       // First, all threads on this processor need to reach the sync
+                       // call; only then can we proceed with merging the data.  Only
+                       // the last thread on this processor needs to do the FlushCache,
+                       // etc.  Others just suspend until the sync is over.
+                       MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
+                       if(syncThreadCount < numberLocalWorkerThreads)
+                       {
+                               MSADEBPRINT(printf("Sync addAndSuspend \n"););
+                               addAndSuspend(syncWaiters);
+                               return;
+                       }
                
-        //ckout << "[" << CkMyPe() << "] Sync started" << endl;
-
-        // flush the cache asynchronously and also empty it
-        FlushCache();
-        // idea: instead of invalidating the pages, switch it to read
-        // mode. That will not work, since the page may have also been
-        // modified by another thread.
-        EmptyCache();
-
-        // Now, we suspend too (if we had at least one dirty page).
-        // We will be awoken when all our dirty pages have been
-        // written and acknowledged.
-               MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
-        getListener()->suspend();
-               MSADEBPRINT(printf("Sync awakening after suspend\n"););
-
-        // So far, the sync has been asynchronous, i.e. PE0 might be ahead
-        // of PE1.  Next we basically do a barrier to ensure that all PE's
-        // are synchronized.
-
-        // at this point, the sync's across the group should
-        // synchronize among themselves by each one sending
-        // a sync acknowledge message to PE 0. (this is like
-        // a reduction over a group)
-        if(CkMyPe() != 0)
-        {
-            thisProxy[0].SyncAck();
-        }
-        else /* I *am* PE 0 */
-        {
-            SyncAck();
-        }
-                               MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
-        /* Wait until sync is reflected from PE 0 */
-        addAndSuspend(syncWaiters);
+                       //ckout << "[" << CkMyPe() << "] Sync started" << endl;
+
+                       // flush the cache asynchronously and also empty it
+                       FlushCache();
+                       // idea: instead of invalidating the pages, switch it to read
+                       // mode. That will not work, since the page may have also been
+                       // modified by another thread.
+                       EmptyCache();
+
+                       // Now, we suspend too (if we had at least one dirty page).
+                       // We will be awoken when all our dirty pages have been
+                       // written and acknowledged.
+                       MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
+                       getListener()->suspend();
+                       MSADEBPRINT(printf("Sync awakening after suspend\n"););
+
+                       // So far, the sync has been asynchronous, i.e. PE0 might be ahead
+                       // of PE1.  Next we basically do a barrier to ensure that all PE's
+                       // are synchronized.
+
+                       // at this point, the sync's across the group should
+                       // synchronize among themselves by each one sending
+                       // a sync acknowledge message to PE 0. (this is like
+                       // a reduction over a group)
+                       if(CkMyPe() != 0)
+                       {
+                               thisProxy[0].SyncAck();
+                       }
+                       else /* I *am* PE 0 */
+                       {
+                               SyncAck();
+                       }
+                       MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
+                       /* Wait until sync is reflected from PE 0 */
+                       addAndSuspend(syncWaiters);
                                
-               MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
-               //ckout << "[" << CkMyPe() << "] Sync finished" << endl;        
-    }
+                       MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
+                       //ckout << "[" << CkMyPe() << "] Sync finished" << endl;        
+               }
 
     inline unsigned int getNumEntries() { return nEntries; }
     inline CProxy_PageArray_t getArray() { return pageArray; }
 
     // TODO: Can this SyncAck and other simple Acks be made efficient?
     inline void SyncAck()
-    {
-        CkAssert(CkMyPe() == 0);  // SyncAck is only called on PE 0
-        syncAckCount++;
-        // DONE @@ what if fewer worker threads than pe's ?
-        // @@ what if fewer worker threads than pe's and >1 threads on 1 pe?
-        //if(syncAckCount == min(numberOfWorkerThreads, CkNumPes())){
-               if (syncAckCount == enrolledPEs.size()) {
-                       MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",nEntries,nPages););
-                       pageArray.Sync();
-               }               
-    }
+               {
+                       CkAssert(CkMyPe() == 0);  // SyncAck is only called on PE 0
+                       syncAckCount++;
+                       // DONE @@ what if fewer worker threads than pe's ?
+                       // @@ what if fewer worker threads than pe's and >1 threads on 1 pe?
+                       //if(syncAckCount == min(numberOfWorkerThreads, CkNumPes())){
+                       if (syncAckCount == enrolledPEs.size()) {
+                               MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",
+                                                                  nEntries, nPages););
+                               pageArray.Sync();
+                       }               
+               }
 
     inline void SyncDone()
-    {
-        //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
-        //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
-        /* Reset for next sync */
-        syncThreadCount = 0;
-        syncAckCount = 0;
-                               MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
-        syncWaiters.signal(0);
-    }
+               {
+                       //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
+                       //ckout << "[" << CkMyPe() << "] Sync Done indication" << endl;
+                       /* Reset for next sync */
+                       syncThreadCount = 0;
+                       syncAckCount = 0;
+                       MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
+                       syncWaiters.signal(0);
+               }
 
     inline void FreeMem()
-    {
-        for(unsigned int i = 0; i < nPages; i++)
-        {
-            if(pageTable[i]) delete [] destroyPage(i);
-        }
+               {
+                       for(unsigned int i = 0; i < nPages; i++)
+                       {
+                               if(pageTable[i]) delete [] destroyPage(i);
+                       }
 
-        while(!pagePool.empty())
-        {
-            delete [] pagePool.top();  // @@@
-            pagePool.pop();
-        }
+                       while(!pagePool.empty())
+                       {
+                               delete [] pagePool.top();  // @@@
+                               pagePool.pop();
+                       }
        
-       resident_pages=0;
-    }
+                       resident_pages=0;
+               }
 
-               /** 
-                       Deregister a client. Decremenet the number of local threads. If total number of local threads 
-                       hits 0 FreeMem()
-               */
-               inline void unroll(){
-        numberLocalWorkerThreads--;
-                               if(numberLocalWorkerThreads == 0){
-                                       FreeMem();
-                               }
+       /** 
+               Deregister a client. Decrement the number of local threads. If total number of local threads 
+               hits 0 FreeMem()
+       */
+       inline void unroll() {
+               numberLocalWorkerThreads--;
+               if(numberLocalWorkerThreads == 0){
+                       FreeMem();
                }
+       }
 
     /**
      * Issue a prefetch request for the given range of pages. These pages will
      * be locked into the cache, so that they will not be swapped out.
      */
     inline void Prefetch(unsigned int pageStart, unsigned int pageEnd)
-    {
-        /* prefetching is feasible only if we we did not encounter an out
-         * of buffer condition in the previous prefetch call
-         */
-        if(!outOfBufferInPrefetch)
-        {
-            //ckout << "prefetching pages " << pageStart << " through " << pageEnd << endl;
-            for(unsigned int p = pageStart; p <= pageEnd; p++)
-            {
-                if(NULL == pageTable[p])
-                {
-
-                    /* relocate the buffer asynchronously */
-                    ENTRY_TYPE* nu = tryBuffer(1);
-                    if(NULL == nu)
-                    {
-                        /* signal that sufficient buffer space is not available */
-                        outOfBufferInPrefetch = 1;
-                        break;
-                    }
-
-                    pageTable[p] = nu;
-                    state(p)->state = Read_Fault;
-
-                    pageArray[p].GetPage(CkMyPe());
-                    IncrementPagesWaiting(p);
-                    //ckout << "Prefetch page" << p << ", pages waiting = " << nPagesWaiting << endl;
-                    /* don't suspend the thread */
-                }
-
-                /* mark the page as being locked */
-                state(p)->locked = true;
-            }
-        }
-    }
+               {
+                       /* prefetching is feasible only if we we did not encounter an out
+                        * of buffer condition in the previous prefetch call
+                        */
+                       if(!outOfBufferInPrefetch)
+                       {
+                               //ckout << "prefetching pages " << pageStart << " through " << pageEnd << endl;
+                               for(unsigned int p = pageStart; p <= pageEnd; p++)
+                               {
+                                       if(NULL == pageTable[p])
+                                       {
+
+                                               /* relocate the buffer asynchronously */
+                                               ENTRY_TYPE* nu = tryBuffer(1);
+                                               if(NULL == nu)
+                                               {
+                                                       /* signal that sufficient buffer space is not available */
+                                                       outOfBufferInPrefetch = 1;
+                                                       break;
+                                               }
+
+                                               pageTable[p] = nu;
+                                               state(p)->state = Read_Fault;
+
+                                               pageArray[p].GetPage(CkMyPe());
+                                               IncrementPagesWaiting(p);
+                                               //ckout << "Prefetch page" << p << ", pages waiting = " << nPagesWaiting << endl;
+                                               /* don't suspend the thread */
+                                       }
+
+                                       /* mark the page as being locked */
+                                       state(p)->locked = true;
+                               }
+                       }
+               }
 
     /**
      * Wait for all the prefetch pages to be fetched into the cache.
      * Returns: 0 if prefetch successful, 1 if not
      */
     inline int WaitAll(void)
-    {
-        if(outOfBufferInPrefetch)
-        {
-            // we encountered out of buffer in the previous prefetch call, return error
-            outOfBufferInPrefetch = 0;
-           getListener()->suspend();
-            UnlockPages();
-            return 1;
-        }
-        else
-        {
-            // prefetch requests have been successfully issued already, so suspend the
-            // thread and wait for completion
-            outOfBufferInPrefetch = 0;
-           getListener()->suspend();
-            return 0;
-        }
-    }
+               {
+                       if(outOfBufferInPrefetch)
+                       {
+                               // we encountered out of buffer in the previous prefetch call, return error
+                               outOfBufferInPrefetch = 0;
+                               getListener()->suspend();
+                               UnlockPages();
+                               return 1;
+                       }
+                       else
+                       {
+                               // prefetch requests have been successfully issued already, so suspend the
+                               // thread and wait for completion
+                               outOfBufferInPrefetch = 0;
+                               getListener()->suspend();
+                               return 0;
+                       }
+               }
     
     inline void UnlockPage(unsigned int page) {
         pageState_t *s=stateN(page);
-       if(s && s->locked) {
+               if(s && s->locked) {
             replacementPolicy->pageAccessed(page);
             s->locked = false;
-       }
+               }
     }
 
     /**
      * Unlock all the pages locked in the cache
      */
     inline void UnlockPages()
-    {
-        // add all the locked pages to page replacement policy
-        for(unsigned int page = 0; page < nPages; page++)
-           UnlockPage(page);
-    }
+               {
+                       // add all the locked pages to page replacement policy
+                       for(unsigned int page = 0; page < nPages; page++)
+                               UnlockPage(page);
+               }
 
     /**
      * Unlock the given pages: [startPage ... endPage]
      *  Note that the range is inclusive.
      */
     inline void UnlockPages(unsigned int startPage, unsigned int endPage)
-    {
-        for(unsigned int page = startPage; page <= endPage; page++)
-           UnlockPage(page);
-    }
+               {
+                       for(unsigned int page = startPage; page <= endPage; page++)
+                               UnlockPage(page);
+               }
 
     /// Debugging routine
     inline void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset)
-    {
-        CkAssert( pageNum < nPages );
-        CkAssert( offset < ENTRIES_PER_PAGE );
+               {
+                       CkAssert( pageNum < nPages );
+                       CkAssert( offset < ENTRIES_PER_PAGE );
 
-        //ckout << "p" << CkMyPe() << "ID" << ID;
+                       //ckout << "p" << CkMyPe() << "ID" << ID;
 //         if (pageTable[pageNum] == 0)
 //             ckout << "emitBufferValue: page " << pageNum << " not available in local cache." << endl;
 //         else
 //             ckout << "emitBufferValue: [" << pageNum << "," << offset << "] = " << pageTable[pageNum][offset] << endl;
-    }
+               }
 };
 
 // each element of this array is responsible for managing
@@ -1233,150 +1242,150 @@ protected:
     unsigned int pageNo() { return thisIndex; }
 
     inline void allocatePage(MSA_Page_Fault_t access) // @@@
-    {
-        if(epage == NULL)
-        {
-            epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
-            if (access==Accumulate_Fault)
-              writeIdentity();
-        }
-    }
+               {
+                       if(epage == NULL)
+                       {
+                               epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
+                               if (access==Accumulate_Fault)
+                                       writeIdentity();
+                       }
+               }
 
     // begin and end are indexes into the page.
     inline void set(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
-    {
-        //ckout << "set: " << begin << "," << end << endl;
-        for(unsigned int i = 0; i < (end - begin); i++) {
-            epage[begin + i] = buffer[i]; // @@@, calls assignment operator
-            //ckout << "set val[" << begin+i << "]=" << buffer[i] << endl;
-        }
-    }
+               {
+                       //ckout << "set: " << begin << "," << end << endl;
+                       for(unsigned int i = 0; i < (end - begin); i++) {
+                               epage[begin + i] = buffer[i]; // @@@, calls assignment operator
+                               //ckout << "set val[" << begin+i << "]=" << buffer[i] << endl;
+                       }
+               }
 
     // MSA_PageArray::
     inline void combine(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
-    {
-        ENTRY_TYPE* pagePtr = epage + begin;
-        for(unsigned int i = 0; i < (end - begin); i++)
-            entryOpsObject.accumulate(pagePtr[i], buffer[i]);
-    }
+               {
+                       ENTRY_TYPE* pagePtr = epage + begin;
+                       for(unsigned int i = 0; i < (end - begin); i++)
+                               entryOpsObject.accumulate(pagePtr[i], buffer[i]);
+               }
 
     // MSA_PageArray::
     inline void writeIdentity()
-    {
-        for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
-            epage[i] = entryOpsObject.getIdentity();
-    }
+               {
+                       for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
+                               epage[i] = entryOpsObject.getIdentity();
+               }
 
 public:
     inline MSA_PageArray() : epage(NULL) { }
     inline MSA_PageArray(CkMigrateMessage* m) { delete m; }
     
     void setCacheProxy(CProxy_CacheGroup_t &cache_)
-    {
-       cache=cache_;
-    }
+               {
+                       cache=cache_;
+               }
     
     virtual void pup(PUP::er& p)
-    {
-        ArrayElement1D::pup(p);
-       int epage_present=(epage!=0);
-       p|epage_present;
-       if (epage_present) {
-          if(p.isUnpacking())
-            allocatePage(Write_Fault);
-          for (int i=0;i<ENTRIES_PER_PAGE;i++)
-           p|epage[i];
-       }
-    }
+               {
+                       ArrayElement1D::pup(p);
+                       int epage_present=(epage!=0);
+                       p|epage_present;
+                       if (epage_present) {
+                               if(p.isUnpacking())
+                                       allocatePage(Write_Fault);
+                               for (int i=0;i<ENTRIES_PER_PAGE;i++)
+                                       p|epage[i];
+                       }
+               }
     
     inline ~MSA_PageArray()
-    {
-        if(epage) delete [] epage;
-    }
+               {
+                       if(epage) delete [] epage;
+               }
 
     /// Request our page.
     ///   pe = to which to send page
     inline void GetPage(int pe)
-    {
-        if(epage == NULL) {
-            // send empty page
-            if (entryOpsObject.pupEveryElement())
-                cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
-            else
-                cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
-        } else {
-            // send page with data
-            if (entryOpsObject.pupEveryElement())
-                cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
-            else
-                cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);  // send page with data                
-        }
-    }
+               {
+                       if(epage == NULL) {
+                               // send empty page
+                               if (entryOpsObject.pupEveryElement())
+                                       cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
+                               else
+                                       cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
+                       } else {
+                               // send page with data
+                               if (entryOpsObject.pupEveryElement())
+                                       cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
+                               else
+                                       cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);  // send page with data                
+                       }
+               }
 
     /// Receive a non-runlength encoded page from the network:
     // @@ TBD: ERROR: This does not work for  varsize pages.
     inline void PAReceivePage(ENTRY_TYPE *pageData,
-                            int pe, MSA_Page_Fault_t pageState)
-    {
-        allocatePage(pageState);
-
-        if(pageState == Write_Fault)
-            set(pageData, 0, ENTRIES_PER_PAGE);
-        else
-            combine(pageData, 0, ENTRIES_PER_PAGE);
+                                                         int pe, MSA_Page_Fault_t pageState)
+               {
+                       allocatePage(pageState);
+
+                       if(pageState == Write_Fault)
+                               set(pageData, 0, ENTRIES_PER_PAGE);
+                       else
+                               combine(pageData, 0, ENTRIES_PER_PAGE);
        
-        // send the acknowledgement to the sender that we received the page
-        //ckout << "Sending Ack to PE " << pe << endl;
-        cache[pe].AckPage(thisIndex);
-    }
+                       // send the acknowledgement to the sender that we received the page
+                       //ckout << "Sending Ack to PE " << pe << endl;
+                       cache[pe].AckPage(thisIndex);
+               }
 
     /// Receive a runlength encoded page from the network:
     inline void PAReceiveRLEPageWithPup(
        const MSA_WriteSpan_t *spans, unsigned int nSpans, 
         MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> &entries, unsigned int nEntries, 
         int pe, MSA_Page_Fault_t pageState)
-    {
-        PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
-    }
+               {
+                       PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
+               }
 
 
     inline void PAReceiveRLEPage(
        const MSA_WriteSpan_t *spans, unsigned int nSpans, 
         const ENTRY_TYPE *entries, unsigned int nEntries, 
         int pe, MSA_Page_Fault_t pageState)
-    {
-        allocatePage(pageState);
+               {
+                       allocatePage(pageState);
        
-        //ckout << "p" << CkMyPe() << "ReceiveRLEPage nSpans=" << nSpans << " nEntries=" << nEntries << endl;
-        int e=0; /* consumed entries */
-        for (int s=0;s<nSpans;s++) {
-            if(pageState == Write_Fault)
-                set(&entries[e], spans[s].start,spans[s].end);
-            else /* Accumulate_Fault */
-                combine(&entries[e], spans[s].start,spans[s].end);
-            e+=spans[s].end-spans[s].start;
-        } 
-
-        // send the acknowledgement to the sender that we received the page
-        //ckout << "Sending AckRLE to PE " << pe << endl;
-        cache[pe].AckPage(thisIndex);
-    }
+                       //ckout << "p" << CkMyPe() << "ReceiveRLEPage nSpans=" << nSpans << " nEntries=" << nEntries << endl;
+                       int e=0; /* consumed entries */
+                       for (int s=0;s<nSpans;s++) {
+                               if(pageState == Write_Fault)
+                                       set(&entries[e], spans[s].start,spans[s].end);
+                               else /* Accumulate_Fault */
+                                       combine(&entries[e], spans[s].start,spans[s].end);
+                               e+=spans[s].end-spans[s].start;
+                       
+
+                       // send the acknowledgement to the sender that we received the page
+                       //ckout << "Sending AckRLE to PE " << pe << endl;
+                       cache[pe].AckPage(thisIndex);
+               }
 
     // MSA_PageArray::
     inline void Sync()
-    {
-                               MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n");); 
-        contribute(0, NULL, CkReduction::concat);
-    }
+               {
+                       MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n");); 
+                       contribute(0, NULL, CkReduction::concat);
+               }
 
     inline void emit(int ID, int index)
-    {
-         //ckout << "p" << CkMyPe() << "ID" << ID;
+               {
+                       //ckout << "p" << CkMyPe() << "ID" << ID;
 //         if(epage == NULL)
 //             ckout << "emit: epage is NULL" << endl;
 //         else
 //             ckout << "emit: " << epage[index] << endl;
-    }
+               }
 };
 
 #define CK_TEMPLATES_ONLY