Bug fix in the ring building algorithms
authorSameer Kumar <skumar2@uiuc.edu>
Fri, 22 Apr 2005 13:37:19 +0000 (13:37 +0000)
committerSameer Kumar <skumar2@uiuc.edu>
Fri, 22 Apr 2005 13:37:19 +0000 (13:37 +0000)
src/ck-com/MultiRingMulticast.C

index b3ce3031afd16c8c8ea5fa8c4ad8fcfad531a87c..4f0f21e78946d8053d0a219752e9de1114126b41 100644 (file)
@@ -25,6 +25,9 @@ inline int getMyId(int *pelist, int npes, int mype) {
         if(mype == pelist[count])
             myid = count;
 
         if(mype == pelist[count])
             myid = count;
 
+    if(myid == -1)
+        CkPrintf("Warning myid = -1\n");
+
     return myid;
 }
 
     return myid;
 }
 
@@ -54,11 +57,16 @@ inline int getMidPe(int *pelist, int npes, int src_pe) {
     
     my_id = getMyId(pelist, npes, src_pe);    
     
     
     my_id = getMyId(pelist, npes, src_pe);    
     
+    CkAssert(my_id >= 0 && my_id < npes);
+
     if(my_id < npes/2)
         mid_pe = pelist[npes/2 + my_id];        
     else
         mid_pe = pelist[my_id % (npes/2)];
     
     if(my_id < npes/2)
         mid_pe = pelist[npes/2 + my_id];        
     else
         mid_pe = pelist[my_id % (npes/2)];
     
+    if(mid_pe == -1)
+        CkPrintf("Warning midpe = -1\n");
+
     return mid_pe;
 }
 
     return mid_pe;
 }
 
@@ -68,17 +76,17 @@ inline int getMidPe(int *pelist, int npes, int src_pe) {
 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
 (int nelements, CkArrayIndexMax *elements){
 
 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
 (int nelements, CkArrayIndexMax *elements){
 
-    ComlibSectionHashObject *obj = new ComlibSectionHashObject;
+    ComlibSectionHashObject *obj = new ComlibSectionHashObject();
 
     obj->npes = 0;
     obj->pelist = 0;
 
 
     obj->npes = 0;
     obj->pelist = 0;
 
-    sinfo.getLocalIndices(nelements, elements, obj->indices);
-
     int *pelist;
     int npes;
     sinfo.getRemotePelist(nelements, elements, npes, pelist);
     
     int *pelist;
     int npes;
     sinfo.getRemotePelist(nelements, elements, npes, pelist);
     
+    sinfo.getLocalIndices(nelements, elements, obj->indices);
+
     if(npes == 0)
         return obj;
 
     if(npes == 0)
         return obj;
 
@@ -99,7 +107,18 @@ ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
     pelist[npes ++] = CkMyPe();
     qsort(pelist, npes, sizeof(int), intCompare);
 
     pelist[npes ++] = CkMyPe();
     qsort(pelist, npes, sizeof(int), intCompare);
 
+    char dump[2560];
+    sprintf(dump, "Section on %d : ", CkMyPe());
+    for(int count = 0; count < npes; count ++) {
+        sprintf(dump, "%s, %d", dump, pelist[count]);
+    }
+    
+    CkPrintf("%s\n\n", dump);
+
     int myid = getMyId(pelist, npes, CkMyPe());    
     int myid = getMyId(pelist, npes, CkMyPe());    
+
+    CkAssert(myid >= 0 && myid < npes);
+
     int nextpe = -1;
     
     if(myid < npes / 2) 
     int nextpe = -1;
     
     if(myid < npes / 2) 
@@ -136,9 +155,7 @@ ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
 (int nelements, CkArrayIndexMax *elements, int src_pe){
 
 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
 (int nelements, CkArrayIndexMax *elements, int src_pe){
 
-    ComlibSectionHashObject *obj = new ComlibSectionHashObject;
-
-    sinfo.getLocalIndices(nelements, elements, obj->indices);
+    ComlibSectionHashObject *obj = new ComlibSectionHashObject();
 
     int *pelist;
     int npes;
 
     int *pelist;
     int npes;
@@ -146,6 +163,7 @@ ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
     
     obj->pelist = 0;
     obj->npes = 0;   
     
     obj->pelist = 0;
     obj->npes = 0;   
+    sinfo.getLocalIndices(nelements, elements, obj->indices);
 
     pelist[npes ++] = CkMyPe();
 
 
     pelist[npes ++] = CkMyPe();
 
@@ -156,11 +174,22 @@ ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
     
     int myid = getMyId(pelist, npes, CkMyPe());
     
     
     int myid = getMyId(pelist, npes, CkMyPe());
     
-    int nextpe = -1;
+    CkAssert(myid >= 0 && myid < npes);
+
+    int src_id = getMyId(pelist, npes, src_pe);
+    
+    if(src_id == -1) { //Src isnt the receipient of the multicast
+        pelist[npes ++] = src_pe;
+        qsort(pelist, npes, sizeof(int), intCompare);
+        src_id = getMyId(pelist, npes, src_pe);
+        myid = getMyId(pelist, npes, CkMyPe());
+        
+        CkAssert(src_id >= 0 && src_id < npes);
+    }
 
 
+    int nextpe = -1;
     int new_src_pe = src_pe;
     int mid_pe = getMidPe(pelist, npes, src_pe);
     int new_src_pe = src_pe;
     int mid_pe = getMidPe(pelist, npes, src_pe);
-    int src_id = getMyId(pelist, npes, src_pe);
     
     if(myid < npes / 2) {
         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
     
     if(myid < npes / 2) {
         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
@@ -181,18 +210,16 @@ ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
 
     bool end_flag = isEndOfRing(nextpe, new_src_pe);
 
 
     bool end_flag = isEndOfRing(nextpe, new_src_pe);
 
-    if((nextpe != CkMyPe()) && ((CkMyPe() == mid_pe) || !end_flag)) {
+    if(!end_flag) {
         obj->pelist = new int[1];
         obj->npes = 1;
         obj->pelist[0] = nextpe;
     }
     
         obj->pelist = new int[1];
         obj->npes = 1;
         obj->pelist[0] = nextpe;
     }
     
-    CkPrintf("%d: Src = %d Next = %d end = %d\n", CkMyPe(), src_pe, 
-             nextpe, end_flag);    
-    
+    CkPrintf("%d: Src = %d Next = %d end = %d Midpe = %d\n", CkMyPe(), src_pe, 
+             nextpe, end_flag, mid_pe);    
     
     delete [] pelist;
     
     delete [] pelist;
-
     return obj;
 }
 
     return obj;
 }
 
@@ -205,16 +232,9 @@ int MultiRingMulticast::isEndOfRing(int next_pe, int src_pe){
     
     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
     
     
     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
     
-    if(next_pe > CkMyPe()){
-        if(src_pe <= next_pe && src_pe > CkMyPe())
-            return 1;
-        
+    if((next_pe != CkMyPe()) && (next_pe != src_pe)) 
         return 0;
         return 0;
-    }
-    
-    if(src_pe > CkMyPe() || src_pe <= next_pe)
-        return 1;
     
     
-    return 0;
+    return 1;
 }
 
 }