Two changes, initComlibManager is no longer an initproc call
authorSameer Kumar <skumar2@uiuc.edu>
Mon, 11 Oct 2004 15:50:13 +0000 (15:50 +0000)
committerSameer Kumar <skumar2@uiuc.edu>
Mon, 11 Oct 2004 15:50:13 +0000 (15:50 +0000)
2) PrioStreaming now flushes queues when priority changes.

src/ck-com/ComlibManager.C
src/ck-com/ComlibManager.ci
src/ck-com/PrioStreaming.C
src/ck-com/PrioStreaming.h

index 24f8544c71c2e4e404e021eb4a46bac11e6a3144..3ee55f9d4ad39ad320642db8079760177bf19c04 100644 (file)
@@ -65,6 +65,8 @@ ComlibManager::ComlibManager(){
 
 void ComlibManager::init(){
     
+    initComlibManager();
+
     PUPable_reg(CharmStrategy);
     PUPable_reg(CharmMessageHolder);
     
index 46f677bfcc665159c2af4efd6a19e19b2d50b6fa..2a17dc5565d3576b96f17d894e5a349588697e24 100644 (file)
@@ -1,5 +1,5 @@
 module comlib {
-  initproc void initComlibManager();
+  //  initproc void initComlibManager();
 
   message ComlibDummyMsg;
   //  message PrioMsg;
index c32d950abf7e433f788da0ec907bada29869109c..9a68a83835759e515bdfe683aea43cde86967a05 100644 (file)
@@ -12,25 +12,30 @@ void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
                  PERIOD, bufferMax);
 
     int pe=cmsg->dest_proc;
-    streamingMsgBuf[pe].enq(cmsg);
-    streamingMsgCount[pe]++;
-    if (streamingMsgCount[pe] > bufferMax) {
-        flushPE(pe);
-        return;
-    }
-    
     char* msg = cmsg->getCharmMessage();
     envelope *env = UsrToEnv(msg);
     int msg_prio = *(int*)env->getPrioPtr();
-    
+
+    if(streamingMsgCount[pe] == 0) 
+        minPrioVec[pe] = msg_prio;
+    else if(minPrioVec[pe] > msg_prio)
+        minPrioVec[pe] = msg_prio;
+
+    streamingMsgBuf[pe].enq(cmsg);
+    streamingMsgCount[pe]++;   
+
     if(msg_prio <= basePriority)
         flushPE(pe);
+
+    if (streamingMsgCount[pe] > bufferMax) 
+        flushPE(pe);
 }
 
 void PrioStreaming::pup(PUP::er &p){
 
     StreamingStrategy::pup(p);
-
     p | basePriority;
 
+    if(p.isUnpacking())
+        minPrioVec.resize(CkNumPes());
 }
index 8af78b2f11efa024217f281293250dbd936b7e83..714aeb459a210382c3497215f5bb35c0b13ec85c 100644 (file)
@@ -9,6 +9,7 @@
 class PrioStreaming : public StreamingStrategy {
  protected:
     int basePriority;
+    CkVec<int> minPrioVec;
     
  public:
     /**
@@ -31,7 +32,14 @@ class PrioStreaming : public StreamingStrategy {
     
     virtual void insertMessage(CharmMessageHolder *msg);
 
+    //If new priority is greater than current priority, 
+    //then flush all queues which have relatively high priority messages
     inline void setBasePriority(int p) {
+        if(p > basePriority) {
+            for(int count =0; count < CkNumPes(); count++)
+                if(minPrioVec[count] <= p)
+                    flushPE(count);
+        }        
         basePriority = p;
     }