NDMeshStreamer: added pup routine for array client; changed interface to
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 26 Mar 2012 23:39:54 +0000 (18:39 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 26 Mar 2012 23:45:46 +0000 (18:45 -0500)
require specifying number of contributions when calling associateCallback;
changed setup to reinitialize list of local clients and clear the cache of
array id to destination pe mappings every time associateCallback is called
in order to support chare migration in between calls to associateCallback

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 231fb0fc7d9d9a4844354faa11a1c9744f769e22..6cc789bbce13e800fa774646eb410635ad510f9d 100644 (file)
@@ -23,7 +23,8 @@ module NDMeshStreamer {
     entry void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
     entry void flushDirect();
     entry void finish();
-    entry void associateCallback(CkCallback startCb, CkCallback endCb, 
+    entry void associateCallback(int numContributors, 
+                                CkCallback startCb, CkCallback endCb, 
                                 CProxy_CompletionDetector detector);
   };
 
index 0ffcdbf4f3cf883d0b7d09341e3fa02aecb07f7e..8d0c8b47decb45068206e33b08fd9dabb74abe06 100644 (file)
@@ -84,6 +84,9 @@ class MeshStreamerArrayClient :  public CBase_MeshStreamerArrayClient<dtype>,
     MeshStreamerClient<dtype>::detectorLocalObj_->consume();
     process(data);
   }
+  void pup(PUP::er &p) {
+    CBase_MeshStreamerArrayClient<dtype>::pup(p);
+  }
 
 };
 
@@ -133,7 +136,7 @@ private:
 
     virtual int numElementsInClient() = 0;
 
-    virtual void setDetectorInClient() = 0;
+    virtual void initLocalClients() = 0;
 
     void flushLargestBuffer();
 
@@ -162,7 +165,8 @@ public:
     }
     virtual void insertData(dtype &dataItem, int destinationPe); 
     void insertData(void *dataItemHandle, int destinationPe);
-    void associateCallback(CkCallback startCb, CkCallback endCb, 
+    void associateCallback(int numContributors, 
+                          CkCallback startCb, CkCallback endCb, 
                           CProxy_CompletionDetector detector);
     void flushAllBuffers();
     void registerPeriodicProgressFunction();
@@ -174,8 +178,8 @@ public:
       registerPeriodicProgressFunction();
     }
 
-    void done() {
-      detectorLocalObj_->done();
+    void done(int numContributorsFinished = 1) {
+      detectorLocalObj_->done(numContributorsFinished);
     }
 
 };
@@ -416,15 +420,17 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
 
 template <class dtype>
 void MeshStreamer<dtype>::associateCallback(
+                         int numContributors,
                          CkCallback startCb, CkCallback endCb, 
                          CProxy_CompletionDetector detector) {
   userCallback_ = endCb; 
-  static CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), this->thisProxy);
+  static CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
+                          this->thisProxy);
   detector_ = detector;      
   detectorLocalObj_ = detector_.ckLocalBranch();
-  setDetectorInClient();
-  detectorLocalObj_->start_detection(numElementsInClient(), 
-                                    startCb, finish , 0);
+  initLocalClients();
+
+  detectorLocalObj_->start_detection(numContributors, startCb, finish , 0);
   
   if (progressPeriodInMs_ <= 0) {
     CkPrintf("Using completion detection in NDMeshStreamer requires"
@@ -628,7 +634,7 @@ private:
     return CkNumPes();
   }
 
-  void setDetectorInClient() {
+  void initLocalClients() {
     clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
   }
 
@@ -685,8 +691,15 @@ private:
     return numArrayElements_;
   }
 
-  void setDetectorInClient() {
+  void initLocalClients() {
+
+#ifdef CACHE_ARRAY_METADATA
+    std::fill(isCachedArrayMetadata_, 
+             isCachedArrayMetadata_ + numArrayElements_, false);
+#endif
+
     for (int i = 0; i < numArrayElements_; i++) {
+      clientObjs_[i] = clientProxy_[i].ckLocal();
       if (clientObjs_[i] != NULL) {
        clientObjs_[i]->setDetector(
                         MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_);
@@ -715,10 +728,6 @@ public:
     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
 
     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
-    for (int i = 0; i < numArrayElements_; i++) {
-      clientObjs_[i] = clientProxy_[i].ckLocal();
-    }
-
 #ifdef CACHE_ARRAY_METADATA
     destinationPes_ = new int[numArrayElements_];
     isCachedArrayMetadata_ = new bool[numArrayElements_];