Fixing some comlib bugs that could possibly occur due to the way the src and dest...
authorIsaac Dooley <idooley2@illinois.edu>
Sat, 11 Jul 2009 23:08:09 +0000 (23:08 +0000)
committerIsaac Dooley <idooley2@illinois.edu>
Sat, 11 Jul 2009 23:08:09 +0000 (23:08 +0000)
src/ck-com/ComlibManager.C
src/ck-com/ComlibManager.ci
src/ck-com/ComlibStrategy.C
src/ck-com/ComlibStrategy.h
src/ck-com/EachToManyMulticastStrategy.C
src/ck-com/EachToManyMulticastStrategy.h
src/conv-com/convcomlibmanager.h
src/conv-com/convcomlibstrategy.C
src/conv-com/routerstrategy.C

index 0b988de1fd5fd0ccd07752154c223dc67880d9af..2a45b6b95448e8cea3fbd250f9fd786c7ed75dd0 100644 (file)
@@ -93,8 +93,7 @@ ComlibManager::ComlibManager(){
 
 void ComlibManager::init(){
 
-
-  //  CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
+   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
 
 
   if(CkNumPes() == 1 ){
@@ -149,9 +148,6 @@ void ComlibManager::barrier(){
                bcount ++;
                if(bcount == CkNumPes()){
                        bcount = 0;
-                       //barrierReached = 1;
-                       //barrier2Reached = 0;
-
                        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
                        cgproxy.resumeFromSetupBarrier();
                }
@@ -169,15 +165,18 @@ void ComlibManager::barrier(){
    triggering the broadcast of the strategies created in Main::Main. Here the
    Main::Main has for sure executed, otherwise we will not have received
    confirmation by all other processors.
+   
+   
  */
 void ComlibManager::resumeFromSetupBarrier(){
        ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
 
        setupComplete = 1;
        ComlibDoneCreating();
+       ComlibPrintf("[%d] resumeFromSetupBarrier calling ComlibDoneCreating to tell converse layer strategies to set themselves up\n", CkMyPe());
        sendBufferedMessagesAllStrategies();
-}
 
+}
 
 /***************************************************************************
  Determine whether the delegated messages should be buffered until the 
@@ -203,22 +202,24 @@ void ComlibManager::sendBufferedMessagesAllStrategies(){
 
 /***************************************************************************
    Send all the buffered messages once startup has completed, and we have 
-   recovered from any errors have recovered.
+   recovered from all errors.
 ***************************************************************************/
 void ComlibManager::sendBufferedMessages(int instid){
   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
-
-  if(! shouldBufferMessagesNow(instid) && delayMessageSendBuffer[instid].size() > 0){
+  
+  if(shouldBufferMessagesNow(instid)){
+    ComlibManagerPrintf("[%d] sendBufferedMessages is not flushing buffered messages for strategy %d because shouldBufferMessagesNow()==true\n", CkMyPe(), instid);  
+  } else if(delayMessageSendBuffer[instid].size() == 0){
+    ComlibManagerPrintf("[%d] sendBufferedMessages: no bufferedmessages to send for strategy %d\n", CkMyPe(), instid);  
+  } else{
     ComlibManagerPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid);
 
-
     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
       CharmMessageHolder* cmsg = *iter;
          
       switch(cmsg->type){
            
       case CMH_ARRAYSEND:
-       //          CkAbort("CMH_ARRAYSEND unimplemented");
        CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
        CkpvAccess(conv_com_object).doneInserting(instid);
        break;
@@ -243,28 +244,22 @@ void ComlibManager::sendBufferedMessages(int instid){
     }
 
     delayMessageSendBuffer[instid].clear();
-  } 
-  else {
-    ComlibManagerPrintf("sendBufferedMessages is not flushing buffered messages for strategy %d because myEntry->errorMode=%d && setupComplete = %d && myEntry->discoveryMode = %d && myEntry->bufferOutgoing = %d\n", instid, (int)myEntry->errorMode, (int)setupComplete, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);  
   }
 
-
-
 }
 
 
 
 /***************************************************************************
- * Bracketed section:
- *
- * Routines for bracketed strategies, to call the brackets begin and end, and to
- * restore count errors after some objects migrate.
+ * Routine for bracketed strategies, for detecting count errors after 
+ * objects migrate.
  * 
  * This function must only be called after all messages from the previous 
  * iteration have been delivered. This is the application's responsibility to 
- * ensure. The iteration values provided must increase monotonically.
+ * ensure. The iteration values provided must increase monotonically, 
+ * and must be greater than STARTUP_ITERATION.
+ *
  ***************************************************************************/
-
 /// Called when the array/group element starts sending messages
 void ComlibManager::beginIteration(int instid, int iteration){
        StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
@@ -272,7 +267,10 @@ void ComlibManager::beginIteration(int instid, int iteration){
        ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
 
 
+
        if(iteration > myEntry->lastKnownIteration){
+             ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
+
              // Verify & update errorModeServer:
              if(CkMyPe()==0){
                CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || 
@@ -287,6 +285,21 @@ void ComlibManager::beginIteration(int instid, int iteration){
              // Verify & update errorMode:
              CkAssert(myEntry->errorMode == ERROR_FIXED_MODE || myEntry->errorMode == NORMAL_MODE  );
              myEntry->errorMode = NORMAL_MODE;   
+             
+             ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
+
+             if(myEntry->lastKnownIteration == STARTUP_ITERATION){
+               // At this point, myEntry->numElements == 0
+               // An error will be detected below, so messages will be buffered.
+               // Since this is startup, bracketedStartErrorRecoveryProcess 
+               // will wait until the comlib layer is setup before continuing
+             } else {
+               // switch to using the newly updated source and destination lists (describing which objects are local to a PE)
+               // This must be done here because the list can only be updated once all sends 
+               // have completed from the iteration during which the error was detected.
+               myInfo->useNewSourceAndDestinations();  
+             }
+
 
              myEntry->lastKnownIteration = iteration;
              myEntry->nBeginItr = 1; // we are the first time to be called this iteration
@@ -295,12 +308,10 @@ void ComlibManager::beginIteration(int instid, int iteration){
              myEntry->totalEndCounted = 0;
              myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
              myEntry->nEndSaved = 0;
-             
-             
-             ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
+                     
        } else {
-               myEntry->nBeginItr++;
                ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
+               myEntry->nBeginItr++;
        }
        
        
@@ -308,7 +319,7 @@ void ComlibManager::beginIteration(int instid, int iteration){
        // This will ensure that if we are the processor that detects this error, 
        // we won't deliver at least this message until the strategy is fixed
        if (myEntry->nBeginItr > myEntry->numElements) {
-         ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING\n",CkMyPe());                     
+         ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING because nBeginItr=%d > numElements=%d\n",CkMyPe(), myEntry->nBeginItr, myEntry->numElements);
          myEntry->bufferOutgoing = 1;
        }
        
@@ -369,7 +380,7 @@ void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){
   
 
   if(converseManager->isReady(instid)){
-    ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
+    ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess(instid=%d step=%d) %s %s %s\n", CkMyPe(), instid, step, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
 
     CkAssert(myEntry->strategy != NULL);
     CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
@@ -681,64 +692,70 @@ void ComlibManager::bracketedReleaseBufferedMessages(int instid) {
     it is still here. If the array element has migrated away, then the
     bracketedDiscover() method is called on the new PE.
 
+    the new source and destination element arrays will be empty at this point,
+    so myInfo->addNewDestinationList(e); will add a new record that
+    will be used for future iterations.
+
 */
 void ComlibManager::bracketedStartDiscovery(int instid) {
        StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
        CkAssert(myEntry->discoveryMode == NORMAL_DISCOVERY_MODE);
        myEntry->discoveryMode = STARTED_DISCOVERY_MODE;
        ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
-                               
+       const CProxy_ComlibManager myProxy(thisgroup);
+
        ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
 
        int countSrc = 0;
        int countDest = 0;
-       CkArrayID aid;
-       CkArrayIndexMax *idx;
-       int nelem;
-       CkArray *a;
-       CProxy_ComlibManager myProxy(thisgroup);
 
        if (myInfo->isSourceArray()) {
-               myInfo->getSourceArray(aid, idx, nelem);
-               myInfo->resetSource();
-               a = (CkArray*)_localBranch(aid);
-               for (int i=0; i<nelem; ++i) {
-                       int pe = a->lastKnown(idx[i]);
-//                     ComlibManagerPrintf("[%d] bracketedStartDiscovery src Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
-                       if (pe == CkMyPe()) {
-                               countSrc++;
-                               myInfo->addSource(idx[i]);
-                       }
-                       else {
-                               myProxy[pe].bracketedDiscover(instid, aid, idx[i], true);
-                       }
-               }
-               delete[] idx;
+         CkAssert(myInfo->newSourceListSize() == 0);
+
+         const CkVec<CkArrayIndexMax> & srcElements = myInfo->getSourceElements();
+         const int nelem = srcElements.size();
+         const CkArrayID aid = myInfo->getSourceArrayID(); 
+         const CkArray *a = (CkArray*)_localBranch(aid);
+
+         for (int i=0; i<nelem; ++i) {
+           int pe = a->lastKnown(srcElements[i]);
+           if (pe == CkMyPe()) {
+             countSrc++;
+             myInfo->addNewLocalSource(srcElements[i]);
+           }
+           else {
+             myProxy[pe].bracketedDiscover(instid, aid, srcElements[i], true);
+           }
+         }
+
        }
 
        if (myInfo->isDestinationArray()) {
-               myInfo->getDestinationArray(aid, idx, nelem);
-               myInfo->resetDestination();
-               a = (CkArray*)_localBranch(aid);
-               for (int i=0; i<nelem; ++i) {
-                       int pe = a->lastKnown(idx[i]);
-//                     ComlibManagerPrintf("[%d] bracketedStartDiscovery dest Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
-                       if (pe == CkMyPe()) {
-                               countDest++;
-                               myInfo->addDestination(idx[i]);
-                       }
-                       else {
-                               myProxy[pe].bracketedDiscover(instid, aid, idx[i], false);
-                       }
-               }
-               delete[] idx;
+         CkAssert(myInfo->newDestinationListSize() == 0);
+
+         const CkVec<CkArrayIndexMax> & destElements = myInfo->getDestinationElements();
+         const int nelem = destElements.size();
+         const CkArrayID aid = myInfo->getDestinationArrayID();
+         const CkArray *a = (CkArray*)_localBranch(aid);
+
+         for (int i=0; i<nelem; ++i) {
+           int pe = a->lastKnown(destElements[i]);
+           if (pe == CkMyPe()) {
+             countDest++;
+             myInfo->addNewLocalDestination(destElements[i]);
+           }
+           else {
+             myProxy[pe].bracketedDiscover(instid, aid, destElements[i], false);
+           }
+         }
        }
 
+       // Report the number of elements that are now local to this PE (if >0).
+       // The new owner PEs will report the counts for those objects that are no longer local to this PE
        if (countSrc > 0 || countDest > 0) {
-               // contribute only if we have something
-               myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
+         myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
        }
-
+       
 }
 
 
@@ -779,15 +796,15 @@ void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax
                        // Add the element as a source element for the strategy
                        ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
                        CkAssert((unsigned long)myInfo > 0x1000);
-                       myInfo->addSource(idx);
+                       myInfo->addNewLocalSource(idx);
 
                        ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
                        myEntry->numElements = myInfo->getLocalSrc();           
                }
                else {
                        // Add the element as a Destination element for the strategy
-                       ComlibManagerPrintf("[%d] bracketedDiscover addDestination\n", CkMyPe());
-                       myInfo->addDestination(idx);
+                       ComlibManagerPrintf("[%d] bracketedDiscover addNewDestination\n", CkMyPe());
+                       myInfo->addNewLocalDestination(idx);
                }
        } else {
                ComlibManagerPrintf("Keep On Forwarding*********************\n");
@@ -1043,9 +1060,7 @@ void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m,
        env->setUsed(0);
        CkPackMessage(&env);
        
-       
-       /// @TODO why are CkSectionInfo needed since it doesn't seem to have
-       /// useful information?
+
        CkSectionInfo minfo;
        minfo.type = COMLIB_MULTICAST_MESSAGE;
        minfo.sInfo.cInfo.instId = ci->getID();
@@ -1115,19 +1130,19 @@ void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
 
 
 void ComlibManager::printDiagnostics(int instid){
-       ComlibManagerPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
+  CkPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
 }
 
 
 void ComlibManager::printDiagnostics(){
-  
+
 
   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
     int instid = iter->first;
     int size = iter->second.size();
     
-    if(size>0){
+    if(size>0 || true){
       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
       
       if(! shouldBufferMessagesNow(instid)){
@@ -1137,7 +1152,10 @@ void ComlibManager::printDiagnostics(){
       }
       
       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
-      CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d errorMode=%d errorModeServer=%d discoveryMode=%d bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, (int)myEntry->errorMode, (int)myEntry->errorModeServer, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);
+      CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d %s %s %s bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString(), (int)myEntry->bufferOutgoing);
+
+
+
     }
   }
   
@@ -1322,7 +1340,7 @@ void ComlibNotifyMigrationDoneHandler(void *msg) {
 
 
 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
-  // CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
+  CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
 
   ComlibManager *mgr = (ComlibManager*)ptr;
   mgr->printDiagnostics();
index e33d9bbd87a2cd1ec33b5b2b8018a2e1742fdf83..d0219e974903f5034a7d15478d9e89dc81b34002 100644 (file)
@@ -16,9 +16,9 @@ module comlib {
     entry void barrier(void);
     entry void resumeFromSetupBarrier();
 
-    entry void bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step);
     entry void bracketedStartErrorRecoveryProcess(int instid, int step);        
     entry void bracketedErrorDetected(int instid, int step);
+    entry void bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step);
     entry void bracketedConfirmCount(int instid);
     entry void bracketedCountConfirmed(int instid, int count, int step);
     entry void bracketedReceiveNewCount(int instid);
index fae0e4a8d238813f93b0e4ec9e48f1ac31ac968a..a13ccf51cfdadbdb8b3a6c80cd30cf8bf7d516a6 100644 (file)
@@ -229,150 +229,81 @@ int *ComlibGroupInfo::getCombinedCountList() {
   return result;
 }
 
-/*
-void ComlibGroupInfo::getCombinedPeList(int *&pelist, int &npes) {
-    int count = 0;        
-    pelist = 0;
-    npes = 0;
-
-    pelist = new int[CkNumPes()];
-    if(nsrcpes == 0 || ndestpes == 0) {
-        npes = CkNumPes();        
-        for(count = 0; count < CkNumPes(); count ++) 
-            pelist[count] = count;                         
-    }
-    else {        
-        npes = ndestpes;
-        memcpy(pelist, destpelist, npes * sizeof(int));
-        
-        //Add source processors to the destination processors
-        //already obtained
-        for(int count = 0; count < nsrcpes; count++) {
-            int p = srcpelist[count];
-
-            for(count = 0; count < npes; count ++)
-                if(pelist[count] == p)
-                    break;
-
-            if(count == npes)
-                pelist[npes ++] = p;
-        }                        
-    }
-}
-*/
 
 ComlibArrayInfo::ComlibArrayInfo() {
        
     src_aid.setZero();
-    //nSrcIndices = -1;
-    //src_elements = NULL;
     isAllSrc = 0;
     totalSrc = 0;
     isSrcArray = 0;
 
     dest_aid.setZero();
-    //nDestIndices = -1;
-    //dest_elements = NULL;
     isAllDest = 0;
     totalDest = 0;
     isDestArray = 0;
 };
 
-/*
-ComlibArrayInfo::~ComlibArrayInfo() {
-    //CkPrintf("in comlibarrayinfo destructor\n");
-
-    if(nSrcIndices > 0)
-        delete [] src_elements;
 
-    if(nDestIndices > 0)
-        delete [] dest_elements;
-}
-*/
-
-/// Set the  source array used for this strategy. 
-/// The list of array indices should be the whole portion of the array involved in the strategy.
-/// The non-local array elements will be cleaned up inside purge() at migration of the strategy
-void ComlibArrayInfo::setSourceArray(CkArrayID aid, 
-                                         CkArrayIndexMax *e, int nind){
+void ComlibArrayInfo::setSourceArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
     src_aid = aid;
     isSrcArray = 1;
-    /*
-    nSrcIndices = nind;
-    if(nind > 0) {
-        src_elements = new CkArrayIndexMax[nind];
-        memcpy(src_elements, e, sizeof(CkArrayIndexMax) * nind);
-    }
-    */
+
     src_elements.removeAll();
+    for (int i=0; i<nind; ++i){
+      CkAssert(e[i].nInts == 1);
+      src_elements.push_back(e[i]);
+    }
     
-    for (int i=0; i<nind; ++i) src_elements.push_back(e[i]);
-    
-    if (src_elements.size() == 0) 
+    if (nind == 0) 
        isAllSrc = 1;
     else 
        isAllSrc = 0;
-    
-    totalSrc = src_elements.size();
-    
-}
 
+    totalSrc = nind;
+
+    CkAssert(src_elements.size() == totalSrc);    
 
-void ComlibArrayInfo::getSourceArray(CkArrayID &aid, 
-                                         CkArrayIndexMax *&e, int &nind){
-    aid = src_aid;
-    nind = src_elements.length();//nSrcIndices;
-    e = src_elements.getVec();//src_elements;
 }
 
 
-void ComlibArrayInfo::setDestinationArray(CkArrayID aid, 
-                                          CkArrayIndexMax *e, int nind){
+void ComlibArrayInfo::setDestinationArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
+  ComlibPrintf("[%d] ComlibArrayInfo::setDestinationArray  dest_elements\n", CkMyPe());
     dest_aid = aid;
     isDestArray = 1;
-    /*
-    nDestIndices = nind;
-    if(nind > 0) {
-        dest_elements = new CkArrayIndexMax[nind];
-        memcpy(dest_elements, e, sizeof(CkArrayIndexMax) * nind);
-    }
-    */
+   
     dest_elements.removeAll();
-    for (int i=0; i<nind; ++i) dest_elements.push_back(e[i]);
+    for (int i=0; i<nind; ++i){
+      CkAssert(e[i].nInts > 0);
+      dest_elements.push_back(e[i]);
+    }
 
-    if (dest_elements.size() == 0) 
+    if (nind == 0) 
        isAllDest = 1;
     else 
        isAllDest = 0;
     
-    totalDest = dest_elements.size();
-    
-}
+    totalDest = nind;
+    CkAssert(dest_elements.size() == totalDest);    
 
-
-void ComlibArrayInfo::getDestinationArray(CkArrayID &aid, 
-                                          CkArrayIndexMax *&e, int &nind){
-    aid = dest_aid;
-    nind = dest_elements.length();
-    e = dest_elements.getVec();
 }
 
+
 /// @TODO fix the pup!
 //Each strategy must define his own Pup interface.
 void ComlibArrayInfo::pup(PUP::er &p){ 
     p | src_aid;
-    //p | nSrcIndices;
     p | isSrcArray;
     p | isAllSrc;
     p | totalSrc;
-    p | src_elements;
-    
+    p | src_elements; 
+    p | new_src_elements;
+
     p | dest_aid;
-    //p | nDestIndices;
     p | isDestArray;
     p | isAllDest;
     p | totalDest;
-    p | dest_elements;
+    p | dest_elements; 
+    p | new_dest_elements;
 
     if (p.isPacking() || p.isUnpacking()) {
       // calling purge both during packing (at the end) and during unpacking
@@ -381,77 +312,54 @@ void ComlibArrayInfo::pup(PUP::er &p){
       purge();
     }
 
-    /*    
-    if(p.isUnpacking() && nSrcIndices > 0) 
-        src_elements = new CkArrayIndexMax[nSrcIndices];
     
-    if(p.isUnpacking() && nDestIndices > 0) 
-        dest_elements = new CkArrayIndexMax[nDestIndices];        
-    
-    if(nSrcIndices > 0)
-        p((char *)src_elements, nSrcIndices * sizeof(CkArrayIndexMax));    
-    else
-        src_elements = NULL;
+}
 
-    if(nDestIndices > 0)
-        p((char *)dest_elements, nDestIndices * sizeof(CkArrayIndexMax));    
-    else
-        dest_elements = NULL;
 
-    localDestIndexVec.resize(0);
-    */
-    
+void ComlibArrayInfo::printDestElementList() {
+  char buf[100000];
+  buf[0] = '\0';
+  for(int i=0;i<dest_elements.size();i++){
+    sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
+  }
+  CkPrintf("[%d] dest_elements = %s\n", CkMyPe(), buf);
 }
 
+
 void ComlibArrayInfo::newElement(CkArrayID &id, const CkArrayIndex &idx) {
-  ComlibPrintf("ComlibArrayInfo::newElement\n");
-  if (isAllSrc && id==src_aid) src_elements.push_back(idx);
-  if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
+  CkAbort("New Comlib implementation does not allow dynamic element insertion yet\n");
+  //  CkPrintf("ComlibArrayInfo::newElement dest_elements\n");
+  //  if (isAllSrc && id==src_aid) src_elements.push_back(idx);
+  //  if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
 }
 
 void ComlibArrayInfo::purge() {
-       int i;
-       CkArray *a;
-//     ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
-       if (isSrcArray) {
-               a = (CkArray *)_localBranch(src_aid);
-               if (isAllSrc) {
-                       // gather the information of all the elements that are currenly present here
-                       ComlibElementIterator iter(&src_elements);
-                       a->getLocMgr()->iterate(iter);
-
-                       // register to ComlibArrayListener for this array id
-//                     ComlibManagerRegisterArrayListener(src_aid, this);
-               } else {
-                       // delete all the elements of which we are not homePe
-                       for (i=src_elements.size()-1; i>=0; --i) {
-                               if (a->homePe(src_elements[i]) != CkMyPe()) { 
-                                       
-//                                     ComlibPrintf("[%d] removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
-                                       src_elements.remove(i); 
-                               }
-                       }
-               }
-       }
-       if (isDestArray) {
-               a = (CkArray *)_localBranch(dest_aid);
-               if (isAllDest) {
-                       // gather the information of all the elements that are currenly present here
-                       ComlibElementIterator iter(&dest_elements);
-                       a->getLocMgr()->iterate(iter);
-
-                       // register to ComlibArrayListener for this array id
-//                     ComlibManagerRegisterArrayListener(dest_aid, this);
-               } else {
-                       // delete all the elements of which we are not homePe
-                       for (i=dest_elements.size()-1; i>=0; --i) {
-                               if (a->homePe(dest_elements[i]) != CkMyPe()) {
-//                                     ComlibPrintf("[%d] removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
-                                       dest_elements.remove(i); 
-                               }
-                       }
-               }
-       }
+  //   ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
+
+  if (isSrcArray) {
+    CkArray *a = (CkArray *)_localBranch(src_aid);
+
+    // delete all the source elements for which we are not homePe
+    for (int i=src_elements.size()-1; i>=0; --i) {
+      if (a->homePe(src_elements[i]) != CkMyPe()) {                    
+       ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
+       src_elements.remove(i); 
+      }
+    }
+  }
+
+  if (isDestArray) {
+    CkArray *a = (CkArray *)_localBranch(dest_aid);
+       
+    // delete all the destination elements for which we are not homePe
+    for (int i=dest_elements.size()-1; i>=0; --i) {
+      if (a->homePe(dest_elements[i]) != CkMyPe()) {
+       ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
+       dest_elements.remove(i); 
+      }
+    }          
+  }
+
 }
 
 int *ComlibArrayInfo::getCombinedCountList() {
@@ -607,20 +515,18 @@ void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
 }
 */
 
-/// Broadcast the message to all local elements
+/**  Broadcast the message to all local elements (as listed in dest_elements) */
 void ComlibArrayInfo::localBroadcast(envelope *env) {
   int count = localMulticast(&dest_elements, env);
-  ComlibPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
-
-//  char buf[100000];
-//  buf[0] = '\0';
-//  for(int i=0;i<dest_elements.size();i++){
-//       sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
-//  }
-//  ComlibPrintf("dest_elements = %s\n", buf);
+  if(com_debug){
+    CkPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
+    printDestElementList();
+  }
 
 }
 
+
+
 /**
   This method multicasts the message to all the indices in vec.  It
   also takes care to check if the entry method is readonly or not. If
@@ -640,7 +546,6 @@ void ComlibArrayInfo::localBroadcast(envelope *env) {
   @todo Replace this method with calls to CharmStrategy::deliverToIndices, possibly making it a function that is not part of any class
 
 */
-
 #include "register.h"
 int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
                                      envelope *env){
@@ -665,7 +570,8 @@ int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
 
     //ComlibPrintf("sending to %d elements\n",nelements);
     for(int i = 0; i < nelements-1; i ++){
-        idx = (*vec)[i];
+      idx = (*vec)[i];
+      CkAssert(idx.nInts == 1);
         //if(com_debug) idx.print();
 
         env->getsetArrayIndex() = idx;
index 8f03ea2c224e63e6992ca6b247526c4f0d899fd7..c95f70ed0438aedd5e6277d018784800d6c1f0a8 100644 (file)
@@ -91,13 +91,13 @@ class CharmMessageHolder : public MessageHolder{
     
     /// Store a local copy of the sec_id, so I can use it later.
     inline void saveCopyOf_sec_id(){
-      ComlibPrintf("[%d] saveCopyOf_sec_id sec_id=%p NULL=%d\n", CkMyPe(), sec_id, NULL);
+      //      ComlibPrintf("[%d] saveCopyOf_sec_id sec_id=%p NULL=%d\n", CkMyPe(), sec_id, NULL);
 
       checkme();
 
       if(sec_id!=NULL){
 
-       ComlibPrintf("Original has values: _nElems=%d, npes=%d\n", sec_id->_nElems, sec_id->npes );
+       //ComlibPrintf("Original has values: _nElems=%d, npes=%d\n", sec_id->_nElems, sec_id->npes );
        CkAssert(sec_id->_nElems>=0);
        CkAssert(sec_id->npes>=0);
 
@@ -111,7 +111,7 @@ class CharmMessageHolder : public MessageHolder{
        copy_of_sec_id->_cookie = sec_id->_cookie;
        copy_of_sec_id->_nElems = sec_id->_nElems;
        copy_of_sec_id->npes = sec_id->npes;
-       ComlibPrintf("Copy has values: _nElems=%d, npes=%d\n", copy_of_sec_id->_nElems, copy_of_sec_id->npes );
+       //      ComlibPrintf("Copy has values: _nElems=%d, npes=%d\n", copy_of_sec_id->_nElems, copy_of_sec_id->npes );
        for(int i=0; i<sec_id->_nElems; i++){
          copy_of_sec_id->_elems[i] = sec_id->_elems[i];
        }
@@ -120,7 +120,7 @@ class CharmMessageHolder : public MessageHolder{
        }
 
        // change local pointer to the new copy of the CkSectionID
-       ComlibPrintf("saving copy of sec_id into %p\n", copy_of_sec_id);
+       //      ComlibPrintf("saving copy of sec_id into %p\n", copy_of_sec_id);
       }
 
       checkme();
@@ -210,46 +210,98 @@ class ComlibMulticastMsg;
    communication library.
 */
 class ComlibArrayInfo {
- protected:
+ private:
+
     CkArrayID src_aid;         ///< Source Array ID
-    CkVec<CkArrayIndexMax> src_elements;     ///< local source array elements
-    int isSrcArray;
-    int isAllSrc; ///< if true then all the array is involved in the operation
-    int totalSrc; ///< The total number of src elements involved in the strategy
+    CkArrayID dest_aid; ///< Destination Array ID
+    
+    /**  Destination indices that are local to this PE 
+        (as determined by the bracketed counting scheme from a previous iteration)  
+    */
+    CkVec<CkArrayIndexMax> src_elements; 
+
+    /**  Destination indices that are currently being updated. 
+        At the beginning of the next iteration these will be 
+        moved into dest_elements by useNewDestinationList()
+    */
+    CkVec<CkArrayIndexMax> new_src_elements;
+
+    /**  Destination indices that are local to this PE 
+        (as determined by the bracketed counting scheme from a previous iteration)  
+    */
+    CkVec<CkArrayIndexMax> dest_elements; 
+
+    /**  Destination indices that are currently being updated. 
+        At the beginning of the next iteration these will be 
+        moved into dest_elements by useNewDestinationList()
+    */
+    CkVec<CkArrayIndexMax> new_dest_elements;
 
-    CkArrayID dest_aid;
-    CkVec<CkArrayIndexMax> dest_elements; ///< destination indices
+    int isSrcArray;
     int isDestArray;
-    int isAllDest; ///< if true then all the array is involved in the operation
+
+    bool isAllSrc; ///< if true then all the array is involved in the operation
+    int totalSrc; ///< The total number of src elements involved in the strategy
+    bool isAllDest; ///< if true then all the array is involved in the operation
     int totalDest; ///< The total number of array elements involved in the strategy
     
  public:
     ComlibArrayInfo();
     //~ComlibArrayInfo();
 
+    /** Set the  source array used for this strategy. 
+       The list of array indices should be the whole portion of the array involved in the strategy.
+       The non-local array elements will be cleaned up inside purge() at migration of the strategy
+    */
     void setSourceArray(CkArrayID aid, CkArrayIndexMax *e=0, int nind=0);
     int isSourceArray(){return isSrcArray;}
-    void getSourceArray(CkArrayID &aid, CkArrayIndexMax *&e, int &nind);
-    /// This operation leaks memory is the index vector is not retrieved before!
-    void resetSource() {new (&src_elements) CkVec<CkArrayIndexMax>();};
-    void addSource(CkArrayIndexMax &e) {
-       src_elements.push_back(e);
-//     ComlibPrintf("[%d] src_elements.push_back(%d)  now contains %d\n", CkMyPe(), e.data()[0], src_elements.size());
-    }
+    CkArrayID getSourceArrayID() {return src_aid;}
+    const CkVec<CkArrayIndexMax> & getSourceElements() {return src_elements;}
+
+    /** Set the destination array used for this strategy. 
+       The list of array indices should be the whole portion of the array involved in the strategy.
+       The non-local array elements will be cleaned up inside purge() at migration of the strategy
+    */
+    void setDestinationArray(CkArrayID aid, CkArrayIndexMax *e=0, int nind=0);
+    int isDestinationArray(){return isDestArray;}
+    CkArrayID getDestinationArrayID() {return dest_aid;}
+    const CkVec<CkArrayIndexMax> & getDestinationElements() {return dest_elements;}
 
     /// Get the number of source array elements
     int getTotalSrc() {return totalSrc;}
     int getLocalSrc() {return src_elements.size();}
 
-    void setDestinationArray(CkArrayID aid, CkArrayIndexMax *e=0, int nind=0);
-    int isDestinationArray(){return isDestArray;}
-    void getDestinationArray(CkArrayID &aid, CkArrayIndexMax *&e, int &nind);
-    /// This operation leaks memory is the index vector is not retrieved before!
-    void resetDestination() {new (&dest_elements) CkVec<CkArrayIndexMax>();};
-    void addDestination(CkArrayIndexMax &e) {
-       dest_elements.push_back(e);
-       ComlibPrintf("[%d] dest_elements.push_back(%d)  now contains %d\n", CkMyPe(), e.data()[0], dest_elements.size());       
+    /** Add a destination object that is local to this PE to list used in future iterations */
+    void addNewLocalDestination(const CkArrayIndexMax &e) {
+        CkAssert(e.nInts > 0);
+       new_dest_elements.push_back(e);
+    }
+
+    /** Add a source object that is local to this PE to list used in future iterations */
+    void addNewLocalSource(const CkArrayIndexMax &e) {
+        CkAssert(e.nInts > 0);
+       new_src_elements.push_back(e);
     }
+  
+    /// Switch to using the new destination and source lists if the previous iteration found an error and constructed the new list
+    void useNewSourceAndDestinations() {
+      if(new_dest_elements.size() > 0) {
+       dest_elements.removeAll();
+       dest_elements = new_dest_elements;
+       CkAssert(dest_elements.size() == new_dest_elements.size());
+       new_dest_elements.removeAll();
+      }
+      if(new_src_elements.size() > 0) {
+       src_elements.removeAll();
+       src_elements = new_src_elements;
+       CkAssert(src_elements.size() == new_src_elements.size());
+       new_src_elements.removeAll();
+      }
+    }
+
+    int newDestinationListSize(){ return new_dest_elements.size(); }
+    int newSourceListSize(){ return new_src_elements.size(); }
+
     int getTotalDest() {return totalDest;}
     int getLocalDest() {return dest_elements.size();}
 
@@ -267,10 +319,10 @@ class ComlibArrayInfo {
     /// This routine returnes an array of size CkNumPes() where each element
     /// follow the convention for bracketed strategies counts.
     int *getCombinedCountList();
-    //void getSourcePeList(int *&pelist, int &npes);
-    //void getDestinationPeList(int *&pelist, int &npes);
-    //void getCombinedPeList(int *&pelist, int &npes);
-    
+
+    void printDestElementList();
+
+
     void pup(PUP::er &p);
 };
 
index c4482d387ffcc0cd83a8c9ffd07f4d875017e217..c71d29e8f223cdf5adcf449603eea02fac2dda29 100644 (file)
@@ -6,64 +6,10 @@
 
 #include "EachToManyMulticastStrategy.h"
 #include "string.h"
-//#include "routerstrategy.h"
 
 #include "AAPLearner.h"
 #include "AAMLearner.h"
 
-//EachToManyMulticastStrategy CODE
-//CkpvExtern(int, RecvdummyHandle);
-//CkpvExtern(CkGroupID, cmgrID);
-
-/*
-void *itrDoneHandler(void *msg){
-
-    EachToManyMulticastStrategy *nm_mgr;
-
-    DummyMsg *dmsg = (DummyMsg *)msg;
-    comID id = dmsg->id;
-    int instid = id.instanceID;
-
-    CmiFree(msg);
-    ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
-
-    StrategyTableEntry *sentry = 
-        CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
-        ->getStrategyTableEntry(instid);
-    int nexpected = sentry->numElements;
-
-    if(nexpected == 0) {             
-        ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
-        nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
-        nm_mgr->doneInserting();
-
-       if (!nm_mgr->getOnFinish().isInvalid()) nm_mgr->getOnFinish().send(0);
-
-    }
-
-    return NULL;
-}
- */
-/*
-void *E2MHandler(void *msg){
-    //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
-    EachToManyMulticastStrategy *nm_mgr;    
-
-    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
-    int instid = conv_header->stratid;
-
-    envelope *env = (envelope *)msg;
-
-    nm_mgr = (EachToManyMulticastStrategy *) 
-        CProxy_ComlibManager(CkpvAccess(cmgrID)).
-        ckLocalBranch()->getStrategy(instid);
-
-    RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
-
-    nm_mgr->localMulticast(msg);
-    return NULL;
-}
- */
 
 //Group Constructor
 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy,
@@ -109,17 +55,6 @@ EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy,
        //ainfo.getSourcePeList(nsrcPe, srcPe);
        //ainfo.getDestinationPeList(ndestPe, destPe);
 
-       /*
-      char dump[1000];
-      char sdump[100];
-      sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
-      for(int count = 0; count < npes; count ++){
-      sprintf(sdump, "%d, ", pelist[count]);
-      strcat(dump, sdump);           
-      }    
-      ComlibPrintf("%s\n", dump);
-        */
-
        commonInit(count);
 }
 
@@ -196,24 +131,6 @@ void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
        RouterStrategy::insertMessage(cmsg);
 }
 
-/*
-void EachToManyMulticastStrategy::doneInserting(){
-
-    StrategyTableEntry *sentry = 
-        CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
-        ->getStrategyTableEntry(getInstance());
-    int nexpected = sentry->numElements;
-
-    if(routerID == USE_DIRECT && nexpected == 0)
-        return;
-
-    if(MyPe < 0)
-        return;
-
-    //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
-    rstrat->doneInserting();
-}
- */
 
 void EachToManyMulticastStrategy::pup(PUP::er &p){
 
@@ -225,81 +142,7 @@ void EachToManyMulticastStrategy::pup(PUP::er &p){
 
 }
 
-/* NOT NEEDED
-void EachToManyMulticastStrategy::finalizeCreation() {
-  ainfo.purge();
-}
- */
-
-// void EachToManyMulticastStrategy::beginProcessing(int numElements){
-
-//     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
-//     /*
-//     char dump[1000];
-//     char sdump[100];
-//     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
-//     for(int count = 0; count < npes; count ++){
-//         sprintf(sdump, "%d, ", pelist[count]);
-//         strcat(dump, sdump);           
-//     }    
-//     ComlibPrintf("%s\n", dump);
-//     */
-
-//     int expectedDeposits = 0;
-
-//     rstrat->setInstance(getInstance());
-
-//     if(ainfo.isSourceArray()) 
-//         expectedDeposits = numElements;
-
-//     if(getType() == GROUP_STRATEGY) {
-
-//         CkGroupID gid;
-//         int *srcpelist;
-//         int nsrcpes;
-
-//         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
-
-//         for(int count = 0; count < nsrcpes; count ++)
-//             if(srcpelist[count] == CkMyPe()){
-//                 expectedDeposits = 1;
-//                 break;
-//             }
-
-//         StrategyTableEntry *sentry = 
-//             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
-//             ->getStrategyTableEntry(myInstanceID);
-//         sentry->numElements = expectedDeposits;
-//     }
-//     /*
-//     if(useLearner) 
-//         setLearner(new AAPLearner());    
-//     else 
-//         setLearner(new AAMLearner());                
-//     */
-
-//     if(expectedDeposits > 0)
-//         return;
-
-//     if(expectedDeposits == 0 && MyPe >= 0)
-//         ConvComlibScheduleDoneInserting(myInstanceID);
-// }
-
-/*
-void EachToManyMulticastStrategy::finalizeProcessing() {
-    if(npes > 0)
-        delete [] pelist;
-
-    if(ndestpes > 0)
-        delete [] destpelist;
-
-    if(rstrat)
-        delete rstrat;
-
-    if(useLearner && getLearner() != NULL)
-        delete getLearner();
-}
- */
+
 
 void EachToManyMulticastStrategy::deliver(char *msg, int size) {
        ComlibPrintf("[%d] EachToManyMulticastStrategy::deliver for %s\n",
@@ -319,10 +162,9 @@ void EachToManyMulticastStrategy::deliver(char *msg, int size) {
                        CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), env->getGroupNum());
                }
                else if (getType() == ARRAY_STRATEGY) {
-                       //        CkPrintf("[%d] Delivering via ComlibArrayInfo::deliver()\n", CkMyPe());
+                       //        ComlibPrintf("[%d] Delivering via ComlibArrayInfo::deliver()\n", CkMyPe());
                        //      ComlibArrayInfo::deliver(env);
 
-                       // call ainfo's localBroadcast(env);
                        ComlibPrintf("[%d] Delivering via ComlibArrayInfo::localBroadcast()\n", CkMyPe());
                        ainfo.localBroadcast(env);
 
@@ -333,7 +175,7 @@ void EachToManyMulticastStrategy::deliver(char *msg, int size) {
 void EachToManyMulticastStrategy::localMulticast(void *msg){
        register envelope *env = (envelope *)msg;
        CkUnpackMessage(&env);
-       CkPrintf("localMulticast calls ainfo.localBroadcast()\n");
+       ComlibPrintf("localMulticast calls ainfo.localBroadcast()\n");
        ainfo.localBroadcast(env);
 }
 
index 74910b2ee78ebaf750cb28a2e33af505c9bced0e..ffa0807939832d474a5a05b08a4a27fbc9f6697a 100644 (file)
@@ -64,6 +64,8 @@ class EachToManyMulticastStrategy : public RouterStrategy, public CharmStrategy
     virtual void localMulticast(void *msg);
 
     virtual void notifyDone();
+
+    /** Called by handleMessage at the destinations for the broadcast if in DIRECT mode. */
     virtual void deliver(char *, int);
 
     /// this method can be called when the strategy is in DIRECT mode, so the
index 62350d89858e884cb368709b374f7d0fc4a65b7a..95d6f6d9edb7eccaafc2e4f99c5b80cb984a17e9 100644 (file)
@@ -30,6 +30,9 @@
 #define MAX_NUM_STRATS 32
 
 
+#define STARTUP_ITERATION -10000
+
+
 /** Converse interface to handle all the Comlib strategies registered in the
     Converse of Charm++ program. Being in Converse, this is a pure class
     allocated into a global variable. */
index 5cab088436b5626f9c12267c6b26cf1d2b73f35e..5e392351a6156970c12601e0fa713b86f4a1a5eb 100644 (file)
@@ -92,7 +92,7 @@ void StrategyWrapper::pup (PUP::er &p) {
 
 
 StrategyTableEntry::StrategyTableEntry() {
-    lastKnownIteration = -1000;
+    lastKnownIteration = STARTUP_ITERATION;
     strategy = NULL;
     isNew = 0;
     isReady = 0;
index 553854b7c8987db620172e1e7e90e0a83f5ca519..2f7513a34f72a1bf6585ebe246574cc01ed3f28b 100644 (file)
@@ -313,16 +313,11 @@ RouterStrategy::~RouterStrategy() {
 }
 
 /// Receive a message from the upper layer and buffer it in the msgQ until
-/// doneInserting is called. If the strategy is USE_DIRECT then just send it.
+/// doneInserting is called. If the strategy is USE_DIRECT then just send it to the handleMessage method for the Strategy.
 void RouterStrategy::insertMessage(MessageHolder *cmsg){
        ComlibPrintf("[%d] RouterStrategy::insertMessage\n", CkMyPe());
 
-       
-       for(int i = 0; i < ndestPes; i++){
-               int destPe = destPelist[i];
-               ComlibPrintf("[%d] RouterStrategy::insertMessage destPelist[%d]=%d\n", CkMyPe(), i, destPe );
-       }
-       
+       
   //if(myPe < 0)
   //    CmiAbort("insertMessage: mype < 0\n");
 
@@ -350,18 +345,24 @@ void RouterStrategy::insertMessage(MessageHolder *cmsg){
 #else
        if(cmsg->dest_proc == IS_BROADCAST) {
                ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all PEs\n", CkMyPe());
-               for(int destPe = 0; destPe < CkNumPes()-1; destPe++){
-                       ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), destPe );
-                       CmiSyncSend(destPe, cmsg->size, cmsg->getMessage());
-               }
-               if(CkNumPes()>0){
-                       CmiSyncSendAndFree(CkNumPes()-1, cmsg->size, cmsg->getMessage());
-                       ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), CkNumPes()-1 );
-               }
+       
+#if 0
+               CmiSyncBroadcastAndFree(cmsg->size, cmsg->getMessage() ); // This ought to be the same as the following alternative
+#else
+               for(int destPe = 0; destPe < CkNumPes()-1; destPe++){
+                       ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), destPe );
+                       CmiSyncSend(destPe, cmsg->size, cmsg->getMessage());
+               }
+               if(CkNumPes()>0){
+                       CmiSyncSendAndFree(CkNumPes()-1, cmsg->size, cmsg->getMessage());
+                       ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), CkNumPes()-1 );
+               }
+#endif
+
        }       
-        else
-            CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
-                               cmsg->getMessage());
+        else {
+         CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, cmsg->getMessage());
+       }
        delete cmsg;
     
 #endif
@@ -392,15 +393,15 @@ void RouterStrategy::insertMessage(MessageHolder *cmsg){
 }
 
 void RouterStrategy::doneInserting(){
-       ComlibPrintf("[%d] RouterStrategy::doneInserting", CkMyPe());
-       
+  ComlibPrintf("[%d] RouterStrategy::doneInserting msgQ.length()=%d \n", CkMyPe(), msgQ.length());
+  
+  
   if(myPe < 0) return; // nothing to do if I have not objects in my processor
       //CmiAbort("insertMessage: mype < 0\n");
 
     id.instanceID = getInstance();
 
     //ComlibPrintf("Instance ID = %d\n", getInstance());
-    ComlibPrintf("[%d] RouterStrategy::doneInserting %d \n", CkMyPe(), msgQ.length());
     
     if(doneFlag == 0) {
         ComlibPrintf("[%d] Waiting for previous iteration to Finish\n", 
@@ -411,6 +412,7 @@ void RouterStrategy::doneInserting(){
     }
     
     if(routerID == USE_DIRECT) {
+      CkAssert(msgQ.length() == 0);
       //DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
       //memset((char *)m, 0, sizeof(DummyMsg)); 
       //m->id.instanceID = getInstance();
@@ -438,26 +440,9 @@ void RouterStrategy::doneInserting(){
         msgQ.push(cmsg);
     }
 
+    ComlibPrintf("Calling router->EachToManyMulticastQ??????????????????????????\n");
     router->EachToManyMulticastQ(id, msgQ);
 
-    /* These were needed in the case of more than one iteration executing at the same time.
-       For the moment this is not the case.
-    while(!recvQ.isEmpty()) {
-        char *msg = recvQ.deq();
-        RecvManyMsg(msg);
-    }
-
-    while(!procQ.isEmpty()) {
-        char *msg = procQ.deq();
-        ProcManyMsg(msg);
-    }
-
-    while(!dummyQ.isEmpty() > 0) {
-        DummyMsg *m = dummyQ.deq();
-        router->DummyEP(m->id, m->magic);
-        CmiFree(m);
-    }
-    */
 }
 
 void RouterStrategy::deliver(char *msg, int size) {