New version of communication library with learning capabilities. Learning and dynamic...
authorSameer Kumar <skumar2@uiuc.edu>
Wed, 6 Oct 2004 20:50:33 +0000 (20:50 +0000)
committerSameer Kumar <skumar2@uiuc.edu>
Wed, 6 Oct 2004 20:50:33 +0000 (20:50 +0000)
21 files changed:
src/ck-com/AAMLearner.C [new file with mode: 0644]
src/ck-com/AAMLearner.h [new file with mode: 0644]
src/ck-com/AAPLearner.C [new file with mode: 0644]
src/ck-com/AAPLearner.h [new file with mode: 0644]
src/ck-com/ComlibArrayListener.C
src/ck-com/ComlibArrayListener.h
src/ck-com/ComlibLearner.h [new file with mode: 0644]
src/ck-com/ComlibManager.C
src/ck-com/ComlibManager.ci
src/ck-com/ComlibManager.h
src/ck-com/ComlibSectionInfo.C
src/ck-com/ComlibStrategy.C
src/ck-com/ComlibStrategy.h
src/ck-com/DirectMulticastStrategy.C
src/ck-com/DirectMulticastStrategy.h
src/ck-com/EachToManyMulticastStrategy.C
src/ck-com/EachToManyMulticastStrategy.h
src/ck-com/MsgPacker.C
src/ck-com/PrioStreaming.h
src/ck-com/RingMulticastStrategy.C
src/ck-com/RingMulticastStrategy.h

diff --git a/src/ck-com/AAMLearner.C b/src/ck-com/AAMLearner.C
new file mode 100644 (file)
index 0000000..fc510fb
--- /dev/null
@@ -0,0 +1,116 @@
+#include "AAMLearner.h"
+#include "ComlibManager.h"
+
+#include "EachToManyMulticastStrategy.h"
+#include "RingMulticastStrategy.h"
+
+AAMLearner::AAMLearner() {
+   init();
+}
+
+void AAMLearner::init() {
+    alpha = ALPHA;
+    beta = BETA;
+    gamma = GAMMA;
+}
+
+Strategy *AAMLearner::optimizePattern(Strategy *strat, 
+                                           ComlibGlobalStats &stats) {
+    CharmStrategy *in_strat = (CharmStrategy *)strat;
+    double npes;              //, *pelist;
+    CharmStrategy *ostrat = NULL;
+
+    double degree = 0, msgsize = 0, nmsgs = 0;
+    stats.getAverageStats(strat->getInstance(), msgsize, nmsgs, degree, npes);
+
+    double dcost = computeDirect(npes, msgsize, degree);
+    double mcost = computeMesh(npes, msgsize, degree);
+    double gcost = computeGrid(npes, msgsize, degree);
+    double hcost = computeHypercube(npes, msgsize, degree);
+    double mincost = min4(dcost, mcost, gcost, hcost);
+
+    int minstrat = -1;
+    if(in_strat->getType() == ARRAY_STRATEGY) {
+        CkArrayID said, daid;
+        CkArrayIndexMax *sidxlist, *didxlist;
+        int nsrc, ndest;
+        
+        in_strat->ainfo.getSourceArray(said, sidxlist, nsrc);
+        in_strat->ainfo.getDestinationArray(daid, didxlist, ndest);
+               
+        if(dcost == mincost) 
+            minstrat = USE_DIRECT;        
+        
+        else if(mcost == mincost) 
+            minstrat = USE_MESH;                
+        else if(gcost == mincost) 
+            minstrat = USE_GRID;
+        else if(hcost == mincost) 
+            minstrat = USE_HYPERCUBE;               
+
+        CkPrintf("Choosing router %d, %g, %g, %g\n", minstrat, 
+                 mcost, hcost, dcost);
+        
+        if(minstrat != USE_DIRECT) {
+            ostrat = new EachToManyMulticastStrategy
+                (minstrat, said, daid,
+                 nsrc, sidxlist, ndest,
+                 didxlist);
+        }
+        else {
+            ostrat = new RingMulticastStrategy(said, daid);
+        }
+        
+        ostrat->setInstance(in_strat->getInstance());
+    }
+    else
+        CkAbort("Groups Not Implemented Yet\n");
+
+    //Group strategy implement later, foo bar !!
+    
+    return ostrat;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAMLearner::computeDirect(double P, double m, double d) {
+    double cost = 0.0;
+    cost = d * alpha;
+    cost += d * m * beta;
+    
+    return cost;
+}
+
+/******************* CHECK EQUATIONS FOR AAM ***********/
+//P = number of processors, m = msgsize, d = degree
+double AAMLearner::computeMesh(double P, double m, double d) {
+    double cost = 0.0;
+    cost = 2 * sqrt((double) P) * alpha;
+    cost += d * m * (beta + gamma);
+    
+    return cost;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAMLearner::computeHypercube(double P, double m, double d) {
+
+    if(P == 0)
+        return 0;
+
+    double cost = 0.0;
+    double log_2_P = log(P)/log(2.0);
+    
+    cost = log_2_P * alpha;
+    cost += d * m * (beta + gamma);
+
+    return cost;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAMLearner::computeGrid(double P, double m, double d) {
+    double cost = 0.0;
+    cost = 3 * cbrt((double) P) * alpha;
+    cost += d * m * (beta + gamma);
+    
+    return cost;
+}
+
diff --git a/src/ck-com/AAMLearner.h b/src/ck-com/AAMLearner.h
new file mode 100644 (file)
index 0000000..7699e40
--- /dev/null
@@ -0,0 +1,34 @@
+
+#ifndef AAMLEARNER_H
+#define AAMLEARNER_H
+
+#include "ComlibManager.h"
+#include "ComlibLearner.h"
+#include "AAPLearner.h"
+
+#define GAMMA 2e-9
+
+class AAMLearner : public ComlibLearner {
+    //alpha network and cpu s/w overhead
+    //beta network transmission time
+    //gamma memory copy overhead
+    double alpha, beta, gamma;   
+
+    double computeDirect(double P, double m, double d);
+    double computeMesh(double P, double m, double d);
+    double computeHypercube(double P, double m, double d);
+    double computeGrid(double P, double m, double d);
+
+ public:
+    AAMLearner();    
+
+    void init();
+    Strategy* optimizePattern(Strategy* , ComlibGlobalStats &);
+    
+    Strategy ** optimizePattern(Strategy** , ComlibGlobalStats &) {
+        CkAbort("Not implemented\n");
+    }
+};
+
+
+#endif
diff --git a/src/ck-com/AAPLearner.C b/src/ck-com/AAPLearner.C
new file mode 100644 (file)
index 0000000..6a1741b
--- /dev/null
@@ -0,0 +1,133 @@
+#include "AAPLearner.h"
+#include "ComlibManager.h"
+#include "EachToManyMulticastStrategy.h"
+
+AAPLearner::AAPLearner() {
+   init();
+}
+
+void AAPLearner::init() {
+    alpha = ALPHA;
+    beta = BETA;
+}
+
+Strategy *AAPLearner::optimizePattern(Strategy *strat, 
+                                           ComlibGlobalStats &stats) {
+    CharmStrategy *in_strat = (CharmStrategy *)strat;
+    double npes;              //, *pelist;
+    CharmStrategy *ostrat = NULL;
+
+    /*
+      if(in_strat->getType() == ARRAY_STRATEGY) {
+      in_strat->ainfo.getCombinedPeList(pelist, npes);
+      }
+      
+      if(in_strat->getType() == GROUP_STRATEGY) {
+      CkGroupID gid;
+      //Convert to combined pelist
+      in_strat->ginfo.getSourceGroup(gid, pelist, npes);
+      }
+    */
+
+    double degree = 0, msgsize = 0, nmsgs = 0;
+    stats.getAverageStats(strat->getInstance(), msgsize, nmsgs, degree, npes);
+
+    double dcost = computeDirect(npes, msgsize, degree);
+    double mcost = computeMesh(npes, msgsize, degree);
+    double gcost = computeGrid(npes, msgsize, degree);
+    double hcost = computeHypercube(npes, msgsize, degree);
+    double mincost = min4(dcost, mcost, gcost, hcost);
+
+    int minstrat = -1;
+    if(dcost == mincost) 
+        minstrat = USE_DIRECT;
+    else if(mcost == mincost)                     
+        minstrat = USE_MESH;                
+    else if(gcost == mincost) 
+        minstrat = USE_GRID;
+    else if(hcost == mincost) 
+        minstrat = USE_HYPERCUBE;
+
+    CkPrintf("Choosing router %d, %g, %g, %g, : %g,%g,%g\n", minstrat, 
+             mcost, hcost, dcost, npes, msgsize, degree);
+    
+    if(in_strat->getType() == ARRAY_STRATEGY) {
+        CkArrayID said, daid;
+        CkArrayIndexMax *sidxlist, *didxlist;
+        int nsrc, ndest;
+        
+        in_strat->ainfo.getSourceArray(said, sidxlist, nsrc);
+        in_strat->ainfo.getDestinationArray(daid, didxlist, ndest);
+                
+        ostrat = new EachToManyMulticastStrategy
+            (minstrat, said, daid,
+             nsrc, sidxlist, ndest,
+             didxlist);
+
+        ostrat->setInstance(in_strat->getInstance());
+    }
+    
+    //Group strategy implement later, foo bar !!
+    if(in_strat->getType() == GROUP_STRATEGY) {
+        CkGroupID gid;
+        int src_npes, *src_pelist;
+        int dest_npes, *dest_pelist;
+        in_strat->ginfo.getSourceGroup(gid, src_pelist, src_npes);
+        in_strat->ginfo.getDestinationGroup(gid, dest_pelist, dest_npes); 
+
+        ostrat = new EachToManyMulticastStrategy
+            (minstrat, src_npes, src_pelist, dest_npes, dest_pelist);
+    }
+
+    return ostrat;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAPLearner::computeDirect(double P, double m, double d) {
+    double cost = 0.0;
+    cost = d * alpha;
+    cost += d * m * beta;
+    
+    return cost;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAPLearner::computeMesh(double P, double m, double d) {
+
+    double cost = 0.0;
+    cost = 2 * sqrt((double) P) * alpha;
+    cost += 2 * d * m * beta;
+    
+    return cost;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAPLearner::computeHypercube(double P, double m, double d) {
+
+    if(P == 0)
+        return 0;
+
+    double cost = 0.0;
+    double log_2_P = log(P)/log(2.0);
+    
+    if(d >= P/2) {
+        cost = log_2_P * alpha;
+        cost += P/2 * log_2_P * m * beta;
+    }
+    else {
+        cost = log_2_P * alpha;
+        cost += log_2_P * d * m * beta;
+    }
+
+    return cost;
+}
+
+//P = number of processors, m = msgsize, d = degree
+double AAPLearner::computeGrid(double P, double m, double d) {
+    double cost = 0.0;
+    cost = 3 * cbrt((double) P) * alpha;
+    cost += 3 * d * m * beta;
+    
+    return cost;
+}
+
diff --git a/src/ck-com/AAPLearner.h b/src/ck-com/AAPLearner.h
new file mode 100644 (file)
index 0000000..a10e3e5
--- /dev/null
@@ -0,0 +1,40 @@
+
+#ifndef AAPLEARNER_H
+#define AAPLEARNER_H
+
+#include "ComlibManager.h"
+#include "ComlibLearner.h"
+
+#define ALPHA 1e-5
+#define BETA  7.8e-9
+
+#define min(x,y) ((x < y) ? x : y)
+
+inline double min4(double x, double y, double a, double b) {
+    double x1 = min(x,y);
+    double a1 = min(a,b);
+    
+    return min(x1,a1);
+} 
+
+class AAPLearner : public ComlibLearner {
+    double alpha, beta;
+
+    double computeDirect(double P, double m, double d);
+    double computeMesh(double P, double m, double d);
+    double computeHypercube(double P, double m, double d);
+    double computeGrid(double P, double m, double d);
+
+ public:
+    AAPLearner();    
+
+    void init();
+    Strategy* optimizePattern(Strategy* , ComlibGlobalStats &);
+    
+    Strategy ** optimizePattern(Strategy** , ComlibGlobalStats &) {
+        CkAbort("Not implemented\n");
+    }
+};
+
+
+#endif
index 05940b4e2ecb7ef1dde765b2247977232abe5fec..9b4f94a3fb668f34d745f0036c0c95adc019040a 100644 (file)
@@ -35,7 +35,6 @@ CmiBool ComlibArrayListener::ckElementArriving(ArrayElement *elt){
     return CmiTrue;
 }
 
-
 void ComlibArrayListener::addElement(ArrayElement *elt, 
                                      CmiBool migration_flag){
     if(nElements == 0)
index 08b2c94439ec98edecd31e5ae4ad7c6fb3f2b890..b5f28d07655e47d05cc228f7ffa66eaf597f724b 100644 (file)
@@ -24,8 +24,18 @@ class ComlibArrayListener : public CkArrayListener{
     void ckElementLeaving(ArrayElement *elt);
     CmiBool ckElementArriving(ArrayElement *elt);
     
+    //Add strategy to listening list, strategy will get an the number
+    //of array elements lying on that processor
     void registerStrategy(StrategyTableEntry *);
 
+    //remove strategy from table, and now it will not get updates
+    //about this array
+    void unregisterStrategy(StrategyTableEntry *entry) {
+        for(int count = 0; count < strategyList.size(); count++)
+            if(strategyList[count] == entry)
+                strategyList.remove(count);
+    }
+
     void getLocalIndices(CkVec<CkArrayIndexMax> &vec);
 
     void pup(PUP::er &p);
diff --git a/src/ck-com/ComlibLearner.h b/src/ck-com/ComlibLearner.h
new file mode 100644 (file)
index 0000000..5d60d29
--- /dev/null
@@ -0,0 +1,30 @@
+#ifndef COMLIBLEARNER_H
+#define COMLIBLEARNER_H
+
+#include "convcomlibstrategy.h"
+
+
+/* Communication library learner which takes a strategy or a list of
+   strategies as input along with the communication pattern of the
+   objects belonging to those strategies and returns new strategies to
+   replace the input strategies. These new strategies optimize the
+   communication pattern. */
+
+class ComlibGlobalStats;
+class ComlibLearner {
+ public:
+    //Configures parameters of the learner. Will be called by the
+    //communication library on every processor after the second
+    //barrier of the communication library.
+    virtual void init() {}
+    
+    //Optimizes a specific strategy. Returns a new optimized strategy
+    virtual Strategy* optimizePattern(Strategy *strat,ComlibGlobalStats &sdata){return NULL;}
+    
+    //Optimizes the communication pattern of a group of strategies
+    //together
+    virtual Strategy** optimizePattern(Strategy **strat, 
+                                           ComlibGlobalStats &sdata){return NULL;}
+};
+
+#endif
index ea1457efff8d9209f8051a6a318809d2146922d1..80f7f318d9297111e7effd55923a0c3fea77e777 100644 (file)
@@ -1,5 +1,5 @@
-#include "ComlibManager.h"
 
+#include "ComlibManager.h"
 #include "EachToManyMulticastStrategy.h"
 #include "DirectMulticastStrategy.h"
 #include "StreamingStrategy.h"
@@ -18,28 +18,31 @@ CkpvExtern(int, RecvdummyHandle);
 CkpvDeclare(int, RecvmsgHandle);
 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
 CkpvDeclare(CkGroupID, cmgrID);
-CkpvExtern(ConvComlibManager *, conv_comm_ptr);
+CkpvExtern(ConvComlibManager *, conv_com_ptr);
 
 //handler to receive array messages
 void recv_array_msg(void *msg){
 
+    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
+
     if(msg == NULL)
         return;
     
-    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
-
     register envelope* env = (envelope *)msg;
     env->setUsed(0);
     env->getsetArrayHops()=1;
     CkUnpackMessage(&env);
 
-    /*
-    CProxyElement_ArrayBase ap(env->getsetArrayMgr(), env->getsetArrayIndex());
-    ComlibPrintf("%d:Array Base created\n", CkMyPe());
-    ap.ckSend((CkArrayMessage *)EnvToUsr(env), env->getsetArrayEp());
-    */
+    int srcPe = env->getSrcPe();
+    int sid = ((CmiMsgHeaderExt *) env)->stratid;
+
+    ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
+             sid, env->getTotalsize(), srcPe);
+
+    RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
     
     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
+    //if(!comm_debug)
     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
 
     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
@@ -67,12 +70,21 @@ void ComlibManager::init(){
     
     //comm_debug = 1;
     
+    numStatsReceived = 0;
+    curComlibController = 0;
+    clibIteration = 0;
+    
+    strategyCreated = CmiFalse;
+
+    CpvInitialize(ClibLocationTableType*, locationTable);
+    CpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
+
     CkpvInitialize(int, RecvmsgHandle);
-    CkpvAccess(RecvmsgHandle) = CkRegisterHandler((CmiHandler)recv_array_msg);
+    CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
 
     bcast_pelist = new int [CkNumPes()];
-    for(int bcount = 0; bcount < CkNumPes(); bcount++)
-        bcast_pelist[bcount] = bcount;
+    for(int brcount = 0; brcount < CkNumPes(); brcount++)
+        bcast_pelist[brcount] = brcount;
 
     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
@@ -82,7 +94,6 @@ void ComlibManager::init(){
     
     npes = CkNumPes();
     pelist = NULL;
-    nstrats = 0;
 
     CkpvInitialize(CkGroupID, cmgrID);
     CkpvAccess(cmgrID) = thisgroup;
@@ -93,84 +104,107 @@ void ComlibManager::init(){
     prevStratID = -1;
     //prioEndIterationFlag = 1;
 
-    strategyTable = CkpvAccess(conv_comm_ptr)->getStrategyTable();
+    strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
     
     receivedTable = 0;
     flushTable = 0;
-    totalMsgCount = 0;
-    totalBytes = 0;
-    nIterations = 0;
+    //    totalMsgCount = 0;
+    //    totalBytes = 0;
+    //nIterations = 0;
     barrierReached = 0;
     barrier2Reached = 0;
 
+    bcount = b2count = 0;
+    lbUpdateReceived = CmiFalse;
+
     isRemote = 0;
     remotePe = -1;
 
+    CkpvInitialize(int, migrationDoneHandlerID);
+    CkpvAccess(migrationDoneHandlerID) = 
+        CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
+    
     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
-    cgproxy[0].barrier();
+    cgproxy[curComlibController].barrier();
 }
 
 //First barrier makes sure that the communication library group 
 //has been created on all processors
 void ComlibManager::barrier(){
-  static int bcount = 0;
-  ComlibPrintf("In barrier %d\n", bcount);
-  if(CkMyPe() == 0) {
-    bcount ++;
-    if(bcount == CkNumPes()){
-      barrierReached = 1;
-      doneCreating();
+    ComlibPrintf("In barrier %d\n", bcount);
+    if(CkMyPe() == 0) {
+        bcount ++;
+        if(bcount == CkNumPes()){
+            bcount = 0;
+            barrierReached = 1;
+            barrier2Reached = 0;
+
+            if(strategyCreated)
+                broadcastStrategies();
+        }
     }
-  }
 }
 
 //Has finished passing the strategy list to all the processors
 void ComlibManager::barrier2(){
-  static int bcount = 0;
-  if(CkMyPe() == 0) {
-    bcount ++;
-    ComlibPrintf("In barrier2 %d\n", bcount);
-    if(bcount == CkNumPes()) {
-        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
-        cgproxy.resumeFromBarrier2();
+    if(CkMyPe() == 0) {
+        b2count ++;
+        ComlibPrintf("In barrier2 %d\n", bcount);
+        if(b2count == CkNumPes()) {
+            b2count = 0; 
+            CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
+            cgproxy.resumeFromBarrier2();
+        }
     }
-  }
 }
 
 //Registers a set of strategies with the communication library
 ComlibInstanceHandle ComlibManager::createInstance() {
   
-    ListOfStrategies.insertAtEnd(NULL);
-    nstrats++;
-    
-    ComlibInstanceHandle cinst(nstrats - 1, CkpvAccess(cmgrID));  
+    CkpvAccess(conv_com_ptr)->nstrats++;    
+    ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
+                               CkpvAccess(cmgrID));  
     return cinst;
 }
 
 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
-    ListOfStrategies[pos] = strat;
+    
+    strategyCreated = true;
+
+    ListOfStrategies.enq(strat);
+    strat->setInstance(pos);
 }
 
 //End of registering function, if barriers have been reached send them over
-void ComlibManager::doneCreating() {
+void ComlibManager::broadcastStrategies() {
     if(!barrierReached)
       return;    
 
-    ComlibPrintf("Sending Strategies %d, %d\n", nstrats, 
-                 ListOfStrategies.length());
+    lbUpdateReceived = CmiFalse;
+    barrierReached = 0;
 
-    if(nstrats == 0)
-        return;
+    ComlibPrintf("Sending Strategies %d, %d\n", 
+                 CkpvAccess(conv_com_ptr)->nstrats, 
+                 ListOfStrategies.length());
 
     StrategyWrapper sw;
-    sw.s_table = new Strategy* [nstrats];
-    sw.nstrats = nstrats;
-    
-    for (int count=0; count<nstrats; count++)
-        sw.s_table[count] = ListOfStrategies[count];
+    sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
+
+    if(ListOfStrategies.length() > 0) {
+        int len = ListOfStrategies.length();
+        sw.s_table = new Strategy* [len];
+        sw.nstrats = len;
+        
+        for (int count=0; count < len; count++)
+            sw.s_table[count] = ListOfStrategies.deq();
+    }
+    else {
+        sw.nstrats = 0;
+        sw.s_table = 0;
+    }
 
     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
-    cgproxy.receiveTable(sw);
+    cgproxy.receiveTable(sw, *CpvAccess(locationTable));
 }
 
 //Called when the array/group element starts sending messages
@@ -191,8 +225,10 @@ void ComlibManager::endIteration(){
     //    prioEndIterationFlag = 1;
     prevStratID = -1;
     
-    ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), curStratID, 
-                 (* strategyTable)[curStratID].elementCount, (* strategyTable)[curStratID].numElements);
+    ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
+                 curStratID, 
+                 (* strategyTable)[curStratID].elementCount, 
+                 (* strategyTable)[curStratID].numElements);
 
     if(isRemote) {
         isRemote = 0;
@@ -213,13 +249,14 @@ void ComlibManager::endIteration(){
         
         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
         
-        nIterations ++;
-        
+        //nIterations ++;
+        /*
         if(nIterations == LEARNING_PERIOD) {
             //CkPrintf("Sending %d, %d\n", totalMsgCount, totalBytes);
             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
             cgproxy[0].learnPattern(totalMsgCount, totalBytes);
         }
+        */
         
         if(barrier2Reached) {      
            (* strategyTable)[curStratID].strategy->doneInserting();
@@ -233,89 +270,170 @@ void ComlibManager::endIteration(){
 
 //receive the list of strategies
 //Insert the strategies into the strategy table in converse comm lib.
-//CpvAccess(conv_comm_ptr) points to the converse commlib instance
-void ComlibManager::receiveTable(StrategyWrapper sw){
-    
-    ComlibPrintf("[%d] In receiveTable %d\n", CkMyPe(), sw.nstrats);
+//CkpvAccess(conv_com_ptr) points to the converse commlib instance
+void ComlibManager::receiveTable(StrategyWrapper &sw, 
+                                 CkHashtableT<ClibGlobalArrayIndex, int> 
+                                 &htable)
+{
+
+    ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
+                 clibIteration);
 
+    clibIteration ++;
     receivedTable = 1;
-    nstrats = sw.nstrats;
+
+    delete CpvAccess(locationTable);
+    CpvAccess(locationTable) =  NULL;
+
+    CpvAccess(locationTable) = new CkHashtableT<ClibGlobalArrayIndex, int>;
+
+    CkHashtableIterator *ht_iterator = htable.iterator();
+    ht_iterator->seekStart();
+    while(ht_iterator->hasNext()){
+        ClibGlobalArrayIndex *idx;
+        int *pe;
+        pe = (int *)ht_iterator->next((void **)&idx);
+        
+        ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
+                     idx->idx.data()[0], *pe);
+
+        CkpvAccess(locationTable)->put(*idx) = *pe;       
+    }
 
     CkArrayID st_aid;
     int st_nelements;
     CkArrayIndexMax *st_elem;
+    int temp_curstratid = curStratID;
+
+    CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
+    clib_stats.setNstrats(sw.total_nstrats);
 
+    //First recreate strategies
     int count = 0;
-    for(count = 0; count < nstrats; count ++) {
+    for(count = 0; count < sw.nstrats; count ++) {
         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
         
         //set the instance to the current count
         //currently all strategies are being copied to all processors
         //later strategies will be selectively copied
-        cur_strategy->setInstance(count);  
-        CkpvAccess(conv_comm_ptr)->insertStrategy(cur_strategy);
         
-        ComlibPrintf("[%d] Inserting strategy \n", CkMyPe());       
+        //location of this strategy table entry in the strategy table
+        int loc = cur_strategy->getInstance();
+        
+        if(loc >= MAX_NUM_STRATS)
+            CkAbort("Strategy table is full \n");
+
+        CharmStrategy *old_strategy;
+
+        //If this is a learning decision and the old strategy has to
+        //be gotten rid of, finalize it here.
+        if((old_strategy = 
+            (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
+           != NULL) {
+            old_strategy->finalizeProcessing();
+
+            //Unregister from array listener if array strategy
+            if(old_strategy->getType() == ARRAY_STRATEGY) {
+                ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
+                as.getSourceArray(st_aid, st_elem, st_nelements);
+
+                (* strategyTable)[loc].numElements = 0;
+                if(!st_aid.isZero()) {
+                    ComlibArrayListener *calistener = CkArrayID::
+                        CkLocalBranch(st_aid)->getComlibArrayListener();
+                    
+                    calistener->unregisterStrategy(&((*strategyTable)[loc]));
+                }
+            }
+        }
+        
+        //Insert strategy, frees an old strategy and sets the
+        //strategy_table entry to point to the new one
+        CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
+        
+        ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
 
         if(cur_strategy->getType() == ARRAY_STRATEGY &&
            cur_strategy->isBracketed()){ 
 
             ComlibPrintf("Inserting Array Listener\n");
 
-            ComlibArrayInfo as = ((CharmStrategy *)cur_strategy)->ainfo;
+            ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
             as.getSourceArray(st_aid, st_elem, st_nelements);
             
-            if(st_aid.isZero())
-                CkAbort("Array ID is zero");
-            
-            ComlibArrayListener *calistener = 
-                CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
-            
-            calistener->registerStrategy(&((* strategyTable)[count]));
+            (* strategyTable)[loc].numElements = 0;
+            if(!st_aid.isZero()) {            
+                ComlibArrayListener *calistener = 
+                    CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
+                
+                calistener->registerStrategy(&((* strategyTable)[loc]));
+            }
         }              
-  
+        
         if(cur_strategy->getType() == GROUP_STRATEGY){
-            (* strategyTable)[count].numElements = 1;
+            (* strategyTable)[loc].numElements = 1;
         }
         
-        cur_strategy->beginProcessing((* strategyTable)[count].numElements); 
-        
+        (* strategyTable)[loc].elementCount = 0;
+        cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
+    }
+
+    //Resume all end iterarions. Newer strategies may have more 
+    //or fewer elements to expect for!!
+    for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
                      CkMyPe(), count,
                      (* strategyTable)[count].nEndItr);
                          
         curStratID = count;
         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
-            endIteration();            
+            endIteration();  
+        
+        (* strategyTable)[count].nEndItr = 0;        
     }           
     
-    ComlibPrintf("receivedTable %d\n", nstrats);
+    curStratID = temp_curstratid;
+    ComlibPrintf("receivedTable %d\n", sw.nstrats);
     
     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
-    cgproxy[0].barrier2();
+    cgproxy[curComlibController].barrier2();
 }
 
 void ComlibManager::resumeFromBarrier2(){
     barrier2Reached = 1;
-    
-    ComlibPrintf("[%d] Barrier 2 reached\n", CkMyPe());
+    barrierReached = 0;
+
+    ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
 
     //    if(flushTable) {
-    for (int count = 0; count < nstrats; count ++) {
+    for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
         if (!(* strategyTable)[count].tmplist.isEmpty()) {
             CharmMessageHolder *cptr;
-            while (!(* strategyTable)[count].tmplist.isEmpty())
-                (* strategyTable)[count].strategy->insertMessage
-                    ((* strategyTable)[count].tmplist.deq());
+            while (!(* strategyTable)[count].tmplist.isEmpty()) {
+                CharmMessageHolder *cmsg = (CharmMessageHolder *) 
+                    (* strategyTable)[count].tmplist.deq();
+                
+                if((*strategyTable)[count].strategy->getType() == 
+                   ARRAY_STRATEGY) {
+                    if(cmsg->dest_proc >= 0) {
+                        envelope *env  = UsrToEnv(cmsg->getCharmMessage()); 
+                        cmsg->dest_proc = getLastKnown(env->getsetArrayMgr(), 
+                                                       env->getsetArrayIndex());
+                    }
+                    //else
+                    //  CkAbort("NOT FIXED YET\n");                    
+                }                                
+                (* strategyTable)[count].strategy->insertMessage(cmsg);
+            }
         }
         
         if ((* strategyTable)[count].call_doneInserting) {
+            (* strategyTable)[count].call_doneInserting = 0;
             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
             (* strategyTable)[count].strategy->doneInserting();
         }
     }
-    //}
-    
+    //}    
     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
 }
 
@@ -325,24 +443,10 @@ void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
                               const CkArrayIndexMax &idx, CkArrayID a){
     
     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
-    /*
-    if(curStratID != prevStratID && prioEndIterationFlag) {        
-        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
-        ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
-                     CkMyPe());
-        PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg();
-        int mprio = -100;
-        *(int *)CkPriorityPtr(pmsg) = mprio;
-        pmsg->instID = curStratID;
-        CkSetQueueing(pmsg, CK_QUEUEING_BFIFO);
-        cgproxy[CkMyPe()].prioEndIteration(pmsg);
-        prioEndIterationFlag = 0;
-    }        
-    prevStratID = curStratID;            
-    */
 
     CkArrayIndexMax myidx = idx;
-    int dest_proc = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
+    int dest_proc = getLastKnown(a, myidx); 
+    //CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
     
     //ComlibPrintf("Send Data %d %d %d %d\n", CkMyPe(), dest_proc, 
     //  UsrToEnv(msg)->getTotalsize(), receivedTable);
@@ -355,7 +459,10 @@ void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
     env->getsetArrayHops()=0;
     env->getsetArrayIndex()=idx;
     env->setUsed(0);
-    
+    ((CmiMsgHeaderExt *)env)->stratid = curStratID;
+
+    //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
+
     CkPackMessage(&env);
     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
     
@@ -367,20 +474,22 @@ void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
         return;
     }
 
+    /*
     if(dest_proc == CkMyPe()){
         CProxyElement_ArrayBase ap(a,idx);
         ap.ckSend((CkArrayMessage *)msg, ep);
         return;
     }
+    */
 
-    totalMsgCount ++;
-    totalBytes += UsrToEnv(msg)->getTotalsize();
+    //totalMsgCount ++;
+    //totalBytes += UsrToEnv(msg)->getTotalsize();
 
     CharmMessageHolder *cmsg = new 
         CharmMessageHolder((char *)msg, dest_proc);
     //get rid of the new.
 
-    ComlibPrintf("Before Insert\n");
+    ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, receivedTable);
 
     if (receivedTable)
       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
@@ -394,7 +503,7 @@ void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
 
 
 #include "qd.h"
-//CpvExtern(QdState*, _qd);
+//CkpvExtern(QdState*, _qd);
 
 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
     
@@ -423,7 +532,8 @@ void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, Ck
         return;
     }
     
-    CpvAccess(_qd)->create(1);
+    ((CmiMsgHeaderExt *)env)->stratid = curStratID;
+    CkpvAccess(_qd)->create(1);
 
     env->setMsgtype(ForBocMsg);
     env->setEpIdx(ep);
@@ -448,13 +558,16 @@ void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, Ck
 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
 
+    //Broken, add the processor list here.
+
     register envelope * env = UsrToEnv(m);
     env->getsetArrayMgr()=a;
     env->getsetArraySrcPe()=CkMyPe();
     env->getsetArrayEp()=ep;
     env->getsetArrayHops()=0;
     env->getsetArrayIndex()= dummyArrayIndex;
-    
+    ((CmiMsgHeaderExt *)env)->stratid = curStratID;
+
     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
 
     CkSectionInfo minfo;
@@ -465,17 +578,19 @@ void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a
     minfo.pe = CkMyPe();
     ((CkMcastBaseMsg *)m)->_cookie = minfo;       
 
+    //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
+
     CharmMessageHolder *cmsg = new 
-        CharmMessageHolder((char *)m, IS_MULTICAST);
-    cmsg->npes = CkNumPes();
-    cmsg->pelist = bcast_pelist;
+        CharmMessageHolder((char *)m, IS_BROADCAST);
+    cmsg->npes = 0;
+    cmsg->pelist = NULL;
     cmsg->sec_id = NULL;
 
     multicast(cmsg);
 }
 
-void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, CkArrayID a, 
-                                     CkSectionID &s) {
+void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
+                                     CkArrayID a, CkSectionID &s) {
 
 #ifndef CMK_OPTIMIZE
     traceUserEvent(section_send_event);
@@ -489,17 +604,19 @@ void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, CkArray
     env->getsetArrayEp()=ep;
     env->getsetArrayHops()=0;
     env->getsetArrayIndex()= dummyArrayIndex;
-    
+    ((CmiMsgHeaderExt *)env)->stratid = curStratID;
+
     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
     
     env->setUsed(0);    
     CkPackMessage(&env);
     
-    totalMsgCount ++;
-    totalBytes += env->getTotalsize();
+    //totalMsgCount ++;
+    //totalBytes += env->getTotalsize();
 
     //Provide a dummy dest proc as it does not matter for mulitcast 
-    CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_MULTICAST);
+    CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
+                                                      IS_SECTION_MULTICAST);
     cmsg->npes = 0;
     cmsg->sec_id = &s;
 
@@ -519,19 +636,20 @@ void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, CkArray
 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
     register envelope * env = UsrToEnv(m);
 
-    CpvAccess(_qd)->create(1);
+    CkpvAccess(_qd)->create(1);
 
     env->setMsgtype(ForBocMsg);
     env->setEpIdx(ep);
     env->setGroupNum(g);
     env->setSrcPe(CkMyPe());
     env->setUsed(0);
+    ((CmiMsgHeaderExt *)env)->stratid = curStratID;
 
     CkPackMessage(&env);
     CmiSetHandler(env, _charmHandlerIdx);
     
     //Provide a dummy dest proc as it does not matter for mulitcast 
-    CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_MULTICAST);
+    CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
     
     cmsg->npes = 0;
     cmsg->pelist = NULL;
@@ -548,8 +666,8 @@ void ComlibManager::multicast(CharmMessageHolder *cmsg) {
     CkPackMessage(&env);
 
     //Will be used to detect multicast message for learning
-    totalMsgCount ++;
-    totalBytes += env->getTotalsize();
+    //totalMsgCount ++;
+    //totalBytes += env->getTotalsize();
     
     if (receivedTable)
        (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
@@ -562,82 +680,51 @@ void ComlibManager::multicast(CharmMessageHolder *cmsg) {
     ComlibPrintf("After multicast\n");
 }
 
-/*
-void ComlibManager::multicast(void *msg, int npes, int *pelist) {
-    register envelope * env = UsrToEnv(msg);
-    
-    ComlibPrintf("[%d]: In multicast\n", CkMyPe());
-
-    env->setUsed(0);    
-    CkPackMessage(&env);
-    CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
+//Collect statistics from all the processors, also gets the list of
+//array elements on each processor.
+void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
+                                 CkVec<ClibGlobalArrayIndex> &idx_vec) {
     
-    totalMsgCount ++;
-    totalBytes += env->getTotalsize();
-
-    CharmMessageHolder *cmsg = new 
-    CharmMessageHolder((char *)msg,IS_MULTICAST);
-    cmsg->npes = npes;
-    cmsg->pelist = pelist;
-    //Provide a dummy dest proc as it does not matter for mulitcast 
-    //get rid of the new.
-    
-    if (receivedTable)
-       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
-    else {
-        flushTable = 1;
-       ComlibPrintf("Enqueuing message in tmplist\n");
-        (* strategyTable)[curStratID].tmplist.enq(cmsg);
-    }
-
-    ComlibPrintf("After multicast\n");
-}
-*/
+    ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
 
-
-void ComlibManager::learnPattern(int total_msg_count, int total_bytes) {
-    static int nrecvd = 0;
-    static double avg_message_count = 0;
-    static double avg_message_bytes = 0;
-
-    avg_message_count += ((double) total_msg_count) / LEARNING_PERIOD;
-    avg_message_bytes += ((double) total_bytes) /  LEARNING_PERIOD;
-
-    nrecvd ++;
+    numStatsReceived ++;
+    clib_gstats.updateStats(stat, pe);
     
-    if(nrecvd == CkNumPes()) {
-        //Number of messages and bytes a processor sends in each iteration
-        avg_message_count /= CkNumPes();
-        avg_message_bytes /= CkNumPes();
+    for(int count = 0; count < idx_vec.length(); count++) {
+        int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
         
-        //CkPrintf("STATS = %5.3lf, %5.3lf", avg_message_count,
-        //avg_message_bytes);
-
-        //Learning, ignoring contention for now! 
-        double cost_dir, cost_mesh, cost_grid, cost_hyp;
-       double p=(double)CkNumPes();
-        cost_dir = ALPHA * avg_message_count + BETA * avg_message_bytes;
-        cost_mesh = ALPHA * 2 * sqrt(p) + BETA * avg_message_bytes * 2;
-        cost_grid = ALPHA * 3 * pow(p,1.0/3.0) + BETA * avg_message_bytes * 3;
-        cost_hyp =  (log(p)/log(2.0))*(ALPHA  + BETA * avg_message_bytes/2.0);
+        ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
+                     pe);
         
-        // Find the one with the minimum cost!
-        int min_strat = USE_MESH; 
-        double min_cost = cost_mesh;
-        if(min_cost > cost_hyp)
-            min_strat = USE_HYPERCUBE;
-        if(min_cost > cost_grid)
-            min_strat = USE_GRID;
-
-        if(min_cost > cost_dir)
-            min_strat = USE_DIRECT;
-
-        switchStrategy(min_strat);        
-    }
-}
+        CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
+    }        
 
-void ComlibManager::switchStrategy(int strat){
-    //CkPrintf("Switching to %d\n", strat);
+    if(numStatsReceived == CkNumPes()) {
+        numStatsReceived = 0;
+
+        for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
+            count++ ){
+            Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
+            if(strat->getType() > CONVERSE_STRATEGY) {
+                CharmStrategy *cstrat = (CharmStrategy *)strat;
+                ComlibLearner *learner = cstrat->getLearner();
+                CharmStrategy *newstrat = NULL;
+                                
+                if(learner != NULL) {
+                    ComlibPrintf("Calling Learner\n");
+                    newstrat = (CharmStrategy *)learner->optimizePattern(strat, clib_gstats);
+                    if(newstrat != NULL)
+                        ListOfStrategies.enq(newstrat);
+                }
+            }
+        }
+        barrierReached = 1;
+        
+        //if(lbUpdateReceived) {
+        //lbUpdateReceived = CmiFalse;
+        broadcastStrategies();
+        //}
+    }
 }
 
 void ComlibManager::setRemote(int remote_pe){
@@ -688,14 +775,133 @@ void ComlibManager::sendRemote(){
 }
 
 
-/*
-void ComlibManager::prioEndIteration(PrioMsg *pmsg){
-    CkPrintf("[%d] In Prio End Iteration\n", CkMyPe());
-    setInstance(pmsg->instID);
-    endIteration();
-    delete pmsg;
+void ComlibManager::AtSync() {
+
+    //comm_debug = 1;
+    ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
+
+    barrier2Reached = 0;
+    receivedTable = 0;
+    barrierReached = 0;
+    CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
+
+    int pos = 0;
+
+    CkVec<ClibGlobalArrayIndex> gidx_vec;
+
+    CkVec<CkArrayID> tmp_vec;
+    for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
+        if((* strategyTable)[count].strategy->getType() == ARRAY_STRATEGY) {
+            CharmStrategy *cstrat = (CharmStrategy*)
+                ((* strategyTable)[count].strategy);
+            
+            CkArrayID src, dest;
+            CkArrayIndexMax *elements;
+            int nelem;
+            
+            cstrat->ainfo.getSourceArray(src, elements, nelem);
+            cstrat->ainfo.getDestinationArray(dest, elements, nelem);
+
+            CmiBool srcflag = CmiFalse;
+            CmiBool destflag = CmiFalse;
+            
+            if(src == dest || dest.isZero())
+                destflag = CmiTrue;
+
+            if(src.isZero())
+                srcflag = CmiTrue;                        
+
+            for(pos = 0; pos < tmp_vec.size(); pos++) {
+                if(tmp_vec[pos] == src)
+                    srcflag = CmiTrue;
+
+                if(tmp_vec[pos] == dest)
+                    destflag = CmiTrue;
+
+                if(srcflag && destflag)
+                    break;
+            }
+
+            if(!srcflag)
+                tmp_vec.insertAtEnd(src);
+
+            if(!destflag)
+                tmp_vec.insertAtEnd(dest);
+        }
+        
+        //cant do it here, done in receiveTable
+        //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
+        //  (* strategyTable)[count].reset();
+    }
+
+    for(pos = 0; pos < tmp_vec.size(); pos++) {
+        CkArrayID aid = tmp_vec[pos];
+
+        ComlibArrayListener *calistener = 
+            CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
+
+        CkVec<CkArrayIndexMax> idx_vec;
+        calistener->getLocalIndices(idx_vec);
+
+        for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
+            ClibGlobalArrayIndex gindex;
+            gindex.aid = aid;
+            gindex.idx = idx_vec[idx_count];
+
+            gidx_vec.insertAtEnd(gindex);
+        }
+    }
+
+    cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
+    clib_stats.reset();
 }
-*/
+
+#include "lbdb.h"
+#include "CentralLB.h"
+
+/******** FOO BAR : NEEDS to be consistent with array manager *******/
+void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
+    if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
+        CkAbort("LDB OBJ ID smaller than array index\n");
+    
+    //values higher than CkArrayIndexMax should be 0
+    for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
+        idx.data()[count] = ld_id.id[count];
+    }
+    idx.nInts = 1;
+}
+
+void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
+    for(int count = 0; count < msg->n_moves; count ++) {
+        MigrateInfo m = msg->moves[count];
+
+        CkArrayID aid; CkArrayIndexMax idx;
+        aid = CkArrayID(m.obj.omhandle.id.id);
+        LDObjID2IdxMax(m.obj.id, idx);
+
+        ClibGlobalArrayIndex cid; 
+        cid.aid = aid;
+        cid.idx = idx;
+        
+        int pe = CkpvAccess(locationTable)->get(cid);
+
+        //Value exists in the table, so update it
+        if(pe != 0) {
+            pe = m.to_pe + CkNumPes();
+            CkpvAccess(locationTable)->getRef(cid) = pe;
+        }
+        //otherwise we dont care about these objects
+    }   
+
+    lbUpdateReceived = CmiTrue;
+    if(barrierReached) {
+        broadcastStrategies();
+        barrierReached = 0;
+    }
+
+    CkFreeMsg(msg);
+}
+
 
 void ComlibDelegateProxy(CProxy *proxy){
     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
@@ -720,7 +926,7 @@ ComlibInstanceHandle CkGetComlibInstance(int id) {
 
 void ComlibDoneCreating(){
     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
-    (cgproxy.ckLocalBranch())->doneCreating();
+    (cgproxy.ckLocalBranch())->broadcastStrategies();
 }
 
 char *router;
@@ -826,13 +1032,44 @@ void ComlibInitSectionID(CkSectionID &sid){
     sid.pelist = NULL;
 }
 
+void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
+    CkSectionID &sid = sproxy->ckGetSectionID();
+    ComlibInitSectionID(sid);
+    sid._cookie.sInfo.cInfo.status = 0;
+}
+
 // for backward compatibility - for old name commlib
 void _registercommlib(void)
 {
-  static int _done = 0; if(_done) return; _done = 1;
+  static int _done = 0; 
+  if(_done) 
+      return; 
+  _done = 1;
   _registercomlib();
 }
 
+void ComlibAtSyncHandler(void *msg) {
+    CmiFree(msg);
+    CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
+    ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
+    if(cmgr_ptr)
+        cmgr_ptr->AtSync();    
+}
+
+void ComlibNotifyMigrationDoneHandler(void *msg) {
+    CmiFree(msg);
+    CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
+    ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
+    if(cmgr_ptr)
+        cmgr_ptr->AtSync();    
+}
+
+
+void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
+    CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
+    (cgproxy.ckLocalBranch())->lbUpdate(msg);
+}
+
 #include "comlib.def.h"
 
 
index 561117c3f1d65ee724bae043b66eefa94a3d6def..46f677bfcc665159c2af4efd6a19e19b2d50b6fa 100644 (file)
@@ -14,15 +14,12 @@ module comlib {
     entry void barrier(void);
     entry void barrier2(void);
     entry void resumeFromBarrier2(void);
+    entry void receiveTable(StrategyWrapper sw, 
+       CkHashtableT<ClibGlobalArrayIndex, int>);
 
-    //entry void beginIteration();
-    //entry void endIteration();
-    //entry void prioEndIteration(PrioMsg *);
-
-    entry void receiveTable(StrategyWrapper sw);
     entry void receiveRemoteSend(CkQ<CharmMessageHolder *> &remoteQ, int id);
-
-    entry void learnPattern(int, int);
+    entry void collectStats(ComlibLocalStats s, int src, 
+       CkVec<ClibGlobalArrayIndex>);
   }
 
   mainchare ComlibManagerMain {
index 71e7c33edeeacd71b22a4b127d96ebea0a1e685f..d9af80b5a9c4e77ee6bc805a24e1cca891c25d61 100644 (file)
                              //learning framework will discover 
                              //the appropriate strategy, not completely 
                              //implemented
-#define IS_MULTICAST -1
-
-#define ALPHA 5E-6
-#define BETA 3.33E-9
-
 PUPbytes(comID);
 
+#include "ComlibStats.h"
+
 #include "comlib.decl.h"
 
 //Dummy message to be sent incase there are no messages to send. 
@@ -58,8 +55,6 @@ class ComlibMulticastMsg : public CkMcastBaseMsg,
 
 class ComlibManager;
 
-extern CkGroupID cmgrID;
-
 //An Instance of the communication library.
 class ComlibInstanceHandle {
  private:    
@@ -95,6 +90,8 @@ class ComlibInstanceHandle {
     }
 };
 
+class LBMigrateMsg;
+
 class ComlibManager: public CkDelegateMgr {
     friend class ComlibInstanceHandle;
 
@@ -104,6 +101,7 @@ class ComlibManager: public CkDelegateMgr {
 
     int remotePe;
     CmiBool isRemote;
+    CmiBool strategyCreated;
 
     int npes;
     int *pelist;
@@ -116,25 +114,36 @@ class ComlibManager: public CkDelegateMgr {
     //Pointer to the converse comm lib strategy table
     StrategyTable *strategyTable;
 
-    CkVec<CharmStrategy *> ListOfStrategies; //temporary list of strategies
+    CkQ<CharmStrategy *> ListOfStrategies; //temporary list of strategies
     
     CkQ<CharmMessageHolder *> remoteQ;  //list of remote messages
                                         //after the object has
                                         //migrated
 
     //The number of strategies created by the user
-    int nstrats; 
+    //int nstrats; //now part of conv comlib
     
     int curStratID, prevStratID;      
     //Number of strategies created by the user.
 
     //flags
     int receivedTable, flushTable, barrierReached, barrier2Reached;
-    int totalMsgCount, totalBytes, nIterations;
+    CmiBool lbUpdateReceived;
+
+    int bcount , b2count;
+    //int totalMsgCount, totalBytes, nIterations;
 
     ComlibArrayListener *alistener;
     int prioEndIterationFlag;
 
+    ComlibGlobalStats clib_gstats; 
+    int    numStatsReceived;
+
+    int curComlibController;   //Processor where strategies are  recreated
+    int clibIteration;         //Number of such learning iterations,
+                               //each of which is triggered by a
+                               //loadbalancing operation
+
     void init(); //initialization function
 
     //charm_message for multicast for a section of that group
@@ -152,32 +161,42 @@ class ComlibManager: public CkDelegateMgr {
                                //the bracket.
     
     void setInstance(int id); 
+
     //void prioEndIteration(PrioMsg *pmsg);
     void registerStrategy(int pos, CharmStrategy *s);
 
  public:
+
+    ComlibLocalStats clib_stats;   //To store statistics of
+                                   //communication operations
+    
     ComlibManager();  //Recommended constructor
 
-    ComlibManager(CkMigrateMessage *m){ }
+    ComlibManager(CkMigrateMessage *m) { }
     int useDefCtor(void){ return 1; } //Use default constructor should
     //be pupped and store all the strategies.
     
     void barrier(void);
     void barrier2(void);
     void resumeFromBarrier2(void);
-    void receiveTable(StrategyWrapper sw); //Receive table of strategies.
 
-    void ArraySend(CkDelegateData *pd,int ep, void *msg, const CkArrayIndexMax &idx, 
-                   CkArrayID a);
+    //Receive table of strategies.
+    void receiveTable(StrategyWrapper &sw, 
+                      CkHashtableT <ClibGlobalArrayIndex, int>&); 
+
+    void ArraySend(CkDelegateData *pd,int ep, void *msg, 
+                   const CkArrayIndexMax &idx, CkArrayID a);
 
     void receiveRemoteSend(CkQ<CharmMessageHolder*> &rq, int id);
     void sendRemote();
 
-    void GroupSend(CkDelegateData *pd,int ep, void *msg, int onpe, CkGroupID gid);
+    void GroupSend(CkDelegateData *pd, int ep, void *msg, int onpe, 
+                   CkGroupID gid);
     
     virtual void ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a);
     virtual void GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g);
-    virtual void ArraySectionSend(CkDelegateData *pd,int ep,void *m,CkArrayID a,CkSectionID &s);
+    virtual void ArraySectionSend(CkDelegateData *pd, int ep ,void *m, 
+                                  CkArrayID a, CkSectionID &s);
 
     CharmStrategy *getStrategy(int instid)
         {return (CharmStrategy *)(* strategyTable)[instid].strategy;}
@@ -187,13 +206,41 @@ class ComlibManager: public CkDelegateMgr {
 
     //To create a new strategy, returns handle to the strategy table;
     ComlibInstanceHandle createInstance();  
-    void doneCreating();             //Done creating instances
+    void broadcastStrategies();             //Done creating instances
+
+    void AtSync();           //User program called loadbalancer
+    void lbUpdate(LBMigrateMsg *); //loadbalancing updates
 
     //Learning functions
-    void learnPattern(int totalMessageCount, int totalBytes);
-    void switchStrategy(int strat);
+    //void learnPattern(int totalMessageCount, int totalBytes);
+    //void switchStrategy(int strat);
 
     void setRemote(int remotePe);
+
+    void collectStats(ComlibLocalStats &s, int src,CkVec<ClibGlobalArrayIndex>&);
+
+    //Returns the processor on which the comlib sees the array element
+    //belonging to
+    inline int getLastKnown(CkArrayID a, CkArrayIndexMax &idx) {
+        /*
+        ClibGlobalArrayIndex cidx;
+        cidx.aid = a;
+        cidx.idx = idx;
+        int pe = locationTable->get(cidx);
+
+        if(pe == 0) {
+            //Array element does not exist in the table
+
+            CkArray *array = CkArrayID::CkLocalBranch(a);
+            pe = array->lastKnown(idx) + CkNumPes();
+            locationTable->put(cidx) = pe;
+        }
+        //CkPrintf("last pe = %d \n", pe - CkNumPes());
+
+        return pe - CkNumPes();
+        */
+        return ComlibGetLastKnown(a, idx);
+    }
 };
 
 void ComlibDelegateProxy(CProxy *proxy);
@@ -201,11 +248,36 @@ void ComlibDelegateProxy(CProxy *proxy);
 ComlibInstanceHandle CkCreateComlibInstance();
 ComlibInstanceHandle CkGetComlibInstance();
 ComlibInstanceHandle CkGetComlibInstance(int id);
+void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy);
+
 
 //Only Called when the strategies are not being created in main::main
 void ComlibDoneCreating(); 
 
 void ComlibInitSectionID(CkSectionID &sid);
-void ComlibDeleteSection();
+
+void ComlibAtSync(void *msg);
+void ComlibNotifyMigrationDoneHandler(void *msg);
+void ComlibLBMigrationUpdate(LBMigrateMsg *);
+
+#define RECORD_SEND_STATS(sid, bytes, dest) {             \
+        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));               \
+        cgproxy.ckLocalBranch()->clib_stats.recordSend(sid, bytes, dest); \
+}\
+
+#define RECORD_RECV_STATS(sid, bytes, src) { \
+        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); \
+        cgproxy.ckLocalBranch()->clib_stats.recordRecv(sid, bytes, src); \
+}\
+
+#define RECORD_SENDM_STATS(sid, bytes, dest_arr, ndest) {       \
+        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); \
+        cgproxy.ckLocalBranch()->clib_stats.recordSendM(sid, bytes, dest_arr, ndest); \
+}\
+
+#define RECORD_RECVM_STATS(sid, bytes, src_arr, nsrc) {        \
+        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); \
+        cgproxy.ckLocalBranch()->clib_stats.recordRecvM(sid, bytes, src_arr, nsrc); \
+}\
 
 #endif
index cf8191d0e204600b8f9783ec4728a4c3b8d47118..287a80a18237bd14bc6ed213c1ee3359d02bd8ec 100644 (file)
@@ -55,13 +55,20 @@ ComlibMulticastMsg * ComlibSectionInfo::getNewMulticastMessage
 void ComlibSectionInfo::unpack(envelope *cb_env, 
                                CkVec<CkArrayIndexMax> *&dest_indices, 
                                envelope *&env) {
-    
-    dest_indices = new CkVec<CkArrayIndexMax>;
-    
+        
+    dest_indices = NULL;    
     ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)EnvToUsr(cb_env);
+    
+    if(ccmsg->nIndices > 0)
+        dest_indices = new CkVec<CkArrayIndexMax>;
+
     for(int count = 0; count < ccmsg->nIndices; count++){
         CkArrayIndexMax idx = ccmsg->indices[count];
-        int dest_proc = CkArrayID::CkLocalBranch(destArrayID)->lastKnown(idx);
+        
+        //This will work because. lastknown always knows if I have the
+        //element of not
+        int dest_proc = ComlibGetLastKnown(destArrayID, idx);
+        //CkArrayID::CkLocalBranch(destArrayID)->lastKnown(idx);
         
         if(dest_proc == CkMyPe())
             dest_indices->insertAtEnd(idx);                        
@@ -74,6 +81,9 @@ void ComlibSectionInfo::unpack(envelope *cb_env,
 
 
 void ComlibSectionInfo::processOldSectionMessage(CharmMessageHolder *cmsg) {
+
+    ComlibPrintf("Process Old Section Message \n");
+
     int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
 
     //Old section id, send the id with the message
@@ -93,8 +103,10 @@ void ComlibSectionInfo::initSectionID(CkSectionID *sid){
     int count = 0, acount = 0;
 
     for(acount = 0; acount < sid->_nElems; acount++){
-        int p = CkArrayID::CkLocalBranch(destArrayID)->
-            lastKnown(sid->_elems[acount]);
+
+        int p = ComlibGetLastKnown(destArrayID, sid->_elems[acount]);
+        //CkArrayID::CkLocalBranch(destArrayID)->
+        //lastKnown(sid->_elems[acount]);
         
         if(p == -1) CkAbort("Invalid Section\n");        
         for(count = 0; count < sid->npes; count ++)
index e42f2a5102db43f29007d134430f5131bf02ad90..d2d589468e6e666b6657906457613ec4a28754c8 100644 (file)
@@ -2,6 +2,11 @@
 #include "charm++.h"
 #include "envelope.h"
 
+//calls ComlibNotifyMigrationDone(). Even compiles when -module comlib
+//is not included. Hack to make loadbalancer work without comlib
+//currently.
+CkpvDeclare(int, migrationDoneHandlerID);
+
 //Class that defines the entry methods that a Charm level strategy
 //must define.  To write a new strategy inherit from this class and
 //define the virtual methods.  Every strategy can also define its own
@@ -60,44 +65,133 @@ void ComlibNodeGroupInfo::pup(PUP::er &p) {
 
 ComlibGroupInfo::ComlibGroupInfo() {
     
-    isGroup = 0;
+    isSrcGroup = 0;
+    isDestGroup = 0;
     nsrcpes = 0;
+    ndestpes = 0;
     srcpelist = NULL;
-    gid.setZero();
+    destpelist = NULL;
+    sgid.setZero();
+    dgid.setZero();
 };
 
+ComlibGroupInfo::~ComlibGroupInfo() {
+    if(nsrcpes > 0 && srcpelist != NULL)
+        delete [] srcpelist;
+
+    if(ndestpes > 0 && destpelist != NULL)
+        delete [] destpelist;
+}
+
 void ComlibGroupInfo::pup(PUP::er &p){
 
-    p | gid;
+    p | sgid;
+    p | dgid;
     p | nsrcpes;
+    p | ndestpes;
+
+    p | isSrcGroup;
+    p | isDestGroup;
 
     if(p.isUnpacking()) {
-        if(nsrcpes >0) 
+        if(nsrcpes > 0) 
             srcpelist = new int[nsrcpes];
+
+        if(ndestpes > 0) 
+            destpelist = new int[ndestpes];
     }
 
     if(nsrcpes > 0) 
         p(srcpelist, nsrcpes);
+
+    if(ndestpes > 0) 
+        p(destpelist, ndestpes);
 }
 
 void ComlibGroupInfo::setSourceGroup(CkGroupID gid, int *pelist, 
                                          int npes) {
-    this->gid = gid;
+    this->sgid = gid;
     srcpelist = pelist;
     nsrcpes = npes;
-    isGroup = 1;
+    isSrcGroup = 1;
+
+    if(nsrcpes == 0) {
+        nsrcpes = CkNumPes();
+        srcpelist = new int[nsrcpes];
+        for(int count =0; count < nsrcpes; count ++)
+            srcpelist[count] = count;
+    }
 }
 
 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid, int *&pelist, 
                                          int &npes){
-    gid = this->gid;
-
-    pelist = srcpelist;
+    gid = this->sgid;
     npes = nsrcpes;
+
+    pelist = new int [nsrcpes];
+    memcpy(pelist, srcpelist, npes * sizeof(int));
 }
 
 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid){
-    gid = this->gid;
+    gid = this->sgid;
+}
+
+void ComlibGroupInfo::setDestinationGroup(CkGroupID gid, int *pelist, 
+                                         int npes) {
+    this->dgid = gid;
+    destpelist = pelist;
+    ndestpes = npes;
+    isDestGroup = 1;
+
+    if(ndestpes == 0) {
+        ndestpes = CkNumPes();
+        destpelist = new int[ndestpes];
+        for(int count =0; count < ndestpes; count ++)
+            destpelist[count] = count;
+    }
+}
+
+void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid, int *&pelist, 
+                                         int &npes){
+    gid = this->dgid;
+    npes = ndestpes;
+
+    pelist = new int [ndestpes];
+    memcpy(pelist, destpelist, npes * sizeof(int));
+}
+
+void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid){
+    gid = this->dgid;
+}
+
+void ComlibGroupInfo::getCombinedPeList(int *&pelist, int &npes) {
+    int count = 0;        
+    pelist = 0;
+    npes = 0;
+
+    pelist = new int[CkNumPes()];
+    if(nsrcpes == 0 || ndestpes == 0) {
+        npes = CkNumPes();        
+        for(count = 0; count < CkNumPes(); count ++) 
+            pelist[count] = count;                         
+    }
+    else {        
+        npes = ndestpes;
+        memcpy(pelist, destpelist, npes * sizeof(int));
+        
+        //Add source processors to the destination processors
+        //already obtained
+        for(int count = 0; count < nsrcpes; count++) {
+            int p = srcpelist[count];
+
+            for(count = 0; count < npes; count ++)
+                if(pelist[count] == p)
+                    break;
+
+            if(count == npes)
+                pelist[npes ++] = p;
+        }                        
+    }
 }
 
 ComlibArrayInfo::ComlibArrayInfo() {
@@ -113,6 +207,16 @@ ComlibArrayInfo::ComlibArrayInfo() {
     isDestArray = 0;
 };
 
+ComlibArrayInfo::~ComlibArrayInfo() {
+    //CkPrintf("in comlibarrayinfo destructor\n");
+
+    if(nSrcIndices > 0)
+        delete [] src_elements;
+
+    if(nDestIndices > 0)
+        delete [] dest_elements;
+}
+
 void ComlibArrayInfo::setSourceArray(CkArrayID aid, 
                                          CkArrayIndexMax *e, int nind){
     src_aid = aid;
@@ -179,20 +283,7 @@ void ComlibArrayInfo::pup(PUP::er &p){
     else
         dest_elements = NULL;
 
-    //Insert all local elements into a vector
-    if(p.isUnpacking() && !dest_aid.isZero()) {
-        CkArray *dest_array = CkArrayID::CkLocalBranch(dest_aid);
-        
-        if(nDestIndices == 0){            
-            dest_array->getComlibArrayListener()->getLocalIndices
-                (localDestIndexVec);
-        }
-        else {
-            for(int count = 0; count < nDestIndices; count++) 
-                if(dest_array->lastKnown(dest_elements[count]) == CkMyPe())
-                    localDestIndexVec.insertAtEnd(dest_elements[count]);
-        }
-    }
+    localDestIndexVec.resize(0);
 }
 
 //Get the list of destination processors
@@ -224,8 +315,8 @@ void ComlibArrayInfo::getDestinationPeList(int *&destpelist, int &ndestpes) {
 
     //Find the last known processors of the array elements
     for(acount = 0; acount < nDestIndices; acount++) {
-        int p = CkArrayID::CkLocalBranch(dest_aid)->
-            lastKnown(dest_elements[acount]);        
+
+        int p = ComlibGetLastKnown(dest_aid, dest_elements[acount]); 
         
         for(count = 0; count < ndestpes; count ++)
             if(destpelist[count] == p)
@@ -260,8 +351,8 @@ void ComlibArrayInfo::getSourcePeList(int *&srcpelist, int &nsrcpes) {
 
     nsrcpes = 0;
     for(acount = 0; acount < nSrcIndices; acount++) {
-        int p = CkArrayID::CkLocalBranch(src_aid)->
-            lastKnown(src_elements[acount]);        
+        
+        int p = ComlibGetLastKnown(src_aid, src_elements[acount]); 
         
         for(count = 0; count < nsrcpes; count ++)
             if(srcpelist[count] == p)
@@ -308,9 +399,8 @@ void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
         //Add source processors to the destination processors
         //already obtained
         for(int acount = 0; acount < nSrcIndices; acount++) {
-            int p = CkArrayID::CkLocalBranch(src_aid)->
-                lastKnown(src_elements[acount]);
-            
+            int p = ComlibGetLastKnown(src_aid, src_elements[acount]);
+
             for(count = 0; count < npes; count ++)
                 if(pelist[count] == p)
                     break;
@@ -321,6 +411,23 @@ void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
 }
 
 void ComlibArrayInfo::localBroadcast(envelope *env) {
+    //Insert all local elements into a vector
+    if(localDestIndexVec.size()==0 && !dest_aid.isZero()) {
+        CkArray *dest_array = CkArrayID::CkLocalBranch(dest_aid);
+        
+        if(nDestIndices == 0){            
+            dest_array->getComlibArrayListener()->getLocalIndices
+                (localDestIndexVec);
+        }
+        else {
+            for(int count = 0; count < nDestIndices; count++) {
+                if(ComlibGetLastKnown(dest_aid, dest_elements[count])
+                   == CkMyPe())
+                    localDestIndexVec.insertAtEnd(dest_elements[count]);
+            }
+        }
+    }
+
     ComlibArrayInfo::localMulticast(&localDestIndexVec, env);
 }
 
@@ -385,3 +492,48 @@ void ComlibArrayInfo::deliver(envelope *env){
     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);    
 }
+
+void ComlibNotifyMigrationDone() {
+    if(CpvInitialized(migrationDoneHandlerID)) 
+        if(CkpvAccess(migrationDoneHandlerID) > 0) {
+            char *msg = (char *)CmiAlloc(CmiReservedHeaderSize);
+            CmiSetHandler(msg, CkpvAccess(migrationDoneHandlerID));
+            //CmiSyncSendAndFree(CkMyPe(), CmiReservedHeaderSize, msg);
+            CmiHandleMessage(msg);
+        }
+}
+
+
+//Stores the location of many array elements used by the
+//strategies.  Since hash table returns a reference to the object
+//and for an int that will be 0, the actual value stored is pe +
+//CkNumPes so 0 would mean processor -CkNumPes which is invalid.
+CpvDeclare(ClibLocationTableType *, locationTable);
+
+int ComlibGetLastKnown(CkArrayID aid, CkArrayIndexMax idx) {
+    //CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
+    //return (cgproxy.ckLocalBranch())->getLastKnown(aid, idx);
+
+    if(!CpvInitialized(locationTable)) {
+        CkAbort("Uninitialized table\n");
+    }
+
+    if(CpvAccess(locationTable) == NULL)
+        CkAbort("comlib location table is NULL\n");
+
+    ClibGlobalArrayIndex cidx;
+    cidx.aid = aid;
+    cidx.idx = idx;
+    int pe = CpvAccess(locationTable)->get(cidx);
+    
+    if(pe == 0) {
+        //Array element does not exist in the table
+        
+        CkArray *array = CkArrayID::CkLocalBranch(aid);
+        pe = array->lastKnown(idx) + CkNumPes();
+        CpvAccess(locationTable)->put(cidx) = pe;
+    }
+    //CkPrintf("last pe = %d \n", pe - CkNumPes());
+    
+    return pe - CkNumPes();
+}
index 3d6edf8c5f120608aeb55ac3104776fb08d42472..7ee83b91b2e5dc0748af88b43800315b7ce5ec94 100644 (file)
@@ -2,7 +2,11 @@
 #define COMMLIBSTRATEGY_H
 
 #include "charm++.h"
+#include "ckhashtable.h"
 #include "convcomlibstrategy.h"
+#include "ComlibLearner.h"
+
+CkpvExtern(int, migrationDoneHandlerID);
 
 //Class managing Charm++ messages in the communication library.
 //It is aware of envelopes, arrays, etc
@@ -22,6 +26,56 @@ class CharmMessageHolder : public MessageHolder{
     PUPable_decl(CharmMessageHolder);
 };
 
+
+//Struct to store the comlib location table info
+struct ClibGlobalArrayIndex {
+    CkArrayID aid;
+    CkArrayIndexMax idx;
+
+    //These routines allow ClibGlobalArrayIndex to be used in
+    //  a CkHashtableT
+    CkHashCode hash(void) const;
+    static CkHashCode staticHash(const void *a,size_t);
+    int compare(const ClibGlobalArrayIndex &ind) const;
+    static int staticCompare(const void *a,const void *b,size_t);
+};
+PUPbytes(ClibGlobalArrayIndex);
+
+/*********** CkHashTable functions ******************/
+inline CkHashCode ClibGlobalArrayIndex::hash(void) const
+{
+    register CkHashCode ret = idx.hash() | (CkGroupID(aid).idx << 16);
+    return ret;
+}
+
+inline int ClibGlobalArrayIndex::compare(const ClibGlobalArrayIndex &k2) const
+{
+    if(idx == k2.idx && aid == k2.aid)
+        return 1;
+    
+    return 0;
+}
+
+//ClibGlobalArrayIndex CODE
+inline int ClibGlobalArrayIndex::staticCompare(const void *k1, const void *k2, 
+                                                size_t ){
+    return ((const ClibGlobalArrayIndex *)k1)->
+                compare(*(const ClibGlobalArrayIndex *)k2);
+}
+
+inline CkHashCode ClibGlobalArrayIndex::staticHash(const void *v,size_t){
+    return ((const ClibGlobalArrayIndex *)v)->hash();
+}
+
+
+typedef CkHashtableT<ClibGlobalArrayIndex,int> ClibLocationTableType;
+    
+//Stores the location of many array elements used by the
+//strategies.  Since hash table returns a reference to the object
+//and for an int that will be 0, the actual value stored is pe +
+//CkNumPes so 0 would mean processor -CkNumPes which is invalid.
+CkpvExtern(ClibLocationTableType *, locationTable);
+
 //Info classes that help bracketed streategies manage objects
 //Each info class points to a list of source (or destination) objects
 //ArrayInfo also access the array listener interface
@@ -47,18 +101,28 @@ class ComlibNodeGroupInfo {
 
 class ComlibGroupInfo {
  protected:
-    CkGroupID gid;
+    CkGroupID sgid, dgid;
     int *srcpelist, nsrcpes; //src processors for the elements
-    int isGroup;   
+    int *destpelist, ndestpes;
+    int isSrcGroup;   
+    int isDestGroup;
 
  public:
     ComlibGroupInfo();
+    ~ComlibGroupInfo();
 
-    void setSourceGroup(CkGroupID gid, int *srcpelist=0, int nsrcpes=0);
-    int isSourceGroup(){return isGroup;}
+    int isSourceGroup(){return isSrcGroup;}
+    int isDestinationGroup(){return isDestGroup;}
+
+    void setSourceGroup(CkGroupID gid, int *srcpelist=0, int nsrcpes=0);    
     void getSourceGroup(CkGroupID &gid);
     void getSourceGroup(CkGroupID &gid, int *&pelist, int &npes);
-    
+
+    void setDestinationGroup(CkGroupID sgid,int *destpelist=0,int ndestpes=0);
+    void getDestinationGroup(CkGroupID &gid);
+    void getDestinationGroup(CkGroupID &dgid,int *&destpelist, int &ndestpes);
+
+    void getCombinedPeList(int *&pelist, int &npes);
     void pup(PUP::er &p);
 };
 
@@ -88,6 +152,7 @@ class ComlibArrayInfo {
     
  public:
     ComlibArrayInfo();
+    ~ComlibArrayInfo();
 
     void setSourceArray(CkArrayID aid, CkArrayIndexMax *e=0, int nind=0);
     int isSourceArray(){return isSrcArray;}
@@ -115,7 +180,9 @@ class ComlibArrayInfo {
    node groups, groups and arrays */
 
 class CharmStrategy : public Strategy {
+ protected:
     int forwardOnMigration;
+    ComlibLearner *learner;
 
  public:
     ComlibGroupInfo ginfo;
@@ -128,9 +195,12 @@ class CharmStrategy : public Strategy {
     CharmStrategy() : Strategy() {
         setType(GROUP_STRATEGY); 
         forwardOnMigration = 0;
+        learner = NULL;
     }
 
-    CharmStrategy(CkMigrateMessage *m) : Strategy(m){}
+    CharmStrategy(CkMigrateMessage *m) : Strategy(m){
+        learner = NULL;
+    }
 
     //Called for each message
     //Function inserts a Charm++ message
@@ -151,6 +221,15 @@ class CharmStrategy : public Strategy {
     //DOES NOT exist in Converse Strategies
     virtual void beginProcessing(int nelements){}
 
+    //Added a new call that is called after the strategy had be
+    //created on every processor.  DOES NOT exist in Converse
+    //Strategies. Called when the strategy is deactivated, possibly as
+    //a result of a learning decision
+    virtual void finalizeProcessing(){}
+
+    virtual ComlibLearner *getLearner() {return learner;}
+    virtual void setLearner(ComlibLearner *l) {learner = l;}
+
     virtual void pup(PUP::er &p);
     PUPable_decl(CharmStrategy);
 
@@ -163,4 +242,8 @@ class CharmStrategy : public Strategy {
     }
 };
 
+//API calls which will be valid when communication library is not linked
+int ComlibGetLastKnown(CkArrayID a, CkArrayIndexMax idx);
+void ComlibNotifyMigrationDone();
+
 #endif
index 03390567bab755ed0a1bdea7631242f12bd3e521..c3cc4b380f9a906b35da3655cc47be153c9140e7 100644 (file)
@@ -1,4 +1,5 @@
 #include "DirectMulticastStrategy.h"
+#include "AAMLearner.h"
 
 CkpvExtern(CkGroupID, cmgrID);
 
@@ -12,7 +13,9 @@ void *DMHandler(void *msg){
     nm_mgr = (DirectMulticastStrategy *) 
         CProxy_ComlibManager(CkpvAccess(cmgrID)).
         ckLocalBranch()->getStrategy(instid);
-    
+
+    envelope *env = (envelope *) msg;
+    RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
     nm_mgr->handleMulticastMessage(msg);
     return NULL;
 }
@@ -32,6 +35,7 @@ DirectMulticastStrategy::DirectMulticastStrategy(int ndest, int *pelist)
 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
     :  CharmStrategy() {
 
+    //ainfo.setSourceArray(aid);
     ainfo.setDestinationArray(aid);
     setType(ARRAY_STRATEGY);
     ndestpes = 0;
@@ -39,6 +43,17 @@ DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
     commonInit();
 }
 
+DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID said, CkArrayID daid)
+    :  CharmStrategy() {
+
+    ainfo.setSourceArray(said);
+    ainfo.setDestinationArray(daid);
+    setType(ARRAY_STRATEGY);
+    ndestpes = 0;
+    destpelist = 0;
+    commonInit();
+}
+
 void DirectMulticastStrategy::commonInit(){
 
     if(ndestpes == 0) {
@@ -49,6 +64,24 @@ void DirectMulticastStrategy::commonInit(){
     }
 }
 
+DirectMulticastStrategy::~DirectMulticastStrategy() {
+    if(ndestpes > 0)
+        delete [] destpelist;
+
+    if(getLearner() != NULL)
+        delete getLearner();
+        
+    CkHashtableIterator *ht_iterator = sec_ht.iterator();
+    ht_iterator->seekStart();
+    while(ht_iterator->hasNext()){
+        void **data;
+        data = (void **)ht_iterator->next();        
+        CkVec<CkArrayIndexMax> *a_vec = (CkVec<CkArrayIndexMax> *) (* data);
+        if(a_vec != NULL)
+            delete a_vec;
+    }
+}
+
 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
     if(messageBuf == NULL) {
        CkPrintf("ERROR MESSAGE BUF IS NULL\n");
@@ -58,7 +91,7 @@ void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
     ComlibPrintf("[%d] Comlib Direct Multicast: insertMessage \n", 
                  CkMyPe());   
    
-    if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
+    if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
 
         if(cur_sec_id > 0) {        
@@ -74,11 +107,12 @@ void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
             
             sinfo.initSectionID(sid);
 
-            cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
+            cmsg = new CharmMessageHolder((char *)newmsg, 
+                                          IS_SECTION_MULTICAST); 
             cmsg->sec_id = sid;
         }        
     }
-    
+   
     messageBuf->enq(cmsg);
     if(!isBracketed())
         doneInserting();
@@ -95,7 +129,9 @@ void DirectMulticastStrategy::doneInserting(){
        CharmMessageHolder *cmsg = messageBuf->deq();
         char *msg = cmsg->getCharmMessage();
                
-        if(cmsg->dest_proc == IS_MULTICAST) {      
+        if(cmsg->dest_proc == IS_SECTION_MULTICAST || 
+           cmsg->dest_proc == IS_BROADCAST) {      
+
             if(getType() == ARRAY_STRATEGY)
                 CmiSetHandler(UsrToEnv(msg), handlerId);
             
@@ -106,13 +142,20 @@ void DirectMulticastStrategy::doneInserting(){
                 cur_npes = cmsg->sec_id->npes;
             }
             
+            //Collect Multicast Statistics
+            RECORD_SENDM_STATS(getInstance(), 
+                               ((envelope *)cmsg->getMessage())->getTotalsize(), 
+                               cur_map, cur_npes);
+
+
             ComlibPrintf("[%d] Calling Direct Multicast %d %d %d\n", CkMyPe(),
                          UsrToEnv(msg)->getTotalsize(), cur_npes, 
                          cmsg->dest_proc);
+
             /*
-            for(int i=0; i < cur_npes; i++)
-                CkPrintf("[%d] Sending to %d %d\n", CkMyPe(), 
-                         cur_map[i], cur_npes);
+              for(int i=0; i < cur_npes; i++)
+              CkPrintf("[%d] Sending to %d %d\n", CkMyPe(), 
+              cur_map[i], cur_npes);
             */
 
             CmiSyncListSendAndFree(cur_npes, cur_map, 
@@ -135,10 +178,22 @@ void DirectMulticastStrategy::pup(PUP::er &p){
     CharmStrategy::pup(p);
 
     p | ndestpes;
-    if(p.isUnpacking())
+    if(p.isUnpacking() && ndestpes > 0)
         destpelist = new int[ndestpes];
     
     p(destpelist, ndestpes);        
+    
+    if(p.isUnpacking()) {
+        CkArrayID src;
+        int nidx;
+        CkArrayIndexMax *idx_list;     
+        ainfo.getSourceArray(src, idx_list, nidx);
+        
+        if(!src.isZero()) {
+            AAMLearner *l = new AAMLearner();
+            setLearner(l);
+        }
+    }
 }
 
 void DirectMulticastStrategy::beginProcessing(int numElements){
index f23a802bbb02d0ad8150838766de34698648ff9e..941f41ca4e6dbc778b3ad363dfb1997ddce870f7 100644 (file)
@@ -9,12 +9,12 @@ void *DMHandler(void *msg);
 class DirectMulticastStrategy: public CharmStrategy {
  protected:
     CkQ <CharmMessageHolder*> *messageBuf;
-
+    
     int ndestpes, *destpelist; //Destination processors
     int handlerId;
     
     ComlibSectionInfo sinfo;
-
+    
     //Array section support
     CkHashtableT<ComlibSectionHashKey, void *> sec_ht; 
     
@@ -26,12 +26,17 @@ class DirectMulticastStrategy: public CharmStrategy {
     
     //Group constructor
     DirectMulticastStrategy(int ndestpes = 0, int *destpelist = 0);    
-
+    DirectMulticastStrategy(CkMigrateMessage *m): CharmStrategy(m){}
+                
     //Array constructor
     DirectMulticastStrategy(CkArrayID aid);
 
-    DirectMulticastStrategy(CkMigrateMessage *m): CharmStrategy(m){}
-    
+    //Array constructor
+    DirectMulticastStrategy(CkArrayID said, CkArrayID dest);
+        
+    //Destuctor
+    ~DirectMulticastStrategy();
+        
     virtual void insertMessage(CharmMessageHolder *msg);
     virtual void doneInserting();
 
index 012eb3fe223a1c1e43761ac76073b5d8ca0b989c..0a652496811d32ae85281baf76cdd4f18a07adbf 100644 (file)
@@ -3,6 +3,9 @@
 #include "string.h"
 #include "routerstrategy.h"
 
+#include "AAPLearner.h"
+#include "AAMLearner.h"
+
 //EachToManyMulticastStrategy CODE
 CkpvExtern(int, RecvdummyHandle);
 CkpvExtern(CkGroupID, cmgrID);
@@ -24,7 +27,7 @@ void *itrDoneHandler(void *msg){
     int nexpected = sentry->numElements;
     
     if(nexpected == 0) {             
-        //CkPrintf("[%d] Calling Dummy Done Inserting\n", CkMyPe());
+        ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
         nm_mgr->doneInserting();
     }
@@ -35,14 +38,17 @@ void *itrDoneHandler(void *msg){
 void *E2MHandler(void *msg){
     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
     EachToManyMulticastStrategy *nm_mgr;    
-    
-    CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
+
+    envelope *env = (envelope *)msg;
+    CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr(env);
     int instid = bmsg->_cookie.sInfo.cInfo.instId;
     
     nm_mgr = (EachToManyMulticastStrategy *) 
         CProxy_ComlibManager(CkpvAccess(cmgrID)).
         ckLocalBranch()->getStrategy(instid);
     
+    RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
+    
     nm_mgr->localMulticast(msg);
     return NULL;
 }
@@ -57,61 +63,14 @@ EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy,
     
     setType(GROUP_STRATEGY);
 
-    int count = 0;
-
-    if(n_srcpes == 0) {
-        n_srcpes = CkNumPes();
-        src_pelist = new int[n_srcpes];
-        for(count =0; count < n_srcpes; count ++)
-            src_pelist[count] = count;
-    }
-    
     CkGroupID gid;
     gid.setZero();
     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
+    ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
 
-    if(n_destpes == 0) {
-        ndestpes = CkNumPes();
-        destpelist = new int[ndestpes];
-        for(count =0; count < ndestpes; count ++)
-            destpelist[count] = count;
-    }
-    else {
-        ndestpes = n_destpes;
-        destpelist = dest_pelist;
-    }
-
-    if(n_srcpes == 0){
-        pelist = src_pelist;
-        npes = n_srcpes;
-
-        commonInit();
-        return;
-    }
-
-    if(n_destpes == 0) {
-        pelist = destpelist;
-        npes = ndestpes;
-        
-        commonInit();
-        return;
-    }
-    
-    //source and destination lists are both subsets
-    pelist = new int[CkNumPes()];
-    npes = n_srcpes;
-    memcpy(pelist, src_pelist, n_srcpes * sizeof(int));
-    
-    for(int dcount = 0; dcount < ndestpes; dcount++) {
-        int p = destpelist[dcount];
-        
-        for(count = 0; count < npes; count ++)
-            if(pelist[count] == p)
-                break;
-        
-        if(count == npes)
-            pelist[npes++] = p;
-    }    
+    //Written in this funny way to be symettric with the array case.
+    ginfo.getDestinationGroup(gid, destpelist, ndestpes);
+    ginfo.getCombinedPeList(pelist, npes);
 
     commonInit();
 }
@@ -135,11 +94,17 @@ EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy,
     ainfo.getDestinationPeList(destpelist, ndestpes);
     ainfo.getCombinedPeList(pelist, npes);
     
-    //    for(int count = 0; count < npes; count ++){
-    //CkPrintf("%d, ", pelist[count]);
-    //}    
-    //CkPrintf("\n");
-    
+    /*
+      char dump[1000];
+      char sdump[100];
+      sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
+      for(int count = 0; count < npes; count ++){
+      sprintf(sdump, "%d, ", pelist[count]);
+      strcat(dump, sdump);           
+      }    
+      ComlibPrintf("%s\n", dump);
+    */
+
     commonInit();
 }
 
@@ -150,6 +115,8 @@ void EachToManyMulticastStrategy::commonInit() {
     setBracketed();
     setForwardOnMigration(1);
 
+    mflag = CmiFalse;
+
     if(CkMyPe() == 0 && router != NULL){
         if(strcmp(router, "USE_MESH") == 0)
             routerID = USE_MESH;
@@ -166,13 +133,25 @@ void EachToManyMulticastStrategy::commonInit() {
     rstrat = NULL;
 }
 
+EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
+    CkHashtableIterator *ht_iterator = sec_ht.iterator();
+    ht_iterator->seekStart();
+    while(ht_iterator->hasNext()){
+        void **data;
+        data = (void **)ht_iterator->next();        
+        CkVec<CkArrayIndexMax> *a_vec = (CkVec<CkArrayIndexMax> *) (* data);
+        if(a_vec != NULL)
+            delete a_vec;
+    }
+}
+
 
 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
 
     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
                  CkMyPe());   
 
-    if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
+    if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
 
         if(cur_sec_id > 0) {        
@@ -185,7 +164,8 @@ void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
             CkSectionID *sid = cmsg->sec_id;
             delete cmsg;
             
-            cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
+            cmsg = new CharmMessageHolder((char *)newmsg,
+                                          IS_SECTION_MULTICAST); 
             cmsg->sec_id = sid;
             initSectionID(cmsg->sec_id);
         }        
@@ -196,16 +176,58 @@ void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
         }        
     }
 
+    if(cmsg->dest_proc == IS_BROADCAST) {
+        cmsg->npes = ndestpes;
+        cmsg->pelist = destpelist;
+    }
+
     //For section multicasts and broadcasts
-    if(cmsg->dest_proc == IS_MULTICAST)
+    if(cmsg->dest_proc == IS_SECTION_MULTICAST 
+       || cmsg->dest_proc == IS_BROADCAST ) {
+        
+        //Use Multicast Learner (Foobar will not work for combinations
+        //of personalized and multicast messages
+        
+        if(!mflag) {
+            ComlibLearner *l = getLearner();
+            if(l != NULL) {
+                delete l;
+                l = NULL;
+            }
+            
+            AAMLearner *alearner = new AAMLearner();
+            setLearner(alearner);
+            mflag = CmiTrue;
+        }
+
         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
+
+        //Collect Multicast Statistics
+        RECORD_SENDM_STATS(getInstance(), 
+                           ((envelope *)cmsg->getMessage())->getTotalsize(), 
+                           cmsg->pelist, cmsg->npes);
+    }
+    else {
+        //Collect Statistics
+        RECORD_SEND_STATS(getInstance(), 
+                          ((envelope *)cmsg->getMessage())->getTotalsize(), 
+                          cmsg->dest_proc);
+    }
     
     rstrat->insertMessage(cmsg);
 }
 
 void EachToManyMulticastStrategy::doneInserting(){
-    ComlibPrintf("%d: DoneInserting \n", CkMyPe());
-    
+
+    StrategyTableEntry *sentry = 
+        CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
+        ->getStrategyTableEntry(getInstance());
+    int nexpected = sentry->numElements;
+
+    if(routerID == USE_DIRECT && nexpected == 0)
+        return;
+
+    //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
     rstrat->doneInserting();
 }
 
@@ -219,6 +241,7 @@ void EachToManyMulticastStrategy::pup(PUP::er &p){
 
     p | routerID; 
     p | npes; p | ndestpes;     
+    p | mflag;
     
     if(p.isUnpacking() && npes > 0) {
         pelist = new int[npes];    
@@ -236,8 +259,7 @@ void EachToManyMulticastStrategy::pup(PUP::er &p){
 
     if(p.isUnpacking()){
        handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
-        int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);
-        
+        int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
         
         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
         setConverseStrategy(rstrat);
@@ -248,11 +270,19 @@ void EachToManyMulticastStrategy::pup(PUP::er &p){
 }
 
 void EachToManyMulticastStrategy::beginProcessing(int numElements){
-
-    ComlibPrintf("Begin processing %d\n", numElements);
+    
+    ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
+    
+    char dump[1000];
+    char sdump[100];
+    sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
+    for(int count = 0; count < npes; count ++){
+        sprintf(sdump, "%d, ", pelist[count]);
+        strcat(dump, sdump);           
+    }    
+    ComlibPrintf("%s\n", dump);
 
     int expectedDeposits = 0;
-    MaxSectionID = 0;
 
     rstrat->setInstance(getInstance());
 
@@ -286,6 +316,9 @@ void EachToManyMulticastStrategy::beginProcessing(int numElements){
     ainfo.getDestinationArray(dest, idx_list, nidx);
     sinfo = ComlibSectionInfo(dest, myInstanceID);
     
+    AAPLearner *alearner = new AAPLearner();
+    setLearner(alearner);
+
     if(expectedDeposits > 0)
         return;
     
@@ -294,6 +327,21 @@ void EachToManyMulticastStrategy::beginProcessing(int numElements){
         ConvComlibScheduleDoneInserting(myInstanceID);
 }
 
+void EachToManyMulticastStrategy::finalizeProcessing() {
+    if(npes > 0)
+        delete [] pelist;
+    
+    if(ndestpes > 0)
+        delete [] destpelist;
+
+    if(rstrat)
+        delete rstrat;
+
+    if(getLearner() != NULL)
+        delete getLearner();
+}
+
+
 void EachToManyMulticastStrategy::localMulticast(void *msg){
     register envelope *env = (envelope *)msg;
     CkUnpackMessage(&env);
@@ -345,7 +393,7 @@ void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
     //Convert real processor numbers to virtual processors in the all
     //to all multicast group
     for(int count = 0; count < sid->npes; count ++) {
-        sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]];        
+        sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]]; 
         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
     }
 }
index c50869d7a47f6ab60e9d41753519770f12e77880..e31809bf627734cd35c4f9ba62283d75ec1b4b2b 100644 (file)
@@ -11,8 +11,9 @@ class EachToManyMulticastStrategy: public CharmStrategy {
     int npes, *pelist; //Domain of the topology
     int MyPe;          //My id in that domain
 
-    int ndestpes, *destpelist, *destMap; //Destination processors
+    CmiBool mflag;
 
+    int ndestpes, *destpelist; //Destination processors
     int handlerId;
     
     //Dynamically set by the application
@@ -22,8 +23,6 @@ class EachToManyMulticastStrategy: public CharmStrategy {
     virtual void commonInit();
     virtual void initSectionID(CkSectionID *sid);
 
-    int MaxSectionID;
-
     RouterStrategy *rstrat;
     ComlibSectionInfo sinfo;
 
@@ -39,7 +38,9 @@ class EachToManyMulticastStrategy: public CharmStrategy {
                                 CkArrayIndexMax *srcelements=0, int ndest=0, 
                                 CkArrayIndexMax *destelements=0);
     
-    EachToManyMulticastStrategy(CkMigrateMessage *m) : CharmStrategy(m){}
+    EachToManyMulticastStrategy(CkMigrateMessage *m) : CharmStrategy(m){};
+    
+    ~EachToManyMulticastStrategy();
 
     //Basic function, subclasses should not have to change it
     virtual void insertMessage(CharmMessageHolder *msg);
@@ -48,6 +49,7 @@ class EachToManyMulticastStrategy: public CharmStrategy {
 
     virtual void pup(PUP::er &p);    
     virtual void beginProcessing(int nelements);
+    virtual void finalizeProcessing();
     virtual void localMulticast(void *msg);
     
     PUPable_decl(EachToManyMulticastStrategy);
index 2e3cb27bd7b5f1fa0f1ab9d505dc73d706b19eef..0bf75196b868b5074f324a74362101aea11df7bc 100644 (file)
@@ -58,7 +58,7 @@ MsgPacker::MsgPacker(CkQ<CharmMessageHolder *> &msgq, int n_msgs){
         msgList[count].idx = env->getsetArrayIndex();
         msgList[count].data = cmsg->getCharmMessage();
 
-        if(msgList[count].size > MAX_MESSAGE_SIZE)
+        if(msgList[count].size >= MAX_MESSAGE_SIZE-1)
             CkAbort("Can't send messges larger than 64KB\n");
 
         delete cmsg;
index 7ac39406f17e19889ebdf7ea4d0358f604cca460..8af78b2f11efa024217f281293250dbd936b7e83 100644 (file)
@@ -27,7 +27,7 @@ class PrioStreaming : public StreamingStrategy {
     */
 
     PrioStreaming(int periodMs=10, int bufferMax=1000, int prio=0);
-    PrioStreaming(CkMigrateMessage *){}
+    PrioStreaming(CkMigrateMessage *m) : StreamingStrategy(m) {}
     
     virtual void insertMessage(CharmMessageHolder *msg);
 
index 6c11630f32958cd81c3f27f962c877d6c54d94f4..9ea046aaef3021f6a5a06c95c58493ac959c2396 100644 (file)
@@ -12,10 +12,52 @@ RingMulticastStrategy::RingMulticastStrategy(CkArrayID dest_aid)
     commonRingInit();    
 }
 
+//Array Constructor
+RingMulticastStrategy::RingMulticastStrategy(CkArrayID src, CkArrayID dest)
+    : DirectMulticastStrategy(src, dest){
+    commonRingInit();    
+}
+
 void RingMulticastStrategy::commonRingInit(){
     //Sort destpelist
 }
 
+
+void RingMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
+    if(messageBuf == NULL) {
+       CkPrintf("ERROR MESSAGE BUF IS NULL\n");
+       return;
+    }
+    
+    ComlibPrintf("[%d] Comlib Direct Multicast: insertMessage \n", 
+                 CkMyPe());   
+    
+    if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
+        int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
+
+        if(cur_sec_id > 0) {        
+            sinfo.processOldSectionMessage(cmsg);
+        }
+        else {
+            CkSectionID *sid = cmsg->sec_id;
+
+            //New sec id, so send it along with the message
+            void *newmsg = sinfo.getNewMulticastMessage(cmsg);
+            CkFreeMsg(cmsg->getCharmMessage());
+            delete cmsg;
+            
+            initSectionID(sid);
+            cmsg = new CharmMessageHolder((char *)newmsg, 
+                                          IS_SECTION_MULTICAST); 
+            cmsg->sec_id = sid;
+        }        
+    }
+    
+    messageBuf->enq(cmsg);
+    if(!isBracketed())
+        doneInserting();
+}
+
 extern int _charmHandlerIdx;
 void RingMulticastStrategy::doneInserting(){
     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
@@ -32,22 +74,22 @@ void RingMulticastStrategy::doneInserting(){
         ComlibPrintf("[%d] Calling Ring %d %d %d\n", CkMyPe(),
                      env->getTotalsize(), ndestpes, cmsg->dest_proc);
                
-        if(cmsg->dest_proc == IS_MULTICAST) {      
+        if(cmsg->dest_proc == IS_SECTION_MULTICAST ||
+           cmsg->dest_proc == IS_BROADCAST) {      
+            
             CmiSetHandler(env, handlerId);
             
             int dest_pe = -1;
             RingMulticastHashObject *robj;
-
+            
             if(cmsg->sec_id == NULL)
                 dest_pe = nextPE;
             else {
                 robj = getHashObject(CkMyPe(), 
                                      cmsg->sec_id->_cookie.sInfo.cInfo.id);
-                                
-                ComlibPrintf("Gotten has obect %d\n",  robj);
-
-                CkAssert(robj != NULL);
-
+                
+                ComlibPrintf("Gotten has obect %d\n",  robj);                
+                CkAssert(robj != NULL);                
                 dest_pe = robj->nextPE;
             }
             
@@ -182,7 +224,7 @@ void RingMulticastStrategy::handleMulticastMessage(void *msg){
 
 void RingMulticastStrategy::initSectionID(CkSectionID *sid){
 
-    CkPrintf("Ring Init section ID\n");
+    ComlibPrintf("Ring Init section ID\n");
     sid->pelist = NULL;
     sid->npes = 0;
 
@@ -209,7 +251,8 @@ RingMulticastHashObject *RingMulticastStrategy::createHashObject
         CkArrayIndexMax *idx_list;        
         ainfo.getDestinationArray(dest, idx_list, nidx);
 
-        int p = CkArrayID::CkLocalBranch(dest)->lastKnown(elements[acount]);
+        int p = ComlibGetLastKnown(dest, elements[acount]);
+        //CkArrayID::CkLocalBranch(dest)->lastKnown(elements[acount]);
         
         if(p < min_dest)
             min_dest = p;
index d4a60cd4a64040acf28204bc84c2fef1a41af1f7..03fd8a7bbfa73544bf369e93f512f6b56ae4e015 100644 (file)
@@ -28,9 +28,29 @@ class RingMulticastStrategy: public DirectMulticastStrategy {
     
     //Array constructor
     RingMulticastStrategy(CkArrayID dest_id);    
+    RingMulticastStrategy(CkArrayID src, CkArrayID dest);    
     RingMulticastStrategy(CkMigrateMessage *m) {}
+
+    //Destructor
+    ~RingMulticastStrategy() { 
+        
+        CkHashtableIterator *ht_iterator = sec_ht.iterator();
+        ht_iterator->seekStart();
+        while(ht_iterator->hasNext()){
+            void **data;
+            data = (void **)ht_iterator->next();        
+            RingMulticastHashObject *robj = 
+                (RingMulticastHashObject*)(* data);
+
+            *data = NULL;
+            if(robj)
+                delete robj;
+        }
+
+        sec_ht.empty();
+    }
     
-    //void insertMessage(CharmMessageHolder *msg);
+    void insertMessage(CharmMessageHolder *msg);
     void doneInserting();
     void handleMulticastMessage(void *msg);