Changes in message logging protocol to make it safer. Determinants are all received...
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 26 Apr 2012 20:46:55 +0000 (15:46 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 26 Apr 2012 20:46:55 +0000 (15:46 -0500)
src/ck-core/ckcausalmlog.C
src/ck-core/ckcausalmlog.h

index 06afc80b894ccd0a9a23037cd41ccbc60b9c5870..e5b99a7cbc55fa113171b52a241efbecb5539571 100644 (file)
@@ -31,7 +31,7 @@
 
 #define DEBUG_MEM(x)  //x
 #define DEBUG(x) // x
-#define DEBUG_RESTART(x) // x
+#define DEBUG_RESTART(x)  //x
 #define DEBUGLB(x)   // x
 #define DEBUG_TEAM(x)  // x
 #define DEBUG_PERF(x) // x
@@ -170,7 +170,8 @@ int _recvGlobalStepHandlerIdx;
 int _updateHomeRequestHandlerIdx;
 int _updateHomeAckHandlerIdx;
 int _resendMessagesHandlerIdx;
-int _resendReplyHandlerIdx;
+int _sendDetsHandlerIdx;
+int _sendDetsReplyHandlerIdx;
 int _receivedTNDataHandlerIdx;
 int _receivedDetDataHandlerIdx;
 int _distributedLocationHandlerIdx;
@@ -247,7 +248,8 @@ void _messageLoggingInit(){
        _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
        _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
        _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
-       _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
+       _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
+       _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
        _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
        _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
        _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
@@ -1066,7 +1068,7 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
 //env->sender.updatePosition(env->getSrcPe());
        if(env->TN == obj->mlogData->tProcessed+1){
                //the message that needs to be processed now
-               DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
+               DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
                // once we find a message that we can process we put back all the messages in the out of order queue
                // back into the main scheduler queue. 
        DEBUG_MEM(CmiMemoryCheck());
@@ -1087,14 +1089,14 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
        // checking if message has already been processed
        // message must be discarded
        if(env->TN <= obj->mlogData->tProcessed){
-               DEBUG(printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
+               DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
                
                CmiFree(env);
                return 0;
        }
        //message that needs to be processed in the future
 
-       DEBUG(printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
+       DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
        //the message cant be processed now put it back in the out of order message Q, 
        //It will be transferred to the main queue later
        CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
@@ -2065,17 +2067,33 @@ void _recvCheckpointHandler(char *_restartData){
        _initDone();
 
        getGlobalStep(globalLBID);
+
+       // sending request to send determinants
+       _numRestartResponses = 0;
        
-       countUpdateHomeAcks = 0;
-       RestartRequest updateHomeRequest;
-       updateHomeRequest.PE = CmiMyPe();
-       CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
-       for(int i=0;i<CmiNumPes();i++){
-               if(i != CmiMyPe()){
-                       CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
+       // Send out the request to resend logged determinants to all other processors
+       CkVec<TProcessedLog> objectVec;
+       forAllCharesDo(createObjIDList, (void *)&objectVec);
+       int numberObjects = objectVec.size();
+       
+       //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
+       int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
+       char *resendMsg = (char *)CmiAlloc(totalSize);  
+
+       ResendRequest *resendReq = (ResendRequest *)resendMsg;
+       resendReq->PE =CkMyPe(); 
+       resendReq->numberObjects = numberObjects;
+       char *objList = &resendMsg[sizeof(ResendRequest)];
+       memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
+
+       CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
+       for(int i=0;i<CkNumPes();i++){
+               if(i != CkMyPe()){
+                       CmiSyncSend(i,totalSize,resendMsg);
                }
        }
-
+       CmiFree(resendMsg);
+       
 }
 
 /**
@@ -2334,14 +2352,16 @@ void resendMessageForChare(void *data, ChareMlogData *mlogData){
 }
 
 /**
- * Resends messages since last checkpoint to the list of objects included in the 
- * request. It also sends stored remote determinants to the particular failed PE.
+ * Send all remote determinants to a particular failed PE.
+ * It only sends determinants to those objects on the list.
  */
-void _resendMessagesHandler(char *msg){
+void _sendDetsHandler(char *msg){
        ResendData d;
        CkVec<Determinant> *detVec;
        ResendRequest *resendReq = (ResendRequest *)msg;
 
+       // CkPrintf("[%d] Sending determinants\n",CkMyPe());
+
        // building the reply message
        char *listObjects = &msg[sizeof(ResendRequest)];
        d.numberObjects = resendReq->numberObjects;
@@ -2350,67 +2370,8 @@ void _resendMessagesHandler(char *msg){
        d.ticketVecs = new CkVec<MCount>[d.numberObjects];
        detVec = new CkVec<Determinant>[d.numberObjects];
 
-       //Check if any of the retained objects need to be recreated
-       //If they have not been recreated on the restarted processor
-       //they need to be recreated on this processor
-       int count=0;
-       for(int i=0;i<retainedObjectList.size();i++){
-               if(retainedObjectList[i]->migRecord.toPE == d.PE){
-                       count++;
-                       int recreate=1;
-                       for(int j=0;j<d.numberObjects;j++){
-                               if(d.listObjects[j].recver.type != TypeArray ){
-                                       continue;
-                               }
-                               CkArrayID aid(d.listObjects[j].recver.data.array.id);           
-                               CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
-                               if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
-                                       if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asChild()){
-                                               recreate = 0;
-                                               break;
-                                       }
-                               }
-                       }
-                       CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
-                       if(recreate){
-                               donotCountMigration=1;
-                               _receiveMlogLocationHandler(retainedObjectList[i]->msg);
-                               donotCountMigration=0;
-                               CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
-                               int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
-                               informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
-                               sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
-                               CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
-                               CmiAssert(rec->type() == CkLocRec::local);
-                               CkVec<CkMigratable *> eltList;
-                               locMgr->migratableList((CkLocRec_local *)rec,eltList);
-                               for(int j=0;j<eltList.size();j++){
-                                       if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
-                                               CpvAccess(_currentObj) = eltList[j];
-                                               eltList[j]->ResumeFromSync();
-                                       }
-                               }
-                               retainedObjectList[i]->msg=NULL;        
-                       }
-               }
-       }
-       
-       if(count > 0){
-//             CmiAbort("retainedObjectList for restarted processor not empty");
-       }
-       
-       DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
-
-
-       //TML: examines the origin processor to determine if it belongs to the same group.
-       // In that case, it only returns the maximum ticket received for each object in the list.
-       if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
-               forAllCharesDo(fillTicketForChare,&d);
-       else
-               forAllCharesDo(resendMessageForChare,&d);
-
-       // adding the stored determinants to the resendReplyMsg
-       // traversing all the stored determinants
+       // adding the remote determinants to the resendReplyMsg
+       // traversing all the remote determinants
        CkVec<Determinant> *vec;
        for(int i=0; i<d.numberObjects; i++){
                vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
@@ -2477,23 +2438,46 @@ void _resendMessagesHandler(char *msg){
                ticketList = &ticketList[sizeof(Determinant)*vecsize];
        }
 
-       CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
+       CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
        CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
        
-/*     
-       if(verifyAckRequestsUnacked){
-               CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
-               for(int i=0;i<verifyAckRequestsUnacked;i++){
-                       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
-                       LDObjHandle h;
-                       lb->Migrated(h,1);
-               }
-       }
-       
-       verifyAckRequestsUnacked=0;*/
-       
-       delete [] d.ticketVecs;
        delete [] detVec;
+       delete [] d.ticketVecs;
+
+       DEBUG_MEM(CmiMemoryCheck());
+
+       if(resendReq->PE != CkMyPe()){
+               CmiFree(msg);
+       }       
+//     CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
+       lastRestart = CmiWallTimer();
+
+}
+
+/**
+ * Resends messages since last checkpoint to the list of objects included in the 
+ * request. It also sends stored remote determinants to the particular failed PE.
+ */
+void _resendMessagesHandler(char *msg){
+       ResendData d;
+       ResendRequest *resendReq = (ResendRequest *)msg;
+
+       //CkPrintf("[%d] Resending messages\n",CkMyPe());
+
+       // building the reply message
+       char *listObjects = &msg[sizeof(ResendRequest)];
+       d.numberObjects = resendReq->numberObjects;
+       d.PE = resendReq->PE;
+       d.listObjects = (TProcessedLog *)listObjects;
+       
+       DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
+
+       //TML: examines the origin processor to determine if it belongs to the same group.
+       // In that case, it only returns the maximum ticket received for each object in the list.
+       if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
+               forAllCharesDo(fillTicketForChare,&d);
+       else
+               forAllCharesDo(resendMessageForChare,&d);
 
        DEBUG_MEM(CmiMemoryCheck());
 
@@ -2529,7 +2513,7 @@ void processDelayedRemoteMsgQueue(){
  * Message format: |Header|ObjID list|TN list|Determinant list|
  * TN list = |number of TNs|list of TNs|...|
  */
-void _resendReplyHandler(char *msg){
+void _sendDetsReplyHandler(char *msg){
        ResendRequest *resendReply = (ResendRequest *)msg;
        CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
        char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
@@ -2592,12 +2576,52 @@ void _resendReplyHandler(char *msg){
 
        }
 
-       // checking if the restart is over
+       // checking if we have received all replies
        _numRestartResponses++;
-       if(_numRestartResponses == CkNumPes()){
+       if(_numRestartResponses != CkNumPes())
+               return;
+       else 
                _numRestartResponses = 0;
-               processDelayedRemoteMsgQueue();
+
+       
+       // continuing with restart process; send out the request to resend logged messages to all other processors
+       CkVec<TProcessedLog> objectVec;
+       forAllCharesDo(createObjIDList, (void *)&objectVec);
+       int numberObjects = objectVec.size();
+       
+       //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
+       int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
+       char *resendMsg = (char *)CmiAlloc(totalSize);  
+
+       ResendRequest *resendReq = (ResendRequest *)resendMsg;
+       resendReq->PE =CkMyPe(); 
+       resendReq->numberObjects = numberObjects;
+       char *objList = &resendMsg[sizeof(ResendRequest)];
+       memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
+
+       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
+       CpvAccess(_currentObj) = lb;
+       lb->ReceiveDummyMigration(restartDecisionNumber);
+
+//HERE sleep(10);
+//     CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
+       
+       CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
+       for(int i=0;i<CkNumPes();i++){
+               if(i != CkMyPe()){
+                       CmiSyncSend(i,totalSize,resendMsg);
+               }
        }
+       _resendMessagesHandler(resendMsg);
+       CmiFree(resendMsg);
+
+       /* test for parallel restart migrate away object**/
+       if(fastRecovery){
+               distributeRestartedObjects();
+               printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
+       }
+
+//     processDelayedRemoteMsgQueue();
 
 };
 
@@ -2646,6 +2670,7 @@ void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
        // traversing the whole list of determinants
        for(int i=0; i<listSize; i++){
                det = &listDets[i];
+               if(CkMyPe() == 4) printDet(det,"RECOVERY");
                obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
                DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
        }
@@ -2676,7 +2701,7 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
        //that senders know about. Those less than the ticket number processed 
        //by the receiver can be thrown away. The rest need not be consecutive
        // ie there can be holes in the list of ticket numbers seen by senders
-       if(obj->mlogData->resendReplyRecvd == CkNumPes()){
+       if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
                obj->mlogData->resendReplyRecvd = 0;
                //sort the received TNS
                sortVec(obj->mlogData->receivedTNs);
@@ -2729,7 +2754,7 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                obj->mlogData->receivedTNs = NULL;
                obj->mlogData->restartFlag = 0;
 
-               processDelayedRemoteMsgQueue();
+               // processDelayedRemoteMsgQueue();
 
                DEBUG_RESTART(char objString[100]);
                DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
@@ -3339,11 +3364,20 @@ void _getGlobalStepHandler(LBStepMsg *msg){
        CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
 };
 
+/**
+ * @brief Receives the global step handler from PE 0
+ */
 void _recvGlobalStepHandler(LBStepMsg *msg){
        
-       restartDecisionNumber=msg->step;
-       RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
-       _updateHomeAckHandler(dummyAck);
+       // updating restart decision number
+       restartDecisionNumber = msg->step;
+       CmiFree(msg);
+
+       // sending a dummy message to sendDetsReplyHandler
+       ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
+       resendReplyMsg->PE = CkMyPe();
+       resendReplyMsg->numberObjects = 0;
+       _sendDetsReplyHandler((char *)resendReplyMsg);
 };
 
 /**
index 84a2db1e9cd9f0516f8d082308b608fcae3dde2a..5ec44b56718b849acb5cbf32c4fb64a4ac16bbd2 100644 (file)
@@ -578,7 +578,8 @@ void CkMlogRestartLocal();
 void _getCheckpointHandler(RestartRequest *restartMsg);
 void _recvCheckpointHandler(char *_restartData);
 void _resendMessagesHandler(char *msg);
-void _resendReplyHandler(char *msg);
+void _sendDetsHandler(char *msg);
+void _sendDetsReplyHandler(char *msg);
 void _receivedTNDataHandler(ReceivedTNData *msg);
 void _receivedDetDataHandler(ReceivedDetData *msg);
 void _distributedLocationHandler(char *receivedMsg);
@@ -597,7 +598,8 @@ void _recvRestartCheckpointHandler(char *_restartData);
 extern int _getCheckpointHandlerIdx;
 extern int _recvCheckpointHandlerIdx;
 extern int _resendMessagesHandlerIdx;
-extern int _resendReplyHandlerIdx;
+extern int _sendDetsHandlerIdx;
+extern int _sendDetsReplyHandlerIdx;
 extern int _receivedTNDataHandlerIdx;
 extern int _receivedDetDataHandlerIdx;
 extern int _distributedLocationHandlerIdx;