only one replica send checkpoints
authorNikhil Jain <nikhil@illinois.edu>
Fri, 11 Jan 2013 21:41:26 +0000 (15:41 -0600)
committerNikhil Jain <nikhil@illinois.edu>
Fri, 11 Jan 2013 21:41:26 +0000 (15:41 -0600)
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h

index d4798bec50ae555f4ddd3032fe7683a95ff9f840..160c6f3af016d414a89aaaee4ecbc5afd19ff581 100644 (file)
@@ -131,6 +131,8 @@ CpvDeclare(int, recvdRemote);
 CpvDeclare(int, recvdLocal);
 CpvDeclare(int, localChkpDone);
 CpvDeclare(int, remoteChkpDone);
+CpvDeclare(int, remoteStarted);
+CpvDeclare(int, localStarted);
 CpvDeclare(int, recvdArrayChkp);
 CpvDeclare(int, recvdProcChkp);
 CpvDeclare(int, localChecksum);
@@ -148,6 +150,7 @@ static int recoverProcDataHandlerIdx;
 static int restartBeginHandlerIdx;
 static int recvRemoteChkpHandlerIdx;
 static int replicaDieHandlerIdx;
+static int replicaChkpStartHandlerIdx;
 static int replicaDieBcastHandlerIdx;
 static int replicaRecoverHandlerIdx;
 static int replicaChkpDoneHandlerIdx;
@@ -565,10 +568,11 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 
 // loop through my checkpoint table and ask checkpointed array elements
 // to send me checkpoint data.
-void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+//void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+void CkMemCheckPT::doItNow(int starter)
 {
   checkpointed = 1;
-  cpCallback = cb;
+  //cpCallback = cb;
   cpStarter = starter;
   inCheckpointing = 1;
   if (CkMyPe() == cpStarter) {
@@ -729,16 +733,26 @@ void CkMemCheckPT::startCheckpoint(){
       CpvAccess(localChecksum) = checksum;
       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
-      CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
-      CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
+      //only one reaplica will send
+      if(CmiMyPartition()==0){
+        CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
+      }
     }else{
       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
       CkPackMessage(&env);
-      CmiSetHandler(env,recvRemoteChkpHandlerIdx);
-      CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+      if(CmiMyPartition()==0){
+        CmiSetHandler(env,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+      }
+    }
+    if(CmiMyPartition()==0){
+      notifyReplica = 1;
+      thisProxy[CkMyPe()].doneComparison(true);
     }
   }
   if(CpvAccess(recvdRemote)==1){
+    //only partition 1 will do it
     //compare the checkpoint 
     int size = CpvAccess(chkpBuf)[pointer]->len;
     if(CpvAccess(use_checksum)){
@@ -818,9 +832,10 @@ void CkMemCheckPT::doneRComparison(int ret){
     }
     if(notifyReplica == 0){
       //notify the replica am done
-      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
-      CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+      CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
       notifyReplica = 1;
     }
  // }
@@ -836,6 +851,8 @@ void CkMemCheckPT::doneBothComparison(){
   CpvAccess(recvdLocal) = 0;
   CpvAccess(localChkpDone) = 0;
   CpvAccess(remoteChkpDone) = 0;
+  CpvAccess(remoteStarted) = 0;
+  CpvAccess(localStarted) = 0;
   CpvAccess(_remoteCrashedNode) = -1;
   CkMemCheckPT::replicaAlive = 1;
   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
@@ -1536,9 +1553,19 @@ void CkMemCheckPT::RollBack(){
       // store user callback and user data
       CkMemCheckPT::cpCallback = cb;
 
+
+      //send to my replica that checkpoint begins 
+      if(CkReplicaAlive()==1){
+        char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(msg, replicaChkpStartHandlerIdx);
+        CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
+      }
+      CpvAccess(localStarted) = 1;
       // broadcast to start check pointing
-      CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-      checkptMgr.doItNow(CkMyPe(), cb);
+      if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(CkMyPe());
+      }
 #else
       // when mem checkpoint is disabled, invike cb immediately
       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
@@ -1686,7 +1713,7 @@ void CkMemCheckPT::RollBack(){
     }
 
     static void recvRemoteChkpHandler(char *msg){
-//#if CMK_USE_CHECKSUM
+      CpvAccess(remoteChkpDone) = 1;
       if(CpvAccess(use_checksum)){
         if(CkMyPe()==0)
           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
@@ -1701,7 +1728,6 @@ void CkMemCheckPT::RollBack(){
           }
         }
       }else{
-//#else
         envelope *env = (envelope *)msg;
         CkUnpackMessage(&env);
         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
@@ -1723,7 +1749,6 @@ void CkMemCheckPT::RollBack(){
           CpvAccess(buddyBuf) = chkpMsg;
         }
       }
-//#endif
     }
 
     static void replicaRecoverHandler(char *msg){
@@ -1737,8 +1762,9 @@ void CkMemCheckPT::RollBack(){
 
     static void replicaChkpDoneHandler(char *msg){
       CpvAccess(remoteChkpDone) = 1;
+      int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
       if(CpvAccess(localChkpDone) == 1)
-        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
       CmiFree(msg);
     }
 
@@ -1750,6 +1776,14 @@ void CkMemCheckPT::RollBack(){
 #endif
     }
 
+    static void replicaChkpStartHandler(char * msg){
+      CpvAccess(remoteStarted) =1;
+      if(CpvAccess(localStarted)==1){    
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(0);
+      }
+    }
+
 
     static void replicaDieBcastHandler(char *msg){
       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
@@ -2181,6 +2215,7 @@ void CkMemCheckPT::RollBack(){
         //for replica
         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
+        replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
@@ -2203,6 +2238,8 @@ void CkMemCheckPT::RollBack(){
         CpvInitialize(int,recvdLocal);
         CpvInitialize(int,localChkpDone);
         CpvInitialize(int,remoteChkpDone);
+        CpvInitialize(int,remoteStarted);
+        CpvInitialize(int,localStarted);
         CpvInitialize(int,recvdRemote);
         CpvInitialize(int,recvdProcChkp);
         CpvInitialize(int,localChecksum);
@@ -2222,6 +2259,8 @@ void CkMemCheckPT::RollBack(){
         CpvAccess(recvdLocal) = 0;
         CpvAccess(localChkpDone) = 0;
         CpvAccess(remoteChkpDone) = 0;
+        CpvAccess(remoteStarted) = 0;
+        CpvAccess(localStarted) = 0;
         CpvAccess(recvdRemote) = 0;
         CpvAccess(recvdProcChkp) = 0;
         CpvAccess(localChecksum) = 0;
index 8e1ca759a88787673a65c08ca0b1308907f6b567..031dd9aaf0b23cf48533336625037a894985d862 100644 (file)
@@ -19,7 +19,8 @@ module CkMemCheckpoint {
         entry CkMemCheckPT(int w);
        entry void createEntry(CkArrayID, CkGroupID, CkArrayIndex, int);
        // checkpointing
-        entry [expedited] void doItNow(int spe, CkCallback &);  //checkpointing
+       // entry [expedited] void doItNow(int spe, CkCallback &);  //checkpointing
+        entry [expedited] void doItNow(int spe);  //checkpointing
        entry void recvData(CkArrayCheckPTMessage *);
        entry void recvArrayCheckpoint(CkArrayCheckPTMessage *);
        entry void gotData();
index c34d7a54898e57df0857a117b233c82c860e2a3e..695ab9294dac5f3c35b588fb404a9d6ea07a2882 100644 (file)
@@ -70,7 +70,8 @@ public:
   virtual ~CkMemCheckPT();
   void pup(PUP::er& p);
   inline int BuddyPE(int pe);
-  void doItNow(int sp, CkCallback &);
+  //void doItNow(int sp, CkCallback &);
+  void doItNow(int sp);
   void restart(int diePe);
   void removeArrayElements();
   void createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy);