doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-com / MultiRingMulticast.C
index a5e391b2198a38c03dc903686d1e52289b462241..3594a6358a21b19884c8169a15ab80cd1fcc9e6e 100644 (file)
+/**
+   @addtogroup ComlibCharmStrategy
+   @{
+   
+   @file 
+*/
 
 #include "MultiRingMulticast.h"
 
-//Array Constructor
-MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
-    : DirectMulticastStrategy(dest_aid, flag){
-}
-
-
-void MultiRingMulticast::pup(PUP::er &p){
-
-    DirectMulticastStrategy::pup(p);
-}
-
-void MultiRingMulticast::beginProcessing(int  nelements){
-
-    DirectMulticastStrategy::beginProcessing(nelements);
-}
-
-
-inline int getMyId(int *pelist, int npes, int mype) {
-    int myid = -1;
-
-    for(int count = 0; count < npes; count ++)
-        if(mype == pelist[count])
-            myid = count;
-
-    //if(myid == -1)
-    //  CkPrintf("Warning myid = -1\n");
-
-    return myid;
-}
-
-
-//Assumes a sorted input. Returns the next processor greater a given
-//current pe
-inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
-    
-    nextpe = pelist[0];
-    
-    int count= 0;
-    for(count = 0; count < npes; count++)
-        if(pelist[count] > mype)
-            break;
-    
-    if(count < npes) 
-        nextpe = pelist[count];
-    
-    return;
-}
-
-
-inline int getMidPe(int *pelist, int npes, int src_pe) {
-    
-    int my_id = 0;
-    int mid_pe = -1;
-    
-    my_id = getMyId(pelist, npes, src_pe);    
-    
-    CkAssert(my_id >= 0 && my_id < npes);
-
-    if(my_id < npes/2)
-        mid_pe = pelist[npes/2 + my_id];        
-    else
-        mid_pe = pelist[my_id % (npes/2)];
-    
-    //if(mid_pe == -1)
-    //  CkPrintf("Warning midpe = -1\n");
-
-    return mid_pe;
-}
 
 //Unlike ring the source here sends two or more messages while all
 //elements along the ring only send one.
 
-ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
-(int nelements, CkArrayIndexMax *elements){
-
-    ComlibSectionHashObject *obj = new ComlibSectionHashObject();
+void MultiRingMulticastStrategy::createObjectOnSrcPe(ComlibSectionHashObject *obj, int npes, ComlibMulticastIndexCount *pelist) {
 
-    obj->npes = 0;
-    obj->pelist = 0;
-
-    int *pelist;
-    int npes;
-    sinfo.getRemotePelist(nelements, elements, npes, pelist);
-    
-    sinfo.getLocalIndices(nelements, elements, obj->indices);
+  obj->npes = 0;
+  obj->pelist = 0;
 
-    if(npes == 0)
-        return obj;
+  if(npes == 0) return;
 
-    if(npes == 1) {        
-        obj->npes = 1;        
-        obj->pelist = pelist;
-
-        return obj;
-    }
-
-    if(npes == 2) {        
-        obj->npes = 2;        
-        obj->pelist = pelist;
-        
-        return obj;
-    }
-    
-    pelist[npes ++] = CkMyPe();
-    qsort(pelist, npes, sizeof(int), intCompare);
-
-    /*
-      char dump[2560];
-      sprintf(dump, "Section on %d : ", CkMyPe());
-      for(int count = 0; count < npes; count ++) {
-      sprintf(dump, "%s, %d", dump, pelist[count]);
+  if(npes < 4) {
+    // direct sending, take out ourself from the list!
+    obj->npes = npes;
+    for (int i=0; i<npes; ++i) if (pelist[i].pe == CkMyPe()) obj->npes --;
+    obj->pelist = new int[obj->npes];
+    for (int i=0, count=0; i<npes; ++i) {
+      if (pelist[i].pe != CkMyPe()) {
+       obj->pelist[count] = pelist[i].pe;
+       count++;
       }
+    }
+    return;
+  }
     
-      CkPrintf("%s\n\n", dump);
-    */
-    
-    int myid = getMyId(pelist, npes, CkMyPe());    
-
-    CkAssert(myid >= 0 && myid < npes);
-
-    int nextpe = -1;
-    
-    if(myid < npes / 2) 
-        getNextPe(pelist, npes/2, CkMyPe(), nextpe);
-    else 
-        getNextPe(pelist+npes/2, npes - npes/2, CkMyPe(), nextpe);
-    
-    int mid_pe = -1;
-    mid_pe = getMidPe(pelist, npes, CkMyPe());
-    
-    delete [] pelist;
-
-    if(nextpe != CkMyPe()) {
-        obj->pelist = new int[2];
-        obj->npes = 2;
-        
-        obj->pelist[0] = nextpe;
-        obj->pelist[1] = mid_pe;
+  int myid = -1; // getMyId(pelist, npes, CkMyPe());    
+  for (int i=0; i<npes; ++i) {
+    if (pelist[i].pe == CkMyPe()) {
+      myid = i;
+      break;
     }
-    else {
-        CkPrintf("Warning Should not be here !!!!!!!!!\n");
-        obj->pelist = new int[1];
-        obj->npes = 1;
+  }
+
+  int breaking = npes/2; /* 0 : breaking-1    is the first ring
+                           breaking : npes-1 is the second ring
+                        */
+
+  int next_id = myid + 1;
+  // wrap nextpe around the ring
+  if(myid < breaking) {
+    if (next_id >= breaking) next_id = 0;
+  } else {
+    if (next_id >= npes) next_id = breaking;
+  }
+    
+  int mid_id;
+  if (myid < breaking) {
+    mid_id = myid + breaking;
+    if (mid_id < breaking) mid_id = breaking;
+  } else {
+    mid_id = myid - breaking;
+    if (mid_id >= breaking) mid_id = 0;
+  }
+  //mid_pe = getMidPe(pelist, npes, CkMyPe());
+    
+  if(pelist[next_id].pe != CkMyPe()) {
+    obj->pelist = new int[2];
+    obj->npes = 2;
         
-        obj->pelist[0] = mid_pe;
-    }
-    
-    //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), nextpe, mid_pe);    
-    
-    return obj;
+    obj->pelist[0] = pelist[next_id].pe;
+    obj->pelist[1] = pelist[mid_id].pe;
+  }
+  else {
+    CkAbort("Warning Should not be here !!!!!!!!!\n");
+  }
+  
+  //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), pelist[next_id], pelist[mid_id]);
+  
+  return;
 }
 
+void MultiRingMulticastStrategy::createObjectOnIntermediatePe(ComlibSectionHashObject *obj, int npes, ComlibMulticastIndexCount *counts, int srcpe) {
 
-ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
-(int nelements, CkArrayIndexMax *elements, int src_pe){
+  obj->pelist = 0;
+  obj->npes = 0;
 
-    ComlibSectionHashObject *obj = new ComlibSectionHashObject();
+  if(npes < 4) return;
 
-    int *pelist;
-    int npes;
-    sinfo.getRemotePelist(nelements, elements, npes, pelist);
-    
-    obj->pelist = 0;
-    obj->npes = 0;   
-    sinfo.getLocalIndices(nelements, elements, obj->indices);
-
-    pelist[npes ++] = CkMyPe();
-
-    if(npes <= 3)
-        return obj;
-    
-    qsort(pelist, npes, sizeof(int), intCompare);
-    
-    int myid = getMyId(pelist, npes, CkMyPe());
-    
-    CkAssert(myid >= 0 && myid < npes);
-
-    int src_id = getMyId(pelist, npes, src_pe);
-    
-    if(src_id == -1) { //Src isnt the receipient of the multicast
-        pelist[npes ++] = src_pe;
-        qsort(pelist, npes, sizeof(int), intCompare);
-        src_id = getMyId(pelist, npes, src_pe);
-        myid = getMyId(pelist, npes, CkMyPe());
-        
-        CkAssert(src_id >= 0 && src_id < npes);
+  // where are we inside the list?
+  int myid = -1;
+  for (int i=0; i<npes; ++i) {
+    if (counts[i].pe == CkMyPe()) {
+      myid = i;
+      break;
     }
+  }
 
-    int nextpe = -1;
-    int new_src_pe = src_pe;
-    int mid_pe = getMidPe(pelist, npes, src_pe);
-    
-    if(myid < npes / 2) {
-        getNextPe(pelist, npes/2, CkMyPe(), nextpe);
-
-        //If source is in the other half I have to change to the
-        //middle guy
-        if(src_id >= npes/2)
-            new_src_pe = mid_pe;
-    }
-    else {
-        getNextPe(pelist + (npes/2), npes - npes/2, CkMyPe(), nextpe);
+  // we must be in the list!
+  CkAssert(myid >= 0 && myid < npes);
 
-        //If source is in the other half I have to change to the
-        //middle guy
-        if(src_id < npes/2)
-            new_src_pe = mid_pe;
+  int breaking = npes/2;
+  int srcid = 0;
+  for (int i=0; i<npes; ++i) {
+    if (counts[i].pe == srcpe) {
+      srcid = i;
+      break;
     }
-
-    bool end_flag = isEndOfRing(nextpe, new_src_pe);
-
-    if(!end_flag) {
-        obj->pelist = new int[1];
-        obj->npes = 1;
-        obj->pelist[0] = nextpe;
+  }
+
+  if (srcid < breaking ^ myid < breaking) {
+    // if we are in the two different halves, correct srcid
+    if (srcid < breaking) {
+      srcid += breaking;
+      if (srcid < breaking) srcid = breaking;
+    } else {
+      srcid -= breaking;
+      if (srcid >= breaking) srcid = 0;
     }
-    
-    //CkPrintf("%d: Src = %d Next = %d end = %d Midpe = %d\n", CkMyPe(), src_pe, 
-    //       nextpe, end_flag, mid_pe);    
-    
-    delete [] pelist;
-    return obj;
-}
-
-//We need to end the ring,
-//    if next_pe is the same as the source_pe, or
-//    if next_pe is the first processor in the ring, greater than srouce_pe.
-//Both these comparisons are done in a 'cyclic' way with wraparounds.
-
-int MultiRingMulticast::isEndOfRing(int next_pe, int src_pe){
-    
-    ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
-    
-    if((next_pe != CkMyPe()) && (next_pe != src_pe)) 
-        return 0;
-    
-    return 1;
+  }
+  // now srcid is the starting point of this half ring, which could be the
+  // original sender itself (0 if the sender is not part of the recipients),
+  // or the counterpart in the other ring
+
+  int nextid = myid + 1;
+  // wrap nextpe around the ring
+  if(myid < breaking) {
+    if (nextid >= breaking) nextid = 0;
+  }
+  else {
+    if (nextid >= npes) nextid = breaking;
+  }
+
+  if (nextid != srcid) {
+    obj->pelist = new int[1];
+    obj->npes = 1;
+    obj->pelist[0] = counts[nextid].pe;
+  }
+
+  return;
 }
 
+/*@}*/