Removing hole detection in determinants during recovery.
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 3 May 2012 14:48:29 +0000 (09:48 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 3 May 2012 14:48:29 +0000 (09:48 -0500)
src/ck-core/ckcausalmlog.C
src/ck-core/ckcausalmlog.h

index e5b99a7cbc55fa113171b52a241efbecb5539571..5994339443bfac5cf36cab6b92c59d5e05e0dcf1 100644 (file)
@@ -123,6 +123,11 @@ CpvDeclare(char *, _incarnation);
 
 /***** *****/
 
+/***** VARIABLES FOR PARALLEL RECOVERY *****/
+CpvDeclare(CkVec<LocationID *> *, _emigrantRecObjs);
+CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
+/***** *****/
+
 #if COLLECT_STATS_MSGS
 int *numMsgsTarget;
 int *sizeMsgsTarget;
@@ -197,7 +202,7 @@ int _falseRestart =0; /**
                                                                                                        a porcessor without actually starting it
                                                                                                        1 -> false restart
                                                                                                        0 -> restart after an actual crash
-                                                                                               */              
+                                                                                               */
 
 //Load balancing globals
 int onGoingLoadBalancing=0;
@@ -308,7 +313,13 @@ void _messageLoggingInit(){
        for(int i=0; i<CmiNumPes(); i++){
                CpvAccess(_incarnation)[i] = 0;
        }
-       
+
+       // Cpv variables for parallel recovery
+       CpvInitialize(CkVec<LocationID *> *, _emigrantRecObjs);
+       CpvAccess(_emigrantRecObjs) = new CkVec<LocationID *>;
+       CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
+       CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
+
        //Cpv variables for checkpoint
        CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
        CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
@@ -2440,7 +2451,7 @@ void _sendDetsHandler(char *msg){
 
        CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
        CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
-       
+
        delete [] detVec;
        delete [] d.ticketVecs;
 
@@ -2488,6 +2499,7 @@ void _resendMessagesHandler(char *msg){
        lastRestart = CmiWallTimer();
 }
 
+MCount maxVec(CkVec<MCount> *TNvec);
 void sortVec(CkVec<MCount> *TNvec);
 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
 
@@ -2520,6 +2532,7 @@ void _sendDetsReplyHandler(char *msg){
        
 //     DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
        DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
+       
        for(int i =0; i< resendReply->numberObjects;i++){
                Chare *obj = (Chare *)listObjects[i].getObject();
                
@@ -2583,7 +2596,6 @@ void _sendDetsReplyHandler(char *msg){
        else 
                _numRestartResponses = 0;
 
-       
        // continuing with restart process; send out the request to resend logged messages to all other processors
        CkVec<TProcessedLog> objectVec;
        forAllCharesDo(createObjIDList, (void *)&objectVec);
@@ -2703,6 +2715,8 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
        // ie there can be holes in the list of ticket numbers seen by senders
        if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
                obj->mlogData->resendReplyRecvd = 0;
+
+#if VERIFY_DETS
                //sort the received TNS
                sortVec(obj->mlogData->receivedTNs);
        
@@ -2747,7 +2761,11 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                                obj->mlogData->currentHoles = numberHoles;
                        }
                }
-       
+#else
+               if(obj->mlogData->receivedTNs->size() > 0){
+                       obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
+               }
+#endif
                // cleaning up structures and getting ready to continue execution       
                delete obj->mlogData->receivedTNs;
                DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
@@ -2762,6 +2780,15 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
 
 }
 
+/** @brief Returns the maximum ticket from a vector */
+MCount maxVec(CkVec<MCount> *TNvec){
+       MCount max = 0;
+       for(int i=0; i<TNvec->size(); i++){
+               if((*TNvec)[i] > max)
+                       max = (*TNvec)[i];
+       }
+       return max;
+}
 
 void sortVec(CkVec<MCount> *TNvec){
        //sort it ->its bloddy bubble sort
@@ -2863,9 +2890,14 @@ public:
                        
                CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
                idx.print();
-                       
-               //TODO: an element that is being moved should leave some trace behind so that
-               // the arraybroadcaster can forward messages to it
+
+               // inserting the emigrant object to the list of emigrant recovery objects
+               // every broadcast message will be sent to these objects
+               LocationID *location = new LocationID();
+               location->idx = idx;
+               location->gid = loc.getManager()->getGroupID();
+               location->PE = *targetPE;
+               CpvAccess(_emigrantRecObjs)->push_back(location);       
                        
                //pack up this location and send it across
                PUP::sizer psizer;
@@ -2927,12 +2959,16 @@ void _distributedLocationHandler(char *receivedMsg){
 
        CkLocRec *rec = mgr->elementRec(idx);
        CmiAssert(rec->type() == CkLocRec::local);
+
+       // adding object to the list of immigrant recovery objects
+       CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
        
        CkVec<CkMigratable *> eltList;
        mgr->migratableList((CkLocRec_local *)rec,eltList);
        for(int i=0;i<eltList.size();i++){
                if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
                        CpvAccess(_currentObj) = eltList[i];
+                       eltList[i]->mlogData->immigrantRecFlag = 1;
                        eltList[i]->ResumeFromSync();
                }
        }
@@ -3373,6 +3409,8 @@ void _recvGlobalStepHandler(LBStepMsg *msg){
        restartDecisionNumber = msg->step;
        CmiFree(msg);
 
+       CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
+
        // sending a dummy message to sendDetsReplyHandler
        ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
        resendReplyMsg->PE = CkMyPe();
@@ -3597,22 +3635,13 @@ void RestoredLocalMap::pup(PUP::er &p){
        p(TNArray,count);
 };
 
-
-
-
 /**********************************
        * The methods of the message logging
        * data structure stored in each chare
        ********************************/
 
 MCount ChareMlogData::nextSN(const CkObjID &recver){
-/*     MCount SN = snTable.get(recver);
-       snTable.put(recver) = SN+1;
-       return SN+1;*/
-       DEBUG_MEM(CmiMemoryCheck());
-       double _startTime = CmiWallTimer();
        MCount *SN = snTable.getPointer(recver);
-       DEBUG_MEM(CmiMemoryCheck());
        if(SN==NULL){
                snTable.put(recver) = 1;
                return 1;
@@ -3620,7 +3649,6 @@ MCount ChareMlogData::nextSN(const CkObjID &recver){
                (*SN)++;
                return *SN;
        }
-//     traceUserBracketEvent(34,_startTime,CkWallTimer());
 };
  
 /**
index 5ec44b56718b849acb5cbf32c4fb64a4ac16bbd2..d9fb72538ac15060576ae39008fa670705b6fb0f 100644 (file)
@@ -61,7 +61,6 @@ typedef struct {
        int index;
 } RemoveDeterminantsHeader;
 
-
 /**
  * @brief Struct for the header of the storeDeterminants handler
  */
@@ -208,6 +207,7 @@ public:
 
        int toResumeOrNot;
        int resumeCount;
+       int immigrantRecFlag;
 
 private:
 
@@ -237,6 +237,7 @@ public:
                resendReplyRecvd=0;
                toResumeOrNot=0;
                resumeCount=0;
+               immigrantRecFlag = 0;
        };
        inline MCount nextSN(const CkObjID &recver);
        inline Ticket next_ticket(CkObjID &sender,MCount SN);
@@ -282,12 +283,13 @@ public:
 };
 
 /**
- * @brief 
+ * @brief Class that represents the location of an array element.
  */
 class LocationID{
 public:
        CkArrayIndexMax idx;
        CkGroupID gid;
+       int PE;
 };
 
 /**