Fixing some comlib bugs that could possibly occur due to the way the src and dest...
[charm.git] / src / ck-com / ComlibManager.C
index 0b988de1fd5fd0ccd07752154c223dc67880d9af..2a45b6b95448e8cea3fbd250f9fd786c7ed75dd0 100644 (file)
@@ -93,8 +93,7 @@ ComlibManager::ComlibManager(){
 
 void ComlibManager::init(){
 
-
-  //  CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
+   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
 
 
   if(CkNumPes() == 1 ){
@@ -149,9 +148,6 @@ void ComlibManager::barrier(){
                bcount ++;
                if(bcount == CkNumPes()){
                        bcount = 0;
-                       //barrierReached = 1;
-                       //barrier2Reached = 0;
-
                        CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
                        cgproxy.resumeFromSetupBarrier();
                }
@@ -169,15 +165,18 @@ void ComlibManager::barrier(){
    triggering the broadcast of the strategies created in Main::Main. Here the
    Main::Main has for sure executed, otherwise we will not have received
    confirmation by all other processors.
+   
+   
  */
 void ComlibManager::resumeFromSetupBarrier(){
        ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
 
        setupComplete = 1;
        ComlibDoneCreating();
+       ComlibPrintf("[%d] resumeFromSetupBarrier calling ComlibDoneCreating to tell converse layer strategies to set themselves up\n", CkMyPe());
        sendBufferedMessagesAllStrategies();
-}
 
+}
 
 /***************************************************************************
  Determine whether the delegated messages should be buffered until the 
@@ -203,22 +202,24 @@ void ComlibManager::sendBufferedMessagesAllStrategies(){
 
 /***************************************************************************
    Send all the buffered messages once startup has completed, and we have 
-   recovered from any errors have recovered.
+   recovered from all errors.
 ***************************************************************************/
 void ComlibManager::sendBufferedMessages(int instid){
   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
-
-  if(! shouldBufferMessagesNow(instid) && delayMessageSendBuffer[instid].size() > 0){
+  
+  if(shouldBufferMessagesNow(instid)){
+    ComlibManagerPrintf("[%d] sendBufferedMessages is not flushing buffered messages for strategy %d because shouldBufferMessagesNow()==true\n", CkMyPe(), instid);  
+  } else if(delayMessageSendBuffer[instid].size() == 0){
+    ComlibManagerPrintf("[%d] sendBufferedMessages: no bufferedmessages to send for strategy %d\n", CkMyPe(), instid);  
+  } else{
     ComlibManagerPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid);
 
-
     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
       CharmMessageHolder* cmsg = *iter;
          
       switch(cmsg->type){
            
       case CMH_ARRAYSEND:
-       //          CkAbort("CMH_ARRAYSEND unimplemented");
        CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
        CkpvAccess(conv_com_object).doneInserting(instid);
        break;
@@ -243,28 +244,22 @@ void ComlibManager::sendBufferedMessages(int instid){
     }
 
     delayMessageSendBuffer[instid].clear();
-  } 
-  else {
-    ComlibManagerPrintf("sendBufferedMessages is not flushing buffered messages for strategy %d because myEntry->errorMode=%d && setupComplete = %d && myEntry->discoveryMode = %d && myEntry->bufferOutgoing = %d\n", instid, (int)myEntry->errorMode, (int)setupComplete, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);  
   }
 
-
-
 }
 
 
 
 /***************************************************************************
- * Bracketed section:
- *
- * Routines for bracketed strategies, to call the brackets begin and end, and to
- * restore count errors after some objects migrate.
+ * Routine for bracketed strategies, for detecting count errors after 
+ * objects migrate.
  * 
  * This function must only be called after all messages from the previous 
  * iteration have been delivered. This is the application's responsibility to 
- * ensure. The iteration values provided must increase monotonically.
+ * ensure. The iteration values provided must increase monotonically, 
+ * and must be greater than STARTUP_ITERATION.
+ *
  ***************************************************************************/
-
 /// Called when the array/group element starts sending messages
 void ComlibManager::beginIteration(int instid, int iteration){
        StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
@@ -272,7 +267,10 @@ void ComlibManager::beginIteration(int instid, int iteration){
        ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
 
 
+
        if(iteration > myEntry->lastKnownIteration){
+             ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
+
              // Verify & update errorModeServer:
              if(CkMyPe()==0){
                CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || 
@@ -287,6 +285,21 @@ void ComlibManager::beginIteration(int instid, int iteration){
              // Verify & update errorMode:
              CkAssert(myEntry->errorMode == ERROR_FIXED_MODE || myEntry->errorMode == NORMAL_MODE  );
              myEntry->errorMode = NORMAL_MODE;   
+             
+             ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
+
+             if(myEntry->lastKnownIteration == STARTUP_ITERATION){
+               // At this point, myEntry->numElements == 0
+               // An error will be detected below, so messages will be buffered.
+               // Since this is startup, bracketedStartErrorRecoveryProcess 
+               // will wait until the comlib layer is setup before continuing
+             } else {
+               // switch to using the newly updated source and destination lists (describing which objects are local to a PE)
+               // This must be done here because the list can only be updated once all sends 
+               // have completed from the iteration during which the error was detected.
+               myInfo->useNewSourceAndDestinations();  
+             }
+
 
              myEntry->lastKnownIteration = iteration;
              myEntry->nBeginItr = 1; // we are the first time to be called this iteration
@@ -295,12 +308,10 @@ void ComlibManager::beginIteration(int instid, int iteration){
              myEntry->totalEndCounted = 0;
              myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
              myEntry->nEndSaved = 0;
-             
-             
-             ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
+                     
        } else {
-               myEntry->nBeginItr++;
                ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
+               myEntry->nBeginItr++;
        }
        
        
@@ -308,7 +319,7 @@ void ComlibManager::beginIteration(int instid, int iteration){
        // This will ensure that if we are the processor that detects this error, 
        // we won't deliver at least this message until the strategy is fixed
        if (myEntry->nBeginItr > myEntry->numElements) {
-         ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING\n",CkMyPe());                     
+         ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING because nBeginItr=%d > numElements=%d\n",CkMyPe(), myEntry->nBeginItr, myEntry->numElements);
          myEntry->bufferOutgoing = 1;
        }
        
@@ -369,7 +380,7 @@ void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){
   
 
   if(converseManager->isReady(instid)){
-    ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
+    ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess(instid=%d step=%d) %s %s %s\n", CkMyPe(), instid, step, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
 
     CkAssert(myEntry->strategy != NULL);
     CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
@@ -681,64 +692,70 @@ void ComlibManager::bracketedReleaseBufferedMessages(int instid) {
     it is still here. If the array element has migrated away, then the
     bracketedDiscover() method is called on the new PE.
 
+    the new source and destination element arrays will be empty at this point,
+    so myInfo->addNewDestinationList(e); will add a new record that
+    will be used for future iterations.
+
 */
 void ComlibManager::bracketedStartDiscovery(int instid) {
        StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
        CkAssert(myEntry->discoveryMode == NORMAL_DISCOVERY_MODE);
        myEntry->discoveryMode = STARTED_DISCOVERY_MODE;
        ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
-                               
+       const CProxy_ComlibManager myProxy(thisgroup);
+
        ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
 
        int countSrc = 0;
        int countDest = 0;
-       CkArrayID aid;
-       CkArrayIndexMax *idx;
-       int nelem;
-       CkArray *a;
-       CProxy_ComlibManager myProxy(thisgroup);
 
        if (myInfo->isSourceArray()) {
-               myInfo->getSourceArray(aid, idx, nelem);
-               myInfo->resetSource();
-               a = (CkArray*)_localBranch(aid);
-               for (int i=0; i<nelem; ++i) {
-                       int pe = a->lastKnown(idx[i]);
-//                     ComlibManagerPrintf("[%d] bracketedStartDiscovery src Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
-                       if (pe == CkMyPe()) {
-                               countSrc++;
-                               myInfo->addSource(idx[i]);
-                       }
-                       else {
-                               myProxy[pe].bracketedDiscover(instid, aid, idx[i], true);
-                       }
-               }
-               delete[] idx;
+         CkAssert(myInfo->newSourceListSize() == 0);
+
+         const CkVec<CkArrayIndexMax> & srcElements = myInfo->getSourceElements();
+         const int nelem = srcElements.size();
+         const CkArrayID aid = myInfo->getSourceArrayID(); 
+         const CkArray *a = (CkArray*)_localBranch(aid);
+
+         for (int i=0; i<nelem; ++i) {
+           int pe = a->lastKnown(srcElements[i]);
+           if (pe == CkMyPe()) {
+             countSrc++;
+             myInfo->addNewLocalSource(srcElements[i]);
+           }
+           else {
+             myProxy[pe].bracketedDiscover(instid, aid, srcElements[i], true);
+           }
+         }
+
        }
 
        if (myInfo->isDestinationArray()) {
-               myInfo->getDestinationArray(aid, idx, nelem);
-               myInfo->resetDestination();
-               a = (CkArray*)_localBranch(aid);
-               for (int i=0; i<nelem; ++i) {
-                       int pe = a->lastKnown(idx[i]);
-//                     ComlibManagerPrintf("[%d] bracketedStartDiscovery dest Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
-                       if (pe == CkMyPe()) {
-                               countDest++;
-                               myInfo->addDestination(idx[i]);
-                       }
-                       else {
-                               myProxy[pe].bracketedDiscover(instid, aid, idx[i], false);
-                       }
-               }
-               delete[] idx;
+         CkAssert(myInfo->newDestinationListSize() == 0);
+
+         const CkVec<CkArrayIndexMax> & destElements = myInfo->getDestinationElements();
+         const int nelem = destElements.size();
+         const CkArrayID aid = myInfo->getDestinationArrayID();
+         const CkArray *a = (CkArray*)_localBranch(aid);
+
+         for (int i=0; i<nelem; ++i) {
+           int pe = a->lastKnown(destElements[i]);
+           if (pe == CkMyPe()) {
+             countDest++;
+             myInfo->addNewLocalDestination(destElements[i]);
+           }
+           else {
+             myProxy[pe].bracketedDiscover(instid, aid, destElements[i], false);
+           }
+         }
        }
 
+       // Report the number of elements that are now local to this PE (if >0).
+       // The new owner PEs will report the counts for those objects that are no longer local to this PE
        if (countSrc > 0 || countDest > 0) {
-               // contribute only if we have something
-               myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
+         myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
        }
-
+       
 }
 
 
@@ -779,15 +796,15 @@ void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax
                        // Add the element as a source element for the strategy
                        ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
                        CkAssert((unsigned long)myInfo > 0x1000);
-                       myInfo->addSource(idx);
+                       myInfo->addNewLocalSource(idx);
 
                        ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
                        myEntry->numElements = myInfo->getLocalSrc();           
                }
                else {
                        // Add the element as a Destination element for the strategy
-                       ComlibManagerPrintf("[%d] bracketedDiscover addDestination\n", CkMyPe());
-                       myInfo->addDestination(idx);
+                       ComlibManagerPrintf("[%d] bracketedDiscover addNewDestination\n", CkMyPe());
+                       myInfo->addNewLocalDestination(idx);
                }
        } else {
                ComlibManagerPrintf("Keep On Forwarding*********************\n");
@@ -1043,9 +1060,7 @@ void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m,
        env->setUsed(0);
        CkPackMessage(&env);
        
-       
-       /// @TODO why are CkSectionInfo needed since it doesn't seem to have
-       /// useful information?
+
        CkSectionInfo minfo;
        minfo.type = COMLIB_MULTICAST_MESSAGE;
        minfo.sInfo.cInfo.instId = ci->getID();
@@ -1115,19 +1130,19 @@ void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
 
 
 void ComlibManager::printDiagnostics(int instid){
-       ComlibManagerPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
+  CkPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
 }
 
 
 void ComlibManager::printDiagnostics(){
-  
+
 
   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
     int instid = iter->first;
     int size = iter->second.size();
     
-    if(size>0){
+    if(size>0 || true){
       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
       
       if(! shouldBufferMessagesNow(instid)){
@@ -1137,7 +1152,10 @@ void ComlibManager::printDiagnostics(){
       }
       
       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
-      CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d errorMode=%d errorModeServer=%d discoveryMode=%d bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, (int)myEntry->errorMode, (int)myEntry->errorModeServer, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);
+      CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d %s %s %s bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString(), (int)myEntry->bufferOutgoing);
+
+
+
     }
   }
   
@@ -1322,7 +1340,7 @@ void ComlibNotifyMigrationDoneHandler(void *msg) {
 
 
 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
-  // CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
+  CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
 
   ComlibManager *mgr = (ComlibManager*)ptr;
   mgr->printDiagnostics();