A few bug fixes related to the mlogft machine.
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 23 Apr 2009 21:46:32 +0000 (21:46 +0000)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 23 Apr 2009 21:46:32 +0000 (21:46 +0000)
17 files changed:
src/arch/net/charmrun/charmrun.c
src/arch/net/machine-recover.c
src/arch/net/machine.c
src/ck-core/ckcheckpoint.C
src/ck-core/ckcheckpoint.h
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmessagelogging.C
src/ck-core/ckmessagelogging.h
src/ck-core/ckreduction.C
src/ck-core/init.C
src/ck-ldb/BaseLB.h
src/ck-ldb/CentralLB.C
src/ck-ldb/CentralLB.h
src/ck-ldb/LBDatabase.C
src/util/ckhashtable.h

index 53941633f7729489380346e587b4279779abda54..c6005f8292b0c29f2502d59756fb66c632db2d31 100644 (file)
@@ -619,6 +619,10 @@ int   arg_help;            /* print help message */
 int   arg_ppn;         /* pes per node */
 int   arg_usehostname;
 
+#ifdef _FAULT_MLOG_
+int     arg_read_pes=0;
+#endif
+
 #if CMK_USE_RSH
 int   arg_maxrsh;
 char *arg_shell;
@@ -660,6 +664,9 @@ void arg_init(int argc, char **argv)
   pparam_flag(&arg_verbose,      0, "verbose",       "Print diagnostic messages");
   pparam_str(&arg_nodelist,      0, "nodelist",      "file containing list of nodes");
   pparam_str(&arg_nodegroup,"main", "nodegroup",     "which group of nodes to use");
+#ifdef _FAULT_MLOG_
+       pparam_int(&arg_read_pes, 0, "readpe",             "number of host names to read into the host table");
+#endif
 
 #if CMK_CCS_AVAILABLE
   pparam_flag(&arg_server,       0, "server",        "Enable client-server (CCS) mode");
@@ -914,6 +921,10 @@ int           nodetab_size;
 int          *nodetab_rank0_table;
 int           nodetab_rank0_size;
 
+#ifdef _FAULT_MLOG_
+int                     loaded_max_pe;
+#endif
+
 void nodetab_reset(nodetab_host *h)
 {
   h->name="SET_H->NAME";
@@ -1062,17 +1073,32 @@ void nodetab_init()
     fprintf(stderr,"ERROR> Cannot read %s: %s\n",nodesfile,strerror(errno));
     exit(1);
   }
-  
-  nodetab_table=(nodetab_host**)malloc(arg_requested_pes*sizeof(nodetab_host*));
-  nodetab_rank0_table=(int*)malloc(arg_requested_pes*sizeof(int));
-  nodetab_max=arg_requested_pes;
+#ifdef _FAULT_MLOG_
+       if(arg_read_pes == 0){
+        arg_read_pes = arg_requested_pes;
+    }
+       nodetab_table=(nodetab_host**)malloc(arg_read_pes*sizeof(nodetab_host*));
+       nodetab_rank0_table=(int*)malloc(arg_read_pes*sizeof(int));
+       nodetab_max=arg_read_pes;
+    fprintf(stderr,"arg_read_pes %d arg_requested_pes %d\n",arg_read_pes,arg_requested_pes);
+#else
+       nodetab_table=(nodetab_host**)malloc(arg_requested_pes*sizeof(nodetab_host*));
+       nodetab_rank0_table=(int*)malloc(arg_requested_pes*sizeof(int));
+       nodetab_max=arg_requested_pes;
+#endif
   
   nodetab_reset(&global);
   group=global;
   rightgroup = (strcmp(arg_nodegroup,"main")==0);
   
   while(fgets(input_line,sizeof(input_line)-1,f)!=0) {
+#ifdef _FAULT_MLOG_
+       if (nodetab_size == arg_read_pes) break;
+#else
     if (nodetab_size == arg_requested_pes) break;
+#endif
     if (input_line[0]=='#') continue;
     zap_newline(input_line);
        if (!nodetab_args(input_line,&global)) {
@@ -1121,6 +1147,11 @@ fin:
     if (nodetab_table[i]->cpus > remain)
       nodetab_table[i]->cpus = remain;
   }
+
+#ifdef _FAULT_MLOG_
+       loaded_max_pe = arg_requested_pes-1;
+#endif
+
 }
 
 /* Given a processor number, look up the nodetab info: */
@@ -1606,8 +1637,12 @@ int req_handle_ending(ChMessage *msg,SOCKET fd)
 {  
   int i;
   req_ending++;
-    
-  if (req_ending == nodetab_size)
+
+#ifndef _FAULT_MLOG_    
+       if (req_ending == nodetab_size)
+#else
+       if(req_ending == arg_requested_pes)
+#endif
   {
     for (i=0;i<req_nClients;i++)
       skt_close(req_clients[i]);
@@ -1649,6 +1684,11 @@ void anounce_crash(int socket_index,int crashed_node);
 static int _last_crash = 0;                    /* last crashed pe number */
 static int _crash_socket_index = 0;            /* last restart socket */
 
+#ifdef _FAULT_MLOG_
+static int numCrashes=0;  /*number of crashes*/
+static SOCKET last_crashed_fd=-1;
+#endif
+
 int req_handle_crashack(ChMessage *msg,SOCKET fd)
 {
   static int count = 0;
@@ -1660,6 +1700,9 @@ int req_handle_crashack(ChMessage *msg,SOCKET fd)
     req_handle_initnodetab(NULL,req_clients[_crash_socket_index]);
     _last_crash = 0;
     count = 0;
+#ifdef _FAULT_MLOG_
+       last_crashed_fd=-1;
+#endif
   }
 }
 #endif
@@ -1676,7 +1719,9 @@ void error_in_req_serve_client(SOCKET fd){
                }
        }
        fflush(stdout);
+#ifndef _FAULT_MLOG_
        skt_close(fd);
+#endif
        crashed_pe = i;
        node_index = i-nodetab_rank(crashed_pe);
        for(i=0;i<nodetab_rank0_size;i++){
@@ -1701,6 +1746,9 @@ void error_in_req_serve_client(SOCKET fd){
        }       
        socket_index = i;
        reconnect_crashed_client(socket_index,crashed_node);
+#ifdef _FAULT_MLOG_
+       skt_close(fd);
+#endif
 }
 #endif
 
@@ -1715,8 +1763,18 @@ int req_handler_dispatch(ChMessage *msg,SOCKET replyFd)
 
   /* grab request data */
   recv_status = ChMessageData_recv(replyFd,msg);
-#ifdef __FAULT__       
+#ifdef __FAULT__
+#ifdef _FAULT_MLOG_
+ if(recv_status < 0){
+        if(replyFd == last_crashed_fd){
+            return REQ_OK;
+        }
+        DEBUGF(("recv_status %d on socket %d \n",recv_status,replyFd));
+        error_in_req_serve_client(replyFd);
+    }
+#else  
   if(recv_status < 0)  error_in_req_serve_client(replyFd);
+#endif
 #endif
 
        if (strcmp(cmd,"ping")==0)       return REQ_OK;
@@ -3294,8 +3352,24 @@ void refill_nodetab_entry(int crashed_node){
        int pe =  nodetab_rank0_table[crashed_node];
        nodetab_host *h = nodetab_table[pe];
        *h = *(replacement_host(pe));
+#ifdef _FAULT_MLOG_
+fprintf(stderr,"Charmrun>>> New pe %d is on host %s \n",pe,nodetab_name(pe));
+#endif
 }
 
+#ifdef _FAULT_MLOG_
+nodetab_host *replacement_host(int pe){
+    int x=loaded_max_pe+1;
+
+    x = x%arg_read_pes;
+    loaded_max_pe +=1;
+/*  while(x == pe){
+ *       x = rand()%nodetab_size;    
+ *           }*/
+    fprintf(stderr,"Charmrun>>> replacing pe %d with %d host %s with %s \n",pe,x,nodetab_name(pe),nodetab_name(x));
+    return nodetab_table[x];
+}
+#else
 nodetab_host *replacement_host(int pe){
        int x=pe;
        while(x == pe){
@@ -3303,6 +3377,7 @@ nodetab_host *replacement_host(int pe){
        }
        return nodetab_table[x];
 }
+#endif
 
 void reconnect_crashed_client(int socket_index,int crashed_node){
        int i;
@@ -3325,6 +3400,7 @@ void reconnect_crashed_client(int socket_index,int crashed_node){
                fprintf(stderr,"Charmrun: Bad initnode data length. Aborting\n");
                fprintf(stderr,"Charmrun: possibly because: %s.\n", msg.data);
                }
+fprintf(stdout,"socket_index %d crashed_node %d reconnected fd %d  \n",socket_index,crashed_node,req_clients[socket_index]);
                /** update the nodetab entry corresponding to
                this node, skip the restarted one */
                in = (ChSingleNodeinfo *)msg.data;
index c350743abf37271d779ab67e2c032bd7a96c2e88..352952d01e5d11fae0e2aba58fd184b62cdb1c89 100644 (file)
@@ -4,6 +4,6 @@ static void crash_node_handle(ChMessage *m){
        crashed_node = ChMessageInt(d[0]);
        /* tell charmrun we knew */
        ctrl_sendone_nolock("crash_ack",NULL,0,NULL,0);
-       fprintf(stdout,"[%d] got crash mesg for %d \n",CmiMyPe(),crashed_node);
+       // fprintf(stdout,"[%d] got crash mesg for %d \n",CmiMyPe(),crashed_node);
 }
 
index 060efc9ae729d76c80d8837e7803558e4d332fa7..5327bc064917f68e114c6ce3cb48a3c8231070f4 100644 (file)
@@ -1211,7 +1211,7 @@ static void ctrl_getone(void)
        /** A processor crashed and got recreated. So charmrun sent 
          across the whole nodetable data to update this processor*/
        node_addresses_store(&msg);
-       fprintf(stdout,"nodetable added %d\n",CmiMyPe());
+       // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
   }
 #endif
   else {
index c8f27c9cd89a3c1e0ea1fc277a112b57ef7fc4a2..b05bb32b2ce2e9311f2f4c9e3c3e0b314d77cd57 100644 (file)
@@ -350,6 +350,17 @@ void CkPupArrayElementsData(PUP::er &p, int notifyListeners)
        }
 }
 
+#ifdef _FAULT_MLOG_
+int  CkCountArrayElements(){
+    int numGroups = CkpvAccess(_groupIDTable)->size();
+    int i;
+    ElementCounter  counter;
+    CKLOCMGR_LOOP(mgr->iterate(counter););
+  int numElements = counter.getCount();
+    return numElements;
+}
+#endif
+
 void CkPupProcessorData(PUP::er &p)
 {
     // save readonlys, and callback BTW
index a3beff96c77905b2377e9db663bf35f82d75a13f..704a3272db5bdc09badedbc403d43da0b52e37d1 100644 (file)
@@ -68,6 +68,9 @@ void CkRemoveArrayElements();
 
 void CkStartCheckpoint(char* dirname,const CkCallback& cb);
 void CkRestartMain(const char* dirname, CkArgMsg *args);
+#ifdef _FAULT_MLOG_
+int  CkCountArrayElements();
+#endif
 
 
 #endif //_CKCHECKPOINT_H
index 2cc12ef80604cb2b4aec38da0e27955c6211363d..ac3128f55a04452d75d9784e38f957c875c13d82 100644 (file)
@@ -25,7 +25,7 @@ CpvExtern(void *, CkGridObject);
 
 /************************** Debugging Utilities **************/
 //For debugging: convert given index to a string
-static const char *idx2str(const CkArrayIndex &ind)
+const char *idx2str(const CkArrayIndex &ind)
 {
        static char retBuf[80];
        retBuf[0]=0;
@@ -37,7 +37,7 @@ static const char *idx2str(const CkArrayIndex &ind)
        return retBuf;
 }
 
-static const char *idx2str(const CkArrayMessage *m)
+const char *idx2str(const CkArrayMessage *m)
 {
        return idx2str(((CkArrayMessage *)m)->array_index());
 }
@@ -1905,6 +1905,21 @@ void CkLocMgr::informHome(const CkArrayIndex &idx,int nowOnPe)
        }
 }
 
+#ifdef _FAULT_MLOG_
+CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx,
+        CmiBool forMigration, CmiBool ignoreArrival,
+        CmiBool notifyHome,int dummy)
+{
+    int localIdx=nextFree();
+    DEBC((AA"Adding new record for element %s at local index %d\n"AB,idx2str(idx),localIdx));
+    CkLocRec_local *rec=new CkLocRec_local(this,forMigration,ignoreArrival,idx,localIdx);
+    if(!dummy){
+        insertRec(rec,idx); //Add to global hashtable
+    }   
+    if (notifyHome) informHome(idx,CkMyPe());
+    return rec; 
+}
+#else
 CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx, 
                CmiBool forMigration, CmiBool ignoreArrival,
                CmiBool notifyHome)
@@ -1918,6 +1933,7 @@ CkLocRec_local *CkLocMgr::createLocal(const CkArrayIndex &idx,
        if (notifyHome) informHome(idx,CkMyPe());
        return rec;
 }
+#endif
 
 //Add a new local array element, calling element's constructor
 CmiBool CkLocMgr::addElement(CkArrayID id,const CkArrayIndex &idx,
@@ -2272,7 +2288,62 @@ void CkLocMgr::iterate(CkLocIterator &dest) {
 
 
 /************************** LocMgr: MIGRATION *************************/
-
+#ifdef _FAULT_MLOG_
+void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
+        CkElementCreation_t type,int dummy)
+{
+    p.comment("-------- Array Location --------");
+    register ManagerRec *m;
+    int localIdx=rec->getLocalIndex();
+    CkVec<CkMigratable *> dummyElts;
+
+    for (m=firstManager;m!=NULL;m=m->next) {
+        int elCType;
+        if (!p.isUnpacking())
+        { //Need to find the element's existing type
+            CkMigratable *elt=m->element(localIdx);
+            if (elt) elCType=elt->ckGetChareType();
+            else elCType=-1; //Element hasn't been created
+        }
+        p(elCType);
+        if (p.isUnpacking() && elCType!=-1) {
+            CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
+            int migCtorIdx=_chareTable[elCType]->getMigCtor();
+                if(!dummy){
+                    if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
+                               }else{
+                    CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
+                    i.locRec=rec;
+                    i.chareType=_entryTable[migCtorIdx]->chareIdx;
+                    dummyElts.push_back(elt);
+                    if (!rec->invokeEntry(elt,NULL,migCtorIdx,CmiTrue)) return ;
+                }
+        }
+    }
+    if(!dummy){
+        for (m=firstManager;m!=NULL;m=m->next) {
+            CkMigratable *elt=m->element(localIdx);
+            if (elt!=NULL)
+                {
+                       elt->pup(p);
+                }
+        }
+    }else{
+            for(int i=0;i<dummyElts.size();i++){
+                CkMigratable *elt = dummyElts[i];
+                if (elt!=NULL){
+            elt->pup(p);
+                       }
+                delete elt;
+            }
+                       for (ManagerRec *m=firstManager;m!=NULL;m=m->next) {
+                m->elts.empty(localIdx);
+            }
+        freeList[localIdx]=firstFree;
+        firstFree=localIdx;
+    }
+}
+#else
 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
                CkElementCreation_t type)
 {
@@ -2308,6 +2379,7 @@ void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
                 }
        }
 }
+#endif
 
 /// Call this member function on each element of this location:
 void CkLocMgr::callMethod(CkLocRec_local *rec,CkMigratable_voidfn_t fn)
@@ -2516,6 +2588,18 @@ void CkLocMgr::restore(const CkArrayIndex &idx, PUP::er &p)
 
 
 /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
+#ifdef _FAULT_MLOG_
+void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p,int dummy)
+{
+    CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,CmiTrue && !dummy /* home doesn't know yet */,dummy );
+        
+    pupElementsFor(p,rec,CkElementCreation_resume,dummy);
+
+    if(!dummy){
+        callMethod(rec,&CkMigratable::ckJustMigrated);
+    }
+}
+#else
 void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p)
 {
        CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,CmiTrue /* home doesn't know yet */ );
@@ -2525,6 +2609,7 @@ void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p)
 
        callMethod(rec,&CkMigratable::ckJustMigrated);
 }
+#endif
 
 /********************* LocMgr: UTILITY ****************/
 void CkMagicNumber_impl::badMagicNumber(
@@ -2585,6 +2670,13 @@ static const char *rec2str[]={
     "dead"//Deleted element (for debugging)
 };
 
+#ifdef _FAULT_MLOG_
+void CkLocMgr::setDuringMigration(CmiBool _duringMigration){
+    duringMigration = _duringMigration;
+}
+#endif
+
+
 //Add given element array record at idx, replacing the existing record
 void CkLocMgr::insertRec(CkLocRec *rec,const CkArrayIndex &idx) {
        CkLocRec *old=elementNrec(idx);
index 1d76eed1f17ed8f42c6d6e585139ff20487b774c..4d0860496b07ae5cc33c9620667e5480632832bf 100644 (file)
@@ -605,13 +605,22 @@ public:
        /// Return true if this array element lives on another processor
        bool isRemote(const CkArrayIndex &idx,int *onPe) const;
 
+#ifdef _FAULT_MLOG_
+       //mark the duringMigration variable .. used for parallel restart
+       void setDuringMigration(CmiBool _duringMigration);
+#endif
+
        /// Pass each of our locations (each separate array index) to this destination.
        void iterate(CkLocIterator &dest);
 
        /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup), skip listeners
        void restore(const CkArrayIndex &idx, PUP::er &p);
        /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
+#ifdef _FAULT_MLOG_
+       void resume(const CkArrayIndex &idx, PUP::er &p,int dummy=0);
+#else
        void resume(const CkArrayIndex &idx, PUP::er &p);
+#endif
 
 //Communication:
        void immigrate(CkArrayElementMigrateMessage *msg);
@@ -625,6 +634,11 @@ public:
        void flushAllRecs(void);
        void pup(PUP::er &p);
        
+       //Look up array element in hash table.  Index out-of-bounds if not found.
+       CkLocRec *elementRec(const CkArrayIndex &idx);
+       //Look up array element in hash table.  Return NULL if not there.
+       CkLocRec *elementNrec(const CkArrayIndex &idx);
+
 private:
 //Internal interface:
        //Add given element array record at idx, replacing the existing record
@@ -634,17 +648,18 @@ private:
        //Insert a remote record at the given index
        CkLocRec_remote *insertRemote(const CkArrayIndex &idx,int nowOnPe);
 
-       //Look up array element in hash table.  Index out-of-bounds if not found.
-       CkLocRec *elementRec(const CkArrayIndex &idx);
-       //Look up array element in hash table.  Return NULL if not there.
-       CkLocRec *elementNrec(const CkArrayIndex &idx);
        //Remove this entry from the table (does not delete record)
        void removeFromTable(const CkArrayIndex &idx);
 
        friend class CkLocation; //so it can call pupElementsFor
        friend class ArrayElement;
+#ifdef _FAULT_MLOG_
+ void pupElementsFor(PUP::er &p,CkLocRec_local *rec,
+        CkElementCreation_t type,int dummy=0);
+#else
        void pupElementsFor(PUP::er &p,CkLocRec_local *rec,
                CkElementCreation_t type);
+#endif
 
        /// Call this member function on each element of this location:
        typedef void (CkMigratable::* CkMigratable_voidfn_t)(void);
@@ -653,9 +668,15 @@ private:
        CmiBool deliverUnknown(CkArrayMessage *msg,CkDeliver_t type,int opts);
 
        /// Create a new local record at this array index.
+#ifdef _FAULT_MLOG_
+CkLocRec_local *createLocal(const CkArrayIndex &idx,
+        CmiBool forMigration, CmiBool ignoreArrival,
+        CmiBool notifyHome,int dummy=0);
+#else
        CkLocRec_local *createLocal(const CkArrayIndex &idx, 
                CmiBool forMigration, CmiBool ignoreArrival,
                CmiBool notifyHome);
+#endif
 
 //Data Members:
        //Map array ID to manager and elements
index 9878d97bdc8c1820625e3e97b8eb182ce5ec0a63..177613ef228cb43a719c36bfe2c5addf13835302 100644 (file)
@@ -67,12 +67,14 @@ CkGroupID ckCheckPTGroupID;         // readonly
 /// @todo the following declarations should be moved into a separate file for all 
 // fault tolerant strategies
 
+#ifdef CMK_MEM_CHECKPOINT
 // name of the kill file that contains processes to be killed 
 char *killFile;                                               
 // flag for the kill file         
 int killFlag=0;
 // variable for storing the killing time
 double killTime=0.0;
+#endif
 
 /// checkpoint buffer for processor system data
 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
@@ -984,6 +986,7 @@ void CkRegisterRestartHandler( )
 /**
  *  * @brief: function for killing a process                                             
  *   */
+#ifdef CMK_MEM_CHECKPOINT
 #if CMK_HAS_GETPID
 void killLocal(void *_dummy,double curWallTime){
         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
@@ -998,8 +1001,9 @@ void killLocal(void *_dummy,double curWallTime){
   CmiAbort("kill() not supported!");
 }
 #endif
+#endif
 
-
+#ifdef CMK_MEM_CHECKPOINT
 /**
  * @brief: reads the file with the kill information
  */
@@ -1013,13 +1017,13 @@ void readKillFile(){
         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
                 if(proc == CkMyPe()){
                         killTime = CmiWallTimer()+sec;
-                        printf("[%d] To be killed after %.6lf s \n",CkMyPe(),sec);
+                        printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
                         CcdCallFnAfter(killLocal,NULL,sec*1000);
                 }
         }
         fclose(fp);
 }
-
+#endif
 
 #include "CkMemCheckpoint.def.h"
 
index addcdca2311157b30f9c8fe0d50598653a9af259..777da25053b195e1668eece113e0a70df03d3178 100644 (file)
@@ -1,5 +1,4 @@
-#ifdef _FAULT_MLOG_
-
+#include "charm.h"
 #include "ck.h"
 #include "ckmessagelogging.h"
 #include "queueing.h"
@@ -7,10 +6,12 @@
 #include <signal.h>
 #include "CentralLB.h"
 
+#ifdef _FAULT_MLOG_
+
 //#define DEBUG(x)  if(_restartFlag) {x;}
 #define DEBUG(x)  //x
 #define DEBUGRESTART(x)  //x
-#define DEBUGLB(x) //x
+#define DEBUGLB(x) // x
 
 #define BUFFERED_LOCAL
 #define BUFFERED_REMOTE
@@ -290,9 +291,9 @@ void _messageLoggingInit(){
 //     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
        CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
 //     printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
-       if(killFlag){
-               readKillFile();
-       }
+//     if(killFlag){
+//             readKillFile();
+//     }
 }
 
 void killLocal(void *_dummy,double curWallTime);       
@@ -307,7 +308,7 @@ void readKillFile(){
        while(fscanf(fp,"%d %lf",&proc,&sec)==2){
                if(proc == CkMyPe()){
                        killTime = CmiWallTimer()+sec;
-                       printf("[%d] To be killed after %.6lf s \n",CkMyPe(),sec);
+                       printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
                        CcdCallFnAfter(killLocal,NULL,sec*1000);        
                }
        }
@@ -1340,7 +1341,7 @@ void retryTicketRequest(void *_dummy,double curWallTime){
        DEBUG(CmiMemoryCheck());
 }
 
-void _pingHandler(PingMsg *msg){
+void _pingHandler(CkPingMsg *msg){
        printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
        CmiFree(msg);
 }
@@ -1482,7 +1483,9 @@ void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
                                listToSkip[j].idx.print();
                        }
                }
-                       
+               
+ printf("numElements = %d\n",numElements);
+       
          for (int i=0; i<numElements; i++) {
                        CkGroupID gID;
                        CkArrayIndexMax idx;
@@ -1623,7 +1626,8 @@ void sendRemoveLogRequests(){
 void _checkpointAckHandler(CheckPointAck *ackMsg){
        DEBUG(CmiMemoryCheck());
        unAckedCheckpoint=0;
-       DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));    
+       DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
+       DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
        if(onGoingLoadBalancing){
                onGoingLoadBalancing = 0;
                finishedCheckpointLoadBalancing();
@@ -1756,7 +1760,7 @@ void clearUpMigratedRetainedLists(int PE){
 /**
  * Function for restarting an object with message logging
  */
-void CkMlogRestart(const char * dummy, CkArgMsg *dummyMsg){
+void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
        printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
        fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
        _restartFlag = 1;
@@ -1767,7 +1771,7 @@ void CkMlogRestart(const char * dummy, CkArgMsg *dummyMsg){
 };
 
 void CkMlogRestartDouble(void *,double){
-       CkMlogRestart(NULL);
+       CkMlogRestart(NULL,NULL);
 };
 
 
@@ -2744,6 +2748,7 @@ void forAllCharesDo(MlogFn fnPointer,void *data){
 ******************************************************************/
 
 void initMlogLBStep(CkGroupID gid){
+       DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
        countLBMigratedAway = 0;
        countLBToMigrate=0;
        onGoingLoadBalancing=1;
@@ -2914,6 +2919,7 @@ void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
 
 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
        DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
+       DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
        sendRemoveLogRequests();
        (*resumeLbFnPtr)(centralLb);
        CmiFree(msg);
index 3de32666c85bc251925f7ffab8c2b8c65b9ccfa9..4671c098552560da086d116ac63bfbb65a28d17c 100644 (file)
@@ -309,7 +309,7 @@ typedef struct{
        int PE;
 } RestartRequest;
 
-typedef RestartRequest PingMsg;
+typedef RestartRequest CkPingMsg;
 typedef RestartRequest CheckpointRequest;
 
 typedef struct{
@@ -455,7 +455,7 @@ void _ticketRequestHandler(TicketRequest *);
 void _ticketHandler(TicketReply *);
 void _localMessageCopyHandler(LocalMessageLog *);
 void _localMessageAckHandler(LocalMessageLogAck *);
-void _pingHandler(PingMsg *msg);
+void _pingHandler(CkPingMsg *msg);
 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
index 5b88d3c270fe203753636118b67daa1ac98b4b32..1fe9756bf7cf032711999313140291f9306a30cc 100644 (file)
@@ -68,7 +68,10 @@ waits for the migrant contributions to straggle in.
 #else
 //No debugging info-- empty defines
 #define DEBUGRED 0
-#define DEBR(x) //CkPrintf x
+#define DEBR(x) // CkPrintf x
+#define DEBRMLOG(x) CkPrintf x
+#define AA
+#define AB
 #define DEBN(x) //CkPrintf x
 #define RED_DEB(x) //CkPrintf x
 #define DEBREVAC(x) //CkPrintf x
@@ -656,7 +659,6 @@ void CkReductionMgr::finishReduction(void)
             return;
         }
     }
-
     if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
         DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
          return;//Need more local messages
@@ -667,13 +669,13 @@ void CkReductionMgr::finishReduction(void)
 
 #endif
 
-
   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
   CkReductionMsg *result=reduceMessages();
   result->redNo=redNo;
 #ifndef _FAULT_MLOG_
 
   result->gcount+=gcount+adj(redNo).gcount;
+
   result->secondaryCallback = result->callback;
   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
        DEBR((AA"Reduced mesg gcount %d localgcount %d\n"AB,result->gcount,gcount));
@@ -717,12 +719,14 @@ void CkReductionMgr::finishReduction(void)
 #else
     {
 #endif
-       int i;
+//     int i;
        completedRedNo++;
-       for (i=1;i<adjVec.length();i++)
+       for (i=1;i<(int)(adjVec.length());i++){
           adjVec[i-1]=adjVec[i];
+       }
        adjVec.length()--;  
   }
+
   inProgress=CmiFalse;
   startRequested=CmiFalse;
   nRemote=nContrib=0;
@@ -754,7 +758,7 @@ countAdjustment &CkReductionMgr::adj(int number)
   number--;
   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
   //Pad the adjustment vector with zeros until it's at least number long
-  while (adjVec.length()<=number)
+  while ((int)(adjVec.length())<=number)
     adjVec.push_back(countAdjustment());
   return adjVec[number];
 }
@@ -1049,7 +1053,7 @@ void CkReductionMgr :: endArrayReduction(){
 #if DEBUGRED
        CkPrintf("[%d,%d]------------END OF GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),completedRedNo,thisgroup.idx,CkWallTimer());
 #endif
-       for (i=1;i<adjVec.length();i++)
+       for (i=1;i<(int)(adjVec.length());i++)
                adjVec[i-1]=adjVec[i];
        adjVec.length()--;
        endArrayReduction();
@@ -2106,6 +2110,7 @@ void CkNodeReductionMgr::pup(PUP::er &p)
   p | blocked;
   p | maxModificationRedNo;
 
+#ifndef _FAULT_MLOG_
   int isnull = (storedCallback == NULL);
   p | isnull;
   if (!isnull) {
@@ -2114,6 +2119,8 @@ void CkNodeReductionMgr::pup(PUP::er &p)
     }
     p|*storedCallback;
   }
+#endif
+
 }
 
 /*
@@ -2314,7 +2321,7 @@ void CkNodeReductionMgr::updateTree(){
                readyDeletion = true;
                additionalGCount = newAdditionalGCount;
                DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
-               for(int i=0;i<newKids.size();i++){
+               for(int i=0;i<(int)(newKids.size());i++){
                        DEBREVAC(("%d ",newKids[i]));
                }
                DEBREVAC(("\n"));
@@ -2377,14 +2384,14 @@ void CkNodeReductionMgr::DeleteChild(int deletedChild){
 }
 
 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
-       for(int i=0;i<newKids.length();i++){
+       for(int i=0;i<(int)(newKids.length());i++){
                if(newKids[i] == deletedChild){
                        newKids.remove(i);
                        break;
                }
        }
        DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
-       for(int i=0;i<newKids.size();i++){
+       for(int i=0;i<(int)(newKids.size());i++){
                DEBREVAC(("%d ",newKids[i]));
        }
        DEBREVAC(("\n"));
index 88f3a2ad1f36bdef68af610a0bd7226c2386b1e7..b644756dc7ebe610182838b78b836ed6768135c3 100644 (file)
@@ -234,14 +234,6 @@ static inline void _parseCommandLineOpts(char **argv)
     if(CmiGetArgFlagDesc(argv,"+Parallelrestart", "Parallel Restart with message logging protocol")){
         parallelRestart = true;
     }
-    if(CmiGetArgStringDesc(argv,"+killFile", &killFile,"Generates SIGKILL on specified processors")){
-        if(faultFunc == NULL){
-            killFlag = 1;
-            if(CmiMyPe() == 0){
-                printf("[%d] killFlag set to 1 for file %s\n",CkMyPe(),killFile);
-            }
-        }
-    }
     if(!CmiGetArgIntDesc(argv,"+mlog_local_buffer",&_maxBufferedMessages,"# of local messages buffered in the message logging protoocl")){
         _maxBufferedMessages = 2;
     }
index 4a4ae60c540bdafec0d7fc8d7890c66404f409e9..e604de17d9c678afd253ee6ec9bfaada9e3396c7 100644 (file)
@@ -155,6 +155,7 @@ public:
 
 public:
 #ifdef _FAULT_MLOG_
+       int step;
        int lbDecisionCount;
 #endif
   LBMigrateMsg(): level(0), n_moves(0), next_lb(0) {}
index 891b40793ddf53de59f665b5ff5e8347cdfaaa71..4aa7b761d65886c08550efecd0e73a77595d495c 100644 (file)
@@ -16,8 +16,8 @@
 #include "LBDBManager.h"
 #include "LBSimulation.h"
 
-#define  DEBUGF(x)        //CmiPrintf x;
-#define  DEBUG(x)      // x;
+#define  DEBUGF(x)      //  CmiPrintf x;
+#define  DEBUG(x)       // x;
 
 #if CMK_MEM_CHECKPOINT
    /* can not handle reduction in inmem FT */
@@ -201,6 +201,10 @@ void CentralLB::ProcessAtSync()
     start_lb_time = CkWallTimer();
   }
 
+#ifdef _FAULT_MLOG_
+       initMlogLBStep(thisgroup);
+#endif
+
   // build message
   BuildStatsMsg();
 
@@ -251,6 +255,9 @@ void CentralLB::BuildStatsMsg()
   int npes = CkNumPes();
   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
   msg->from_pe = CkMyPe();
+#ifdef _FAULT_MLOG_
+       msg->step = step();
+#endif
   //msg->serial = CrnRand();
 
 /*
@@ -297,6 +304,7 @@ void CentralLB::SendStats()
   }
   else
 #endif
+       DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
   thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
 
   statsMsg = NULL;
@@ -433,7 +441,7 @@ void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
   {
     CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage(num);
     const int pe = m->from_pe;
-    DEBUGF(("Stats msg received, %d %d %d %p\n", pe,stats_msg_count,m->n_objs,m));
+       DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
 #ifdef _FAULT_MLOG_     
 /*      
  *              if(m->step < step()){
@@ -483,6 +491,7 @@ void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
   const int clients = CkNumValidPes();
  
   if (stats_msg_count == clients) {
+       DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
     statsData->count = stats_msg_count;
     thisProxy[CkMyPe()].LoadBalance();
   }
@@ -558,6 +567,9 @@ void CentralLB::LoadBalance()
 
   double strat_start_time = CkWallTimer();
   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
+#ifdef _FAULT_MLOG_
+       migrateMsg->step = step();
+#endif
   if (_lb_args.debug()) {
     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
@@ -593,6 +605,7 @@ void CentralLB::LoadBalance()
     lbDecisionCount++;
     migrateMsg->lbDecisionCount = lbDecisionCount;
 #endif
+ CkPrintf("ReceiveMigration called at %.6lf \n",CmiWallTimer());
   thisProxy.ReceiveMigration(migrateMsg);
 
   // Zero out data structures for next cycle
@@ -713,7 +726,22 @@ extern int restarted;
 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
 {
 #if CMK_LBDB_ON
-  int i;
+       int i;
+
+#ifdef _FAULT_MLOG_
+       int *dummyCounts;
+
+       DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
+       // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
+       if(step() > m->step){
+               char str[100];
+               envelope *env = UsrToEnv(m);
+               CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
+               return;
+       }
+       lbDecisionCount = m->lbDecisionCount;
+#endif
+
   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
 /*FAULT_EVAC*/
@@ -726,8 +754,11 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
 #ifdef _FAULT_MLOG_
        int sending=0;
     int dummy=0;
-       int *dummyCounts;
        LBDB *_myLBDB = theLbdb->getLBDB();
+       if(_restartFlag){
+        dummyCounts = new int[CmiNumPes()];
+        bzero(dummyCounts,sizeof(int)*CmiNumPes());
+    }
 #endif
   for(i=0; i < m->n_moves; i++) {
     MigrateInfo& move = m->moves[i];
@@ -763,6 +794,16 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
   }
   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
+
+#ifdef _FAULT_MLOG_
+       if(_restartFlag){
+               sendDummyMigrationCounts(dummyCounts);
+               _restartFlag  =0;
+       delete []dummyCounts;
+       }
+#endif
+
+
 #if 0
   if (m->n_moves ==0) {
     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
@@ -785,6 +826,16 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
 #endif
 }
 
+#ifdef _FAULT_MLOG_
+void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
+    DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
+    //TODO: this is gonna be important when a crash happens during checkpoint
+    //the globalDecisionCount would have to be saved and compared against
+    //a future recvMigration
+                
+       thisProxy[CkMyPe()].ResumeClients(1);
+}
+#endif
 
 void CentralLB::MigrationDone(int balancing)
 {
@@ -846,6 +897,7 @@ void CentralLB::endMigrationDone(int balancing){
 
 #ifdef _FAULT_MLOG_
 void resumeCentralLbAfterChkpt(void *_lb){
+       DEBUGF(("[%d] HERE\n"));
     CentralLB *lb= (CentralLB *)_lb;
     CpvAccess(_currentObj)=lb;
     lb->endMigrationDone(lb->savedBalancing);
index acf166a25001c7132cc13bf91bc3bd136a89b176..15b2d9ededbd53d6902d5d58b33aa12cff581c1d 100644 (file)
@@ -104,6 +104,9 @@ public:
                                                // to be resumed via message
   void ResumeClients(CkReductionMsg *);
   void ReceiveMigration(LBMigrateMsg *);       // Receive migration data
+#ifdef _FAULT_MLOG_
+       void ReceiveDummyMigration(int _step);
+#endif
   void MissMigrate(int waitForBarrier);
 
   // manual predictor start/stop
@@ -280,6 +283,10 @@ public:
 
   char * avail_vector;
   int next_lb;
+#ifdef _FAULT_MLOG_
+       int step;
+#endif
+
 public:
   CLBStatsMsg(int osz, int csz);
   CLBStatsMsg(): from_pe(0), pe_speed(0), 
index 2f3d0ee3444bd7f3c2f94aa180c7a6e291f9f62d..52bf15f8bc45e14e2dfc0af91f84b06da6ec0457 100644 (file)
@@ -327,7 +327,6 @@ void LBDatabase::initnodeFn()
 void LBDatabase::init(void) 
 {
   myLDHandle = LDCreate();
-
   mystep = 0;
   nloadbalancers = 0;
   new_ld_balancer = 0;
@@ -438,6 +437,7 @@ void LBDatabase::pup(PUP::er& p)
   int np;
   if (!p.isUnpacking()) np = CkNumPes();
   p|np;
+       CmiAssert(avail_vector);
   // in case number of processors changes
   if (p.isUnpacking() && np > CkNumPes()) {
     CmiLock(avail_vector_lock);
@@ -447,6 +447,7 @@ void LBDatabase::pup(PUP::er& p)
     CmiUnlock(avail_vector_lock);
   }
   p(avail_vector, np);
+       p|mystep;
 }
 
 
index 9f2fc93562b7749c4289d26ca65371480821861c..9b215246b529e71cdc5b7b5bc3296c5a7bd359aa 100644 (file)
@@ -116,6 +116,11 @@ inline int CkHashCompare_int(const void *k1,const void *k2,size_t /*len*/)
 inline int CkHashCompare_pointer(const void *k1,const void *k2,size_t /*len*/)
        {return *(char **)k1 == *(char **)k2;}
 
+#ifdef _FAULT_MLOG_
+extern int countHashRefs;
+extern int countHashCollisions;
+#endif
+
 ///////////////////////// Hashtable //////////////////////
 
 class CkHashtableIterator;
@@ -367,6 +372,27 @@ public:
                        this->inc(i);
                };
        }
+
+#ifdef _FAULT_MLOG_
+       OBJ *getPointer(const KEY &key) {
+        countHashRefs++;
+        int i=key.hash()%this->len;
+        while(1) {//Assumes key or empty slot will be found
+            char *cur=this->entry(i);
+                       //An empty slot indicates the key is not here
+            if (this->layout.isEmpty(cur)){
+                return NULL;
+            }
+                       //Is this the key?
+            if (key.compare(*(KEY *)this->layout.getKey(cur)))
+                return (OBJ *)this->layout.getObject(cur);
+            this->inc(i);
+            countHashCollisions++;
+        };
+        return NULL;
+    }
+#endif
+
        //Use this version when you're sure the entry exists--
        // avoids the test for an empty entry
        OBJ &getRef(const KEY &key) {