init forward path
authorXiang Ni <xiangni2@illinois.edu>
Tue, 4 Dec 2012 18:46:26 +0000 (12:46 -0600)
committerXiang Ni <xiangni2@illinois.edu>
Tue, 4 Dec 2012 18:46:26 +0000 (12:46 -0600)
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h
src/conv-core/converse.h

index 2397fc0f144e5ec75f60b76f882de45e3ab234b7..c390fa45a6d3b938844259f534463aa9be3d2d95 100644 (file)
@@ -114,7 +114,26 @@ double killTime=0.0;
 
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
-
+CpvDeclare(CkCheckPTMessage**, chkpBuf);
+//store the checkpoint of the buddy to compare
+//do not need the whole msg, can be the checksum
+CpvDeclare(CkCheckPTMessage*, buddyBuf);
+//pointer of the checkpoint going to be written
+CpvDeclare(int, curPointer);
+CpvDeclare(int, recvdRemote);
+CpvDeclare(int, recvdLocal);
+
+bool compare(char * buf1, char * buf2);
+static inline void _handleProcData(PUP::er &p);
+// Converse function handles
+static int askPhaseHandlerIdx;
+static int recvPhaseHandlerIdx;
+static int askProcDataHandlerIdx;
+static int restartBcastHandlerIdx;
+static int recoverProcDataHandlerIdx;
+static int restartBeginHandlerIdx;
+static int recvRemoteChkpHandlerIdx;
+static int notifyHandlerIdx;
 // compute the backup processor
 // FIXME: avoid crashed processors
 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
@@ -332,6 +351,8 @@ CkMemCheckPT::CkMemCheckPT(int w)
   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
+  chkpTable[0] = NULL;
+  chkpTable[1] = NULL;
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -458,6 +479,8 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
     // ack
   thisProxy[buddy].gotData();
 #else
+  chkpTable[0] = NULL;
+  chkpTable[1] = NULL;
   recvArrayCheckpoint(msg);
   thisProxy[msg->bud2].gotData();
 #endif
@@ -490,10 +513,11 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
     // if my table is empty, then I am done
   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
 #else
-  startArrayCheckpoint();
+  startCheckpoint();
+  //startArrayCheckpoint();
 #endif
   // pack and send proc level data
-  sendProcData();
+  //sendProcData();
 }
 
 class MemElementPacker : public CkLocIterator{
@@ -548,11 +572,73 @@ void CkMemCheckPT::startArrayCheckpoint(){
                pupAllElements(p);
        }
        thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
+       if(chkpTable[0]) delete chkpTable[0];
        chkpTable[0] = msg;
+       //send the checkpoint to my 
        recvCount++;
 #endif
 }
 
+void CkMemCheckPT::startCheckpoint(){
+#if CMK_CHKP_ALL
+       int size;
+       {
+               PUP::sizer psizer;
+               pupAllElements(psizer);
+               _handleProcData(psizer);
+               size = psizer.size();
+       }
+       int packSize = size/sizeof(double)+1;
+ // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
+       CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
+       msg->len = size;
+       msg->cp_flag = 1;
+       int budPEs[2];
+       //msg->bud1=CkMyPe();
+       //msg->bud2=ChkptOnPe(CkMyPe());
+       {
+               PUP::toMem p(msg->packData);
+               pupAllElements(p);
+               _handleProcData(p);
+       }
+       int pointer = CpvAccess(curPointer);
+       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
+               CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
+       CpvAccess(recvdLocal) = 1;
+
+       envelope * env = (envelope *)(UsrToEnv(msg));
+       CkPackMessage(&env);
+       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
+       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+       
+       if(CpvAccess(recvdRemote)==1){
+               //compare the checkpoint 
+         int size = CpvAccess(chkpBuf)[pointer]->len;
+         if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
+               thisProxy[CkMyPe()].doneComparison(true);
+         }else{
+               thisProxy[CkMyPe()].doneComparison(true);
+         }
+       }
+#endif
+}
+
+void CkMemCheckPT::doneComparison(bool ret){
+       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+       contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
+}
+
+void CkMemCheckPT::doneRComparison(bool ret){
+       if(ret==true){
+       CpvAccess(recvdRemote) = 0;
+       CpvAccess(recvdLocal) = 0;
+       CpvAccess(curPointer)^=1;
+               cpCallback.send();
+       }else{
+       }
+
+}
+
 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
 {
 #if CMK_CHKP_ALL
@@ -561,6 +647,9 @@ void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
                idx = 0;
        }
        int isChkpting = msg->cp_flag;
+       if(isChkpting == 1){
+               if(chkpTable[idx]) delete chkpTable[idx];
+       }
        chkpTable[idx] = msg;
        if(isChkpting){
                recvCount++;
@@ -1186,14 +1275,6 @@ CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkM
   CkRestartCheckPoint(_diePE);
 }
 
-// Converse function handles
-static int askPhaseHandlerIdx;
-static int recvPhaseHandlerIdx;
-static int askProcDataHandlerIdx;
-static int restartBcastHandlerIdx;
-static int recoverProcDataHandlerIdx;
-static int restartBeginHandlerIdx;
-static int notifyHandlerIdx;
 
 // called on crashed PE
 static void restartBeginHandler(char *msg)
@@ -1270,6 +1351,32 @@ static void restartBcastHandler(char *msg)
 
 extern void _initDone();
 
+bool compare(char * buf1, char *buf2){
+       return true;
+}
+
+static void recvRemoteChkpHandler(char *msg){
+   envelope *env = (envelope *)msg;
+   CkUnpackMessage(&env);
+   CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+  if(CpvAccess(recvdLocal)==1){
+         int pointer = CpvAccess(curPointer);
+         int size = CpvAccess(chkpBuf)[pointer]->len;
+         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
+               
+               checkptMgr[CkMyPe()].doneComparison(true);
+               }else
+               {
+                       checkptMgr[CkMyPe()].doneComparison(false);
+               }
+  }else{
+         CpvAccess(recvdRemote) = 1;
+         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
+         CpvAccess(buddyBuf) = chkpMsg;
+  }  
+}
+
 // called on crashed processor
 static void recoverProcDataHandler(char *msg)
 {
@@ -1585,6 +1692,8 @@ void CkRegisterRestartHandler( )
   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+  restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+  recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
 
 #if CMK_CONVERSE_MPI
   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
@@ -1593,7 +1702,17 @@ void CkRegisterRestartHandler( )
 #endif
 
   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
+  CpvInitialize(CkCheckPTMessage **, chkpBuf);
+  CpvInitialize(CkCheckPTMessage *, buddyBuf);
+  CpvInitialize(int,curPointer);
   CpvAccess(procChkptBuf) = NULL;
+  CpvAccess(buddyBuf) = NULL;
+  CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
+  CpvAccess(chkpBuf)[0] = NULL;
+  CpvAccess(chkpBuf)[1] = NULL;
+  CpvAccess(curPointer) = 0;
+  CpvAccess(recvdLocal) = 0;
+  CpvAccess(recvdRemote) = 0;
 
   notify_crash_fn = notify_crash;
 
index a0f062255789a8c61909649602b08bf2c934cb31..6b083e3181e8f4c32b260ee77fabce971af249c0 100644 (file)
@@ -32,6 +32,8 @@ module CkMemCheckpoint {
        entry void recoverEntry(CkArrayCheckPTMessage *msg);
        entry [reductiontarget] void recoverArrayElements();
        entry [reductiontarget] void finishUp();
+       entry [reductiontarget] void doneRComparison(bool);
+       entry void doneComparison(bool);
        entry void gotReply();
        entry void quiescence(CkCallback&);
         entry void inmem_restore(CkArrayCheckPTMessage *m);
index 5be4b9b27d146a214213226b407edb684bfdb8db..7774f77bcd28bb90b7d8b52393c3ae88bb2920a7 100644 (file)
@@ -94,6 +94,9 @@ public:
   void startArrayCheckpoint();
   void recvArrayCheckpoint(CkArrayCheckPTMessage *m);
   void recoverAll(CkArrayCheckPTMessage * msg, CkVec<CkGroupID> * gmap=NULL, CkVec<CkArrayIndex> * imap=NULL);
+  void startCheckpoint();
+  void doneComparison(bool);
+  void doneRComparison(bool);
 public:
   static CkCallback  cpCallback;
 
index fc9ec1935c65ac6ebe43afa5d19f12b71c521486..46f9645fcfa449fcf07a2a9420491473700f6830 100644 (file)
@@ -139,7 +139,7 @@ extern int _Cmi_mype_global;
 extern int _Cmi_numpes_global;
 extern int _Cmi_mynode_global;
 extern int _Cmi_numnodes_global;
-
+extern PartitionInfo partitionInfo;
 #define CmiMyPartition()         partitionInfo.myPartition
 #define CmiPartitionSize()       partitionInfo.partitionSize
 #define CmiNumPartition()        partitionInfo.numPartitions