add pup support
authorXiang Ni <xiangni2@illinois.edu>
Sat, 15 Dec 2012 20:25:40 +0000 (14:25 -0600)
committerXiang Ni <xiangni2@illinois.edu>
Sat, 15 Dec 2012 20:25:40 +0000 (14:25 -0600)
src/arch/mpi/machine.c
src/ck-core/cklocation.C
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h
src/ck-ldb/CentralLB.C
src/ck-ldb/LBDBManager.C
src/ck-ldb/LBDBManager.h
src/ck-ldb/LBDatabase.h
src/util/pup.h
src/util/pup_util.C

index 9ac8eee79bb229771b2772196bf98f42151b7e7b..cab2d52388a499b67413a40e5bf467b491615d24 100644 (file)
@@ -1918,7 +1918,7 @@ int find_spare_mpirank(int pe,int partition)
       CmiAbort("Charm++> No spare processor available.");
     }
        int newpe = CmiGetPeGlobal(pe,partition);
-       //CmiPrintf("[%d] set the rank of %d to %d\n",CmiGetPeGlobal(CmiMyPe(),CmiMyPartition()),newpe,nextrank);
+       CmiPrintf("[%d] set the rank of %d to %d\n",CmiGetPeGlobal(CmiMyPe(),CmiMyPartition()),newpe,nextrank);
     petorank[newpe] = nextrank;
     nextrank++;
     return nextrank-1;
index 06ad7c5aeea67ee995316d4f7750ad288aaa264b..0230b9388129581fb2f001227354fdb88d1bdd5b 100644 (file)
@@ -1210,6 +1210,7 @@ void CkMigratable::ckFinishConstruction(void)
        myRec->setMeasure(usesAutoMeasure);
        if (barrierRegistered) return;
        DEBL((AA"Registering barrier client for %s\n"AB,idx2str(thisIndexMax)));
+       CkPrintf("Registering barrier client for %s\n",idx2str(thisIndexMax));
         if (usesAtSync)
          ldBarrierHandle = myRec->getLBDB()->AddLocalBarrierClient(
                (LDBarrierFn)staticResumeFromSync,(void*)(this));
@@ -1229,7 +1230,9 @@ void CkMigratable::AtSync(int waitForMigration)
        myRec->AsyncMigrate(!waitForMigration);
        if (waitForMigration) ReadyMigrate(CmiTrue);
        ckFinishConstruction();
-  DEBL((AA"Element %s going to sync\n"AB,idx2str(thisIndexMax)));
+  //DEBL((AA"Element %s going to sync\n"AB,idx2str(thisIndexMax)));
+  //if(CmiMyPartition()==1)
+       CkPrintf("Element %s going to sync\n",idx2str(thisIndexMax));
   // model-based load balancing, ask user to provide cpu load
   if (usesAutoMeasure == CmiFalse) UserSetLBLoad();
 
@@ -2782,12 +2785,13 @@ void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
        // (A separate loop so ckLocal works even in element pup routines)
        for (m=firstManager;m!=NULL;m=m->next) {
                int elCType;
-               if (!p.isUnpacking())
+               if (!p.isUnpacking()&&!p.isChecking())
                { //Need to find the element's existing type
                        CkMigratable *elt=m->element(localIdx);
                        if (elt) elCType=elt->ckGetChareType();
                        else elCType=-1; //Element hasn't been created
                }
+
                p(elCType);
                if (p.isUnpacking() && elCType!=-1) {
                        //Create the element
@@ -2798,16 +2802,17 @@ void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
                                if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
                }
        }
+
        //Next pup the element data
        for (m=firstManager;m!=NULL;m=m->next) {
                CkMigratable *elt=m->element(localIdx);
                if (elt!=NULL)
-                {
-                        elt->pup(p);
+               {
+                       elt->pup(p);
 #if CMK_ERROR_CHECKING
-                        if (p.isUnpacking()) elt->sanitycheck();
+                       if (p.isUnpacking()) elt->sanitycheck();
 #endif
-                }
+               }
        }
 
 #if CMK_MEM_CHECKPOINT
@@ -3107,7 +3112,8 @@ void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool notify,CmiBoo
                rec = (CkLocRec_local *)recGlobal;
        }
        pupElementsFor(p,rec,CkElementCreation_resume,rebuild,create);
-       callMethod(rec,&CkMigratable::ckJustMigrated);
+       if(!p.isChecking())
+               callMethod(rec,&CkMigratable::ckJustMigrated);
 }
 #endif
 
index 948cfdaaccbfdd1df129a0829984fd039857c2d1..92f6506540b249bff0277f37cf73f397179a77c0 100644 (file)
@@ -75,6 +75,7 @@ CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
+int CkMemCheckPT::inCheckpointing = 0;
 int CkMemCheckPT::replicaAlive = 1;
 int CkMemCheckPT::inLoadbalancing = 0;
 double CkMemCheckPT::startTime;
@@ -363,6 +364,7 @@ CkMemCheckPT::CkMemCheckPT(int w)
     _memChkptOn = 0;
   }
   inRestarting = 0;
+  inCheckpointing = 0;
   recvCount = peCount = 0;
   ackCount = 0;
   expectCount = -1;
@@ -516,6 +518,7 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
   checkpointed = 1;
   cpCallback = cb;
   cpStarter = starter;
+  inCheckpointing = 1;
   if (CkMyPe() == cpStarter) {
     startTime = CmiWallTimer();
     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
@@ -569,7 +572,7 @@ class MemElementPacker : public CkLocIterator{
                }
 };
 
-void CkMemCheckPT::pupAllElements(PUP::er &p){
+void pupAllElements(PUP::er &p){
 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
        int numElements;
        if(!p.isUnpacking()){
@@ -663,12 +666,13 @@ void CkMemCheckPT::startCheckpoint(){
          if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
                thisProxy[CkMyPe()].doneComparison(true);
          }else{
-               thisProxy[CkMyPe()].doneComparison(true);
+                 CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
+               thisProxy[CkMyPe()].doneComparison(false);
          }
   }
        else{
                if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
-                       if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+                       {       
                                int pointer = CpvAccess(curPointer);
                                //send the proc data
                                CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
@@ -677,7 +681,9 @@ void CkMemCheckPT::startCheckpoint(){
                                CkPackMessage(&env);
                                CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
                                CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-                               CkPrintf("[%d] sendProcdata\n",CkMyPe());
+                               if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+                                       CkPrintf("[%d] sendProcdata\n",CkMyPe());
+                               }
                        }
                        //send the array checkpoint data
                        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
@@ -695,27 +701,37 @@ void CkMemCheckPT::startCheckpoint(){
 }
 
 void CkMemCheckPT::doneComparison(bool ret){
+       int _ret;
+       if(!ret){
+       CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
+               _ret = 0;
+       }else{
+               _ret = 1;
+       }
+       inCheckpointing = 0;
        CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
-       contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
+       contribute(sizeof(int),&_ret,CkReduction::logical_and,cb);
 }
 
-void CkMemCheckPT::doneRComparison(bool ret){
+void CkMemCheckPT::doneRComparison(int ret){
        CpvAccess(recvdRemote) = 0;
        CpvAccess(recvdLocal) = 0;
 //     if(CpvAccess(curPointer) == 0){
-       if(ret==true){
+       if(ret==1){
        CpvAccess(curPointer)^=1;
                if(CkMyPe() == 0){
-      CkPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
+               CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
                        cpCallback.send();
                }
        }else{
+       CkPrintf("[%d][%d] going to RollBack \n", CmiMyPartition(),CkMyPe());
                RollBack();
        }
 }
 
 void CkMemCheckPT::RollBack(){
        //restore group data
+       checkpointed = 0;
        CkMemCheckPT::inRestarting = 1;
        int pointer = CpvAccess(curPointer)^1;//use the previous one
     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
@@ -900,6 +916,7 @@ void CkMemCheckPT::cpFinish()
 // for debugging, report checkpoint info
 void CkMemCheckPT::report()
 {
+       inCheckpointing = 0;
 #if !CMK_CHKP_ALL
   int objsize = 0;
   int len = ckTable.length();
@@ -1031,7 +1048,7 @@ void CkMemCheckPT::restart(int diePe)
   inRestarting = 1;
                                                                                 
   // disable load balancer's barrier
-  if (CkMyPe() != diePe) resetLB(diePe);
+  //if (CkMyPe() != diePe) resetLB(diePe);
 
   CKLOCMGR_LOOP(mgr->startInserting(););
 
@@ -1055,7 +1072,9 @@ void CkMemCheckPT::removeArrayElements()
   stage = (char*)"removeArrayElements";
   startTime = curTime;
 
-  if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
+//  if (cpCallback.isInvalid()) 
+//       CkPrintf("invalid pe %d\n",CkMyPe());
+//       CkAbort("Didn't set restart callback\n");;
   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
 
   // get rid of all buffering and remote recs
@@ -1145,7 +1164,7 @@ void CkMemCheckPT::recoverBuddies()
 #endif
 
   if (expectCount == 0) {
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
   }
   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
 }
@@ -1296,7 +1315,7 @@ void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<C
                                if(CkMyPe()==thisFailedPe){
                                        mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
                                }
-        else{
+                       else{
                                        mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
                                }
                        }
@@ -1328,18 +1347,20 @@ void CkMemCheckPT::finishUp()
        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
   }
        
+#if CMK_CONVERSE_MPI   
   if(CmiNumPartition()!=1){
        CpvAccess(recvdProcChkp) = 0;
        CpvAccess(recvdArrayChkp) = 0;
        CpvAccess(curPointer)^=1;
-   if (CmiMyPe() == BuddyPE(thisFailedPe)) {
-        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
-   }
        //notify my replica, restart is done
    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
    CmiSetHandler(msg,replicaRecoverHandlerIdx);
    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
   }
+   if (CmiMyPe() == BuddyPE(thisFailedPe)) {
+        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+   }
+#endif
 
 #if CK_NO_PROC_POOL
 #if NODE_CHECKPOINT
@@ -1497,7 +1518,33 @@ static void restartBcastHandler(char *msg)
 extern void _initDone();
 
 bool compare(char * buf1, char *buf2){
-       return true;
+       //buf1 my copy, buf2 from another one 
+//     CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
+       PUP::checker pchecker(buf1,buf2);
+       pchecker.skip();
+       
+       int numElements;
+       pchecker|numElements;
+       CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
+       for(int i=0;i<numElements;i++){
+       //for(int i=0;i<1;i++){
+               CkGroupID gID;
+               CkArrayIndex idx;
+               
+               pchecker|gID;
+               pchecker|idx;
+               
+               CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
+               CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+               mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+               CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
+       }
+       if(pchecker.getResult()){
+               CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
+       }else{
+               CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
+       }
+       return pchecker.getResult();
 }
 
 static void recvRemoteChkpHandler(char *msg){
@@ -1509,12 +1556,12 @@ static void recvRemoteChkpHandler(char *msg){
          int size = CpvAccess(chkpBuf)[pointer]->len;
          CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
          if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
-               
-               checkptMgr[CkMyPe()].doneComparison(true);
-               }else
-               {
+                       checkptMgr[CkMyPe()].doneComparison(true);
+         }else
+         {
+                 CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
                        checkptMgr[CkMyPe()].doneComparison(false);
-               }
+         }
   }else{
          CpvAccess(recvdRemote) = 1;
          if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
@@ -1531,6 +1578,7 @@ static void replicaRecoverHandler(char *msg){
 }
 
 static void replicaDieHandler(char * msg){
+#if CMK_CONVERSE_MPI   
        int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
        CpvAccess(_remoteCrashedNode) = diePe;
        CkMemCheckPT::replicaAlive = 0;
@@ -1539,6 +1587,7 @@ static void replicaDieHandler(char * msg){
       CkPrintf("pe %d in replicad word die\n",diePe);
            CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
     }
+#endif
        //broadcast to my partition
        //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
        //checkptMgr.notifyReplicaDie(diePe);
@@ -1562,19 +1611,27 @@ static void recoverRemoteProcDataHandler(char *msg){
    //store the checkpoint
        int pointer = procMsg->pointer;
 
-       if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
-       CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
 
-   PUP::fromMem p(procMsg->packData);
-   _handleProcData(p,CmiTrue);
+   if(CkMyPe()==CpvAccess(_crashedNode)){
+          if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+          CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+          PUP::fromMem p(procMsg->packData);
+          _handleProcData(p,CmiTrue);
+          _initDone();
+   }
+   else{
+          if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+          CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+          //_handleProcData(p,CmiFalse);
+   }
    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
    CKLOCMGR_LOOP(mgr->startInserting(););
    
-   _initDone();
    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
    CpvAccess(recvdProcChkp) =1;
        if(CpvAccess(recvdArrayChkp)==1){
                _resume_charm_message();
+               _diePE = CpvAccess(_crashedNode);
                char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
                CmiSetHandler(restartmsg, restartBeginHandlerIdx);
                CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
@@ -1595,7 +1652,7 @@ static void recoverRemoteArrayDataHandler(char *msg){
        CpvAccess(chkpBuf)[pointer] = chkpMsg;
    CpvAccess(recvdArrayChkp) =1;
        CkMemCheckPT::inRestarting = 1;
-       if(CmiMyPe()!=CpvAccess(_crashedNode)||CpvAccess(recvdProcChkp) == 1){
+       if(CpvAccess(recvdProcChkp) == 1){
                _resume_charm_message();
                _diePE = CpvAccess(_crashedNode);
                //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
@@ -1608,6 +1665,7 @@ static void recoverRemoteArrayDataHandler(char *msg){
 static void recvPhaseHandler(char * msg)
 {
        CpvAccess(_curRestartPhase)--;
+       CkMemCheckPT::inRestarting = 1;
        //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
@@ -1687,17 +1745,17 @@ void qd_callback(void *m)
 {
    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
    CkFreeMsg(m);
+   if(CmiNumPartition()==1){
 #ifdef CMK_SMP
-   for(int i=0;i<CmiMyNodeSize();i++){
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
-       CmiSetHandler(msg, askProcDataHandlerIdx);
-       int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
-       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-   }
-   return;
+          for(int i=0;i<CmiMyNodeSize();i++){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
+               CmiSetHandler(msg, askProcDataHandlerIdx);
+               int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
+               CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+          }
+          return;
 #endif
-   if(CmiNumPartition()==1){
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
           *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
           // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
@@ -1728,7 +1786,7 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
   }*/
    if(CmiNumPartition()==1){
           CkMemCheckPT::startTime = restartT = CmiWallTimer();
-  CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
+               CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
                restartT = CmiWallTimer();
           CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
@@ -1741,20 +1799,6 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
    else{
                CkCallback cb(qd_callback);
                CkStartQD(cb);
-       //      CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-          //get the restart phase
-          //CmiPrintf("[%d] diePe asks for phase\n",CkMyPe());
-          //char *phaseMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-          //*(int *)(phaseMsg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-          //CmiSetHandler(phaseMsg, askPhaseHandlerIdx);
-          //int pe = ChkptOnPe(CpvAccess(_crashedNode));
-          //CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)phaseMsg);
-               //notify the other partition
-          //CkPrintf("[%d] notify remote partition %d\n",CkMyPe(),CmiMyPartition());
-          //char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-          //*(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-          //CmiSetHandler(msg,replicaDieHandlerIdx);
-          //CmiRemoteSyncSendAndFree(CkMyPe(),0,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
    }
 #else
    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
@@ -1790,6 +1834,12 @@ int CkReplicaAlive()
                return 1;*/
 }
 
+extern "C"
+int CkInCheckpointing()
+{
+       return CkMemCheckPT::inCheckpointing;
+}
+
 extern "C"
 void CkSetInLdb(){
 #if CMK_MEM_CHECKPOINT
@@ -1881,23 +1931,25 @@ void notify_crash(int node)
 #endif
 }
 
-#if CMK_CONVERSE_MPI
 extern "C" void (*notify_crash_fn)(int node);
 
 
+#if CMK_CONVERSE_MPI
 void buddyDieHandler(char *msg)
 {
 #if CMK_MEM_CHECKPOINT
    // notify
+       CkMemCheckPT::inRestarting = 1;
    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
    notify_crash(diepe);
    // send message to crash pe to let it restart
    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+   CkPrintf("[%d] finding newrank\n",CkMyPe());
    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
+   CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
    int buddy = obj->BuddyPE(CmiMyPe());
-   if (buddy == diepe)  {
    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
-     CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
+   if (buddy == diepe)  {
      mpi_restart_crashed(diepe, newrank);
    }
 #endif
@@ -1913,7 +1965,8 @@ void pingCheckHandler()
 {
 #if CMK_MEM_CHECKPOINT
   double now = CmiWallTimer();
-  if (lastPingTime > 0 && now - lastPingTime > 1 && !CkInLdb()) {
+  if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
+  //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
     int i, pe, buddy;
     // tell everyone the buddy dies
     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
@@ -1922,7 +1975,7 @@ void pingCheckHandler()
        if (obj->BuddyPE(pe) == CmiMyPe()) break;
     }
     buddy = pe;
-    CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
+    CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
       if (obj->isFailed(pe) || pe == buddy) continue;
       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
@@ -1935,12 +1988,14 @@ void pingCheckHandler()
     CmiSetHandler(msg, buddyDieHandlerIdx);
     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
     //send to everyone in the other world
-    for(int i=0;i<CmiNumPes();i++){
-      char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-      *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
-      CmiSetHandler(rMsg, replicaDieHandlerIdx);
-      CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
-    }
+       if(CmiNumPartition()!=1){
+               for(int i=0;i<CmiNumPes();i++){
+                 char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+                 *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+                 CmiSetHandler(rMsg, replicaDieHandlerIdx);
+                 CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+               }
+       }
   }
   else 
     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
index d1eaf9a0ef40d434004ba080dd802fba2ae143ec..ada66a74c6fdc7f165823e7fc86f83009564045b 100644 (file)
@@ -35,7 +35,7 @@ module CkMemCheckpoint {
        entry void recoverEntry(CkArrayCheckPTMessage *msg);
        entry [reductiontarget] void recoverArrayElements();
        entry [reductiontarget] void finishUp();
-       entry [reductiontarget] void doneRComparison(bool);
+       entry [reductiontarget] void doneRComparison(int);
        entry [reductiontarget] void recoverFromSoftFailure();
        entry void notifyReplicaDie(int pe);
        entry void doneComparison(bool);
index 930273441e116126cbc91e436653150a097c86b5..0bdc4e38b6fcd68970e4442d5f03e2427815bb84 100644 (file)
@@ -91,13 +91,13 @@ public:
   void updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe);
   void resetLB(int diepe);
   int  isFailed(int pe);
-  void pupAllElements(PUP::er &p);
+  //void pupAllElements(PUP::er &p);
   void startArrayCheckpoint();
   void recvArrayCheckpoint(CkArrayCheckPTMessage *m);
   void recoverAll(double * msg, CkVec<CkGroupID> * gmap=NULL, CkVec<CkArrayIndex> * imap=NULL);
   void startCheckpoint();
   void doneComparison(bool);
-  void doneRComparison(bool);
+  void doneRComparison(int);
   void RollBack();
   void recoverFromSoftFailure();
   void notifyReplicaDie(int diePe);
@@ -105,6 +105,7 @@ public:
   static CkCallback  cpCallback;
 
   static int inRestarting;
+  static int inCheckpointing;
   static int inLoadbalancing;
   static int replicaAlive;
   static double startTime;
@@ -140,6 +141,7 @@ void CkStartMemCheckpoint(CkCallback &cb);
 
 // true if inside a restarting phase
 extern "C" int CkInRestarting(); 
+extern "C" int CkInCheckpointing(); 
 extern "C" int CkInLdb(); 
 extern "C" void CkSetInLdb(); 
 extern "C" void CkResetInLdb();
index e04c51f7abddffa288b489fd672aad4aa4de540f..86b3d2196317c8106b42f8b07804d0de3d76d428 100644 (file)
@@ -164,6 +164,7 @@ void CentralLB::AtSync()
 {
 #if CMK_LBDB_ON
   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
+  CkPrintf("[%d][%d] CentralLB AtSync step %d!!!!!\n",CmiMyPartition(),CkMyPe(),step());
 #if CMK_MEM_CHECKPOINT 
   CkSetInLdb();
 #endif
index 9c65d729bbe59db325b8ea8971a63f4eafe195ef..f1da1a1ec40981304340a31075b7eeb0bb7c10c3 100644 (file)
@@ -618,6 +618,8 @@ void LocalBarrier::AtBarrier(LDBarrierClient h)
 {
   (clients[h.serial])->refcount++;
   at_count++;
+  //if(CmiMyPartition()==1)
+        CkPrintf("[%d] at barrier\n",CkMyPe());
   CheckBarrier();
 }
 
@@ -627,6 +629,9 @@ void LocalBarrier::CheckBarrier()
 
   // If there are no clients, resume as soon as we're turned on
 
+  //if(CmiMyPartition()==1){
+       CkPrintf("[%d] at_count %d client_count %d\n",CkMyPe(),at_count,client_count);
+  //}
   if (client_count == 0) {
     cur_refcount++;
     CallReceivers();
@@ -650,7 +655,9 @@ void LocalBarrier::CheckBarrier()
 void LocalBarrier::CallReceivers(void)
 {
   CmiBool called_receiver=CmiFalse;
-
+//  if(CmiMyPartition()==1)
+       //CkPrintf("[%d][%d] call receiver\n",CmiMyPartition(),CkMyPe());
+       CkPrintf("[%d] call receiver\n",CkMyPe());
 //  for(int i=0; i < max_receiver; i++)
 //   for (int i=max_receiver-1; i>=0; i--) {
    for (int i=receivers.size()-1; i>=0; i--) {
index eedeba4587e276cd27e72f556f6f9900640df20a..10e16a5b009f4a4c1804ac3c52125755940f8f92 100644 (file)
@@ -201,7 +201,12 @@ public:
   inline void TurnOffBarrierReceiver(LDBarrierReceiver h) 
        { localBarrier.TurnOffReceiver(h); };
   inline void AtLocalBarrier(LDBarrierClient h) 
-       { if (useBarrier) localBarrier.AtBarrier(h); };
+       { 
+                       if(CmiMyPartition()==1)
+                               CkPrintf("[%d] LBDBManager local barrier\n",CkMyPe());
+                  if (useBarrier) 
+                  localBarrier.AtBarrier(h); 
+          };
   inline void ResumeClients() 
        { localBarrier.ResumeClients(); };
   inline void MeasuredObjTime(double wtime, double ctime) {
index b723498ecd8e47825ab751982634dff6a93b9339..b2c0791b80a7104f54a16acda755c521ab2ca16b 100644 (file)
@@ -348,7 +348,9 @@ public:
   };
 
   inline void AtLocalBarrier(LDBarrierClient h) {
-    LDAtLocalBarrier(myLDHandle,h);
+  if(CmiMyPartition()==1)
+       CkPrintf("[%d]local barrier\n",CkMyPe());
+         LDAtLocalBarrier(myLDHandle,h);
   }
   inline void LocalBarrierOn(void) { LDLocalBarrierOn(myLDHandle); };
   inline void LocalBarrierOff(void) { LDLocalBarrierOn(myLDHandle); };
index 9b8d18a34d9c1b091cf8c83d3349c1947760741c..1a16fb7f366e2d3cbe887fbe020c76e3017b80b9 100644 (file)
@@ -151,6 +151,7 @@ class er {
   enum {IS_SIZING   =0x0100,
        IS_PACKING  =0x0200,
         IS_UNPACKING=0x0400,
+       IS_CHECKING=0x0800,  // If set, it is checking the match of 2 checkpoints
         TYPE_MASK   =0xFF00
   };
   unsigned int PUP_er_state;
@@ -165,6 +166,7 @@ class er {
   CmiBool isSizing(void) const {return (PUP_er_state&IS_SIZING)!=0?CmiTrue:CmiFalse;}
   CmiBool isPacking(void) const {return (PUP_er_state&IS_PACKING)!=0?CmiTrue:CmiFalse;}
   CmiBool isUnpacking(void) const {return (PUP_er_state&IS_UNPACKING)!=0?CmiTrue:CmiFalse;}
+  CmiBool isChecking(void) const {return (PUP_er_state&IS_CHECKING)!=0?CmiTrue:CmiFalse;}
   const char *  typeString() const;
   unsigned int getStateFlags(void) const {return PUP_er_state;}
 
@@ -216,7 +218,10 @@ class er {
     {bytes((void *)a,nItems,sizeof(signed char),Tchar);}
 #if CMK_SIGNEDCHAR_DIFF_CHAR
   void operator()(char *a,int nItems)
-    {bytes((void *)a,nItems,sizeof(char),Tchar);}
+    {
+               //printf("pup char data\n");
+               bytes((void *)a,nItems,sizeof(char),Tchar);
+       }
 #endif
   void operator()(short *a,int nItems)
     {bytes((void *)a,nItems,sizeof(short),Tshort);}
@@ -239,7 +244,9 @@ class er {
   void operator()(float *a,int nItems)
     {bytes((void *)a,nItems,sizeof(float),Tfloat);}
   void operator()(double *a,int nItems)
-    {bytes((void *)a,nItems,sizeof(double),Tdouble);}
+    {
+               bytes((void *)a,nItems,sizeof(double),Tdouble);
+       }
 
 #if CMK_LONG_DOUBLE_DEFINED
   void operator()(long double *a,int nItems)
@@ -308,6 +315,11 @@ class er {
 
   virtual int size(void) const { return 0; }
   
+  virtual void setAccuracy(double _accuracy){}
+  virtual void skipNext(){}
+  virtual void skip(){}
+  virtual void resume(){}
+
   //For seeking (pack/unpack in different orders)
   virtual void impl_startSeek(seekBlock &s); /*Begin a seeking block*/
   virtual int impl_tell(seekBlock &s); /*Give the current offset*/
@@ -396,6 +408,37 @@ class mem : public er { //Memory-buffer packers and unpackers
   int size(void) const {return buf-origBuf;}
 };
 
+class checker : public er {
+ protected:
+  virtual void bytes(void *p,int n,size_t itemSize,dataType t);
+  myByte * origBuf;
+  myByte * buf;
+  double accuracy;
+  bool _skip;
+  bool result;
+  bool reset;
+ public:
+       checker(void * Abuf, void * Bbuf):er(IS_CHECKING),origBuf((myByte *)Abuf),buf((myByte *)Bbuf),_skip(false),accuracy(0.0),result(true),reset(false) {}
+       virtual void setAccuracy(double _accuracy) {
+               accuracy = _accuracy;
+               //printf("set accuracy %lf\n",accuracy);
+       }
+       virtual void skipNext() {
+               _skip = true;
+               reset = true;
+               //printf("skip\n");
+       }
+       virtual void skip() {
+               _skip = true;
+               reset = false;
+       }
+       virtual void resume() {
+               _skip = false;
+       }
+
+       bool getResult() {return result;}       
+};
+
 //For packing into a preallocated, presized memory buffer
 class toMem : public mem {
  protected:
@@ -430,6 +473,7 @@ inline void fromMemBuf(T &t,void *buf,int len) {
                "This means your pup routine doesn't match during packing and unpacking");
 }
 
+
 /********** PUP::er -- Binary disk file pack/unpack *********/
 class disk : public er {
  protected:
index 3bdd4bc11a979ce72443d3f7731820abfa61a530..d0fc07e7a3147d0666a84226aff3a294fead4ddf 100644 (file)
@@ -17,6 +17,7 @@ virtual functions are defined here.
 #include <string.h>
 #include <ctype.h>
 #include <errno.h>
+#include <math.h>
 
 #include "converse.h"
 #include "pup.h"
@@ -137,6 +138,76 @@ void PUP::fromMem::bytes(void *p,int n,size_t itemSize,dataType t)
        buf+=n;
 }
 
+void PUP::checker::bytes(void * p,int n,size_t itemSize,dataType t)
+{
+       n*=itemSize;
+       memcpy(p,(const void *)origBuf,n); 
+       if(!_skip){
+               //get the data work for certain type
+               switch(t){
+                       case Tdouble:
+                               {
+                                       double * p1 = new double[n/itemSize];
+                                       double * p2 = new double[n/itemSize];
+                                       p1 = (double*)p;
+                                       memcpy((char *)p2,(const void *)buf,n); 
+                                       for(int i=0;i<n/itemSize;i++){
+                                               if(fabs(p1[i]-p2[i])>accuracy){
+                                                       if(result)
+                                                               printf("found incorrect double %lf %lf\n",p1[i],p2[i]);
+                                                       result = result && false;
+                                               }
+                                       }
+                               }       
+                               break;  
+                       case Tint:
+                               {
+                                       int * p1 = new int[n/itemSize];
+                                       int * p2 = new int[n/itemSize];
+                                       p1 = (int *)p;
+                                       memcpy((char *)p2,(const void *)buf,n); 
+                                       for(int i=0;i<n/itemSize;i++){
+                                               if(fabs(p1[i]-p2[i])>accuracy){
+                                                       if(result)
+                                                               printf("found incorrect int %d %d\n",p1[i],p2[i]);
+                                                       result = result && false;
+                                               }
+                                               //printf("p1 %d\n",p1[i]);
+                                               //printf("p2 %d\n",p2[i]);
+                                       }
+                                       //printf("p %d\n",*(int *)p);
+                               }
+                               break;
+                       case Tchar:
+                               {
+                                       char * p1 = new char[n/itemSize];
+                                       char * p2 = new char[n/itemSize];
+                                       p1 = (char *)p;
+                                       memcpy((char *)p2,(const void *)buf,n); 
+                                       for(int i=0;i<n/itemSize;i++){
+                                               if(fabs(p1[i]-p2[i])>accuracy){
+                                                       if(result)
+                                                               printf("found incorrect char %c %c\n",p1[i],p2[i]);
+                                                       result = result && false;
+                                               }
+                                               //printf("p1 %d\n",p1[i]);
+                                               //printf("p2 %d\n",p2[i]);
+                                       }
+                                       //printf("p %d\n",*(int *)p);
+                               }
+                               break;
+                       default:
+                               break;
+               }
+       }
+       if(reset){
+               _skip = false;
+               reset = false;
+       }
+       origBuf+=n;
+       buf+=n;
+}
+
 // dealing with short write
 size_t CmiFwrite(const void *ptr, size_t size, size_t nmemb, FILE *f)
 {