fix for migration
[charm.git] / src / arch / mpi / machine.c
index d6e3c1669cca7bd93b6063e973c4a7b4d4d5a09f..e6c19ece5309e8351988bc5646e48ccaf19aa58a 100644 (file)
@@ -198,7 +198,6 @@ static void reportMsgHistogramInfo();
 
 #endif /* end of MPI_POST_RECV defined */
 
 
 #endif /* end of MPI_POST_RECV defined */
 
-CpvDeclare(int, crashedRank);
 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
 #define TAG     1375
 #if MPI_POST_RECV
 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
 #define TAG     1375
 #if MPI_POST_RECV
@@ -302,13 +301,24 @@ void (*signal_int)(int);
 static int _thread_provided = -1; /* Indicating MPI thread level */
 static int idleblock = 0;
 
 static int _thread_provided = -1; /* Indicating MPI thread level */
 static int idleblock = 0;
 
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+typedef struct crashedrank{
+  int rank;
+  struct crashedrank *next;
+} crashedRankList;
+CpvDeclare(crashedRankList *, crashedRankHdr);
+CpvDeclare(crashedRankList *, crashedRankPtr);
+int isRankDie(int rank);
+#endif
 /* A simple list for msgs that have been sent by MPI_Isend */
 typedef struct msg_list {
     char *msg;
     struct msg_list *next;
     int size, destpe, mode;
     MPI_Request req;
 /* A simple list for msgs that have been sent by MPI_Isend */
 typedef struct msg_list {
     char *msg;
     struct msg_list *next;
     int size, destpe, mode;
     MPI_Request req;
-    int dstrank;
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+    int dstrank; //used in fault tolerance protocol, if the destination is the died rank, delete the msg
+#endif
 } SMSG_LIST;
 
 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
 } SMSG_LIST;
 
 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
@@ -450,10 +460,10 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
 
 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
        dstrank = petorank[node];
 
 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
        dstrank = petorank[node];
+        smsg->dstrank = dstrank;
 #else
        dstrank=node;
 #endif
 #else
        dstrank=node;
 #endif
-  smsg->dstrank = dstrank;
     START_TRACE_SENDCOMM(msg)
     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
     START_TRACE_SENDCOMM(msg)
     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
@@ -515,11 +525,13 @@ static size_t CmiAllAsyncMsgsSent(void) {
         done = 0;
         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
         done = 0;
         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
-        if(msg_tmp->dstrank == CpvAccess(crashedRank)){
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+        if(isRankDie(msg_tmp->dstrank)){
           //CmiPrintf("[%d][%d] msg to crashed rank\n",CmiMyPartition(),CmiMyPe());
           //CmiAbort("unexpected send");
           done = 1;
         }
           //CmiPrintf("[%d][%d] msg to crashed rank\n",CmiMyPartition(),CmiMyPe());
           //CmiAbort("unexpected send");
           done = 1;
         }
+#endif
         if (!done)
             return 0;
         msg_tmp = msg_tmp->next;
         if (!done)
             return 0;
         msg_tmp = msg_tmp->next;
@@ -570,9 +582,11 @@ static void CmiReleaseSentMessages(void) {
 #endif
         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
 #endif
         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
-        if (msg_tmp->dstrank == CpvAccess(crashedRank)){
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+        if (isRankDie(msg_tmp->dstrank)){
           done = 1;
         }
           done = 1;
         }
+#endif
         if (done) {
             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
             CpvAccess(MsgQueueLen)--;
         if (done) {
             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
             CpvAccess(MsgQueueLen)--;
@@ -1113,10 +1127,10 @@ void CmiMachineProgressImpl() {
 /* ######Beginning of functions related with exiting programs###### */
 void LrtsDrainResources() {
 #if !CMK_SMP
 /* ######Beginning of functions related with exiting programs###### */
 void LrtsDrainResources() {
 #if !CMK_SMP
-//    while (!CmiAllAsyncMsgsSent()) {
-//        PumpMsgs();
-//        CmiReleaseSentMessages();
-//    }
+    while (!CmiAllAsyncMsgsSent()) {
+        PumpMsgs();
+        CmiReleaseSentMessages();
+    }
 #else
     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
         while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
 #else
     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
         while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
@@ -1380,12 +1394,12 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
 
     if (*myNodeID >= num_workpes) {    /* is spare processor */
       //if isomalloc_sync call mpi_barrier
 
     if (*myNodeID >= num_workpes) {    /* is spare processor */
       //if isomalloc_sync call mpi_barrier
-               if(CmiGetArgFlag(largv,"+isomalloc_sync")){
+      if(CmiGetArgFlag(largv,"+isomalloc_sync")){
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
           MPI_Barrier(charmComm);
-           }
+      }
          MPI_Status sts;
       int vals[2];
       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
          MPI_Status sts;
       int vals[2];
       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
@@ -1644,8 +1658,12 @@ void LrtsPreCommonInit(int everReturn) {
     waitIrecvListHead->next = NULL;
 #endif
 #endif
     waitIrecvListHead->next = NULL;
 #endif
 #endif
-    CpvInitialize(int, crashedRank);
-    CpvAccess(crashedRank) = -1;
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+    CpvInitialize(crashedRankList *, crashedRankHdr);
+    CpvInitialize(crashedRankList *, crashedRankPtr);
+    CpvAccess(crashedRankHdr) = NULL;
+    CpvAccess(crashedRankPtr) = NULL;
+#endif
 }
 
 void LrtsPostCommonInit(int everReturn) {
 }
 
 void LrtsPostCommonInit(int everReturn) {
@@ -1940,21 +1958,47 @@ void mpi_end_spare()
 
 int find_spare_mpirank(int pe,int partition)
 {
 
 int find_spare_mpirank(int pe,int partition)
 {
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
     if (nextrank == total_pes) {
       CmiAbort("Charm++> No spare processor available.");
     }
     if (nextrank == total_pes) {
       CmiAbort("Charm++> No spare processor available.");
     }
-       int newpe = CmiGetPeGlobal(pe,partition);
-       CpvAccess(crashedRank) = newpe;
+    int newpe = CmiGetPeGlobal(pe,partition);
+    crashedRankList * crashedRank= (crashedRankList *)(malloc(sizeof(crashedRankList)));
+    crashedRank->rank = newpe;
+    crashedRank->next=NULL;
+    if(CpvAccess(crashedRankHdr)==NULL){
+      CpvAccess(crashedRankHdr) = crashedRank;
+      CpvAccess(crashedRankPtr) = CpvAccess(crashedRankHdr);
+    }else{
+      CpvAccess(crashedRankPtr)->next = crashedRank;
+      CpvAccess(crashedRankPtr) = crashedRank;
+    }
     petorank[newpe] = nextrank;
     nextrank++;
     //CmiPrintf("[%d][%d]spare rank %d for pe %d\n",CmiMyPartition(),CmiMyPe(),nextrank-1,newpe);
     //fflush(stdout);
     return nextrank-1;
     petorank[newpe] = nextrank;
     nextrank++;
     //CmiPrintf("[%d][%d]spare rank %d for pe %d\n",CmiMyPartition(),CmiMyPe(),nextrank-1,newpe);
     //fflush(stdout);
     return nextrank-1;
+#endif
+}
+
+
+int isRankDie(int rank){
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+  crashedRankList * cur = CpvAccess(crashedRankHdr);
+  while(cur!=NULL){
+    if(rank == cur->rank){
+      return 1;
+    }
+    cur = cur->next;
+  }
+  return 0;
+#endif
 }
 
 }
 
+
 void CkDieNow()
 {
 void CkDieNow()
 {
-#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
     CmiPrintf("[%d] die now.\n", CmiMyPe());
     fflush(stdout);
       /* release old messages */
     CmiPrintf("[%d] die now.\n", CmiMyPe());
     fflush(stdout);
       /* release old messages */
@@ -1962,6 +2006,7 @@ void CkDieNow()
         PumpMsgs();
         CmiReleaseSentMessages();
     }
         PumpMsgs();
         CmiReleaseSentMessages();
     }
+    CmiPrintf("[%d] die now before barrier\n", CmiMyPe());
     MPI_Barrier(charmComm);
     MPI_Finalize();
     exit(0);
     MPI_Barrier(charmComm);
     MPI_Finalize();
     exit(0);