Fixing some comlib bugs that could possibly occur due to the way the src and dest...
[charm.git] / src / ck-com / ComlibStrategy.C
index fae0e4a8d238813f93b0e4ec9e48f1ac31ac968a..a13ccf51cfdadbdb8b3a6c80cd30cf8bf7d516a6 100644 (file)
@@ -229,150 +229,81 @@ int *ComlibGroupInfo::getCombinedCountList() {
   return result;
 }
 
-/*
-void ComlibGroupInfo::getCombinedPeList(int *&pelist, int &npes) {
-    int count = 0;        
-    pelist = 0;
-    npes = 0;
-
-    pelist = new int[CkNumPes()];
-    if(nsrcpes == 0 || ndestpes == 0) {
-        npes = CkNumPes();        
-        for(count = 0; count < CkNumPes(); count ++) 
-            pelist[count] = count;                         
-    }
-    else {        
-        npes = ndestpes;
-        memcpy(pelist, destpelist, npes * sizeof(int));
-        
-        //Add source processors to the destination processors
-        //already obtained
-        for(int count = 0; count < nsrcpes; count++) {
-            int p = srcpelist[count];
-
-            for(count = 0; count < npes; count ++)
-                if(pelist[count] == p)
-                    break;
-
-            if(count == npes)
-                pelist[npes ++] = p;
-        }                        
-    }
-}
-*/
 
 ComlibArrayInfo::ComlibArrayInfo() {
        
     src_aid.setZero();
-    //nSrcIndices = -1;
-    //src_elements = NULL;
     isAllSrc = 0;
     totalSrc = 0;
     isSrcArray = 0;
 
     dest_aid.setZero();
-    //nDestIndices = -1;
-    //dest_elements = NULL;
     isAllDest = 0;
     totalDest = 0;
     isDestArray = 0;
 };
 
-/*
-ComlibArrayInfo::~ComlibArrayInfo() {
-    //CkPrintf("in comlibarrayinfo destructor\n");
-
-    if(nSrcIndices > 0)
-        delete [] src_elements;
 
-    if(nDestIndices > 0)
-        delete [] dest_elements;
-}
-*/
-
-/// Set the  source array used for this strategy. 
-/// The list of array indices should be the whole portion of the array involved in the strategy.
-/// The non-local array elements will be cleaned up inside purge() at migration of the strategy
-void ComlibArrayInfo::setSourceArray(CkArrayID aid, 
-                                         CkArrayIndexMax *e, int nind){
+void ComlibArrayInfo::setSourceArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
     src_aid = aid;
     isSrcArray = 1;
-    /*
-    nSrcIndices = nind;
-    if(nind > 0) {
-        src_elements = new CkArrayIndexMax[nind];
-        memcpy(src_elements, e, sizeof(CkArrayIndexMax) * nind);
-    }
-    */
+
     src_elements.removeAll();
+    for (int i=0; i<nind; ++i){
+      CkAssert(e[i].nInts == 1);
+      src_elements.push_back(e[i]);
+    }
     
-    for (int i=0; i<nind; ++i) src_elements.push_back(e[i]);
-    
-    if (src_elements.size() == 0) 
+    if (nind == 0) 
        isAllSrc = 1;
     else 
        isAllSrc = 0;
-    
-    totalSrc = src_elements.size();
-    
-}
 
+    totalSrc = nind;
+
+    CkAssert(src_elements.size() == totalSrc);    
 
-void ComlibArrayInfo::getSourceArray(CkArrayID &aid, 
-                                         CkArrayIndexMax *&e, int &nind){
-    aid = src_aid;
-    nind = src_elements.length();//nSrcIndices;
-    e = src_elements.getVec();//src_elements;
 }
 
 
-void ComlibArrayInfo::setDestinationArray(CkArrayID aid, 
-                                          CkArrayIndexMax *e, int nind){
+void ComlibArrayInfo::setDestinationArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
+  ComlibPrintf("[%d] ComlibArrayInfo::setDestinationArray  dest_elements\n", CkMyPe());
     dest_aid = aid;
     isDestArray = 1;
-    /*
-    nDestIndices = nind;
-    if(nind > 0) {
-        dest_elements = new CkArrayIndexMax[nind];
-        memcpy(dest_elements, e, sizeof(CkArrayIndexMax) * nind);
-    }
-    */
+   
     dest_elements.removeAll();
-    for (int i=0; i<nind; ++i) dest_elements.push_back(e[i]);
+    for (int i=0; i<nind; ++i){
+      CkAssert(e[i].nInts > 0);
+      dest_elements.push_back(e[i]);
+    }
 
-    if (dest_elements.size() == 0) 
+    if (nind == 0) 
        isAllDest = 1;
     else 
        isAllDest = 0;
     
-    totalDest = dest_elements.size();
-    
-}
+    totalDest = nind;
+    CkAssert(dest_elements.size() == totalDest);    
 
-
-void ComlibArrayInfo::getDestinationArray(CkArrayID &aid, 
-                                          CkArrayIndexMax *&e, int &nind){
-    aid = dest_aid;
-    nind = dest_elements.length();
-    e = dest_elements.getVec();
 }
 
+
 /// @TODO fix the pup!
 //Each strategy must define his own Pup interface.
 void ComlibArrayInfo::pup(PUP::er &p){ 
     p | src_aid;
-    //p | nSrcIndices;
     p | isSrcArray;
     p | isAllSrc;
     p | totalSrc;
-    p | src_elements;
-    
+    p | src_elements; 
+    p | new_src_elements;
+
     p | dest_aid;
-    //p | nDestIndices;
     p | isDestArray;
     p | isAllDest;
     p | totalDest;
-    p | dest_elements;
+    p | dest_elements; 
+    p | new_dest_elements;
 
     if (p.isPacking() || p.isUnpacking()) {
       // calling purge both during packing (at the end) and during unpacking
@@ -381,77 +312,54 @@ void ComlibArrayInfo::pup(PUP::er &p){
       purge();
     }
 
-    /*    
-    if(p.isUnpacking() && nSrcIndices > 0) 
-        src_elements = new CkArrayIndexMax[nSrcIndices];
     
-    if(p.isUnpacking() && nDestIndices > 0) 
-        dest_elements = new CkArrayIndexMax[nDestIndices];        
-    
-    if(nSrcIndices > 0)
-        p((char *)src_elements, nSrcIndices * sizeof(CkArrayIndexMax));    
-    else
-        src_elements = NULL;
+}
 
-    if(nDestIndices > 0)
-        p((char *)dest_elements, nDestIndices * sizeof(CkArrayIndexMax));    
-    else
-        dest_elements = NULL;
 
-    localDestIndexVec.resize(0);
-    */
-    
+void ComlibArrayInfo::printDestElementList() {
+  char buf[100000];
+  buf[0] = '\0';
+  for(int i=0;i<dest_elements.size();i++){
+    sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
+  }
+  CkPrintf("[%d] dest_elements = %s\n", CkMyPe(), buf);
 }
 
+
 void ComlibArrayInfo::newElement(CkArrayID &id, const CkArrayIndex &idx) {
-  ComlibPrintf("ComlibArrayInfo::newElement\n");
-  if (isAllSrc && id==src_aid) src_elements.push_back(idx);
-  if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
+  CkAbort("New Comlib implementation does not allow dynamic element insertion yet\n");
+  //  CkPrintf("ComlibArrayInfo::newElement dest_elements\n");
+  //  if (isAllSrc && id==src_aid) src_elements.push_back(idx);
+  //  if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
 }
 
 void ComlibArrayInfo::purge() {
-       int i;
-       CkArray *a;
-//     ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
-       if (isSrcArray) {
-               a = (CkArray *)_localBranch(src_aid);
-               if (isAllSrc) {
-                       // gather the information of all the elements that are currenly present here
-                       ComlibElementIterator iter(&src_elements);
-                       a->getLocMgr()->iterate(iter);
-
-                       // register to ComlibArrayListener for this array id
-//                     ComlibManagerRegisterArrayListener(src_aid, this);
-               } else {
-                       // delete all the elements of which we are not homePe
-                       for (i=src_elements.size()-1; i>=0; --i) {
-                               if (a->homePe(src_elements[i]) != CkMyPe()) { 
-                                       
-//                                     ComlibPrintf("[%d] removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
-                                       src_elements.remove(i); 
-                               }
-                       }
-               }
-       }
-       if (isDestArray) {
-               a = (CkArray *)_localBranch(dest_aid);
-               if (isAllDest) {
-                       // gather the information of all the elements that are currenly present here
-                       ComlibElementIterator iter(&dest_elements);
-                       a->getLocMgr()->iterate(iter);
-
-                       // register to ComlibArrayListener for this array id
-//                     ComlibManagerRegisterArrayListener(dest_aid, this);
-               } else {
-                       // delete all the elements of which we are not homePe
-                       for (i=dest_elements.size()-1; i>=0; --i) {
-                               if (a->homePe(dest_elements[i]) != CkMyPe()) {
-//                                     ComlibPrintf("[%d] removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
-                                       dest_elements.remove(i); 
-                               }
-                       }
-               }
-       }
+  //   ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
+
+  if (isSrcArray) {
+    CkArray *a = (CkArray *)_localBranch(src_aid);
+
+    // delete all the source elements for which we are not homePe
+    for (int i=src_elements.size()-1; i>=0; --i) {
+      if (a->homePe(src_elements[i]) != CkMyPe()) {                    
+       ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
+       src_elements.remove(i); 
+      }
+    }
+  }
+
+  if (isDestArray) {
+    CkArray *a = (CkArray *)_localBranch(dest_aid);
+       
+    // delete all the destination elements for which we are not homePe
+    for (int i=dest_elements.size()-1; i>=0; --i) {
+      if (a->homePe(dest_elements[i]) != CkMyPe()) {
+       ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
+       dest_elements.remove(i); 
+      }
+    }          
+  }
+
 }
 
 int *ComlibArrayInfo::getCombinedCountList() {
@@ -607,20 +515,18 @@ void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
 }
 */
 
-/// Broadcast the message to all local elements
+/**  Broadcast the message to all local elements (as listed in dest_elements) */
 void ComlibArrayInfo::localBroadcast(envelope *env) {
   int count = localMulticast(&dest_elements, env);
-  ComlibPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
-
-//  char buf[100000];
-//  buf[0] = '\0';
-//  for(int i=0;i<dest_elements.size();i++){
-//       sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
-//  }
-//  ComlibPrintf("dest_elements = %s\n", buf);
+  if(com_debug){
+    CkPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
+    printDestElementList();
+  }
 
 }
 
+
+
 /**
   This method multicasts the message to all the indices in vec.  It
   also takes care to check if the entry method is readonly or not. If
@@ -640,7 +546,6 @@ void ComlibArrayInfo::localBroadcast(envelope *env) {
   @todo Replace this method with calls to CharmStrategy::deliverToIndices, possibly making it a function that is not part of any class
 
 */
-
 #include "register.h"
 int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
                                      envelope *env){
@@ -665,7 +570,8 @@ int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
 
     //ComlibPrintf("sending to %d elements\n",nelements);
     for(int i = 0; i < nelements-1; i ++){
-        idx = (*vec)[i];
+      idx = (*vec)[i];
+      CkAssert(idx.nInts == 1);
         //if(com_debug) idx.print();
 
         env->getsetArrayIndex() = idx;