Sending back recovering objects for parallel recovery.
authorEsteban Meneses <emenese2@illinois.edu>
Sat, 7 Jul 2012 21:01:06 +0000 (16:01 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Sat, 7 Jul 2012 21:01:06 +0000 (16:01 -0500)
src/ck-core/ckcausalmlog.C
src/ck-core/ckcausalmlog.h
src/ck-ldb/CentralLB.C

index 15fa325ebca90cbbd8ddc87e349839aabee5000d..561758c0fae43e25e70c4c36fda6d48fe7d7f30a 100644 (file)
@@ -136,7 +136,7 @@ CpvDeclare(char **, _storeDetsPtrs);
 /***** VARIABLES FOR PARALLEL RECOVERY *****/
 CpvDeclare(int, _numEmigrantRecObjs);
 CpvDeclare(int, _numImmigrantRecObjs);
-CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
+CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
 /***** *****/
 
 #if COLLECT_STATS_MSGS
@@ -191,6 +191,7 @@ int _sendDetsReplyHandlerIdx;
 int _receivedTNDataHandlerIdx;
 int _receivedDetDataHandlerIdx;
 int _distributedLocationHandlerIdx;
+int _sendBackLocationHandlerIdx;
 int _storeDeterminantsHandlerIdx;
 int _removeDeterminantsHandlerIdx;
 
@@ -270,6 +271,7 @@ void _messageLoggingInit(){
        _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
        _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
        _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
+       _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
        _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
        _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
        _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
@@ -340,8 +342,8 @@ void _messageLoggingInit(){
     CpvInitialize(int, _numImmigrantRecObjs);
     CpvAccess(_numImmigrantRecObjs) = 0;
 
-    CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
-    CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
+    CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
+    CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
 
        //Cpv variables for checkpoint
        CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
@@ -2969,6 +2971,35 @@ void distributeRestartedObjects(){
        CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
 };
 
+/**
+ * Handler to receive back a location.
+ */
+void _sendBackLocationHandler(char *receivedMsg){
+       printf("Array element received at processor %d after recovery\n",CkMyPe());
+       DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
+       int sourcePE = distributeMsg->PE;
+       char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
+       PUP::fromMem pmem(buf);
+       CkGroupID gID;
+       CkArrayIndexMax idx;
+       pmem |gID;
+       pmem |idx;
+       CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
+       donotCountMigration=1;
+       mgr->resume(idx,pmem,CmiTrue);
+       donotCountMigration=0;
+       informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
+       printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
+       idx.print();
+
+       // checking if it has received all emigrant recovering objects
+       CpvAccess(_numEmigrantRecObjs)--;
+       if(CpvAccess(_numEmigrantRecObjs) == 0){
+               (*resumeLbFnPtr)(centralLb);
+       }
+
+}
+
 /**
  * Handler to update information about an object just received.
  */
@@ -2994,7 +3025,7 @@ void _distributedLocationHandler(char *receivedMsg){
        CmiAssert(rec->type() == CkLocRec::local);
 
        // adding object to the list of immigrant recovery objects
-       CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
+       CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
        CpvAccess(_numImmigrantRecObjs)++;
        
        CkVec<CkMigratable *> eltList;
@@ -3142,24 +3173,112 @@ void initMlogLBStep(CkGroupID gid){
 #endif
 }
 
+/**
+ * Pups a location
+ */
+void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
+       CkArrayIndexMax idx = loc->getIndex();
+       CkGroupID gID = locMgr->ckGetGroupID();
+       p|gID;      // store loc mgr's GID as well for easier restore
+       p|idx;
+       p|*loc;
+};
+
+/**
+ * Sends back the immigrant recovering object to their origin PE.
+ */
+void sendBackImmigrantRecObjs(){
+       CkLocation *loc;
+       CkLocMgr *locMgr;
+       CkArrayIndexMax idx;
+       CkLocRec_local *rec;
+       PUP::sizer psizer;
+       int targetPE;
+       CkVec<CkMigratable *> eltList;
+       CkReductionMgr *reductionMgr;
+       // looping through all elements in immigrant recovery objects vector
+       for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
+
+               // getting the components of each location
+               loc = (*CpvAccess(_immigrantRecObjs))[i];
+               idx = loc->getIndex();
+               rec = loc->getLocalRecord();
+               locMgr = loc->getManager();
+       locMgr->migratableList((CkLocRec_local *)rec,eltList);
+               targetPE = eltList[i]->mlogData->immigrantSourcePE;
+
+               // decrement counter at array manager
+               reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
+               reductionMgr->decNumImmigrantRecObjs();
+
+               CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
+               idx.print();
+                       
+               //pack up this location and send it across
+               pupLocation(loc,locMgr,psizer);
+               int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
+               char *msg = (char *)CmiAlloc(totalSize);
+               DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
+               distributeMsg->PE = CkMyPe();
+               char *buf = &msg[sizeof(DistributeObjectMsg)];
+               PUP::toMem pmem(buf);
+               pmem.becomeDeleting();
+               pupLocation(loc,locMgr,pmem);
+               
+               locMgr->setDuringMigration(CmiTrue);
+               delete rec;
+               locMgr->setDuringMigration(CmiFalse);
+               locMgr->inform(idx,targetPE);
+
+               // sending the object
+               CmiSetHandler(msg,_sendBackLocationHandlerIdx);
+               CmiSyncSendAndFree(targetPE,totalSize,msg);
+
+               // freeing memory
+               delete loc;
+
+               CmiAssert(locMgr->lastKnown(idx) == targetPE);
+               
+       }
+
+       // cleaning up all data structures
+       CpvAccess(_immigrantRecObjs)->removeAll();
+       CpvAccess(_numImmigrantRecObjs) = 0;
+
+}
+
 /**
  * Restores objects after parallel recovery, either by sending back the immigrant objects or 
  * by waiting for all emigrant objects to be back.
  */
-void restoreParallelRecovery(void (*fnPtr)(void *),void *_centralLb){
-       // CODING
+void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
+       resumeLbFnPtr = _fnPtr;
+       centralLb = _centralLb;
+
+       // sending back the immigrant recovering objects
+       if(CpvAccess(_numImmigrantRecObjs) > 0){
+               sendBackImmigrantRecObjs();     
+       }
+
+       // checking whether it needs to wait for emigrant recovery objects
+       if(CpvAccess(_numEmigrantRecObjs) > 0)
+               return;
+
+       // otherwise, load balancing process is finished
+       (*resumeLbFnPtr)(centralLb);
 }
 
 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
        DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
        DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
-       
+
        resumeLbFnPtr = _fnPtr;
        centralLb = _centralLb;
        migrationDoneCalled = 1;
        if(countLBToMigrate == countLBMigratedAway){
                DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
-               startMlogCheckpoint(NULL,CmiWallTimer());       
+               startMlogCheckpoint(NULL,CmiWallTimer());
        }
 };
 
index 9477207ab168e76167339db1d5aa6eb7c4169d79..0ba1d0416f2b6048e89d2a570412af8018a12585 100644 (file)
@@ -583,6 +583,7 @@ void _sendDetsReplyHandler(char *msg);
 void _receivedTNDataHandler(ReceivedTNData *msg);
 void _receivedDetDataHandler(ReceivedDetData *msg);
 void _distributedLocationHandler(char *receivedMsg);
+void _sendBackLocationHandler(char *receivedMsg);
 void _updateHomeRequestHandler(RestartRequest *updateRequest);
 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
index f7f28850b7704fcc1e918b7b051f626bbffabbb3..8afca6518998973caf2464f9f7bdfc3a63495ed6 100644 (file)
@@ -765,7 +765,7 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
 {
   storedMigrateMsg = m;
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
-       ProcessReceiveMigration((CkReductionMsg*)NULL);
+       restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
 #else
   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
                   thisProxy);