fix the bug regarding redmine 257
authorXiang Ni <xiangni2@illinois.edu>
Wed, 28 Aug 2013 22:14:34 +0000 (17:14 -0500)
committerXiang Ni <xiangni2@illinois.edu>
Wed, 28 Aug 2013 22:14:34 +0000 (17:14 -0500)
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.h

index 1866c26da0f12b0addc1d99e7f2a6bedd5be954a..2ec6909dc5a912e0fc20b958f89d11a7b8b23f15 100644 (file)
@@ -348,8 +348,17 @@ CkMemCheckPT::CkMemCheckPT(int w)
   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
-       chkpTable[0] = NULL;
-       chkpTable[1] = NULL;
+#if CMK_CHKP_ALL
+  initEntry();
+#endif        
+}
+
+void CkMemCheckPT::initEntry()
+{
+#if CMK_CHKP_ALL
+  chkpTable[0].init(where, 0);
+  chkpTable[1].init(where, 1);
+#endif 
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -479,10 +488,9 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
     // ack
   thisProxy[buddy].gotData();
 #else
-  chkpTable[0] = NULL;
-  chkpTable[1] = NULL;
-  recvArrayCheckpoint(msg);
+  initEntry();
   thisProxy[msg->bud2].gotData();
+  recvArrayCheckpoint(msg);
 #endif
 }
 
@@ -572,9 +580,8 @@ void CkMemCheckPT::startArrayCheckpoint(){
                pupAllElements(p);
        }
        thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
-    if (chkpTable[0]) delete chkpTable[0];
-       chkpTable[0] = msg;
-       recvCount++;
+       chkpTable[0].updateBuffer(msg);
+        recvCount++;
 #endif
 }
 
@@ -586,10 +593,7 @@ void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
                idx = 0;
        }
        int isChkpting = msg->cp_flag;
-    if (isChkpting == 1){
-               if (chkpTable[idx]) delete chkpTable[idx];
-       }
-       chkpTable[idx] = msg;
+       chkpTable[idx].updateBuffer(msg);
        if(isChkpting){
                recvCount++;
                if(recvCount == 2){
@@ -949,14 +953,14 @@ void CkMemCheckPT::recoverBuddies()
   }
 #else
   //send to failed pe
-  if(CkMyPe()!=thisFailedPe&&chkpTable[1]!=NULL&&chkpTable[1]->bud1==thisFailedPe){
+  if(CkMyPe()!=thisFailedPe&&chkpTable[1].bud1==thisFailedPe){
 #if CK_NO_PROC_POOL
       // find a new buddy
       int budPe = BuddyPE(CkMyPe());
 #else
       int budPe = thisFailedPe;
 #endif
-      CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
+      CkArrayCheckPTMessage *msg = chkpTable[1].getCopy();
       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
          msg->cp_flag = 0;            // not checkpointing
       msg->bud1 = budPe;
@@ -1052,7 +1056,7 @@ void CkMemCheckPT::recoverArrayElements()
     count ++;
   }
 #else
-       CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+       CkArrayCheckPTMessage * msg = chkpTable[0].getCopy();
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
        recoverAll(msg,gmap,imap);
 #else
index 9c8609793a9d7397be1a7f6eada155a514428b19..8d66a61f74961899fa51ab0da98905b4ca929913 100644 (file)
@@ -4,7 +4,6 @@
 #include "CkMemCheckpoint.decl.h"
 
 extern CkGroupID ckCheckPTGroupID;
-
 class CkArrayCheckPTReqMessage: public CMessage_CkArrayCheckPTReqMessage {
 public: 
   CkArrayCheckPTReqMessage()  {}
@@ -21,13 +20,6 @@ public:
        int cp_flag;          // 1: from checkpoint 0: from recover
 };
 
-class CkCheckPTMessage: public CMessage_CkArrayCheckPTMessage {
-public:
-       double *packData;
-       int bud1, bud2;
-       int len;
-       int cp_flag;          // 1: from checkpoint 0: from recover
-};
 
 class CkProcCheckPTMessage: public CMessage_CkProcCheckPTMessage {
 public:
@@ -62,6 +54,84 @@ public:
 #define CkCheckPoint_inMEM   1
 #define CkCheckPoint_inDISK  2
 
+class CkCheckPTEntry{
+  CkArrayCheckPTMessage *data;
+  char * fname;
+public:
+  int bud1, bud2;
+  int where;
+  void init(int _where, int idx)
+  {
+    data = NULL;
+    where = _where;
+    if(where == CkCheckPoint_inDISK)
+    {
+#if CMK_USE_MKSTEMP
+      fname = new char[64];
+#if CMK_CONVERSE_MPI
+      sprintf(fname, "/tmp/ckpt%d-%d-%d-XXXXXX",CmiMyPartition(), CkMyPe(), idx);
+#else
+      sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), idx);
+#endif
+      mkstemp(fname);
+#else
+      fname=tmpnam(NULL);
+#endif
+    }
+  }
+
+  void updateBuffer(CkArrayCheckPTMessage * msg)
+  {
+    if(where == CkCheckPoint_inDISK)
+    {
+      envelope *env = UsrToEnv(msg);
+      CkUnpackMessage(&env);
+      data = (CkArrayCheckPTMessage *)EnvToUsr(env);
+      FILE *f = fopen(fname,"wb");
+      PUP::toDisk p(f);
+      CkPupMessage(p, (void **)&msg);
+      // delay sync to the end because otherwise the messages are blocked
+  //    fsync(fileno(f));
+      fclose(f);
+      bud1 = msg->bud1;
+      bud2 = msg->bud2;
+      delete msg;
+    }else
+    {
+      CmiAssert(where == CkCheckPoint_inMEM);
+      CmiAssert(msg!=NULL);
+      if (data) delete data;
+      data = msg;
+      bud1 = msg->bud1;
+      bud2 = msg->bud2;
+    }
+  }
+  
+  CkArrayCheckPTMessage * getCopy()
+  {
+    if(where == CkCheckPoint_inDISK)
+    {
+      CkArrayCheckPTMessage *msg;
+      FILE *f = fopen(fname,"rb");
+      PUP::fromDisk p(f);
+      CkPupMessage(p, (void **)&msg);
+      fclose(f);
+      msg->bud1 = bud1;                                // update the buddies
+      msg->bud2 = bud2;
+      return msg;
+    }else
+    {
+      CmiAssert(where == CkCheckPoint_inMEM);
+      if (data == NULL) {
+        CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
+        CmiAbort("Abort!");
+      }
+      return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&data);
+    }
+  }
+};
+
+
 class CkMemCheckPT: public CBase_CkMemCheckPT {
 public:
   CkMemCheckPT(int w);
@@ -104,7 +174,7 @@ public:
   static char*  stage;
 private:
   CkVec<CkCheckPTInfo *> ckTable;
-  CkArrayCheckPTMessage * chkpTable[2];
+  CkCheckPTEntry chkpTable[2];
 
   int recvCount, peCount;
   int expectCount, ackCount;
@@ -116,6 +186,7 @@ private:
     /// to use memory or disk checkpointing
   int    where;
 private:
+  void initEntry();
   inline int isMaster(int pe);
 
   void failed(int pe);