Changes due to the messagelogging machine.
authorEsteban Meneses <emenese2@illinois.edu>
Mon, 6 Apr 2009 18:06:48 +0000 (18:06 +0000)
committerEsteban Meneses <emenese2@illinois.edu>
Mon, 6 Apr 2009 18:06:48 +0000 (18:06 +0000)
15 files changed:
src/ck-core/ck.C
src/ck-core/ckarray.C
src/ck-core/ckarray.ci
src/ck-core/ckarray.h
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-core/ckmessagelogging.C
src/ck-core/ckmessagelogging.h
src/ck-core/ckreduction.C
src/ck-core/ckreduction.ci
src/ck-core/ckreduction.h
src/ck-core/init.C
src/ck-ldb/BaseLB.h
src/ck-ldb/CentralLB.C
src/ck-ldb/LBDBManager.h

index 6f6b1b742a1b38aef3741c8d4d403360eb8abc2e..ded36959baf07e5e3759d87262eb010f67a697e1 100644 (file)
@@ -1178,7 +1178,9 @@ static void _skipCldHandler(void *converseMsg)
 }
 
 
-static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
+//static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
+// Made non-static to be used by ckmessagelogging
+void _skipCldEnqueue(int pe,envelope *env, int infoFn)
 {
   if(pe == CkMyPe() ){
     if(!CmiNodeAlive(CkMyPe())){
@@ -1260,7 +1262,9 @@ static void _noCldEnqueue(int pe, envelope *env)
   else CmiSyncSendAndFree(pe, len, (char *)env);
 }
 
-static void _noCldNodeEnqueue(int node, envelope *env)
+//static void _noCldNodeEnqueue(int node, envelope *env)
+//Made non-static to be used by ckmessagelogging
+void _noCldNodeEnqueue(int node, envelope *env)
 {
 /*
   if (node == CkMyNode()) {
index 7ef13ddc95abad64655db46b44b66f017e89c85f..4a6edd1701ab2e28567d6230063ff182e84379d8 100644 (file)
@@ -91,6 +91,7 @@ static const char *idx2str(const ArrayElement *el)
 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
 #   define AA "ArrayBOC on %d: "
 #   define AB ,CkMyPe()
+#   define DEBUG(x) x
 #else
 #   define DEB(X) /*CkPrintf x*/
 #   define DEBI(X) /*CkPrintf x*/
@@ -101,6 +102,7 @@ static const char *idx2str(const ArrayElement *el)
 #   define DEBK(x) /*CkPrintf x*/
 #   define DEBB(x) /*CkPrintf x*/
 #   define str(x) /**/
+#   define DEBUG(x)
 #endif
 
 inline CkArrayIndexMax &CkArrayMessage::array_index(void)
@@ -329,11 +331,13 @@ void ArrayElement::CkAbort(const char *str) const
 
 #ifdef _FAULT_MLOG_
 void ArrayElement::recvBroadcast(CkMessage *m){
-        CkArrayMessage *bcast = (CkArrayMessage *)m;
-        envelope *env = UsrToEnv(m);
-  int epIdx= env->piggyBcastIdx;
-        ckInvokeEntry(epIdx,bcast,CmiTrue);
+       CkArrayMessage *bcast = (CkArrayMessage *)m;
+    envelope *env = UsrToEnv(m);
+       int epIdx= env->piggyBcastIdx;
+    ckInvokeEntry(epIdx,bcast,CmiTrue);
 };
+#else
+void ArrayElement::recvBroadcast(CkMessage *m){}
 #endif
 
 /*********************** Spring Cleaning *****************
@@ -984,6 +988,39 @@ void CkArray::sendExpeditedBroadcast(CkMessage *msg)
        thisProxy.recvExpeditedBroadcast(msg);
 }
 
+#ifdef _FAULT_MLOG_
+int _tempBroadcastCount=0;
+
+void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
+    if(homePe(*index)==CmiMyPe()){
+        CkArrayMessage *bcast = (CkArrayMessage *)data;
+    int epIdx=bcast->array_ep_bcast();
+        DEBUG(CmiPrintf("[%d] gid %d broadcastHomeElements to index %s entry name %s\n",CmiMyPe(),thisgroup.idx,idx2str(*index),_entryTable[bcast->array_ep_bcast()]->name));
+        CkArrayMessage *copy = (CkArrayMessage *)   CkCopyMsg((void **)&bcast);
+        envelope *env = UsrToEnv(copy);
+        env->sender.data.group.onPE = CkMyPe();
+        env->TN  = env->SN=0;
+        env->piggyBcastIdx = epIdx;
+        env->setEpIdx(CkIndex_ArrayElement::recvBroadcast(0));
+        env->getsetArrayMgr() = thisgroup;
+        env->getsetArrayIndex() = *index;
+    env->getsetArrayEp() = CkIndex_ArrayElement::recvBroadcast(0);
+        env->setSrcPe(CkMyPe());
+        rec->deliver(copy,CkDeliver_queue);
+        _tempBroadcastCount++;
+    }else{
+        if(locMgr->homeElementCount != -1){
+            DEBUG(CmiPrintf("[%d] gid %d skipping broadcast to index %s \n",CmiMyPe(),thisgroup.idx,idx2str(*index)));
+        }
+    }
+}
+
+void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
+    arr->broadcastHomeElements(data,rec,index);
+}
+#endif
+
+
 /// Increment broadcast count; deliver to all local elements
 void CkArray::recvBroadcast(CkMessage *m)
 {
index c8fe2c794215cd2ef64f1d3a69f45ad07beff910..5a3e8655e143b8ca9685a65b0b85f7d28db170b3 100644 (file)
@@ -30,6 +30,7 @@ module CkArray {
     entry void ckDestroy(void);
     // CMK_MEM_CHECKPOINT
     entry void inmem_checkpoint(CkArrayCheckPTReqMessage *);
+       entry void recvBroadcast(CkMessage *);
   };
 
 
index d574dbe5175a09fbf72fa85e389a060b20cc54fa..c9f5c4737385604a930d88bc8463c43384387303 100644 (file)
@@ -515,10 +515,9 @@ private:
   void init_checkpt();
 #endif
 public:
-  void inmem_checkpoint(CkArrayCheckPTReqMessage *m);
-#ifdef _FAULT_MLOG_
-void recvBroadcast(CkMessage *);
-#endif
+       void inmem_checkpoint(CkArrayCheckPTReqMessage *m);
+       void recvBroadcast(CkMessage *);
+
 #if CMK_GRID_QUEUE_AVAILABLE
 public:
   int grid_queue_interval;
@@ -765,7 +764,10 @@ private:
 public:
   void flushStates() { CkReductionMgr::flushStates(); CK_ARRAYLISTENER_LOOP(listeners, l->flushState()); }
 #ifdef _FAULT_MLOG_
-    virtual int numberReductionMessages(){CkAssert(CkMyPe() == 0);return numInitial;}
+       // the mlogft only support 1D arrays, then returning the number of elements in the first dimension
+       virtual int numberReductionMessages(){CkAssert(CkMyPe() == 0);return numInitial.data()[0];}
+       void broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index);
+       static void staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index);
 #endif
 
 };
index d1dc15960be1bc76c180c664b03a2605f0ae19cb..8bfb5bb51ebbb5e28ab2ac78830fa49070419b89 100644 (file)
@@ -55,6 +55,7 @@ static const char *idx2str(const CkArrayMessage *m)
 #   define DEBB(x) CkPrintf x  //Broadcast debug messages
 #   define AA "LocMgr on %d: "
 #   define AB ,CkMyPe()
+#   define DEBUG(x) x
 #else
 #   define DEB(X) /*CkPrintf x*/
 #   define DEBI(X) /*CkPrintf x*/
@@ -65,6 +66,7 @@ static const char *idx2str(const CkArrayMessage *m)
 #   define DEBK(x) /*CkPrintf x*/
 #   define DEBB(x) /*CkPrintf x*/
 #   define str(x) /**/
+#   define DEBUG(x)
 #endif
 
 
@@ -1685,6 +1687,20 @@ void CkLocMgr::flushAllRecs(void)
   CmiImmediateUnlock(hashImmLock);
 }
 
+#ifdef _FAULT_MLOG_
+void CkLocMgr::callForAllRecords(CkLocFn fnPointer,CkArray *arr,void *data){
+    void *objp;
+    void *keyp;
+
+    CkHashtableIterator *it = hash.iterator();
+  while (NULL!=(objp=it->next(&keyp))) {
+    CkLocRec *rec=*(CkLocRec **)objp;
+    CkArrayIndex &idx=*(CkArrayIndex *)keyp;
+        fnPointer(arr,data,rec,&idx);
+    }
+}
+#endif
+
 /*************************** LocMgr: CREATION *****************************/
 CkLocMgr::CkLocMgr(CkGroupID mapID_,CkGroupID lbdbID_,CkArrayIndexMax& numInitial)
        :thisProxy(thisgroup),thislocalproxy(thisgroup,CkMyPe()),
index daaf1945c03833b3b446799ff6310ccc7a6cecaf..dbce0fbf23212a30c23d035fa791caf88fd616a9 100644 (file)
@@ -502,6 +502,12 @@ public:
                int onPe,int ctor,CkDeliver_t type) =0;
 };
 
+
+#ifdef _FAULT_MLOG_
+typedef void (*CkLocFn)(CkArray *,void *,CkLocRec *,CkArrayIndex *);
+#endif
+
+
 /**
  * A group which manages the location of an indexed set of
  * migratable objects.  Knows about insertions, deletions,
@@ -708,6 +714,13 @@ private:
        LDOMHandle myLBHandle;
 #endif
        void initLB(CkGroupID lbdbID);
+
+#ifdef _FAULT_MLOG_
+public:
+       void callForAllRecords(CkLocFn,CkArray *,void *);
+       int homeElementCount;
+#endif
+
 };
 
 
index 5883200b7d134e97c3d899af2ab69b8bec8da6b8..addcdca2311157b30f9c8fe0d50598653a9af259 100644 (file)
@@ -1,5 +1,3 @@
-
-//ERASE this line
 #ifdef _FAULT_MLOG_
 
 #include "ck.h"
@@ -1758,7 +1756,7 @@ void clearUpMigratedRetainedLists(int PE){
 /**
  * Function for restarting an object with message logging
  */
-void CkMlogRestart(const char * dummy){
+void CkMlogRestart(const char * dummy, CkArgMsg *dummyMsg){
        printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
        fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
        _restartFlag = 1;
@@ -3542,5 +3540,4 @@ envelope *copyEnvelope(envelope *env){
        return newEnv;
 }
 
-//ERASE this line
 #endif
index 1fa9fbe317041733a49864761a0603d254635d8c..3de32666c85bc251925f7ffab8c2b8c65b9ccfa9 100644 (file)
@@ -497,7 +497,7 @@ extern int _removeProcessedLogHandlerIdx;
 
 
 //methods for restart
-void CkMlogRestart(const char * dummy);
+void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
 void CkMlogRestartDouble(void *,double);
 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
 void initializeRestart(void *data,ChareMlogData *mlogData);
index f645e4a23c034029f49878a03fe66a6d920b2f84..5b88d3c270fe203753636118b67daa1ac98b4b32 100644 (file)
@@ -391,6 +391,42 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 #endif
 }
 
+#ifdef _FAULT_MLOG_
+void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
+    CmiAssert(CmiMyPe() == 0);
+    DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
+    if(redNo == 0){
+        if(perProcessorCounts[m->fromPE] == -1){
+            processorCount++;
+            perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
+            DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
+        }else{
+            if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
+                DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
+                perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
+            }
+        }
+        if(processorCount == CmiNumPes()){
+            totalCount = 0;
+            for(int i=0;i<CmiNumPes();i++){
+                CmiAssert(perProcessorCounts[i] != -1);
+                totalCount += perProcessorCounts[i];
+            }
+            DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
+        }
+        if(m->sourceProcessorCount == 0){
+            if(processorCount == CmiNumPes()){
+                finishReduction();
+            }
+            return;
+        }
+    }
+    addContribution(m);
+}
+#else
+void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
+#endif
+
 //////////// Reduction Manager Remote Entry Points /////////////
 //Sent down the reduction tree (used by barren PEs)
 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
index 3cd94e2b4766fd22a53bfda1810b2afc3564b444..2e72daefe006251e34b5f663b060c3c9cf930756 100644 (file)
@@ -26,6 +26,7 @@ module CkReduction {
 
        //call back for using NodeGroup Reductions added by sayantan
        entry void ArrayReductionHandler(CkReductionMsg *m);
+       entry void contributeViaMessage(CkReductionMsg *m);
   };
 
   nodegroup CkNodeReductionMgr : IrrGroup {
index ab8f170d5d0ec03c946e8adadbe50de9bb3de6d4..5a63efb5660eff94c0956003d4619663acb48d7c 100644 (file)
@@ -71,6 +71,10 @@ public:
   CkReductionNumberMsg(int n) {num=n;}
 };
 
+#ifdef _FAULT_MLOG_
+#define MAX_INT 5000000
+#endif
+
 
 /**
  * One CkReductionMgr runs a non-overlapping set of reductions.
@@ -223,6 +227,7 @@ public:
 #endif
        virtual void pup(PUP::er &p);
        static int isIrreducible(){ return 0;}
+       void contributeViaMessage(CkReductionMsg *m);
 };
 
 
index eb5753d345bebaebd5519ca1eb7da2578e82530f..88f3a2ad1f36bdef68af610a0bd7226c2386b1e7 100644 (file)
@@ -131,6 +131,12 @@ typedef void (*CkFtFn)(const char *, CkArgMsg *);
 static CkFtFn  faultFunc = NULL;
 static char* _restartDir;
 
+#ifdef _FAULT_MLOG_
+int chkptPeriod=1000;
+bool parallelRestart=false;
+extern int BUFFER_TIME; //time spent waiting for buffered messages
+#endif
+
 // flag for killing processes 
 extern int killFlag;
 // file specifying the processes to be killed
index 2477727e9ceeca768dcca1a35b8c4525e179292e..4a4ae60c540bdafec0d7fc8d7890c66404f409e9 100644 (file)
@@ -154,6 +154,9 @@ public:
   double * expectedLoad;       // expected load for future
 
 public:
+#ifdef _FAULT_MLOG_
+       int lbDecisionCount;
+#endif
   LBMigrateMsg(): level(0), n_moves(0), next_lb(0) {}
 };
 
index e6616b1fe3c6a5c3906ba1ea7806e090faa921e4..02d07f8ebb36ad4c8003e97c0d1eb5508b83d713 100644 (file)
@@ -17,6 +17,7 @@
 #include "LBSimulation.h"
 
 #define  DEBUGF(x)        //CmiPrintf x;
+#define  DEBUG(x)      // x;
 
 #if CMK_MEM_CHECKPOINT
    /* can not handle reduction in inmem FT */
@@ -722,6 +723,12 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
   }
   migrates_expected = 0;
   future_migrates_expected = 0;
+#ifdef _FAULT_MLOG_
+       int sending=0;
+    int dummy=0;
+       int *dummyCounts;
+       LBDB *_myLBDB = theLbdb->getLBDB();
+#endif
   for(i=0; i < m->n_moves; i++) {
     MigrateInfo& move = m->moves[i];
     const int me = CkMyPe();
index 26e24336295a092f534847ffd91a614ce9db6b35..6533ca2d72db67d86ce3d39ccc47b5a4fb6537ff 100644 (file)
@@ -277,6 +277,19 @@ private:
   int            startLBFn_count;
 public:
   int useMem();
+#ifdef _FAULT_MLOG_
+    int validObjHandle(LDObjHandle h ){
+            if(objCount == 0)
+                return 0;
+            if(h.handle > objCount)
+                return 0;
+            if(objs[h.handle] == NULL)
+                return 0;
+
+            return 1;
+    }
+#endif
+
 
   int getObjCount() {return objCount;}
   const ObjList& getObjs() {return objs;}