artificial notify failure
authorXiang Ni <xiangni2@illinois.edu>
Sun, 7 Apr 2013 19:32:43 +0000 (14:32 -0500)
committerXiang Ni <xiangni2@illinois.edu>
Sun, 7 Apr 2013 19:32:43 +0000 (14:32 -0500)
src/arch/mpi/machine.c
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.h

index 8fdae56915609dffd5e06fcf4710c7ef2643d7e4..eae1cedb0d5ae973bd9c57b1c29996affe948059 100644 (file)
@@ -2004,10 +2004,10 @@ void CkDieNow()
     CmiPrintf("[%d][%d] die now.\n",CmiMyPartition(), CmiMyPe());
     fflush(stdout);
       /* release old messages */
-    while (!CmiAllAsyncMsgsSent()) {
-        PumpMsgs();
-        CmiReleaseSentMessages();
-    }
+    //while (!CmiAllAsyncMsgsSent()) {
+    //    PumpMsgs();
+    //    CmiReleaseSentMessages();
+    //}
     MPI_Barrier(charmComm);
     MPI_Finalize();
     exit(0);
index e9f5ba766228248a15ada0804cbba07bcaa6aefa..3be7319ec988bb918c8936015c2eee6dd106b55b 100644 (file)
@@ -80,6 +80,7 @@ CpvDeclare(int, _remoteCrashedNode);
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
 int CkMemCheckPT::inCheckpointing = 0;
+int CkMemCheckPT::aboutToDie = 0;
 int CkMemCheckPT::replicaAlive = 1;
 int CkMemCheckPT::inLoadbalancing = 0;
 double CkMemCheckPT::startTime;
@@ -112,6 +113,8 @@ double s_alpha;
 // variable for storing the killing time
 double killTime=0.0;
 extern void killLocal(void *_dummy,double curWallTime);
+extern void sendKillNotify(void *_dummy,double curWallTime);
+extern void verifyDeadth(void *_dummy,double curWallTime);
 extern void injectSoftFailure(void *_dummy,double curWallTime);
 #endif
 
@@ -175,6 +178,9 @@ static int replicaBeginFailureInjectionHandlerIdx;
 static int recoverRemoteProcDataHandlerIdx;
 static int recoverRemoteArrayDataHandlerIdx;
 static int notifyHandlerIdx;
+static int replicaDyingNotifyHandlerIdx;
+static int replicaDeadNotifyHandlerIdx;
+static int replicaDyingBroadcastHandlerIdx;
 // compute the backup processor
 // FIXME: avoid crashed processors
 #if CMK_CONVERSE_MPI
@@ -402,8 +408,8 @@ CkMemCheckPT::CkMemCheckPT(int w)
 #if CMK_CONVERSE_MPI
   void pingBuddy();
   void pingCheckHandler();
-  CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-  CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+  //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+  //CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
   chkpTable[0] = NULL;
   chkpTable[1] = NULL;
@@ -411,6 +417,7 @@ CkMemCheckPT::CkMemCheckPT(int w)
   recvIterCount = 0;
   localDecided = false;
   softFailureInjected = false;
+  chkpCount=0;
   if(killFlag == 2){
     localSeed = failureSeed;
     softLocalSeed = failureSeed*2;
@@ -421,14 +428,53 @@ CkMemCheckPT::CkMemCheckPT(int w)
     if(SMTBF!=-1 && CmiMyPartition()==1 && CkMyPe()==0){
       thisProxy[CkMyPe()].generateSoftFailure();
     }
+  }
+}
 
+void sendKillNotify(void *_dummy,double curWallTime){
+  if(CkInCheckpointing()||CpvAccess(localStarted)==1||CkInRestarting()){
+    //in checkpointing or restart, delaying sending the notify
+    CcdCallFnAfter(sendKillNotify,NULL,500);
+  }else{
+    CkMemCheckPT::aboutToDie =  1;
+    //send the notify to my replica, so my replica won't communicate with me until the phase is clear
+    //char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+    //CmiSetHandler(msg, replicaDyingNotifyHandlerIdx);
+    //CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
+    char * msg1 = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+    CmiSetHandler(msg1, replicaDyingNotifyHandlerIdx);
+    CmiRemoteSyncSendAndFree(1,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg1);
+    
+    //now it can die
+    double sec = 0.001;
+    if(CmiWallTimer()<killTime){
+      sec=killTime-CmiWallTimer();  
+    }
+    CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+    checkptMgr[1].killAfter(sec);
+    
+    sec +=1;
+    CcdCallFnAfter(verifyDeadth,NULL,sec*1000); 
   }
 }
 
-void CkMemCheckPT::replicaInjectFailure(){
-  char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-  CmiSetHandler(msg, replicaBeginFailureInjectionHandlerIdx);
-  CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(unsigned int),msg);
+void replicaDyingNotify(char * msg){
+  CkMemCheckPT::aboutToDie =  1;
+  CmiFree(msg);
+  //char * rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+  //CmiSetHandler(rmsg, replicaDyingBroadcastHandlerIdx);
+  //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)rmsg);
+  //then norify everyone
+}
+
+void replicaDyingBroadcast(char * msg){
+  CkMemCheckPT::aboutToDie =  1;
+  CmiFree(msg);
+}
+
+void replicaDeadNotify(char * msg){
+  CkMemCheckPT::aboutToDie =  0;
+  CmiFree(msg);
 }
 
 void CkMemCheckPT::generateFailure(){
@@ -438,13 +484,14 @@ void CkMemCheckPT::generateFailure(){
     sec = -log(1.0f - ((double)rand3)/(long long int)(RAND_MAX))*MTBF;
   else if(strcmp(failureDist,"W")==0)
     sec = alpha*pow(-log(1.0f - ((double)rand3)/(long long int)(RAND_MAX)),1/beta);
-  thisProxy[1].killAfter(sec);
+  killTime = CmiWallTimer()+sec;
+  printf("[%d][%d] inject hard failure after %.6lf s (MEMCKPT)\n",CmiMyPartition(),CkMyPe(),sec);
+  CcdCallFnAfter(sendKillNotify,NULL,(sec-1)*1000);
 }
 
 void CkMemCheckPT::killAfter(double sec){
   killTime = CmiWallTimer()+sec;
-  printf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
+  CkPrintf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
   CcdCallFnAfter(killLocal,NULL,sec*1000);
 }
 
@@ -481,13 +528,16 @@ void CkMemCheckPT::pup(PUP::er& p)
   p|peCount;
   p|localSeed;
   p|softLocalSeed;
+  p|chkpCount;
+  p|lastChkpTime;
+  p|chkpPeriod;
   if (p.isUnpacking()) {
     recvCount = 0;
 #if CMK_CONVERSE_MPI
     void pingBuddy();
     void pingCheckHandler();
-    CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-    CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
//   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
//   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
     maxIter = -1;
     recvIterCount = 0;
@@ -539,7 +589,7 @@ void CkMemCheckPT::startChkp(){
   if(CkInCheckpointing()){
     return;
   }
-  CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
+  CkPrintf("[%d][%d]start checkpoint at %lf in %lf\n",CmiMyPartition(), CkMyPe(),CmiWallTimer(),CmiWallTimer()-startTime);
   CkStartMemCheckpoint(cpCallback);
 }
 
@@ -961,9 +1011,13 @@ void CkMemCheckPT::doneBothComparison(){
   inCheckpointing = 0;
   notifyReplica = 0;
   if(CkMyPe() == 0){
-    CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
+    CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, memory usage %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size, CmiMemoryUsage()/1048576.0);
   }
   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
+  if(chkpCount!=0)
+    chkpPeriod = CmiWallTimer()-lastChkpTime;
+  chkpCount++;
+  lastChkpTime = CmiWallTimer();
 }
 
 void CkMemCheckPT::RollBack(){
@@ -1612,7 +1666,7 @@ void CkMemCheckPT::RollBack(){
       }
       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
         lastPingTime = CmiWallTimer();
-        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+//        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
       }
       //inject next failure
       if(killFlag==2){
@@ -1678,6 +1732,9 @@ void CkMemCheckPT::RollBack(){
     void CkStartMemCheckpoint(CkCallback &cb)
     {
 #if CMK_MEM_CHECKPOINT
+      //only not letting the dying partition continue checkpoint
+      if(CkMemCheckPT::aboutToDie&&CmiMyPartition()==0)
+       return;
       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
       /*if (_memChkptOn == 0) {
         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
@@ -1694,9 +1751,9 @@ void CkMemCheckPT::RollBack(){
       // store user callback and user data
       CkMemCheckPT::cpCallback = cb;
 
-
       //send to my replica that checkpoint begins 
       if(CkReplicaAlive()==1){
+        CkPrintf("[%d][%d]send checkpoint start notification to my partition\n",CmiMyPartition(), CkMyPe());
         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
@@ -1705,7 +1762,7 @@ void CkMemCheckPT::RollBack(){
       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
       checkptMgr.chkpLocalStart();
       // broadcast to start check pointing
-      if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
+      if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||(CkReplicaAlive()==0)){
         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
         checkptMgr.doItNow(CkMyPe());
       }
@@ -2125,6 +2182,18 @@ void CkMemCheckPT::RollBack(){
         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+       if(CkMemCheckPT::aboutToDie&&CpvAccess(localStarted)){
+          CkPrintf("[%d][%d]send checkpoint start notification to my partition\n",CmiMyPartition(), CkMyPe());
+         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
+         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
+       }
+       {
+          CkMemCheckPT::aboutToDie = 0;
+          //char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          //CmiSetHandler(msg, replicaDeadNotifyHandlerIdx);
+          //CmiRemoteSyncSendAndFree(0,CmiMyPartition(),CmiMsgHeaderSizeBytes,msg);
+        }
       }
     }
     // called on crashed processor
@@ -2408,6 +2477,7 @@ void CkMemCheckPT::RollBack(){
 #if CMK_MEM_CHECKPOINT
       // notify
       CkMemCheckPT::inRestarting = 1;
+      CkMemCheckPT::aboutToDie =  0;
       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
       notify_crash(diepe);
       // send message to crash pe to let it restart
@@ -2491,6 +2561,9 @@ void CkMemCheckPT::RollBack(){
         //for replica
         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
+        replicaDyingNotifyHandlerIdx = CkRegisterHandler((CmiHandler)replicaDyingNotify);
+        replicaDeadNotifyHandlerIdx = CkRegisterHandler((CmiHandler)replicaDeadNotify);
+        replicaDyingBroadcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDyingBroadcast);
         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
@@ -2583,15 +2656,8 @@ void CkMemCheckPT::RollBack(){
           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
         }else{ 
 #if CMK_CONVERSE_MPI
-          if(!CkInCheckpointing()&&!CkInRestarting()){
-            printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
-            CkDieNow();
-          }else{
-            if(killFlag == 2){
-              //delay for 2s 
-              CcdCallFnAfter(killLocal,NULL,2*1000);        
-            }
-          }
+          printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
+         CkDieNow();
 #else 
           kill(getpid(),SIGKILL);                                               
 #endif
@@ -2602,6 +2668,21 @@ void CkMemCheckPT::RollBack(){
         CmiAbort("kill() not supported!");
       }
 #endif
+
+      void verifyDeadth(void * _dummy, double curWallTime){
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = 1;
+        CmiSetHandler(msg, buddyDieHandlerIdx);
+        CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        //send to everyone in the other world
+        if(CmiNumPartition()!=1){
+          char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(rMsg+CmiMsgHeaderSizeBytes) = 1;
+          CmiSetHandler(rMsg, replicaDieHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+       }
+      }
+
       void injectSoftFailure(void *_dummy,double curWallTime){
         if(!CkInCheckpointing()&&!CkInRestarting()){
           CkPrintf("soft failure injected\n");
index b2e6346f8211fea404be251990059f0b7ee126a9..3724f0b015421ce5b7d30c34029419e762abb614 100644 (file)
@@ -121,6 +121,7 @@ public:
 
   static int inRestarting;
   static int inCheckpointing;
+  static int aboutToDie;
   static int inLoadbalancing;
   static int replicaAlive;
   static double startTime;
@@ -132,6 +133,9 @@ public:
 
   int notifyReplica;
   bool softFailureInjected;
+  double lastChkpTime;
+  int chkpCount;
+  double chkpPeriod;
 private:
   CkVec<CkCheckPTInfo *> ckTable;
   CkArrayCheckPTMessage * chkpTable[2];