Introducing support for message logging fault tolerance
authorEsteban Meneses <emenese2@illinois.edu>
Fri, 3 Apr 2009 19:13:30 +0000 (19:13 +0000)
committerEsteban Meneses <emenese2@illinois.edu>
Fri, 3 Apr 2009 19:13:30 +0000 (19:13 +0000)
19 files changed:
src/arch/util/immediate.c
src/ck-core/charm++.h
src/ck-core/ck.C
src/ck-core/ckarray.C
src/ck-core/ckarray.h
src/ck-core/ckarrayreductionmgr.C
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-core/ckreduction.C
src/ck-core/ckreduction.h
src/ck-core/envelope.h
src/ck-core/init.C
src/ck-ldb/CentralLB.C
src/ck-ldb/CentralLB.h
src/conv-core/msgmgr.c
src/libs/ck-libs/ampi/ampi.C
src/libs/ck-libs/tcharm/tcharm.C
src/scripts/Make.depends
src/scripts/Makefile

index c34f9da144f4266b584af0347d2626361b0fd67a..69f9d192ffd447204ca10dbc4ad424dbd5291a42 100644 (file)
@@ -71,6 +71,9 @@ SMP: This routine must be called holding immRecvLock
  */
 void CmiHandleImmediateMessage(void *msg) {
 /*  int handlerNo=CmiGetXHandler(msg); */
+#ifdef _FAULT_MLOG_
+        CmiAssert(0);
+#endif
   int handlerNo=CmiImmediateHandler(msg);
   MACHSTATE2(4,"immediate message handler %d %d", CmiGetHandler(msg), handlerNo)
 /*  CmiHandlerInfo *h=&CpvAccessOther(CmiHandlerTable,0)[handlerNo]; */
index 8656a4fe274f39895a48cec593f9d3af718733cd..a623d0b113fc6391787bde9942dc4e3869ac84d8 100644 (file)
@@ -399,6 +399,12 @@ PUPmarshall(CkArrayID)
 // for object message queue
 #include "ckobjQ.h"
 
+
+#ifdef _FAULT_MLOG_
+class ChareMlogData;
+#endif
+
+
 /**
   The base class of all parallel objects in Charm++,
   including Array Elements, Groups, and NodeGroups.
@@ -410,6 +416,9 @@ class Chare {
     CkObjectMsgQ objQ;                // object message queue
 #endif
   public:
+#ifdef _FAULT_MLOG_
+                ChareMlogData *mlogData;
+#endif
     Chare(CkMigrateMessage *m);
     Chare();
     virtual ~Chare(); //<- needed for *any* child to have a virtual destructor
@@ -988,7 +997,9 @@ if(CpvAccess(networkProgressCount) >=  p)  \
 #endif
 
 
-
+#ifdef _FAULT_MLOG_
+#include "ckmessagelogging.h"
+#endif
 #include "ckmemcheckpoint.h"
 #include "readonly.h"
 #include "ckarray.h"
index 2db2b61cafe11d680600046c112751ed21b45ced..6f6b1b742a1b38aef3741c8d4d403360eb8abc2e 100644 (file)
@@ -121,6 +121,11 @@ extern int _defaultObjectQ;
 Chare::Chare(void) {
   thishandle.onPE=CkMyPe();
   thishandle.objPtr=this;
+#ifdef _FAULT_MLOG_
+        mlogData = new ChareMlogData();
+        mlogData->objID.type = TypeChare;
+        mlogData->objID.data.chare.id = thishandle;
+#endif
 #if CMK_OBJECT_QUEUE_AVAILABLE
   if (_defaultObjectQ)  CkEnableObjQ();
 #endif
@@ -129,6 +134,7 @@ Chare::Chare(void) {
 Chare::Chare(CkMigrateMessage* m) {
   thishandle.onPE=CkMyPe();
   thishandle.objPtr=this;
+
 #if CMK_OBJECT_QUEUE_AVAILABLE
   if (_defaultObjectQ)  CkEnableObjQ();
 #endif
@@ -147,6 +153,12 @@ void Chare::pup(PUP::er &p)
 {
   p(thishandle.onPE);
   thishandle.objPtr=(void *)this;
+#ifdef _FAULT_MLOG_
+        if(p.isUnpacking()){
+                mlogData = new ChareMlogData();
+        }
+        mlogData->pup(p);
+#endif
 }
 
 int Chare::ckGetChareType() const {
@@ -183,6 +195,11 @@ void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
 
 IrrGroup::IrrGroup(void) {
   thisgroup = CkpvAccess(_currentGroup);
+#ifdef _FAULT_MLOG_
+        mlogData->objID.type = TypeGroup;
+        mlogData->objID.data.group.id = thisgroup;
+        mlogData->objID.data.group.onPE = CkMyPe();
+#endif
 }
 
 IrrGroup::~IrrGroup() {
@@ -484,6 +501,10 @@ extern "C" int CkGetArgc(void) {
 /******************** Basic support *****************/
 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
 {
+#ifdef _FAULT_MLOG_
+        CpvAccess(_currentObj) = (Chare *)obj;
+//      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
+#endif
   //BIGSIM_OOC DEBUGGING
   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
   //fflush(stdout);
@@ -506,6 +527,9 @@ extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
   //fflush(stdout);
 
   void *deliverMsg;
+#ifdef _FAULT_MLOG_
+        CpvAccess(_currentObj) = (Chare *)obj;
+#endif
   if (_entryTable[epIdx]->noKeep)
   { /* Deliver a read-only copy of the message */
     deliverMsg=(void *)msg;
@@ -974,7 +998,21 @@ void _processHandler(void *converseMsg,CkCoreState *ck)
     if (!ck->watcher->processMessage(env,ck)) return;
   }
 //#endif
-
+#ifdef _FAULT_MLOG_
+        Chare *obj=NULL;
+        CkObjID sender;
+        MCount SN;
+        MlogEntry *entry=NULL;
+        if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg ||
+        env->getMsgtype() == ForArrayEltMsg){
+                sender = env->sender;
+                SN = env->SN;
+                int result = preProcessReceivedMessage(env,&obj,&entry);
+                if(result == 0){
+                        return;
+                }
+        }
+#endif
 
 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
   bracketStartCriticalPathMethod(env);
@@ -1053,6 +1091,12 @@ void _processHandler(void *converseMsg,CkCoreState *ck)
     default:
       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
   }
+#ifdef _FAULT_MLOG_
+        if(obj != NULL){
+                postProcessReceivedMessage(obj,sender,SN,entry);
+        }
+#endif
+
 
 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
   bracketEndCriticalPathMethod(env);
@@ -1163,10 +1207,29 @@ static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
     CmiSetHandler(env,index_skipCldHandler);
 #endif
     CmiSetInfo(env,infoFn);
-    if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
-    else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
+    if (pe==CLD_BROADCAST) {
+#ifdef _FAULT_MLOG_             
+                        CmiSyncBroadcast(len, (char *)env);
+#else
+                       CmiSyncBroadcastAndFree(len, (char *)env); 
+#endif
+
+}
+    else if (pe==CLD_BROADCAST_ALL) { 
+#ifdef _FAULT_MLOG_             
+                        CmiSyncBroadcastAll(len, (char *)env);
+#else
+                        CmiSyncBroadcastAllAndFree(len, (char *)env);
+#endif
+
+}
     else{
-                       CmiSyncSendAndFree(pe, len, (char *)env);
+#ifdef _FAULT_MLOG_             
+                        CmiSyncSend(pe, len, (char *)env);
+#else
+                        CmiSyncSendAndFree(pe, len, (char *)env);
+#endif
+
                }
   }
 }
@@ -1206,9 +1269,28 @@ static void _noCldNodeEnqueue(int node, envelope *env)
 */
   CkPackMessage(&env);
   int len=env->getTotalsize();
-  if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, (char *)env); }
-  else if (node==CLD_BROADCAST_ALL) { CmiSyncNodeBroadcastAllAndFree(len, (char *)env); }
-  else CmiSyncNodeSendAndFree(node, len, (char *)env);
+  if (node==CLD_BROADCAST) { 
+#ifdef _FAULT_MLOG_
+        CmiSyncNodeBroadcast(len, (char *)env);
+#else
+       CmiSyncNodeBroadcastAndFree(len, (char *)env); 
+#endif
+}
+  else if (node==CLD_BROADCAST_ALL) { 
+#ifdef _FAULT_MLOG_
+                CmiSyncNodeBroadcastAll(len, (char *)env);
+#else
+               CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
+#endif
+
+}
+  else {
+#ifdef _FAULT_MLOG_
+        CmiSyncNodeSend(node, len, (char *)env);
+#else
+       CmiSyncNodeSendAndFree(node, len, (char *)env);
+#endif
+  }
 }
 
 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
@@ -1340,6 +1422,9 @@ static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
 {
   int numPes;
   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
+#ifdef _FAULT_MLOG_
+        sendTicketGroupRequest(env,pe,_infoIdx);
+#else
   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
   _TRACE_CREATION_N(env, numPes);
   if (opts & CK_MSG_SKIP_OR_IMM)
@@ -1347,6 +1432,7 @@ static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
   else
     _skipCldEnqueue(pe, env, _infoIdx);
   _TRACE_CREATION_DONE(1);
+#endif
 }
 
 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
@@ -1465,6 +1551,9 @@ static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
 {
   int numPes;
   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
+#ifdef _FAULT_MLOG_
+        sendTicketNodeGroupRequest(env,node,_infoIdx);
+#else
   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
   _TRACE_CREATION_N(env, numPes);
   if (opts & CK_MSG_SKIP_OR_IMM) {
@@ -1476,6 +1565,7 @@ static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
   else
     CldNodeEnqueue(node, env, _infoIdx);
   _TRACE_CREATION_DONE(1);
+#endif
 }
 
 extern "C"
@@ -1600,12 +1690,16 @@ extern "C"
 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
   register envelope *env = UsrToEnv(msg);
   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
+#ifdef _FAULT_MLOG_
+        sendTicketArrayRequest(env,pe,_infoIdx);
+#else
   if (opts & CK_MSG_IMMEDIATE)
     CmiBecomeImmediate(env);
   if (opts & CK_MSG_SKIP_OR_IMM)
     _noCldEnqueue(pe, env);
   else
     _skipCldEnqueue(pe, env, _infoIdx);
+#endif
 }
 
 class ElementDestroyer : public CkLocIterator {
index d76b768ef492b64cec7413416e835b796435f605..7ef13ddc95abad64655db46b44b66f017e89c85f 100644 (file)
@@ -211,6 +211,10 @@ void ArrayElement::initBasics(void)
     CK_ARRAYLISTENER_LOOP(thisArray->listeners,
                          l->ckElementCreating(this));
   }
+#ifdef _FAULT_MLOG_
+        mlogData->objID.type = TypeArray;
+        mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
+#endif
 }
 
 ArrayElement::ArrayElement(void) 
@@ -323,6 +327,15 @@ void ArrayElement::CkAbort(const char *str) const
        CkMigratable::CkAbort(str);
 }
 
+#ifdef _FAULT_MLOG_
+void ArrayElement::recvBroadcast(CkMessage *m){
+        CkArrayMessage *bcast = (CkArrayMessage *)m;
+        envelope *env = UsrToEnv(m);
+  int epIdx= env->piggyBcastIdx;
+        ckInvokeEntry(epIdx,bcast,CmiTrue);
+};
+#endif
+
 /*********************** Spring Cleaning *****************
 Periodically (every minute or so) remove expired broadcasts
 from the queue.
@@ -854,7 +867,13 @@ CmiBool CkArrayBroadcaster::deliver(CkArrayMessage *bcast,ArrayElement *el)
   elBcastNo++;
   DEBB((AA"Delivering broadcast %d to element %s\n"AB,elBcastNo,idx2str(el)));
   int epIdx=bcast->array_ep_bcast();
+
+#ifdef _FAULT_MLOG_     
+        DEBUG(printf("[%d] elBcastNo %d bcastNo %d \n",CmiMyPe(),bcastNo));
+        return true;
+#else
   return el->ckInvokeEntry(epIdx,bcast,CmiFalse);
+#endif
 }
 
 /// Deliver all needed broadcasts to the given local element
@@ -922,6 +941,12 @@ void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
          _TRACE_CREATION_DETAILED(UsrToEnv(msg), ep);
          int skipsched = opts & CK_MSG_EXPEDITED; 
          //int serializer=0;//1623802937%CkNumPes();
+#ifdef _FAULT_MLOG_
+                CProxy_CkArray ap(_aid);
+                ap[CpvAccess(serializer)].sendBroadcast(msg);
+                CkGroupID _id = _aid;
+//              printf("[%d] At ckBroadcast in CProxy_ArrayBase id %d epidx %d \n",CkMyPe(),_id.idx,ep);
+#else
          if (CkMyPe()==CpvAccess(serializer))
          {
                DEBB((AA"Sending array broadcast\n"AB));
@@ -937,6 +962,7 @@ void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
                else
                        ap[CpvAccess(serializer)].sendBroadcast(msg);
          }
+#endif
        }
 }
 
@@ -964,6 +990,11 @@ void CkArray::recvBroadcast(CkMessage *m)
        CK_MAGICNUMBER_CHECK
        CkArrayMessage *msg=(CkArrayMessage *)m;
        broadcaster->incoming(msg);
+#ifdef _FAULT_MLOG_
+        _tempBroadcastCount=0;
+        locMgr->callForAllRecords(CkArray::staticBroadcastHomeElements,this,(void *)msg);
+#else
+
        //Run through the list of local elements
        int idx=0;
        ArrayElement *el;
@@ -973,6 +1004,8 @@ void CkArray::recvBroadcast(CkMessage *m)
 #endif
                broadcaster->deliver(msg,el);
        }
+#endif
+
 #if CMK_BLUEGENE_CHARM
                 BgEntrySplit("end-broadcast");
 #endif
index 16a16cdbdcaafc1cecd8079a917a56404f22d2af..d574dbe5175a09fbf72fa85e389a060b20cc54fa 100644 (file)
@@ -516,6 +516,9 @@ private:
 #endif
 public:
   void inmem_checkpoint(CkArrayCheckPTReqMessage *m);
+#ifdef _FAULT_MLOG_
+void recvBroadcast(CkMessage *);
+#endif
 #if CMK_GRID_QUEUE_AVAILABLE
 public:
   int grid_queue_interval;
@@ -535,7 +538,11 @@ template <class T>
 class ArrayElementT : public ArrayElement
 {
 public:
-  ArrayElementT(void): thisIndex(*(const T *)thisIndexMax.data()) {}
+  ArrayElementT(void): thisIndex(*(const T *)thisIndexMax.data()) {
+#ifdef _FAULT_MLOG_     
+        mlogData->objID.data.array.idx.asMax()=thisIndexMax;
+#endif
+}
   ArrayElementT(CkMigrateMessage *msg)
        :ArrayElement(msg),
        thisIndex(*(const T *)thisIndexMax.data()) {}
@@ -757,6 +764,10 @@ private:
   CkArrayBroadcaster *broadcaster; //Read-only copy of default broadcaster
 public:
   void flushStates() { CkReductionMgr::flushStates(); CK_ARRAYLISTENER_LOOP(listeners, l->flushState()); }
+#ifdef _FAULT_MLOG_
+    virtual int numberReductionMessages(){CkAssert(CkMyPe() == 0);return numInitial;}
+#endif
+
 };
 /*@}*/
 
index c89683e21006edc57ddb8db80677c8b33e116ed5..d3c4fc54865db18356d8f89a54d1092969449fe1 100644 (file)
@@ -176,6 +176,10 @@ void CkArrayReductionMgr::setAttachedGroup(CkGroupID groupID){
 
 
 void CkArrayReductionMgr::startNodeGroupReduction(int number,CkGroupID groupID){
+#ifdef _FAULT_MLOG_
+    Chare *oldObj =CpvAccess(_currentObj);
+    CpvAccess(_currentObj) = this;
+#endif
        ARPRINT("[%d] startNodeGroupReductions for red No %d my group %d attached group %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,this);
        if(attachedGroup.isZero()){
                setAttachedGroup(groupID);
@@ -184,6 +188,9 @@ void CkArrayReductionMgr::startNodeGroupReduction(int number,CkGroupID groupID){
        CkReductionNumberMsg *msg = new CkReductionNumberMsg(number);
        envelope::setSrcPe((char *)UsrToEnv(msg),CkMyNode());
        ((CkNodeReductionMgr *)this)->ReductionStarting(msg);
+#ifdef _FAULT_MLOG_
+    CpvAccess(_currentObj) = oldObj;
+#endif
 }
 
 int CkArrayReductionMgr::startLocalGroupReductions(int number){        
index 3eeb2ab33e18d375c7932e27d5bdde318b7631ba..d1dc15960be1bc76c180c664b03a2605f0ae19cb 100644 (file)
@@ -980,6 +980,9 @@ void CkMigratable::AtSync(int waitForMigration)
 {
        if (!usesAtSync)
                CkAbort("You must set usesAtSync=CmiTrue in your array element constructor to use AtSync!\n");
+#ifdef _FAULT_MLOG_
+    mlogData->toResumeOrNot=1;
+#endif
        myRec->AsyncMigrate(!waitForMigration);
        if (waitForMigration) ReadyMigrate(CmiTrue);
        ckFinishConstruction();
@@ -992,11 +995,27 @@ void CkMigratable::ReadyMigrate(CmiBool ready)
 {
        myRec->ReadyMigrate(ready);
 }
+
+#ifdef _FAULT_MLOG_
+    extern int globalResumeCount;
+#endif
+
 void CkMigratable::staticResumeFromSync(void* data)
 {
        CkMigratable *el=(CkMigratable *)data;
+#ifdef _FAULT_MLOG_
+    if(el->mlogData->toResumeOrNot ==0 || el->mlogData->resumeCount >= globalResumeCount){
+        return;
+    }
+#endif
        DEBL((AA"Element %s resuming from sync\n"AB,idx2str(el->thisIndexMax)));
+#ifdef _FAULT_MLOG_
+    CpvAccess(_currentObj) = el;
+#endif
        el->ResumeFromSync();
+#ifdef _FAULT_MLOG_
+    el->mlogData->resumeCount++;
+#endif
 }
 void CkMigratable::setMigratable(int migratable) 
 {
@@ -1397,6 +1416,11 @@ void CkLocRec_local::setMigratable(int migratable)
        else
          the_lbdb->NonMigratable(ldHandle);
 }
+#ifdef _FAULT_MLOG_
+void CkLocRec_local::Migrated(){
+    the_lbdb->Migrated(ldHandle, CmiTrue);
+}
+#endif
 #endif
 
 /**
@@ -1536,6 +1560,10 @@ public:
                if (opts & CK_MSG_KEEP)
                        msg = (CkArrayMessage *)CkCopyMsg((void **)&msg);
                buffer.enq(msg);
+#ifdef _FAULT_MLOG_
+        envelope *env = UsrToEnv(msg);
+        env->sender = CpvAccess(_currentObj)->mlogData->objID;
+#endif
                return CmiTrue;
        }
  
@@ -1546,8 +1574,18 @@ public:
                DEBS((AA" Delivering queued messages:\n"AB));
                CkArrayMessage *m;
                while (NULL!=(m=buffer.deq())) {
+#ifdef _FAULT_MLOG_         
+            DEBUG(CmiPrintf("[%d] buffered message being sent\n",CmiMyPe()));
+            envelope *env = UsrToEnv(m);
+            Chare *oldObj = CpvAccess(_currentObj);
+            CpvAccess(_currentObj) =(Chare *) env->sender.getObject();
+            env->sender.type = TypeInvalid;
+#endif
                        DEBS((AA"Sending buffered message to %s\n"AB,idx2str(m->array_index())));
                        myLocMgr->deliverViaQueue(m);
+#ifdef _FAULT_MLOG_         
+            CpvAccess(_currentObj) = oldObj;
+#endif
                }
        }
   
@@ -1705,10 +1743,72 @@ void CkLocMgr::pup(PUP::er &p){
                mapHandle=map->registerArray(emptyIndex,thisgroup);
                // _lbdb is the fixed global groupID
                initLB(lbdbID);
+
+#ifdef _FAULT_MLOG_     
+        int count;
+        p | count;
+        DEBUG(CmiPrintf("[%d] Unpacking Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
+        homeElementCount = count;
+
+        for(int i=0;i<count;i++){
+            CkArrayIndexMax idx;
+            int pe;
+            idx.pup(p);
+            p | pe;
+            DEBUG(CmiPrintf("[%d] idx %s is a home element exisiting on pe %d\n",CmiMyPe(),idx2str(idx),pe));
+            inform(idx,pe);
+            CkLocRec *rec = elementNrec(idx);
+            CmiAssert(rec!=NULL);
+            CmiAssert(lastKnown(idx) == pe);
+        }
+#endif
                // delay doneInserting when it is unpacking during restart.
                // to prevent load balancing kicking in
                if (!CkInRestarting()) 
                        doneInserting();
+       }else{
+ /**
+ * pack the indexes of elements which have their homes on this processor
+ * but dont exist on it.. needed for broadcast after a restart
+ * indexes of local elements dont need to be packed
+ * since they will be recreated later anyway
+ */
+#ifdef _FAULT_MLOG_     
+        int count=0,count1=0;
+        void *objp;
+        void *keyp;
+        CkHashtableIterator *it = hash.iterator();
+      while (NULL!=(objp=it->next(&keyp))) {
+      CkLocRec *rec=*(CkLocRec **)objp;
+        CkArrayIndex &idx=*(CkArrayIndex *)keyp;
+            if(rec->type() != CkLocRec::local){
+                if(homePe(idx) == CmiMyPe()){
+                    count++;
+                }
+            }
+        }
+        p | count;
+        DEBUG(CmiPrintf("[%d] Packing Locmgr %d has %d home elements\n",CmiMyPe(),thisgroup.idx,count));
+
+        it = hash.iterator();
+      while (NULL!=(objp=it->next(&keyp))) {
+      CkLocRec *rec=*(CkLocRec **)objp;
+        CkArrayIndex &idx=*(CkArrayIndex *)keyp;
+            CkArrayIndexMax max = idx;
+            if(rec->type() != CkLocRec::local){
+                if(homePe(idx) == CmiMyPe()){
+                    int pe;
+                    max.pup(p);
+                    pe = rec->lookupProcessor();
+                    p | pe;
+                    count1++;
+                }
+            }
+        }
+        CmiAssert(count == count1);
+
+#endif
+
        }
 }
 
@@ -1770,8 +1870,12 @@ void CkLocMgr::inform(const CkArrayIndex &idx,int nowOnPe)
        if (nowOnPe==CkMyPe())
                return; //Never insert a "remote" record pointing here
        CkLocRec *rec=elementNrec(idx);
-       if (rec!=NULL && rec->type()==CkLocRec::local)
+       if (rec!=NULL && rec->type()==CkLocRec::local){
+#ifdef _FAULT_MLOG_
+        CmiPrintf("[%d]WARNING!!! Element %d:%s is local but is being told it exists on %d\n",CkMyPe(),idx.nInts,idx2str(idx), nowOnPe);
+#endif
                return; //Never replace a local element's record!
+       }
        insertRemote(idx,nowOnPe);
 }
 
@@ -1782,7 +1886,11 @@ void CkLocMgr::informHome(const CkArrayIndex &idx,int nowOnPe)
        if (home!=CkMyPe() && home!=nowOnPe) {
                //Let this element's home Pe know it lives here now
                DEBC((AA"  Telling %s's home %d that it lives on %d.\n"AB,idx2str(idx),home,nowOnPe));
+#ifdef _FAULT_MLOG_
+        informLocationHome(thisgroup,idx,home,CkMyPe());
+#else
                thisProxy[home].updateLocation(idx,nowOnPe);
+#endif
        }
 }
 
@@ -1927,6 +2035,7 @@ void CkLocMgr::deliver(CkMessage *m,CkDeliver_t type,int opts) {
        }else{
                DEBS((AA"deliver %s rec is null\n"AB,idx2str(idx)));
        }
+#ifndef _FAULT_MLOG_
 #if CMK_LBDB_ON
        if (type==CkDeliver_queue) {
                if (!(opts & CK_MSG_LB_NOTRACE)) {
@@ -1935,6 +2044,7 @@ void CkLocMgr::deliver(CkMessage *m,CkDeliver_t type,int opts) {
                }
        }
 #endif
+#endif
 #if CMK_GRID_QUEUE_AVAILABLE
        int gridSrcPE;
        int gridSrcCluster;
@@ -2254,6 +2364,9 @@ void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
                new (doubleSize, 0) CkArrayElementMigrateMessage;
        msg->idx=idx;
        msg->length=bufSize;
+#ifdef _FAULT_MLOG_ 
+    msg->gid = ckGetGroupID();
+#endif
 #if CMK_LBDB_ON
        msg->ignoreArrival = rec->isAsyncMigrate()?1:0;
 #endif
@@ -2276,8 +2389,13 @@ void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
 
        DEBM((AA"Migrated index size %s to %d \n"AB,idx2str(idx),toPe));        
 
-//Send off message and delete old copy
+#ifdef _FAULT_MLOG_
+    sendMlogLocation(toPe,UsrToEnv(msg));
+#else
+       //Send off message and delete old copy
        thisProxy[toPe].immigrate(msg);
+#endif
+
        duringMigration=CmiTrue;
        delete rec; //Removes elements, hashtable entries, local index
        
@@ -2285,7 +2403,9 @@ void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
        duringMigration=CmiFalse;
        //The element now lives on another processor-- tell ourselves and its home
        inform(idx,toPe);
+#ifndef _FAULT_MLOG_    
        informHome(idx,toPe);
+#endif
        CK_MAGICNUMBER_CHECK
 }
 
@@ -2311,7 +2431,11 @@ void CkLocMgr::immigrate(CkArrayElementMigrateMessage *msg)
        }
 
        //Create a record for this element
+#ifndef _FAULT_MLOG_    
        CkLocRec_local *rec=createLocal(idx,CmiTrue,msg->ignoreArrival,CmiFalse /* home told on departure */ );
+#else
+    CkLocRec_local *rec=createLocal(idx,CmiTrue,CmiTrue,CmiFalse /* home told on departure */ );
+#endif
        
        //Create the new elements as we unpack the message
        pupElementsFor(p,rec,CkElementCreation_migrate);
index 20d341faa120b3e2ce25fb8cf67d775209f0c715..daaf1945c03833b3b446799ff6310ccc7a6cecaf 100644 (file)
@@ -60,6 +60,9 @@ public:
        CkArrayIndexMax idx; // Array index that is migrating
        int ignoreArrival;   // if to inform LB of arrival
        int length;//Size in bytes of the packed data
+#ifdef _FAULT_MLOG_
+        CkGroupID gid; //gid of location manager
+#endif
        CmiBool bounced;
        double* packData;
 };
@@ -218,6 +221,9 @@ public:
   int  isReadyMigrate()        { return readyMigrate; }
   CmiBool checkBufferedMigration();    // check and execute pending migration
   int   MigrateToPe();
+#ifdef _FAULT_MLOG_
+        void Migrated();
+#endif
   inline void setMeasure(CmiBool status) { enable_measure = status; }
 private:
   LBDatabase *the_lbdb;
index 3db266e48d04625ec9e8186b0fc493a02c7de70f..f645e4a23c034029f49878a03fe66a6d920b2f84 100644 (file)
@@ -180,6 +180,18 @@ CkReductionMgr::CkReductionMgr()//Constructor
   gcount=lcount=0;
   nContrib=nRemote=0;
   maxStartRequest=0;
+#ifdef _FAULT_MLOG_
+    if(CkMyPe() != 0){
+        perProcessorCounts = NULL;
+    }else{
+        perProcessorCounts = new int[CmiNumPes()];
+        for(int i=0;i<CmiNumPes();i++){
+            perProcessorCounts[i] = -1;
+        }
+    }
+    totalCount = 0;
+    processorCount = 0;
+#endif
   DEBR((AA"In reductionMgr constructor at %d \n"AB,this));
 }
 
@@ -362,7 +374,21 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
   m->redNo=ci->redNo++;
   m->sourceFlag=-1;//A single contribution
   m->gcount=0;
+
+#ifdef _FAULT_MLOG_
+    if(lcount == 0){
+        m->sourceProcessorCount = 1;
+    }else{
+        m->sourceProcessorCount = lcount;
+    }
+    m->fromPE = CmiMyPe();
+    Chare *oldObj =CpvAccess(_currentObj);
+    char currentObjName[100];
+    DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
+    thisProxy[0].contributeViaMessage(m);
+#else
   addContribution(m);
+#endif
 }
 
 //////////// Reduction Manager Remote Entry Points /////////////
@@ -376,6 +402,7 @@ void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
  }
  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
  int srcPE = (UsrToEnv(m))->getSrcPe();
+#ifndef _FAULT_MLOG_ 
   if (isPresent(m->num) && !inProgress)
   {
     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
@@ -393,6 +420,18 @@ void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
   else //is Past
     DEBR((AA"Ignoring parent's late request to start #%d\n"AB,m->num));
   delete m;
+#else
+    if(redNo == 0){
+        if(lcount == 0){
+            DEBR(("[%d] Group %d Sending dummy contribute to get totalCount\n",CmiMyPe(),thisgroup.idx));
+            CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
+            dummy->fromPE = CmiMyPe();
+            dummy->sourceProcessorCount = 0;
+            dummy->redNo = 0;
+            thisProxy[0].contributeViaMessage(dummy);
+        }
+    }
+#endif
 }
 
 //Sent to root of reduction tree with reduction contribution
@@ -448,8 +487,30 @@ void CkReductionMgr::startReduction(int number,int srcPE)
   if(!CmiNodeAlive(CkMyPe())){
        return;
   }
+
+#ifdef _FAULT_MLOG_ 
+  if(CmiMyPe() == 0 && redNo == 0){
+            for(int j=0;j<CkNumPes();j++){
+                if(j != CkMyPe() && j != srcPE){
+                        thisProxy[j].ReductionStarting(new CkReductionNumberMsg(number));
+                }
+            }
+            if(lcount == 0){
+                CkReductionMsg *dummy = CkReductionMsg::buildNew(0,NULL);
+                dummy->fromPE = CmiMyPe();
+                dummy->sourceProcessorCount = 0;
+                dummy->redNo = 0;
+                thisProxy[0].contributeViaMessage(dummy);
+            }
+    }   else{
+        thisProxy[0].ReductionStarting(new CkReductionNumberMsg(number));
+    }
+
+
+#else
+       nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
+#endif
        
-  nodeProxy[CkMyNode()].ckLocalBranch()->startNodeGroupReduction(number,thisgroup);
   int temp;
        /*
   //making it a broadcast done only by PE 0
@@ -478,6 +539,9 @@ void CkReductionMgr::startReduction(int number,int srcPE)
          // kick-start your parent too ...
                        if(treeParent() != srcPE){
                                if(CmiNodeAlive(treeParent())||allowMessagesOnly !=-1){
+#ifdef _FAULT_MLOG_
+    CpvAccess(_currentObj) = oldObj;
+#endif
                                thisProxy[treeParent()].ReductionStarting(new CkReductionNumberMsg(i));
                                }       
                        }       
@@ -499,11 +563,16 @@ void CkReductionMgr::startReduction(int number,int srcPE)
 void CkReductionMgr::addContribution(CkReductionMsg *m)
 {
   if (isPast(m->redNo))
-  {//We've moved on-- forward late contribution straight to root
+  {
+#ifdef _FAULT_MLOG_
+        CmiAbort("this version should not have late migrations");
+#else
+       //We've moved on-- forward late contribution straight to root
     DEBR((AA"Migrant %p gives late contribution for #%d!\n"AB,m->ci,m->redNo));
-   // if (!hasParent()) //Root moved on too soon-- should never happen
-   //   CkAbort("Late reduction contribution received at root!\n");
+       // if (!hasParent()) //Root moved on too soon-- should never happen
+       //   CkAbort("Late reduction contribution received at root!\n");
     thisProxy[0].LateMigrantMsg(m);
+#endif
   }
   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
     DEBR((AA"Contributor %p gives early contribution-- for #%d\n"AB,m->ci,m->redNo));
@@ -530,13 +599,44 @@ void CkReductionMgr::finishReduction(void)
        return;
   }
   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
+#ifndef _FAULT_MLOG_   
+
   if (nContrib<(lcount+adj(redNo).lcount)){
        DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
         return;//Need more local messages
   }
+#else
+
+    if(CkMyPe() != 0){
+        if(redNo != 0){
+            return;
+        }else{
+            CmiAssert(lcount == 0);
+            CkReductionMsg *dummy = reduceMessages();
+            dummy->fromPE = CmiMyPe();
+            dummy->sourceProcessorCount = 0;
+            thisProxy[0].contributeViaMessage(dummy);
+            return;
+        }
+    }
+
+    if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
+        DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
+         return;//Need more local messages
+  }else{
+        DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
+    }
+
+
+#endif
+
+
   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
   CkReductionMsg *result=reduceMessages();
   result->redNo=redNo;
+#ifndef _FAULT_MLOG_
+
   result->gcount+=gcount+adj(redNo).gcount;
   result->secondaryCallback = result->callback;
   result->callback = CkCallback(CkIndex_CkReductionMgr::ArrayReductionHandler(NULL),0,thisProxy);
@@ -551,13 +651,36 @@ void CkReductionMgr::finishReduction(void)
   // Find our node reduction manager, and pass reduction to him:
   CkArrayReductionMgr *nodeMgr=nodeProxy[CkMyNode()].ckLocalBranch();
   nodeMgr->contributeArrayReduction(result);
+#else
+#if DEBUGRED
+    CkPrintf("~~~~~~~~~~~~~~~~~ About to call callback from end of SIMPLIFIED GROUP REDUCTION %d at %.6f\n",redNo,CmiWallTimer());
+#endif
+    if (!result->callback.isInvalid())
+        result->callback.send(result);
+    else if (!storedCallback.isInvalid())
+        storedCallback.send(result);
+    else{
+#if DEBUGRED
+        CkPrintf("No reduction client for group %d \n",thisgroup.idx);
+#endif
+        CkAbort("No reduction client!\n"
+            "You must register a client with either SetReductionClient or during contribute.\n");
+    }
+#if DEBUGRED
+       CkPrintf("[%d,%d]------------END OF SIMPLIFIED GROUP REDUCTION %d for group %d at %.6f\n",CkMyNode(),CkMyPe(),redNo,thisgroup.idx,CkWallTimer());
+#endif
+
+#endif
 
   //House Keeping Operations will have to check later what needs to be changed
   redNo++;
   //Shift the count adjustment vector down one slot (to match new redNo)
   int i;
-
-  if(CkMyPe()!=0){
+#ifndef _FAULT_MLOG_
+       if(CkMyPe()!=0){
+#else
+    {
+#endif
        int i;
        completedRedNo++;
        for (i=1;i<adjVec.length();i++)
@@ -576,10 +699,14 @@ void CkReductionMgr::finishReduction(void)
     if (m!=NULL) //One of these addContributions may have finished us.
       addContribution(m);//<- if *still* early, puts it back in the queue
   }
+#ifndef _FAULT_MLOG_   
+
   if(maxStartRequest >= redNo){
          startReduction(redNo,CkMyPe());
          finishReduction();
   }
+#endif
 }
 
 //////////// Reduction Manager Utilities /////////////
@@ -729,7 +856,11 @@ void CkReductionMgr::pup(PUP::er &p)
   // we can not pup because inserting array elems will add the counters again
 //  p|lcount;
 //  p|gcount;
-
+#ifdef _FAULT_MLOG_ 
+//  p|lcount;
+//  //  p|gcount;
+//  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
+#endif
   if(p.isUnpacking()){
     thisProxy = thisgroup;
     maxStartRequest=0;
@@ -1238,7 +1369,10 @@ CkReduction::reducerFn CkReduction::reducerTable[CkReduction::MAXREDUCERS]={
 /**nodegroup reduction manager . Most of it is similar to the guy above***/
 NodeGroup::NodeGroup(void) {
   __nodelock=CmiCreateLock();
-
+#ifdef _FAULT_MLOG_
+    mlogData->objID.type = TypeNodeGroup;
+    mlogData->objID.data.group.onPE = CkMyNode();
+#endif
 
 }
 NodeGroup::~NodeGroup() {
@@ -1345,6 +1479,10 @@ void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
 
 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 {
+#ifdef _FAULT_MLOG_
+    Chare *oldObj =CpvAccess(_currentObj);
+    CpvAccess(_currentObj) = this;
+#endif
 
   m->ci=ci;
   m->redNo=ci->redNo++;
@@ -1355,11 +1493,18 @@ void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 #endif
   addContribution(m);
 
+#ifdef _FAULT_MLOG_
+    CpvAccess(_currentObj) = oldObj;
+#endif
 }
 
 
 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
 {
+#ifdef _FAULT_MLOG_
+    Chare *oldObj =CpvAccess(_currentObj);
+    CpvAccess(_currentObj) = this;
+#endif
   m->ci=ci;
   m->redNo=ci->redNo++;
   m->gcount=count;
@@ -1371,6 +1516,9 @@ void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMs
   CkPrintf("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer());
 #endif
 
+#ifdef _FAULT_MLOG_
+    CpvAccess(_currentObj) = oldObj;
+#endif
 }
 
 
index 8ae33170291a60b8a70ab7ae55fa0c6bdc80e98c..ab8f170d5d0ec03c946e8adadbe50de9bb3de6d4 100644 (file)
@@ -209,6 +209,18 @@ private:
 
 //Checkpointing utilities
 public:
+#ifdef _FAULT_MLOG_
+    int *perProcessorCounts;
+    int processorCount;
+    int totalCount;
+    int numberReductionMessages(){
+            if(totalCount != 0){
+                return totalCount;
+            }else{
+                return MAX_INT;
+            }
+    }
+#endif
        virtual void pup(PUP::er &p);
        static int isIrreducible(){ return 0;}
 };
@@ -275,6 +287,10 @@ private:
                >0 indicates this is a reduced contribution.
        */
        int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
+#ifdef _FAULT_MLOG_ 
+    int sourceProcessorCount;
+    int fromPE;
+#endif
 private:
        CkReduction::reducerType reducer;
        contributorInfo *ci;//Source contributor, or NULL if none
index d773c05c3e913598c5872d43ba93199d1280ccd1..bb85e21ec5f89f07fe7807a3e4170d644e6a0236 100644 (file)
@@ -275,6 +275,9 @@ typedef unsigned short UShort;
 typedef unsigned char  UChar;
 
 #include "charm.h" // for CkGroupID
+#ifdef _FAULT_MLOG_
+#include "ckobjid.h" //for the ckobjId
+#endif
 
 /**
 The "envelope" sits at the start of every Charm++
@@ -364,6 +367,13 @@ public:
        UChar isPacked:1; ///< If true, message must be unpacked before use
        UChar isUsed:1; ///< Marker bit to prevent message re-send.
     };
+#ifdef _FAULT_MLOG_
+        CkObjID sender;
+        CkObjID recver;
+        MCount SN;
+        MCount TN;
+        MlogEntry *localMlogEntry;
+#endif
 private:
     u_type type; ///< Depends on message type (attribs.mtype)
     UShort ref; ///< Used by futures
@@ -378,6 +388,9 @@ private:
     UInt   totalsize; ///< Byte count from envelope start to end of priobits
     
   public:
+#ifdef _FAULT_MLOG_
+        UInt piggyBcastIdx;
+#endif
     void pup(PUP::er &p);
     UInt   getEvent(void) const { return event; }
     void   setEvent(const UInt e) { event = e; }
@@ -421,6 +434,13 @@ private:
       _SET_USED(env, 0);
       //for record-replay
       env->setEvent(0);
+#ifdef _FAULT_MLOG_
+            env->sender.type = TypeInvalid;
+            env->recver.type = TypeInvalid;
+            env->SN = 0;
+            env->TN = 0;
+            env->localMlogEntry = NULL;
+#endif
       return env;
     }
     UShort getEpIdx(void) const { return epIdx; }
@@ -565,6 +585,13 @@ private:
     }
 public:
     MsgPool():SafePool<void*>(_alloc, CkFreeMsg) {}
+#ifdef _FAULT_MLOG_
+        void *get(void){
+            return allocfn();
+        }
+        void put(void *m){
+        }
+#endif
 };
 
 CkpvExtern(MsgPool*, _msgPool);
index 5a7ceb97beea7f543f85dd3c450761bb04d7647d..eb5753d345bebaebd5519ca1eb7da2578e82530f 100644 (file)
@@ -188,6 +188,9 @@ static inline void _parseCommandLineOpts(char **argv)
 # if CMK_MEM_CHECKPOINT
       faultFunc = CkMemRestart;
 # endif
+#ifdef _FAULT_MLOG_
+            faultFunc = CkMlogRestart;
+#endif
       CmiPrintf("[%d] Restarting after crash \n",CmiMyPe());
   }
   // reading the killFile
@@ -218,7 +221,32 @@ static inline void _parseCommandLineOpts(char **argv)
        if(CmiGetArgStringDesc(argv,"+raiseevac", &_raiseEvacFile,"Generates processor evacuation on random processors")){
                _raiseEvac = 1;
        }
-       
+#ifdef _FAULT_MLOG_
+    if(!CmiGetArgIntDesc(argv,"+chkptPeriod",&chkptPeriod,"Set the checkpoint period for the message logging fault tolerance algorithm in seconds")){
+        chkptPeriod = 100;
+    }
+    if(CmiGetArgFlagDesc(argv,"+Parallelrestart", "Parallel Restart with message logging protocol")){
+        parallelRestart = true;
+    }
+    if(CmiGetArgStringDesc(argv,"+killFile", &killFile,"Generates SIGKILL on specified processors")){
+        if(faultFunc == NULL){
+            killFlag = 1;
+            if(CmiMyPe() == 0){
+                printf("[%d] killFlag set to 1 for file %s\n",CkMyPe(),killFile);
+            }
+        }
+    }
+    if(!CmiGetArgIntDesc(argv,"+mlog_local_buffer",&_maxBufferedMessages,"# of local messages buffered in the message logging protoocl")){
+        _maxBufferedMessages = 2;
+    }
+    if(!CmiGetArgIntDesc(argv,"+mlog_remote_buffer",&_maxBufferedTicketRequests,"# of remote ticket requests buffered in the message logging protoocl")){
+        _maxBufferedTicketRequests = 2;
+    }
+    if(!CmiGetArgIntDesc(argv,"+mlog_buffer_time",&BUFFER_TIME,"# Time spent waiting for messages to be buffered in the message logging protoocl")){
+        BUFFER_TIME = 2;
+    }
+
+#endif 
        /* Anytime migration flag */
        isAnytimeMigration = CmiTrue;
        if (CmiGetArgFlagDesc(argv,"+noAnytimeMigration","The program does not require support for anytime migration")) {
@@ -299,6 +327,10 @@ static inline void _sendStats(void)
   CmiSyncSendAndFree(0, env->getTotalsize(), (char *)env);
 }
 
+#ifdef _FAULT_MLOG_
+extern void _messageLoggingExit();
+#endif
+
 static void _exitHandler(envelope *env)
 {
   DEBUGF(("exitHandler called on %d msgtype: %d\n", CkMyPe(), env->getMsgtype()));
@@ -330,6 +362,9 @@ static void _exitHandler(envelope *env)
       }        
       break;
     case ReqStatMsg:
+#ifdef _FAULT_MLOG_
+        _messageLoggingExit();
+#endif
       DEBUGF(("ReqStatMsg on %d\n", CkMyPe()));
       CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
       CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
@@ -867,6 +902,11 @@ void _initCharm(int unused_argc, char **argv)
        if (!inCommThread) {
          _TRACE_BEGIN_COMPUTATION();
        }
+
+#ifdef _FAULT_MLOG_
+    _messageLoggingInit();
+#endif
+
 #ifndef __BLUEGENE__
        /*
                FAULT_EVAC
@@ -883,6 +923,10 @@ void _initCharm(int unused_argc, char **argv)
 
        evacuate = 0;
        CcdCallOnCondition(CcdSIGUSR1,(CcdVoidFn)CkDecideEvacPe,0);
+#ifdef _FAULT_MLOG_ 
+    CcdCallOnCondition(CcdSIGUSR2,(CcdVoidFn)CkMlogRestart,0);
+#endif
+
        if(_raiseEvac){
                processRaiseEvacFile(_raiseEvacFile);
                /*
@@ -927,6 +971,9 @@ void _initCharm(int unused_argc, char **argv)
                        msg->argc = CmiGetArgc(argv);
                        msg->argv = argv;
                        _entryTable[_mainTable[i]->entryIdx]->call(msg, obj);
+#ifdef _FAULT_MLOG_
+            CpvAccess(_currentObj) = (Chare *)obj;
+#endif
                }
                 _mainDone = 1;
 
index 8a4aa259f65815dd9e8fac7b4910220f0d610607..e6616b1fe3c6a5c3906ba1ea7806e090faa921e4 100644 (file)
 #define USE_LDB_SPANNING_TREE 1
 #endif
 
+#ifdef _FAULT_MLOG_
+/* can not handle reduction in inmem FT */
+#undef USE_REDUCTION
+#undef USE_LDB_SPANNING_TREE
+#define USE_REDUCTION         0
+#define USE_LDB_SPANNING_TREE 0
+#else
+#define USE_REDUCTION         1
+#define USE_LDB_SPANNING_TREE 1
+#endif
+
+#ifdef _FAULT_MLOG_
+extern int _restartFlag;
+extern void getGlobalStep(CkGroupID );
+extern void initMlogLBStep(CkGroupID );
+extern int globalResumeCount;
+extern void sendDummyMigrationCounts(int *);
+#endif
+
 #if CMK_GRID_QUEUE_AVAILABLE
 CpvExtern(void *, CkGridObject);
 #endif
@@ -157,6 +176,9 @@ void CentralLB::AtSync()
 #if CMK_LBDB_ON
   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
 
+#ifdef _FAULT_MLOG_
+#endif
+
   // if num of processor is only 1, nothing should happen
   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
     MigrationDone(0);
@@ -291,8 +313,18 @@ void CentralLB::SendStats()
 #endif
 }
 
+#ifdef _FAULT_MLOG_
+extern int donotCountMigration;
+#endif
+
 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
 {
+#ifdef _FAULT_MLOG_
+    if(donotCountMigration){
+        return ;
+    }
+#endif
+
 #if CMK_LBDB_ON
   if (waitBarrier) {
            migrates_completed++;
@@ -401,6 +433,14 @@ void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
     CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage(num);
     const int pe = m->from_pe;
     DEBUGF(("Stats msg received, %d %d %d %p\n", pe,stats_msg_count,m->n_objs,m));
+#ifdef _FAULT_MLOG_     
+/*      
+ *              if(m->step < step()){
+ *                          //TODO: if a processor is redoing an old load balance step..
+ *                                      //tell it that the step is done and that it should not perform any migrations
+ *                                                  thisProxy[pe].ReceiveDummyMigration();
+ *                                                          }*/
+#endif
        
     if(!CmiNodeAlive(pe)){
        DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
@@ -548,6 +588,10 @@ void CentralLB::LoadBalance()
   }
 
   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
+#ifdef _FAULT_MLOG_ 
+    lbDecisionCount++;
+    migrateMsg->lbDecisionCount = lbDecisionCount;
+#endif
   thisProxy.ReceiveMigration(migrateMsg);
 
   // Zero out data structures for next cycle
@@ -661,6 +705,10 @@ void CentralLB::removeNonMigratable(LDStats* stats, int count)
 }
 
 
+#ifdef _FAULT_MLOG_
+extern int restarted;
+#endif
+
 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
 {
 #if CMK_LBDB_ON
@@ -680,8 +728,26 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
     if (move.from_pe == me && move.to_pe != me) {
       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
       // migrate object, in case it is already gone, inform toPe
+#ifndef _FAULT_MLOG_
       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
+#else
+            if(_restartFlag == 0){
+                DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
+                theLbdb->Migrate(move.obj,move.to_pe);
+                sending++;
+            }else{
+                if(_myLBDB->validObjHandle(move.obj)){
+                    DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
+                    theLbdb->Migrate(move.obj,move.to_pe);
+                    sending++;
+                }else{
+                    DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
+                    dummyCounts[move.to_pe]++;
+                    dummy++;
+                }
+            }
+#endif
     } else if (move.from_pe != me && move.to_pe == me) {
        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
       if (!move.async_arrival) migrates_expected++;
@@ -705,6 +771,10 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
   delete m;
 
 //     CkEvacuatedElement();
+#ifdef _FAULT_MLOG_
+//  migrates_expected = 0;
+//  //  ResumeClients(1);
+#endif
 #endif
 }
 
@@ -721,8 +791,13 @@ void CentralLB::MigrationDone(int balancing)
        DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
   // if sync resume, invoke a barrier
 
-  LoadbalanceDone(balancing);        // callback
+#ifdef _FAULT_MLOG_
+    savedBalancing = balancing;
+    startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
+#endif
 
+  LoadbalanceDone(balancing);        // callback
+#ifndef _FAULT_MLOG_
   // if sync resume invoke a barrier
   if (balancing && _lb_args.syncResume()) {
     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
@@ -738,8 +813,38 @@ void CentralLB::MigrationDone(int balancing)
   CmiGridQueueDeregisterAll ();
   CpvAccess(CkGridObject) = NULL;
 #endif
+#endif 
+#endif
+}
+
+#ifdef _FAULT_MLOG_
+void CentralLB::endMigrationDone(int balancing){
+    DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
+
+
+  if (balancing && _lb_args.syncResume()) {
+    CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
+                  thisProxy);
+    contribute(0, NULL, CkReduction::sum_int, cb);
+  }
+  else{
+    if(CmiNodeAlive(CkMyPe())){
+    DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
+    thisProxy [CkMyPe()].ResumeClients(balancing);
+    }
+  }
+
+}
 #endif
+
+#ifdef _FAULT_MLOG_
+void resumeCentralLbAfterChkpt(void *_lb){
+    CentralLB *lb= (CentralLB *)_lb;
+    CpvAccess(_currentObj)=lb;
+    lb->endMigrationDone(lb->savedBalancing);
 }
+#endif
+
 
 void CentralLB::ResumeClients(CkReductionMsg *msg)
 {
@@ -750,6 +855,10 @@ void CentralLB::ResumeClients(CkReductionMsg *msg)
 void CentralLB::ResumeClients(int balancing)
 {
 #if CMK_LBDB_ON
+#ifdef _FAULT_MLOG_
+    resumeCount++;
+    globalResumeCount = resumeCount;
+#endif
   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
     double end_lb_time = CkWallTimer();
@@ -757,7 +866,9 @@ void CentralLB::ResumeClients(int balancing)
              lbName(), step()-1,end_lb_time, end_lb_time - start_lb_time, CmiMemoryUsage()/1024.0/1024.0);
   }
 
+#ifndef _FAULT_MLOG_
   if (balancing) ComlibNotifyMigrationDone();  
+#endif
 
   theLbdb->ResumeClients();
   if (balancing)  {
index 92199e8fd4ceaa92e18d0e98ac6ad1a186e8cbff..acf166a25001c7132cc13bf91bc3bd136a89b176 100644 (file)
@@ -75,7 +75,11 @@ private:
 public:
   CkMarshalledCLBStatsMessage bufMsg;
   SpanningTree st;
-  CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); } 
+  CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); 
+#ifdef _FAULT_MLOG_
+        lbDecisionCount= resumeCount=0;
+#endif
+} 
   CentralLB(CkMigrateMessage *m):BaseLB(m) {}
   virtual ~CentralLB();
 
@@ -246,8 +250,17 @@ private:
 
 public:
   int useMem();
+#ifdef _FAULT_MLOG_
+    int savedBalancing;
+    void endMigrationDone(int balancing);
+    int lbDecisionCount ,resumeCount;
+#endif
 };
 
+#ifdef _FAULT_MLOG_ 
+    void resumeCentralLbAfterChkpt(void *lb);
+#endif
+
 // CLBStatsMsg is not directly sent in the entry function
 // CkMarshalledCLBStatsMessage is used instead to use the pup defined here.
 //class CLBStatsMsg: public CMessage_CLBStatsMsg {
index c9efec3e93889b5c1f1f220c527003b6cf87acab..2398923c691e8464e69cdd203709d1b2519358c8 100644 (file)
@@ -37,7 +37,9 @@ void CmmFree(t)
 CmmTable t;
 {
   if (t==NULL) return;
+#ifndef _FAULT_MLOG_    
   if (t->first!=NULL) CmiAbort("Cannot free a non-empty message table!");
+#endif
   CmiFree(t);
 }
 
index 495cd3d35c603847bc6cd38462267942f5a2cb44..e1eb4d145a18419c8c15c0b4bc45890ccee88522 100644 (file)
@@ -1123,6 +1123,16 @@ ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
   
   seqEntries=parent->numElements;
   oorder.init (seqEntries);
+#ifdef _FAULT_MLOG_
+    if(thisIndex == 0){
+/*      CkAssert(CkMyPe() == 0);
+ *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
+ *                      CkAssert(numElements);
+ *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
+ *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
+ *                                              _myManager->setNumInitial(numElements);*/
+    }
+#endif
 }
 
 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
@@ -1881,6 +1891,10 @@ ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
 
   resumeOnRecv=true;
   ampi *dis = getAmpiInstance(disComm);
+#ifdef _FAULT_MLOG_
+//  dis->yield();
+//  processRemoteMlogMessages();
+#endif
   while(1) {
       //This is done to take into account the case in which an ampi 
       // thread has migrated while waiting for a message
@@ -1890,6 +1904,12 @@ ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
     dis->thread->suspend();
     dis = getAmpiInstance(disComm);
   }
+
+#ifdef _FAULT_MLOG_
+        CpvAccess(_currentObj) = dis;
+        MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
+#endif
+
   dis->resumeOnRecv=false;
 
   if(sts)
@@ -2187,6 +2207,12 @@ CDECL void AMPI_Migrate(void)
 #endif
 #endif
   TCHARM_Migrate();
+
+#ifdef _FAULT_MLOG_
+    ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
+    CpvAccess(_currentObj) = currentAmpi;
+#endif
+
 #if CMK_BLUEGENE_CHARM
 //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
@@ -2420,6 +2446,10 @@ int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
 #if AMPI_COUNTER
   getAmpiParent()->counters.send++;
+#endif
+#ifdef _FAULT_MLOG_
+//  ptr->yield();
+//  //  processRemoteMlogMessages();
 #endif
   return 0;
 }
@@ -2556,6 +2586,10 @@ int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
     (*(pptr->toPUPer))|(pptr->pupBytes);
     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
   }
+#endif
+#ifdef _FAULT_MLOG_
+//  ptr->yield();
+//  //  processRemoteMlogMessages();
 #endif
 
   return 0;
@@ -3179,10 +3213,15 @@ int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
 #endif
     
 #if 1
+#ifndef _FAULT_MLOG_
+            //for fault evacuation
       if(oldPe != CkMyPe()){
+#endif
                        reqs = getReqs();
                        reqvec  = vecIndex(count,request);
-      }
+#ifndef _FAULT_MLOG_
+            }
+#endif
 #endif
     }
   }
index 5a648dab5756ea35bae8d20217d99bd9d611c27f..2f6cbe4bd8a86c606a7b52b173238ecfa239807a 100644 (file)
@@ -213,6 +213,12 @@ void TCharm::pup(PUP::er &p) {
   //}
 
   checkPupMismatch(p,5134,"before TCHARM");
+#ifdef _FAULT_MLOG_
+    if(!isStopped){
+//      resumeAfterMigration = true;
+    }
+    isStopped = true;
+#endif
   p(isStopped); p(resumeAfterMigration); p(exitWhenDone); p(isSelfDone); p(skipResume);
   p(threadInfo.thisElement);
   p(threadInfo.numElements);
@@ -361,6 +367,9 @@ void TCharm::migrateDelayed(int destPE) {
 }
 void TCharm::ckJustMigrated(void) {
        ArrayElement::ckJustMigrated();
+#ifdef _FAULT_MLOG_
+//  resumeAfterMigration = true;
+#endif
        if (resumeAfterMigration) {
                resumeAfterMigration=false;
                resume(); //Start the thread running
@@ -443,6 +452,10 @@ void TCharm::stop(void)
     we're resuming from migration!  (OSL 2003/9/23)
    */
   TCharm *dis=TCharm::get();
+#ifdef _FAULT_MLOG_ 
+/*  CpvAccess(_currentObj) = dis;
+ *      printf("[%d] _currentObject set to TCharm index %d %p\n",CkMyPe(),dis->thisIndex,dis);*/
+#endif
   dis->isStopped=false;
   dis->startTiming();
   //CkPrintf("[%d] Thread resumed  for tid %p\n",dis->thisIndex,dis->tid);
@@ -456,7 +469,12 @@ void TCharm::start(void)
   DBG("thread resuming soon");
   //CkPrintf("TCharm[%d]::start()\n", thisIndex);
   //CmiPrintStackTrace(0);
+#ifdef _FAULT_MLOG_
+//CthAwakenPrio(tid, CQS_QUEUEING_BFIFO, 1, &prio);
   CthAwaken(tid);
+#else
+  CthAwaken(tid);
+#endif
 }
 
 //Block our thread, schedule, and come back:
index 742d07bd14beb9498768d4fbe00832b873a1a2e0..057d416822694313107b17b7bf3eda32c4234721 100644 (file)
@@ -914,6 +914,25 @@ ckevacuation.o: ckevacuation.C charm++.h charm.h converse.h conv-config.h \
   stats.h ckfutures.h ckIgetControl.h
        $(CHARMC) -c -I. ckevacuation.C
 
+ckmessagelogging.o: ckmessagelogging.C ck.h charm++.h charm.h converse.h \
+  conv-config.h conv-common.h conv-mach.h conv-autoconfig.h \
+  conv-mach-opt.h pup_c.h conv-cpm.h conv-cpath.h conv-qd.h conv-random.h \
+  conv-lists.h conv-trace.h persistent.h debug-conv.h pup.h middle.h \
+  middle-conv.h cklists.h ckbitvector.h ckstream.h init.h ckhashtable.h \
+  debug-charm.h CkMarshall.decl.h cksection.h ckcallback.h conv-ccs.h \
+  sockRoutines.h ccs-server.h ckobjQ.h ckreduction.h CkReduction.decl.h \
+  cknodegroupreduction.h CkArrayReductionMgr.decl.h ckmemcheckpoint.h \
+  CkMemCheckpoint.decl.h readonly.h ckarray.h cklocation.h LBDatabase.h \
+  lbdb.h LBDBManager.h LBObj.h LBOM.h LBComm.h LBMachineUtil.h \
+  LBDatabase.decl.h NullLB.decl.h BaseLB.decl.h CkLocation.decl.h \
+  CkArray.decl.h ComlibArrayListener.h ComlibStrategy.h \
+  convcomlibstrategy.h ComlibLearner.h envelope.h CkFutures.decl.h \
+  charisma.h charisma.decl.h tempo.h tempo.decl.h waitqd.h waitqd.decl.h \
+  sdag.h ckcheckpoint.h CkCheckpoint.decl.h ckevacuation.h \
+  ckarrayreductionmgr.h trace.h trace-bluegene.h qd.h register.h stats.h \
+  ckfutures.h ckmessagelogging.h ckobjid.h
+       $(CHARMC) -c -I. ckmessagelogging.C
+
 LBDBManager.o: LBDBManager.C ./charm++.h ./charm.h ./converse.h \
   ./conv-config.h ./conv-autoconfig.h ./conv-common.h ./conv-mach.h \
   ./conv-mach-opt.h ./pup_c.h ./conv-cpm.h ./conv-cpath.h ./conv-qd.h \
index 1f4269e9818d063eb6d3e5009f62fef8ad612bfe..1f9ab6b09eb77af31df4f4b303442850feb43e6e 100644 (file)
@@ -193,6 +193,7 @@ CKHEADERS=ck.h ckstream.h envelope.h init.h qd.h charm.h charm++.h \
          ckcallback.h CkCallback.decl.h ckcallback-ccs.h       \
          ckarrayreductionmgr.h cknodegroupreduction.h cksection.h \
          ckarray.h cklocation.h ckreduction.h ckcheckpoint.h ckmemcheckpoint.h ckevacuation.h\
+               ckmessagelogging.h ckobjid.h\
          ckobjQ.h readonly.h charisma.h ComlibArrayListener.h ComlibStrategy.h \
          ComlibLearner.h $(UTILHEADERS) \
          tempo.h waitqd.h LBDatabase.h lbdb.h lbdb++.h LBProfit.h \
@@ -626,7 +627,7 @@ LIBCK_CORE=trace-common.o tracef.o init.o register.o qd.o ck.o main.o  \
           msgalloc.o ckfutures.o ckIgetControl.o debug-message.o debug-charm.o ckcallback.o \
           cklocation.o ckarray.o ckreduction.o ckarrayreductionmgr.o\
            tempo.o waitqd.o LBDatabase.o lbdb.o lbdbf.o charisma.o ckobjQ.o  \
-          LBAgent.o LBProfit.o ckcheckpoint.o ckmemcheckpoint.o ckevacuation.o\
+          LBAgent.o LBProfit.o ckcheckpoint.o ckmemcheckpoint.o ckevacuation.o ckmessagelogging.o\
            LBDBManager.o LBComm.o LBObj.o LBMachineUtil.o CentralPredictor.o \
           BaseLB.o CentralLB.o HybridBaseLB.o NborBaseLB.o WSLB.o \
            ObjGraph.o graph.o LButil.o Refiner.o RefinerApprox.o  \