MSA: Partial split-phase synchronization support for 3D arrays
authorPhil Miller <mille121@illinois.edu>
Mon, 13 Jul 2009 17:40:26 +0000 (12:40 -0500)
committerPhil Miller <mille121@illinois.edu>
Thu, 10 Dec 2009 22:22:57 +0000 (16:22 -0600)
src/libs/ck-libs/multiphaseSharedArrays/msa-DistPageMgr.ci
src/libs/ck-libs/multiphaseSharedArrays/msa-DistPageMgr.h
src/libs/ck-libs/multiphaseSharedArrays/msa-distArray.h

index 4a6353dbd29d3a3930c38b12a9cd5ce79c624e30..9510fa95fc16849beff4d47b10231b84b6a4e159 100644 (file)
@@ -17,6 +17,9 @@ module msa
         entry void SyncAck();
         entry void SyncDone();
 
+        // Push data to its home and wait for notice that everyone else has caught up
+               entry [threaded] void FinishSync();
+
         // for debugging purposes only
         entry void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset);
     };
index 7ea63f9c3147c51907e50cf845cab2df7fc40a21..00c4085c260bec98ca3111abc57ee274f636e26e 100644 (file)
@@ -493,6 +493,7 @@ protected:
     unsigned int numberOfWorkerThreads;      // number of worker threads across all processors for this shared array
     // @@ migration?
     unsigned int numberLocalWorkerThreads;   // number of worker threads on THIS processor for this shared array
+       unsigned int numberLocalWorkerThreadsActive;
     unsigned int enrollDoneq;                 // has enroll() been done on this processor?
     MSA_Listeners enrollWaiters;
     MSA_Listeners syncWaiters;
@@ -799,7 +800,8 @@ public:
                  entryOpsObject(new ENTRY_OPS_CLASS),
                  replacementPolicy(new vmPageReplacementPolicy(nPages, pageTable, pageStateStorage)),
                  outOfBufferInPrefetch(0), syncAckCount(0),syncThreadCount(0),
-                 resident_pages(0),numberLocalWorkerThreads(0), enrollDoneq(0)
+                 resident_pages(0), numberLocalWorkerThreads(0), 
+                 numberLocalWorkerThreadsActive(0), enrollDoneq(0)
                {
                        MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
                }
@@ -951,6 +953,7 @@ public:
                        CkAssert(num_workers == numberOfWorkerThreads); // just to verify
                        CkAssert(enrollDoneq == 0);
                        numberLocalWorkerThreads++;
+                       numberLocalWorkerThreadsActive++;
                        // @@ how to ensure that enroll is called only once?
 
                        //ckout << "[" << CkMyPe() << "] sending sync ack to PE 0" << endl;
@@ -1003,38 +1006,33 @@ public:
 
        void SyncRelease()
                {
-                       syncThreadCount++;
+                       numberLocalWorkerThreadsActive--;
 
-                       MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
-                       if(syncThreadCount < numberLocalWorkerThreads)
+                       syncDebug();
+                       
+                       if(syncThreadCount < numberLocalWorkerThreadsActive)
                        {
                                return;
                        }
 
-                       FlushCache();
-                       EmptyCache();
+                       thisProxy[CkMyPe()].FinishSync();
                }
 
-    // MSA_CacheGroup::
-    inline void Sync()
+       void syncDebug()
                {
-                       syncThreadCount++;
-                       //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
-                       //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
-                       //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
+                       MSADEBPRINT(printf("Sync  (Total threads: %d, Active: %d, Synced: %d)\n", 
+                                                          numberLocalWorkerThreads, numberLocalWorkerThreadsActive, syncThreadCount));
+               }
 
-                       // First, all threads on this processor need to reach the sync
-                       // call; only then can we proceed with merging the data.  Only
-                       // the last thread on this processor needs to do the FlushCache,
-                       // etc.  Others just suspend until the sync is over.
-                       MSADEBPRINT(printf("Sync syncThreadCount %d \n",syncThreadCount););
-                       if(syncThreadCount < numberLocalWorkerThreads)
-                       {
-                               MSADEBPRINT(printf("Sync addAndSuspend \n"););
-                               addAndSuspend(syncWaiters);
-                               return;
-                       }
-               
+       void activate()
+               {
+                       numberLocalWorkerThreadsActive++;
+                       
+                       CkAssert(numberLocalWorkerThreadsActive <= numberLocalWorkerThreads);
+               }
+
+       void FinishSync()
+               {
                        //ckout << "[" << CkMyPe() << "] Sync started" << endl;
 
                        // flush the cache asynchronously and also empty it
@@ -1072,7 +1070,33 @@ public:
                        addAndSuspend(syncWaiters);
                                
                        MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
-                       //ckout << "[" << CkMyPe() << "] Sync finished" << endl;        
+                       //ckout << "[" << CkMyPe() << "] Sync finished" << endl;                        
+               }
+
+    // MSA_CacheGroup::
+    inline void Sync()
+               {
+                       syncThreadCount++;
+                       //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << " " << numberLocalWorkerThreads << endl;
+                       //ckout << "[" << CkMyPe() << "] syncThreadCount = " << syncThreadCount << ", registered threads = " << getNumRegisteredThreads()
+                       //  << ", number of suspended threads = " << getNumSuspendedThreads() << endl;
+
+                       syncDebug();
+
+                       // First, all threads on this processor need to reach the sync
+                       // call; only then can we proceed with merging the data.  Only
+                       // the last thread on this processor needs to do the FlushCache,
+                       // etc.  Others just suspend until the sync is over.
+                       MSADEBPRINT(printf("Sync  (Total threads: %d, Active: %d, Synced: %d)\n", 
+                                                          numberLocalWorkerThreads, numberLocalWorkerThreadsActive, syncThreadCount));
+                       if(syncThreadCount < numberLocalWorkerThreadsActive)
+                       {
+                               MSADEBPRINT(printf("Sync addAndSuspend\n"));
+                               addAndSuspend(syncWaiters);
+                               return;
+                       }
+
+                       FinishSync();
                }
 
     inline unsigned int getNumEntries() { return nEntries; }
index 80d6554c092db7e231ee32acdd6cd54c607c68f6..1a238921ed90b4aea9954155d89c0df322e813cf 100644 (file)
@@ -899,7 +899,7 @@ public:
     */
     inline MSA3D(unsigned x, unsigned y, unsigned z, unsigned int num_wrkrs, 
                  unsigned int maxBytes=MSA_DEFAULT_MAX_BYTES)
-        : dim_x(x), dim_y(y), dim_z(z), initHandleGiven(false)
+        : dim_x(x), dim_y(y), dim_z(z), initHandleGiven(false), active(false)
     {
         unsigned nEntries = x*y*z;
         unsigned int nPages = (nEntries + ENTRIES_PER_PAGE - 1)/ENTRIES_PER_PAGE;
@@ -974,6 +974,7 @@ public:
         //
         // @@ What if a MSA3D thread migrates?
         cache->enroll(num_workers);
+        active = true;
     }
 
     // idx is the element to be read/written
@@ -1030,11 +1031,17 @@ public:
 
     static const int DEFAULT_SYNC_SINGLE = 0;
 
+    bool active;
+    
     inline void syncRelease(Handle &m)
     {
         m.checkInvalidate(this);
         delete &m;
-        cache->SyncRelease();
+        if (active)
+            cache->SyncRelease();
+        else
+            CmiAbort("sync from an inactive thread!\n");
+        active = false;
     }
 
     inline Read &syncToRead(Handle &m, int single = DEFAULT_SYNC_SINGLE)
@@ -1144,6 +1151,9 @@ protected:
     /// Synchronize reads and writes across the entire array.
     inline void sync(int single=0)
     {
+        if (!active)
+            cache->activate();
+        active = true;
         cache->SyncReq(single); 
     }
 };