fix for migration
[charm.git] / src / arch / mpi / machine.c
index 58591905237aea9b818df0798ac949a9ff159b29..e6c19ece5309e8351988bc5646e48ccaf19aa58a 100644 (file)
@@ -301,12 +301,24 @@ void (*signal_int)(int);
 static int _thread_provided = -1; /* Indicating MPI thread level */
 static int idleblock = 0;
 
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+typedef struct crashedrank{
+  int rank;
+  struct crashedrank *next;
+} crashedRankList;
+CpvDeclare(crashedRankList *, crashedRankHdr);
+CpvDeclare(crashedRankList *, crashedRankPtr);
+int isRankDie(int rank);
+#endif
 /* A simple list for msgs that have been sent by MPI_Isend */
 typedef struct msg_list {
     char *msg;
     struct msg_list *next;
     int size, destpe, mode;
     MPI_Request req;
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+    int dstrank; //used in fault tolerance protocol, if the destination is the died rank, delete the msg
+#endif
 } SMSG_LIST;
 
 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
@@ -442,12 +454,13 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
         END_TRACE_SENDCOMM(msg);
     }
 #elif USE_MPI_CTRLMSG_SCHEME
-       sendViaCtrlMsg(node, size, msg, smsg);
+    sendViaCtrlMsg(node, size, msg, smsg);
 #else
 /* branch not using MPI_POST_RECV or USE_MPI_CTRLMSG_SCHEME */
 
 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
        dstrank = petorank[node];
+        smsg->dstrank = dstrank;
 #else
        dstrank=node;
 #endif
@@ -461,8 +474,9 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
     CpvAccess(MsgQueueLen)++;
     if (CpvAccess(sent_msgs)==0)
         CpvAccess(sent_msgs) = smsg;
-    else
+    else {
         CpvAccess(end_sent)->next = smsg;
+    }
     CpvAccess(end_sent) = smsg;
 
 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
@@ -478,15 +492,13 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
     return (CmiCommHandle) &(smsg->req);
 }
 
-CmiCommHandle LrtsSendFunc(int destPE, int size, char *msg, int mode) {
+CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode) {
     /* Ignoring the mode for MPI layer */
 
-    int destNode = CmiNodeOf(destPE);
     CmiState cs = CmiGetState();
     SMSG_LIST *msg_tmp;
-    int  rank;
 
-    CmiAssert(destNode != CmiMyNode());
+    CmiAssert(destNode != CmiMyNodeGlobal());
 #if CMK_SMP
     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
       EnqueueMsg(msg, size, destNode, mode);
@@ -513,6 +525,13 @@ static size_t CmiAllAsyncMsgsSent(void) {
         done = 0;
         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+        if(isRankDie(msg_tmp->dstrank)){
+          //CmiPrintf("[%d][%d] msg to crashed rank\n",CmiMyPartition(),CmiMyPe());
+          //CmiAbort("unexpected send");
+          done = 1;
+        }
+#endif
         if (!done)
             return 0;
         msg_tmp = msg_tmp->next;
@@ -563,6 +582,11 @@ static void CmiReleaseSentMessages(void) {
 #endif
         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+        if (isRankDie(msg_tmp->dstrank)){
+          done = 1;
+        }
+#endif
         if (done) {
             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
             CpvAccess(MsgQueueLen)--;
@@ -624,6 +648,7 @@ static int PumpMsgs(void) {
 #if USE_MPI_CTRLMSG_SCHEME
        doSyncRecv = 0;
        nbytes = recvViaCtrlMsg();
+  recd = 1;
        if(nbytes == -1) break;
 #elif MPI_POST_RECV
                /* First check posted recvs then do  probe unmatched outstanding messages */
@@ -841,7 +866,6 @@ static int PumpMsgs(void) {
     MPI_Status sts;
     while(waitIrecvListHead->next) {
         IRecvList irecvEnt = waitIrecvListHead->next;
-
         START_EVENT();
                 
         /*printf("PE[%d]: check irecv entry=%p\n", CmiMyPe(), irecvEnt);*/
@@ -855,7 +879,7 @@ static int PumpMsgs(void) {
         handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
         waitIrecvListHead->next = irecvEnt->next;
         irecvListEntryFree(irecvEnt);
-        recd = 1;        
+        //recd = 1;        
     }
     if(waitIrecvListHead->next == NULL)
         waitIrecvListTail = waitIrecvListHead;
@@ -1121,7 +1145,9 @@ void LrtsDrainResources() {
     }
 #endif
 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
-    if (CmiMyPe() == 0) mpi_end_spare();
+    if (CmiMyPe() == 0&&CmiMyPartition()==0){ 
+      mpi_end_spare();
+    }
 #endif
     MACHSTATE(2, "Machine exit barrier begin {");
     START_EVENT();
@@ -1351,7 +1377,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
         }
     }
 
-
+    setbuf(stdout, NULL);
 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
        CmiAssert(num_workpes <= *numNodes);
@@ -1367,11 +1393,19 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
     nextrank = num_workpes;
 
     if (*myNodeID >= num_workpes) {    /* is spare processor */
-      MPI_Status sts;
+      //if isomalloc_sync call mpi_barrier
+      if(CmiGetArgFlag(largv,"+isomalloc_sync")){
+          MPI_Barrier(charmComm);
+          MPI_Barrier(charmComm);
+          MPI_Barrier(charmComm);
+          MPI_Barrier(charmComm);
+      }
+         MPI_Status sts;
       int vals[2];
       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
       int newpe = vals[0];
       CpvAccess(_curRestartPhase) = vals[1];
+      CmiPrintf("Charm++> Spare MPI rank %d is activated for global PE %d phase %d.\n", *myNodeID, newpe,CpvAccess(_curRestartPhase));
 
       if (newpe == -1) {
           MPI_Barrier(charmComm);
@@ -1379,7 +1413,6 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
           exit(0);
       }
 
-      CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
         /* update petorank */
       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
       nextrank = *myNodeID + 1;
@@ -1398,6 +1431,10 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
                 i++;
       }
       restart_argv[i] = "+restartaftercrash";
+         if(CmiGetArgFlagDesc(largv,"+isomalloc_sync","synchronize isomalloc region globaly")){
+               i++;
+       restart_argv[i] = "+restartisomalloc";
+         }
       phase_str = (char*)malloc(10);
       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
       restart_argv[i+1]=phase_str;
@@ -1621,6 +1658,12 @@ void LrtsPreCommonInit(int everReturn) {
     waitIrecvListHead->next = NULL;
 #endif
 #endif
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+    CpvInitialize(crashedRankList *, crashedRankHdr);
+    CpvInitialize(crashedRankList *, crashedRankPtr);
+    CpvAccess(crashedRankHdr) = NULL;
+    CpvAccess(crashedRankPtr) = NULL;
+#endif
 }
 
 void LrtsPostCommonInit(int everReturn) {
@@ -1849,7 +1892,6 @@ int CmiBarrier() {
          *  and END_EVENT are disabled here. -Chao Mei
          */
         /*START_EVENT();*/
-
         if (MPI_SUCCESS != MPI_Barrier(charmComm))
             CmiAbort("Timernit: MPI_Barrier failed!\n");
 
@@ -1873,7 +1915,6 @@ int CmiBarrierZero() {
         if (CmiMyNode() == 0)  {
             for (i=0; i<CmiNumNodes()-1; i++) {
                 START_EVENT();
-
                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, charmComm,&sts))
                     CmiPrintf("MPI_Recv failed!\n");
 
@@ -1898,7 +1939,7 @@ int CmiBarrierZero() {
 void mpi_restart_crashed(int pe, int rank)
 {
     int vals[2];
-    vals[0] = pe;
+    vals[0] = CmiGetPeGlobal(pe,CmiMyPartition());
     vals[1] = CpvAccess(_curRestartPhase)+1;
     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
@@ -1910,30 +1951,62 @@ void mpi_end_spare()
     int i;
     for (i=nextrank; i<total_pes; i++) {
         int vals[2] = {-1,-1};
+        CmiPrintf("end spare send to rank %d\n",i);
         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,charmComm);
     }
 }
 
-int find_spare_mpirank(int pe)
+int find_spare_mpirank(int pe,int partition)
 {
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
     if (nextrank == total_pes) {
       CmiAbort("Charm++> No spare processor available.");
     }
-    petorank[pe] = nextrank;
+    int newpe = CmiGetPeGlobal(pe,partition);
+    crashedRankList * crashedRank= (crashedRankList *)(malloc(sizeof(crashedRankList)));
+    crashedRank->rank = newpe;
+    crashedRank->next=NULL;
+    if(CpvAccess(crashedRankHdr)==NULL){
+      CpvAccess(crashedRankHdr) = crashedRank;
+      CpvAccess(crashedRankPtr) = CpvAccess(crashedRankHdr);
+    }else{
+      CpvAccess(crashedRankPtr)->next = crashedRank;
+      CpvAccess(crashedRankPtr) = crashedRank;
+    }
+    petorank[newpe] = nextrank;
     nextrank++;
+    //CmiPrintf("[%d][%d]spare rank %d for pe %d\n",CmiMyPartition(),CmiMyPe(),nextrank-1,newpe);
+    //fflush(stdout);
     return nextrank-1;
+#endif
 }
 
+
+int isRankDie(int rank){
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+  crashedRankList * cur = CpvAccess(crashedRankHdr);
+  while(cur!=NULL){
+    if(rank == cur->rank){
+      return 1;
+    }
+    cur = cur->next;
+  }
+  return 0;
+#endif
+}
+
+
 void CkDieNow()
 {
-#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
     CmiPrintf("[%d] die now.\n", CmiMyPe());
-
+    fflush(stdout);
       /* release old messages */
     while (!CmiAllAsyncMsgsSent()) {
         PumpMsgs();
         CmiReleaseSentMessages();
     }
+    CmiPrintf("[%d] die now before barrier\n", CmiMyPe());
     MPI_Barrier(charmComm);
     MPI_Finalize();
     exit(0);