pass hard failure recover test on BGQ
authorNikhil Jain <nikhil@illinois.edu>
Wed, 12 Dec 2012 22:48:04 +0000 (22:48 +0000)
committerNikhil Jain <nikhil@illinois.edu>
Wed, 12 Dec 2012 22:48:04 +0000 (22:48 +0000)
src/arch/mpi/machine.c
src/arch/util/machine-common-core.c
src/ck-core/ckmemcheckpoint.C
src/ck-core/cksection.h

index 368e78ea593e2f6f46ee6666c5638b31bc2a48c0..9ac8eee79bb229771b2772196bf98f42151b7e7b 100644 (file)
@@ -1371,6 +1371,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
       int newpe = vals[0];
       CpvAccess(_curRestartPhase) = vals[1];
+      CmiPrintf("Charm++> Spare MPI rank %d is activated for global PE %d phase %d.\n", *myNodeID, newpe,CpvAccess(_curRestartPhase));
 
       if (newpe == -1) {
           MPI_Barrier(charmComm);
@@ -1378,7 +1379,6 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
           exit(0);
       }
 
-      CmiPrintf("Charm++> Spare MPI rank %d is activated for global PE %d.\n", *myNodeID, newpe);
         /* update petorank */
       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
       nextrank = *myNodeID + 1;
@@ -1918,7 +1918,7 @@ int find_spare_mpirank(int pe,int partition)
       CmiAbort("Charm++> No spare processor available.");
     }
        int newpe = CmiGetPeGlobal(pe,partition);
-       CmiPrintf("[%d] set the rank of %d to %d\n",CmiGetPeGlobal(CmiMyPe(),CmiMyPartition()),newpe,nextrank);
+       //CmiPrintf("[%d] set the rank of %d to %d\n",CmiGetPeGlobal(CmiMyPe(),CmiMyPartition()),newpe,nextrank);
     petorank[newpe] = nextrank;
     nextrank++;
     return nextrank-1;
index 925b4248d2727e11f7ca8eeec3d178dee174004a..9d1aa438918fbc11b124b9b5de334e79daa9ef4e 100644 (file)
@@ -519,8 +519,8 @@ CmiCommHandle CmiRemoteSendNetworkFunc(int destPE, int partition, int size, char
         int rank;
         int destLocalNode = CmiNodeOf(destPE); 
         int destNode = CmiGetNodeGlobal(destLocalNode,partition);
-           if(partition!=CmiMyPartition())
-                       CmiPrintf("send to dest node %d in partition %d\n",destNode,partition); 
+           //if(partition!=CmiMyPartition())
+                       //CmiPrintf("send to dest node %d in partition %d\n",destNode,partition);       
 #if CMK_USE_PXSHM       //not handled yet correctly
         if (CmiValidPxshm(destLocalNode, size)) {
           CmiSendMessagePxshm(msg, size, destLocalNode, &refcount);
@@ -569,8 +569,8 @@ INLINE_KEYWORD void CmiFreeSendFn(int destPE, int size, char *msg) {
 void CmiRemoteFreeSendFn(int destPE, int partition, int size, char *msg) {
     CMI_SET_BROADCAST_ROOT(msg, 0);
     CQdCreate(CpvAccess(cQdState), 1);
-       if(partition!=CmiMyPartition())
-               CmiPrintf("[%d] on partition %d send to %d on partition %d\n",CmiMyPe(),CmiMyPartition(),destPE,partition);
+//     if(partition!=CmiMyPartition())
+//             CmiPrintf("[%d] on partition %d send to %d on partition %d\n",CmiMyPe(),CmiMyPartition(),destPE,partition);
     if (CmiMyPe()==destPE && partition == CmiMyPartition()) {
         CmiSendSelf(msg);
 #if CMK_PERSISTENT_COMM
@@ -713,7 +713,7 @@ void CmiCreatePartitions(char **argv) {
   partitionInfo.myPartition = _Cmi_mynode_global / partitionInfo.partitionSize;
        
 
-  CmiPrintf("in partition %d size %d\n",partitionInfo.myPartition, partitionInfo.partitionSize);
+  //CmiPrintf("in partition %d size %d\n",partitionInfo.myPartition, partitionInfo.partitionSize);
   //reset local variables
   _Cmi_mynode = CmiGetNodeLocal(_Cmi_mynode);
   _Cmi_numnodes = CmiPartitionSize();
index b3185318b3f831a29aa2383bf1a5917515134dc8..948cfdaaccbfdd1df129a0829984fd039857c2d1 100644 (file)
@@ -612,6 +612,8 @@ void CkMemCheckPT::startArrayCheckpoint(){
 
 void CkMemCheckPT::startCheckpoint(){
 #if CMK_CONVERSE_MPI
+       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+    CkPrintf("in start checkpointing!!!!\n");
   int size;
   {
     PUP::sizer p;
@@ -644,11 +646,13 @@ void CkMemCheckPT::startCheckpoint(){
                pupAllElements(p);
        }
        pointer = CpvAccess(curPointer);
+       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+    CkPrintf("start checkpointing!!!!\n");
        if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
-               CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
+               CpvAccess(chkpBuf)[pointer] = msg;
        if(CkReplicaAlive()==1){
                CpvAccess(recvdLocal) = 1;
-               envelope * env = (envelope *)(UsrToEnv(msg));
+               envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
                CkPackMessage(&env);
                CmiSetHandler(env,recvRemoteChkpHandlerIdx);
                CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
@@ -661,7 +665,7 @@ void CkMemCheckPT::startCheckpoint(){
          }else{
                thisProxy[CkMyPe()].doneComparison(true);
          }
-       }
+  }
        else{
                if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
                        if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
@@ -676,16 +680,15 @@ void CkMemCheckPT::startCheckpoint(){
                                CkPrintf("[%d] sendProcdata\n",CkMyPe());
                        }
                        //send the array checkpoint data
+                       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
                        msg->pointer = CpvAccess(curPointer);
                        envelope * env = (envelope *)(UsrToEnv(msg));
                        CkPackMessage(&env);
                        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
                        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-                       CkPrintf("[%d] sendArrayData\n",CkMyPe());
+                       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+                         CkPrintf("[%d] sendArraydata\n",CkMyPe());
                        //can continue work, no need to wait for my replica
-                       bool ret = true;
-                       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
-                       contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
                }
        }
 #endif
@@ -703,6 +706,7 @@ void CkMemCheckPT::doneRComparison(bool ret){
        if(ret==true){
        CpvAccess(curPointer)^=1;
                if(CkMyPe() == 0){
+      CkPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
                        cpCallback.send();
                }
        }else{
@@ -997,6 +1001,8 @@ void CkMemCheckPT::resetLB(int diepe)
 #endif
 }
 
+static double restartT;
+
 // in case when failedPe dies, everybody go through its checkpoint table:
 // destory all array elements
 // recover lost buddies
@@ -1006,8 +1012,10 @@ void CkMemCheckPT::restart(int diePe)
 {
 #if CMK_MEM_CHECKPOINT
   double curTime = CmiWallTimer();
-  if (CkMyPe() == diePe)
+  if (CkMyPe() == diePe){
+          restartT = CmiWallTimer();
     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
+  }
   stage = (char*)"resetLB";
   startTime = curTime;
   if (CkMyPe() == diePe)
@@ -1064,7 +1072,8 @@ void CkMemCheckPT::removeArrayElements()
 // flush state in reduction manager
 void CkMemCheckPT::resetReductionMgr()
 {
-  //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
+  if (CkMyPe() == thisFailedPe) 
+    CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
   int numGroups = CkpvAccess(_groupIDTable)->size();
   for(int i=0;i<numGroups;i++) {
     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
@@ -1230,7 +1239,8 @@ void CkMemCheckPT::recoverArrayElements()
 #endif
 #endif
   curTime = CmiWallTimer();
-  if (CkMyPe() == thisFailedPe)
+  //if (CkMyPe() == thisFailedPe)
+  if (CkMyPe() == 0)
        CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
   for (int i=0; i<CkNumPes(); i++) {
@@ -1269,7 +1279,6 @@ void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<C
        PUP::fromMem p(packData);
        int numElements;
        p|numElements;
-       CkPrintf("[%d] recover all %d\n",CkMyPe(),numElements);
        if(p.isUnpacking()){
                for(int i=0;i<numElements;i++){
                        CkGroupID gID;
@@ -1286,7 +1295,8 @@ void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<C
                        else{
                                if(CkMyPe()==thisFailedPe){
                                        mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
-                               }else{
+                               }
+        else{
                                        mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
                                }
                        }
@@ -1300,12 +1310,9 @@ void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<C
 #endif
                }
        }
-       if(CkMyPe()==thisFailedPe)
-       CkPrintf("recover all ends\n"); 
 #endif
 }
 
-static double restartT;
 
 // on every processor
 // turn load balancer back on
@@ -1326,9 +1333,6 @@ void CkMemCheckPT::finishUp()
        CpvAccess(recvdArrayChkp) = 0;
        CpvAccess(curPointer)^=1;
    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
-        if(CmiMyPartition()==1&&CkMyPe()==2){
-               CmiPrintf("start ping check handler\n");
-        }
         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
    }
        //notify my replica, restart is done
@@ -1436,7 +1440,7 @@ static void restartBeginHandler(char *msg)
   CmiAssert(CkMyPe() == _diePE);
   count ++;
   if (count == CkNumPes()) {
-       printf("restar begin on %d\n",CkMyPe());
+         printf("restart begin on %d\n",CkMyPe());
     CkRestartCheckPointCallback(NULL, NULL);
     count = 0;
   }
@@ -1521,14 +1525,20 @@ static void recvRemoteChkpHandler(char *msg){
 static void replicaRecoverHandler(char *msg){
        CpvAccess(_remoteCrashedNode) = -1;
        CkMemCheckPT::replicaAlive = 1;
+    bool ret = true;
+       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+       checkptMgr[CkMyPe()].doneComparison(ret);
 }
 
 static void replicaDieHandler(char * msg){
-       CkPrintf("[%d] !!!!remote receive notify %d!!!\n",CkMyPe(), CmiMyPartition());
        int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
        CpvAccess(_remoteCrashedNode) = diePe;
        CkMemCheckPT::replicaAlive = 0;
        find_spare_mpirank(diePe,CmiMyPartition()^1);
+    if(CkMyPe()==diePe){
+      CkPrintf("pe %d in replicad word die\n",diePe);
+           CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+    }
        //broadcast to my partition
        //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
        //checkptMgr.notifyReplicaDie(diePe);
@@ -1538,7 +1548,6 @@ static void replicaDieHandler(char * msg){
 
 
 static void replicaDieBcastHandler(char *msg){
-       CkPrintf("[%d] remote receive notify partition %d!!!\n",CkMyPe(),CmiMyPartition());
        int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
        CpvAccess(_remoteCrashedNode) = diePe;
        CkMemCheckPT::replicaAlive = 0;
@@ -1552,7 +1561,9 @@ static void recoverRemoteProcDataHandler(char *msg){
        
    //store the checkpoint
        int pointer = procMsg->pointer;
-       CpvAccess(localProcChkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&procMsg); 
+
+       if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+       CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
 
    PUP::fromMem p(procMsg->packData);
    _handleProcData(p,CmiTrue);
@@ -1565,14 +1576,14 @@ static void recoverRemoteProcDataHandler(char *msg){
        if(CpvAccess(recvdArrayChkp)==1){
                _resume_charm_message();
                char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-               CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
                CmiSetHandler(restartmsg, restartBeginHandlerIdx);
                CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
        }
 }
 
 static void recoverRemoteArrayDataHandler(char *msg){
-   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
+  if(CkMyPe()==CpvAccess(_crashedNode)) 
+  CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
    envelope *env = (envelope *)msg;
    CkUnpackMessage(&env);
    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
@@ -1580,13 +1591,14 @@ static void recoverRemoteArrayDataHandler(char *msg){
    //store the checkpoint
        int pointer = chkpMsg->pointer;
        CpvAccess(curPointer) = pointer;
-       CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&chkpMsg);
+       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
+       CpvAccess(chkpBuf)[pointer] = chkpMsg;
    CpvAccess(recvdArrayChkp) =1;
        CkMemCheckPT::inRestarting = 1;
        if(CmiMyPe()!=CpvAccess(_crashedNode)||CpvAccess(recvdProcChkp) == 1){
                _resume_charm_message();
                _diePE = CpvAccess(_crashedNode);
-               CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
+               //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
                char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
                CmiSetHandler(restartmsg, restartBeginHandlerIdx);
                CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
@@ -1596,16 +1608,16 @@ static void recoverRemoteArrayDataHandler(char *msg){
 static void recvPhaseHandler(char * msg)
 {
        CpvAccess(_curRestartPhase)--;
-       CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
+       //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
   //    if(CmiMyPartition()==1&&CkMyPe()==2){
 //             CmiPrintf("start ping check handler\n");
 //      }
-       // CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+       // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
   // }
    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-   //CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+   //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
 
 }
 // called on crashed processor
@@ -1714,9 +1726,9 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
     CkStartQD(cb);
     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
   }*/
-  CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
    if(CmiNumPartition()==1){
           CkMemCheckPT::startTime = restartT = CmiWallTimer();
+  CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
                restartT = CmiWallTimer();
           CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
@@ -1768,7 +1780,8 @@ int CkInRestarting()
 extern "C"
 int CkReplicaAlive()
 {
-       CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+         CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
        return CkMemCheckPT::replicaAlive;
 
        /*if(CkMemCheckPT::replicaDead==1)
@@ -1884,6 +1897,7 @@ void buddyDieHandler(char *msg)
    int buddy = obj->BuddyPE(CmiMyPe());
    if (buddy == diepe)  {
    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
+     CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
      mpi_restart_crashed(diepe, newrank);
    }
 #endif
@@ -1892,9 +1906,6 @@ void buddyDieHandler(char *msg)
 void pingHandler(void *msg)
 {
   lastPingTime = CmiWallTimer();
-  if(CmiMyPartition()==1&&CkMyPe()==2){
-       //CkPrintf("pingHandler lastTime %lf\n",lastPingTime);
-  }
   CmiFree(msg);
 }
 
@@ -1902,9 +1913,6 @@ void pingCheckHandler()
 {
 #if CMK_MEM_CHECKPOINT
   double now = CmiWallTimer();
-  if(CmiMyPartition()==1&&CkMyPe()==2){
-       //CkPrintf("lastTime %lf, now %lf\n",lastPingTime,now);
-  }
   if (lastPingTime > 0 && now - lastPingTime > 1 && !CkInLdb()) {
     int i, pe, buddy;
     // tell everyone the buddy dies
@@ -1927,12 +1935,12 @@ void pingCheckHandler()
     CmiSetHandler(msg, buddyDieHandlerIdx);
     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
     //send to everyone in the other world
-       for(int i=0;i<CmiNumPes();i++){
-               char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-       *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
-       CmiSetHandler(rMsg, replicaDieHandlerIdx);
-               CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
-       }
+    for(int i=0;i<CmiNumPes();i++){
+      char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+      CmiSetHandler(rMsg, replicaDieHandlerIdx);
+      CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+    }
   }
   else 
     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
index bcce44a7b97d66034be0454e22852512a47e2bb7..2d03dfb25b04abe16c737a50fd6c2ea5a44a33b5 100644 (file)
@@ -110,8 +110,19 @@ class CkSectionInfo {
 
     inline char  &get_type() { return info.type; }
     inline int   &get_pe()    { return info.pe; }
-    inline int   &get_redNo() { CmiAssert(info.type==MulticastMsg); return info.sInfo.sCookie.redNo; }
-    inline void  set_redNo(int redNo) { CmiAssert(info.type==MulticastMsg); info.sInfo.sCookie.redNo = redNo; }
+    //inline int   &get_redNo() { CmiAssert(info.type==MulticastMsg); return info.sInfo.sCookie.redNo; }
+    inline int   &get_redNo() { 
+      if(info.type!=MulticastMsg)
+        CkPrintf("[%d] get type %d\n",CkMyPe(),info.type);
+      return info.sInfo.sCookie.redNo; 
+    }
+    inline void  set_redNo(int redNo) 
+    { 
+    //  CmiAssert(info.type==MulticastMsg); 
+      if(info.type!=MulticastMsg)
+        CkPrintf("[%d] set type %d\n",CkMyPe(),info.type);
+      info.sInfo.sCookie.redNo = redNo; 
+    }
     inline void* &get_val()   { CmiAssert(info.type==MulticastMsg); return info.sInfo.sCookie.val; }
     inline CkGroupID   &get_aid()    { return info.aid; }
     inline CkGroupID   get_aid() const   { return info.aid; }