Changes to introduce team-based message logging support.
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 11 Mar 2010 19:57:48 +0000 (13:57 -0600)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 11 Mar 2010 19:57:48 +0000 (13:57 -0600)
src/ck-core/ck.C
src/ck-core/ckcheckpoint.C
src/ck-core/ckcheckpoint.h
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-core/ckmessagelogging.C
src/ck-core/ckmessagelogging.h
src/ck-core/init.C

index 25b72273592bb2d4ef2f204ab4f4ebeb78a7dbc6..fe041342eadba54db514a29730fc9ab4dbbe91ed 100644 (file)
@@ -81,6 +81,10 @@ Chare::Chare(CkMigrateMessage* m) {
   thishandle.onPE=CkMyPe();
   thishandle.objPtr=this;
 
+#ifdef _FAULT_MLOG_
+        mlogData = NULL;
+#endif
+
 #if CMK_OBJECT_QUEUE_AVAILABLE
   if (_defaultObjectQ)  CkEnableObjQ();
 #endif
@@ -129,10 +133,11 @@ void Chare::pup(PUP::er &p)
   if (chareIdx != -1) thishandle.objPtr=(void*)chareIdx;
 #endif
 #ifdef _FAULT_MLOG_
-  if(p.isUnpacking()){
-    mlogData = new ChareMlogData();
-  }
-  mlogData->pup(p);
+       if(p.isUnpacking()){
+               if(mlogData == NULL || !mlogData->teamRecoveryFlag)
+               mlogData = new ChareMlogData();
+       }
+       mlogData->pup(p);
 #endif
 }
 
index e789ccd8dbc911b590736896570ba6524dcebb83..4988f3e155a14e8e03cbfaf286665496210ac01f 100644 (file)
@@ -123,7 +123,11 @@ void CkCheckpointMgr::Checkpoint(const char *dirname, CkCallback& cb){
        FILE* fGroups = fopen(fileName,"wb");
        if(!fGroups) CkAbort("Failed to create checkpoint file for group table!");
        PUP::toDisk pGroups(fGroups);
-       CkPupGroupData(pGroups);
+#ifdef _FAULT_MLOG_
+    CkPupGroupData(pGroups,CmiTrue);
+#else
+    CkPupGroupData(pGroups);
+#endif
        fclose(fGroups);
 
        // save nodegroups into NodeGroups.dat
@@ -134,7 +138,11 @@ void CkCheckpointMgr::Checkpoint(const char *dirname, CkCallback& cb){
          if(!fNodeGroups) 
            CkAbort("Failed to create checkpoint file for nodegroup table!");
          PUP::toDisk pNodeGroups(fNodeGroups);
-         CkPupNodeGroupData(pNodeGroups);
+#ifdef _FAULT_MLOG_
+      CkPupNodeGroupData(pNodeGroups,CmiTrue);
+#else
+      CkPupNodeGroupData(pNodeGroups);
+#endif
          fclose(fNodeGroups);
        }
 
@@ -262,6 +270,120 @@ void CkPupChareData(PUP::er &p)
 }
 #endif
 
+#ifdef _FAULT_MLOG_
+// handle GroupTable and data
+void CkPupGroupData(PUP::er &p, CmiBool create)
+{
+       int numGroups, i;
+
+       if (!p.isUnpacking()) {
+         numGroups = CkpvAccess(_groupIDTable)->size();
+       }
+       p|numGroups;
+       if (p.isUnpacking()) {
+         if(CkMyPe()==0)  
+            CkpvAccess(_numGroups) = numGroups+1; 
+          else 
+           CkpvAccess(_numGroups) = 1;
+       }
+       DEBCHK("[%d] CkPupGroupData %s: numGroups = %d\n", CkMyPe(),p.typeString(),numGroups);
+
+       GroupInfo *tmpInfo = new GroupInfo [numGroups];
+       if (!p.isUnpacking()) {
+         for(i=0;i<numGroups;i++) {
+               tmpInfo[i].gID = (*CkpvAccess(_groupIDTable))[i];
+               TableEntry ent = CkpvAccess(_groupTable)->find(tmpInfo[i].gID);
+               tmpInfo[i].MigCtor = _chareTable[ent.getcIdx()]->migCtor;
+               tmpInfo[i].DefCtor = _chareTable[ent.getcIdx()]->defCtor;
+               strncpy(tmpInfo[i].name,_chareTable[ent.getcIdx()]->name,255);
+               //CkPrintf("[%d] CkPupGroupData: %s group %s \n", CkMyPe(), p.typeString(), tmpInfo[i].name);
+
+               if(tmpInfo[i].MigCtor==-1) {
+                       char buf[512];
+                       sprintf(buf,"Group %s needs a migration constructor and PUP'er routine for restart.\n", tmpInfo[i].name);
+                       CkAbort(buf);
+               }
+         }
+       }
+       for (i=0; i<numGroups; i++) p|tmpInfo[i];
+
+       for(i=0;i<numGroups;i++) 
+       {
+         CkGroupID gID = tmpInfo[i].gID;
+         if (p.isUnpacking()) {
+           //CkpvAccess(_groupIDTable)->push_back(gID);
+           int eIdx = tmpInfo[i].MigCtor;
+           // error checking
+           if (eIdx == -1) {
+             CkPrintf("[%d] ERROR> Group %s's migration constructor is not defined!\n", CkMyPe(), tmpInfo[i].name); CkAbort("Abort");
+           }
+           void *m = CkAllocSysMsg();
+           envelope* env = UsrToEnv((CkMessage *)m);
+               if(create)
+                   CkCreateLocalGroup(gID, eIdx, env);
+         }   // end of unPacking
+         IrrGroup *gobj = CkpvAccess(_groupTable)->find(gID).getObj();
+         // if using migration constructor, you'd better have a pup
+               if(!create)
+                       gobj->mlogData->teamRecoveryFlag = 1;
+          gobj->pup(p);
+         // CkPrintf("Group PUP'ed: gid = %d, name = %s\n",gobj->ckGetGroupID().idx, tmpInfo[i].name);
+       }
+       delete [] tmpInfo;
+}
+
+// handle NodeGroupTable and data
+void CkPupNodeGroupData(PUP::er &p, CmiBool create)
+{
+       int numNodeGroups, i;
+       if (!p.isUnpacking()) {
+         numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
+       }
+       p|numNodeGroups;
+       if (p.isUnpacking()) {
+         if(CkMyPe()==0){ CksvAccess(_numNodeGroups) = numNodeGroups+1; }
+         else { CksvAccess(_numNodeGroups) = 1; }
+       }
+       if(CkMyPe() == 3)
+       CkPrintf("[%d] CkPupNodeGroupData %s: numNodeGroups = %d\n",CkMyPe(),p.typeString(),numNodeGroups);
+
+       GroupInfo *tmpInfo = new GroupInfo [numNodeGroups];
+       if (!p.isUnpacking()) {
+         for(i=0;i<numNodeGroups;i++) {
+               tmpInfo[i].gID = CksvAccess(_nodeGroupIDTable)[i];
+               TableEntry ent2 = CksvAccess(_nodeGroupTable)->find(tmpInfo[i].gID);
+               tmpInfo[i].MigCtor = _chareTable[ent2.getcIdx()]->migCtor;
+               if(tmpInfo[i].MigCtor==-1) {
+                       char buf[512];
+                       sprintf(buf,"NodeGroup %s either need a migration constructor and\n\
+                                    declared as [migratable] in .ci to be able to checkpoint.",\
+                                    _chareTable[ent2.getcIdx()]->name);
+                       CkAbort(buf);
+               }
+         }
+       }
+       for (i=0; i<numNodeGroups; i++) p|tmpInfo[i];
+       for (i=0;i<numNodeGroups;i++) {
+               CkGroupID gID = tmpInfo[i].gID;
+               if (p.isUnpacking()) {
+                       //CksvAccess(_nodeGroupIDTable).push_back(gID);
+                       int eIdx = tmpInfo[i].MigCtor;
+                       void *m = CkAllocSysMsg();
+                       envelope* env = UsrToEnv((CkMessage *)m);
+                       if(create){
+                               CkCreateLocalNodeGroup(gID, eIdx, env);
+                       }
+               }
+               TableEntry ent2 = CksvAccess(_nodeGroupTable)->find(gID);
+               IrrGroup *obj = ent2.getObj();
+               obj->pup(p);
+               if(CkMyPe() == 3) CkPrintf("Nodegroup PUP'ed: gid = %d, name = %s\n",
+                       obj->ckGetGroupID().idx,
+                       _chareTable[ent2.getcIdx()]->name);
+       }
+       delete [] tmpInfo;
+}
+#else
 // handle GroupTable and data
 void CkPupGroupData(PUP::er &p)
 {
@@ -370,7 +492,7 @@ void CkPupNodeGroupData(PUP::er &p)
        }
        delete [] tmpInfo;
 }
-
+#endif
 
 // handle chare array elements for this processor
 void CkPupArrayElementsData(PUP::er &p, int notifyListeners)
@@ -405,7 +527,7 @@ void CkPupArrayElementsData(PUP::er &p, int notifyListeners)
                 p|idx;
                CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
                if (notifyListeners){
-                 mgr->resume(idx,p);
+                 mgr->resume(idx,p,CmiTrue);
                }
                 else{
                  mgr->restore(idx,p);
@@ -431,6 +553,41 @@ int  CkCountArrayElements(){
 }
 #endif
 
+void CkPupProcessorData(PUP::er &p)
+{
+    // save readonlys, and callback BTW
+    if(CkMyRank()==0) {
+        CkPupROData(p);
+    }
+
+    // save mainchares into MainChares.dat
+    if(CkMyPe()==0) {
+      CkPupMainChareData(p, NULL);
+    }
+       
+    // save non-migratable chare
+    CkPupChareData(p);
+
+    // save groups 
+#ifdef _FAULT_MLOG_
+    CkPupGroupData(p,CmiTrue);
+#else
+    CkPupGroupData(p);
+#endif
+
+    // save nodegroups
+    if(CkMyRank()==0) {
+#ifdef _FAULT_MLOG_
+        CkPupNodeGroupData(p,CmiTrue); 
+#else
+        CkPupNodeGroupData(p);
+#endif
+    }
+
+    // pup array elements
+    CkPupArrayElementsData(p);
+}
+
 // called only on pe 0
 static void checkpointOne(const char* dirname, CkCallback& cb){
        CmiAssert(CkMyPe()==0);
@@ -560,7 +717,11 @@ void CkRestartMain(const char* dirname, CkArgMsg *args){
        FILE* fGroups = fopen(filename,"rb");
        if(!fGroups) CkAbort("Failed to open checkpoint file for group table!");
        PUP::fromDisk pGroups(fGroups);
-       CkPupGroupData(pGroups);
+#ifdef _FAULT_MLOG_
+    CkPupGroupData(pGroups,CmiTrue);
+#else
+    CkPupGroupData(pGroups);
+#endif
        fclose(fGroups);
 
        // restore nodegroups
@@ -573,7 +734,11 @@ void CkRestartMain(const char* dirname, CkArgMsg *args){
                FILE* fNodeGroups = fopen(filename,"rb");
                if(!fNodeGroups) CkAbort("Failed to open checkpoint file for nodegroup table!");
                PUP::fromDisk pNodeGroups(fNodeGroups);
-               CkPupNodeGroupData(pNodeGroups);
+#ifdef _FAULT_MLOG_
+        CkPupNodeGroupData(pNodeGroups,CmiTrue);
+#else
+        CkPupNodeGroupData(pNodeGroups);
+#endif
                fclose(fNodeGroups);
        }
 
index f290fe9715afafefc4d66a51efffc73b369f1c67..5db9df726fe837bbe8180cb5d4138ff835a46231 100644 (file)
@@ -60,9 +60,15 @@ public:
 void CkPupROData(PUP::er &p);
 void CkPupMainChareData(PUP::er &p, CkArgMsg *args);
 void CkPupChareData(PUP::er &p);
+#ifdef _FAULT_MLOG_
+void CkPupGroupData(PUP::er &p,CmiBool create);
+void CkPupNodeGroupData(PUP::er &p,CmiBool create);
+#else
 void CkPupGroupData(PUP::er &p);
 void CkPupNodeGroupData(PUP::er &p);
+#endif
 void CkPupArrayElementsData(PUP::er &p, int notifyListeners=1);
+void CkPupProcessorData(PUP::er &p);
 void CkRemoveArrayElements();
 //void CkTestArrayElements();
 
index b5317c1d587811246a95d8e017b4757d08eea2a3..cf1a0f5dfa90ade8d4522c8320117f80ca88f444 100644 (file)
@@ -2294,7 +2294,7 @@ void CkLocMgr::iterate(CkLocIterator &dest) {
 /************************** LocMgr: MIGRATION *************************/
 #ifdef _FAULT_MLOG_
 void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
-        CkElementCreation_t type,int dummy)
+        CkElementCreation_t type, CmiBool create, int dummy)
 {
     p.comment("-------- Array Location --------");
     register ManagerRec *m;
@@ -2314,7 +2314,8 @@ void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
             CkMigratable *elt=m->mgr->allocateMigrated(elCType,rec->getIndex(),type);
             int migCtorIdx=_chareTable[elCType]->getMigCtor();
                 if(!dummy){
-                    if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
+                       if(create)
+                               if (!addElementToRec(rec,m,elt,migCtorIdx,NULL)) return;
                                }else{
                     CkMigratable_initInfo &i=CkpvAccess(mig_initInfo);
                     i.locRec=rec;
@@ -2592,11 +2593,23 @@ void CkLocMgr::restore(const CkArrayIndex &idx, PUP::er &p)
 
 /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
 #ifdef _FAULT_MLOG_
-void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p,int dummy)
+void CkLocMgr::resume(const CkArrayIndex &idx, PUP::er &p, CmiBool create, int dummy)
 {
-    CkLocRec_local *rec=createLocal(idx,CmiFalse,CmiFalse,CmiTrue && !dummy /* home doesn't know yet */,dummy );
+       CkLocRec_local *rec;
+       CkLocRec *recGlobal;    
+
+       if(create){
+               rec = createLocal(idx,CmiFalse,CmiFalse,CmiTrue && !dummy /* home doesn't know yet */,dummy );
+       }else{
+               recGlobal = elementNrec(idx);
+               if(recGlobal == NULL) 
+                       CmiAbort("Local object not found");
+               if(recGlobal->type() != CkLocRec::local)
+                       CmiAbort("Local object not local, :P");
+               rec = (CkLocRec_local *)recGlobal;
+       }
         
-    pupElementsFor(p,rec,CkElementCreation_resume,dummy);
+    pupElementsFor(p,rec,CkElementCreation_resume,create,dummy);
 
     if(!dummy){
         callMethod(rec,&CkMigratable::ckJustMigrated);
index 81afc720b8ec04403a8960da3aaf8819a2184f8b..00e884c0ba995b6c2b85b4f845b81bbdbe7f1f88 100644 (file)
@@ -622,7 +622,7 @@ public:
        void restore(const CkArrayIndex &idx, PUP::er &p);
        /// Insert and unpack this array element from this checkpoint (e.g., from CkLocation::pup)
 #ifdef _FAULT_MLOG_
-       void resume(const CkArrayIndex &idx, PUP::er &p,int dummy=0);
+       void resume(const CkArrayIndex &idx, PUP::er &p, CmiBool create, int dummy=0);
 #else
        void resume(const CkArrayIndex &idx, PUP::er &p, CmiBool notify=CmiTrue);
 #endif
@@ -659,8 +659,8 @@ private:
        friend class CkLocation; //so it can call pupElementsFor
        friend class ArrayElement;
 #ifdef _FAULT_MLOG_
- void pupElementsFor(PUP::er &p,CkLocRec_local *rec,
-        CkElementCreation_t type,int dummy=0);
      void pupElementsFor(PUP::er &p,CkLocRec_local *rec,
+        CkElementCreation_t type, CmiBool create=CmiTrue, int dummy=0);
 #else
        void pupElementsFor(PUP::er &p,CkLocRec_local *rec,
                CkElementCreation_t type);
index c6bbea29dfc3ca58d9b8b673db512225387fd1d3..1d91fc5ef4916ce635641087313220400c627b36 100644 (file)
 #ifdef _FAULT_MLOG_
 
 //#define DEBUG(x)  if(_restartFlag) {x;}
-#define DEBUG_MEM(x) // x
+#define DEBUG_MEM(x) //x
 #define DEBUG(x)  //x
 #define DEBUGRESTART(x)  //x
 #define DEBUGLB(x) // x
-#define DEBUG_TEAM(x)  x
+#define DEBUG_TEAM(x)  // x
 
 #define BUFFERED_LOCAL
 #define BUFFERED_REMOTE 
@@ -59,6 +59,7 @@ int countClearBufferedLocalCalls=0;
 
 int countUpdateHomeAcks=0;
 
+extern int teamSize;
 extern int chkptPeriod;
 extern bool parallelRestart;
 
@@ -474,7 +475,7 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
        if(isLocal(destPE)){
                ticketLogLocalMessage(mEntry);
        }else{
-               if((TEAM_SIZE_MLOG > 1) && isTeamLocal(destPE)){
+               if((teamSize > 1) && isTeamLocal(destPE)){
 
                        // look to see if this message already has a ticket in the team-table
                        Chare *senderObj = (Chare *)sender.getObject();
@@ -483,7 +484,7 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                                Ticket ticket = ticketRow->get(env->SN);
                                if(ticket.TN != 0){
                                        ticketNumber = ticket.TN;
-                                       CkPrintf("[%d] Found a team preticketed message\n",CkMyPe());
+                                       DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
                                }
                        }
                }
@@ -513,7 +514,7 @@ inline bool isLocal(int destPE){
 inline bool isTeamLocal(int destPE){
 
        // they belong to the same group
-       if(TEAM_SIZE_MLOG > 1 && destPE/TEAM_SIZE_MLOG == CkMyPe()/TEAM_SIZE_MLOG)
+       if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
                return true;
 
        return false;
@@ -898,8 +899,8 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                Ticket ticket;
 
                // checking if the message is team local and if it has a ticket already assigned
-               if(TEAM_SIZE_MLOG > 1 && TN != 0){
-                       CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe());
+               if(teamSize > 1 && TN != 0){
+                       DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
                        ticket.TN = TN;
                        recverObj->mlogData->verifyTicket(sender,SN,TN);
                }
@@ -1486,11 +1487,11 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        
        CkPupROData(psizer);
        DEBUG_MEM(CmiMemoryCheck());
-       CkPupGroupData(psizer);
+       CkPupGroupData(psizer,CmiTrue);
        DEBUG_MEM(CmiMemoryCheck());
-       CkPupNodeGroupData(psizer);
+       CkPupNodeGroupData(psizer,CmiTrue);
        DEBUG_MEM(CmiMemoryCheck());
-       pupArrayElementsSkip(psizer,NULL);
+       pupArrayElementsSkip(psizer,CmiTrue,NULL);
        DEBUG_MEM(CmiMemoryCheck());
 
        int dataSize = psizer.size();
@@ -1507,9 +1508,9 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        pBuf | checkpointCount;
        
        CkPupROData(pBuf);
-       CkPupGroupData(pBuf);
-       CkPupNodeGroupData(pBuf);
-       pupArrayElementsSkip(pBuf,NULL);
+       CkPupGroupData(pBuf,CmiTrue);
+       CkPupNodeGroupData(pBuf,CmiTrue);
+       pupArrayElementsSkip(pBuf,CmiTrue,NULL);
 
        unAckedCheckpoint=1;
        CmiSetHandler(msg,_storeCheckpointHandlerIdx);
@@ -1556,10 +1557,12 @@ public:
     }
 };
 
-
-void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
+/**
+ * Pups all the array elements in this processor.
+ */
+void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
        int numElements,i;
-  int numGroups = CkpvAccess(_groupIDTable)->size();   
+       int numGroups = CkpvAccess(_groupIDTable)->size();      
        if(!p.isUnpacking()){
                numElements = CkCountArrayElements();
        }       
@@ -1577,13 +1580,13 @@ void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
                        }
                }
                
- printf("numElements = %d\n",numElements);
              printf("numElements = %d\n",numElements);
        
-         for (int i=0; i<numElements; i++) {
+               for (int i=0; i<numElements; i++) {
                        CkGroupID gID;
                        CkArrayIndexMax idx;
                        p|gID;
-           p|idx;
+               p|idx;
                        int flag=0;
                        int matchedIdx=0;
                        for(int j=0;j<listsize;j++){
@@ -1602,12 +1605,13 @@ void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
                        }
                                
                        CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
-                       mgr->resume(idx,p,flag);
+                       CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
+                       mgr->resume(idx,p,create,flag);
                        if(flag == 1){
                                int homePE = mgr->homePe(idx);
                                informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
                        }
-         }
+               }
        }
 };
 
@@ -1863,11 +1867,11 @@ void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
        // setting the restart flag
        _restartFlag = 1;
 
-       // if we are using group-based message logging, all members of the group have to be restarted
-       if(TEAM_SIZE_MLOG > 1){
-               for(int i=(CkMyPe()/TEAM_SIZE_MLOG)*TEAM_SIZE_MLOG; i<((CkMyPe()/TEAM_SIZE_MLOG)+1)*TEAM_SIZE_MLOG; i++){
+       // if we are using team-based message logging, all members of the group have to be restarted
+       if(teamSize > 1){
+               for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
                        if(i != CkMyPe() && i < CkNumPes()){
-                               // sending a message to the group member
+                               // sending a message to the team member
                                msg.PE = CkMyPe();
                            CmiSetHandler(&msg,_restartHandlerIdx);
                            CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
@@ -1895,7 +1899,8 @@ void _restartHandler(RestartRequest *restartMsg){
     // setting the restart flag
        _restartFlag = 1;
 
-       // flushing all buffers 
+       // flushing all buffers
+       //TEST END
 /*     CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
        CKLOCMGR_LOOP(mgr->flushAllRecs(););    
        for(int i=0;i<numGroups;i++){
@@ -1982,10 +1987,9 @@ void _recvRestartCheckpointHandler(char *_restartData){
        PUP::fromMem pBuf(buf);
        pBuf | checkpointCount;
        CkPupROData(pBuf);
-       CkPupGroupData(pBuf);
-       CkPupNodeGroupData(pBuf);
-//     pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
-       pupArrayElementsSkip(pBuf,NULL);
+       CkPupGroupData(pBuf,CmiFalse);
+       CkPupNodeGroupData(pBuf,CmiFalse);
+       pupArrayElementsSkip(pBuf,CmiFalse,NULL);
        CkAssert(pBuf.size() == restartData->checkPointSize);
        printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
        
@@ -2272,8 +2276,8 @@ void createObjIDList(void *data,ChareMlogData *mlogData){
        entry.recver = mlogData->objID;
        entry.tProcessed = mlogData->tProcessed;
        list->push_back(entry);
-       char objString[100];
-       DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
+       DEBUG_TEAM(char objString[100]);
+       DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
 }
 
 
@@ -2308,10 +2312,9 @@ void _recvCheckpointHandler(char *_restartData){
        pBuf | checkpointCount;
 
        CkPupROData(pBuf);
-       CkPupGroupData(pBuf);
-       CkPupNodeGroupData(pBuf);
-//     pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
-       pupArrayElementsSkip(pBuf,NULL);
+       CkPupGroupData(pBuf,CmiTrue);
+       CkPupNodeGroupData(pBuf,CmiTrue);
+       pupArrayElementsSkip(pBuf,CmiTrue,NULL);
        CkAssert(pBuf.size() == restartData->checkPointSize);
        printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
        
@@ -2524,6 +2527,7 @@ char name[100];
  * particular metadata information.
  */
 void setTeamRecovery(void *data, ChareMlogData *mlogData){
+       char name[100];
        mlogData->teamRecoveryFlag = 1; 
 }
 
@@ -2816,7 +2820,13 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
        // increases the number of resendReply received
        obj->mlogData->resendReplyRecvd++;
 
-       CkPrintf("[%d] processReceivedTN with %d listSize",CkMyPe(),listSize);  
+       DEBUG(char objName[100]);
+       DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
+       //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
+       //if(obj->mlogData->receivedTNs == NULL)
+       //      CkPrintf("NULL\n");     
+       //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
+
        // includes the tickets into the receivedTN structure
        for(int j=0;j<listSize;j++){
                obj->mlogData->receivedTNs->push_back(listTNs[j]);
@@ -2839,7 +2849,7 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                        int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
                        
                        // updating tCount with the highest ticket handed out
-                       if(TEAM_SIZE_MLOG > 1){
+                       if(teamSize > 1){
                                if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
                                        obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
                        }else{
@@ -2857,7 +2867,7 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                                        if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
                                                //the TNs are not consecutive at this point
                                                for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
-                                                       printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
+                                                       DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
                                                        obj->mlogData->ticketHoles[countHoles] = newTN;
                                                        countHoles++;
                                                }       
@@ -2875,6 +2885,7 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
        
                // cleaning up structures and getting ready to continue execution       
                delete obj->mlogData->receivedTNs;
+               DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
                obj->mlogData->receivedTNs = NULL;
                obj->mlogData->restartFlag = 0;
 
@@ -3025,7 +3036,7 @@ void _distributedLocationHandler(char *receivedMsg){
        pmem |idx;
        CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
        donotCountMigration=1;
-       mgr->resume(idx,pmem);
+       mgr->resume(idx,pmem,CmiTrue);
        donotCountMigration=0;
        informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
        printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
@@ -3101,12 +3112,6 @@ void _dummyMigrationHandler(DummyMigrationMsg *msg){
        CmiFree(msg);
 };
 
-
-
-
-
-
-
 /*****************************************************
        Implementation of a method that can be used to call
        any method on the ChareMlogData of all the chares on
@@ -3152,8 +3157,6 @@ void forAllCharesDo(MlogFn fnPointer,void *data){
        }
        int i;
        CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
-       
-       
 };
 
 
@@ -3176,6 +3179,7 @@ void initMlogLBStep(CkGroupID gid){
 
 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
        DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
+       DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
        
        resumeLbFnPtr = _fnPtr;
        centralLb = _centralLb;
@@ -3431,7 +3435,7 @@ void _messageLoggingExit(){
        printf("[%d] _messageLoggingExit \n",CmiMyPe());
 
        //TML: printing some statistics for group approach
-       //if(TEAM_SIZE_MLOG > 1)
+       //if(teamSize > 1)
                CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
 
 }
@@ -4019,8 +4023,8 @@ void ChareMlogData::pup(PUP::er &p){
  */
 int getCheckPointPE(){
        //TML: assigning a team-based buddy
-       if(TEAM_SIZE_MLOG != 1){
-               return (CmiMyPe() + TEAM_SIZE_MLOG) % CmiNumPes();
+       if(teamSize != 1){
+               return (CmiMyPe() + teamSize) % CmiNumPes();
        }
        return (CmiNumPes() -1 - CmiMyPe());
 }
index f5d645c01869179a164fdcaf288fe15e786ec92b..2b468116e6bf56d96255e109326fe11ac79a3845 100644 (file)
@@ -17,7 +17,6 @@ CpvExtern(Chare *,_currentObj);
 #define FORWARDED_TICKET 0x8000
 
 //TML: global variable for the size of the team
-#define TEAM_SIZE_MLOG 1
 #define MLOG_RESTARTED 0
 #define MLOG_CRASHED 1
 #define MEGABYTE 1048576
@@ -41,7 +40,7 @@ public:
                state = 0;
        }
 };
-PUPbytes(Ticket)
+PUPbytes(Ticket);
 class MlogEntry;
 
 /**
@@ -61,7 +60,7 @@ typedef struct{
        int senderPE;
        int recverPE;
 } LocalMessageLog;
-PUPbytes(LocalMessageLog)
+PUPbytes(LocalMessageLog);
 
 class MlogEntry;
 class RestoredLocalMap;
@@ -536,7 +535,7 @@ CpvExtern(StoredCheckpoint *,_storedCheckpointData);
 //methods for checkpointing
 void checkpointAlarm(void *_dummy,double curWallTime);
 void startMlogCheckpoint(void *_dummy,double curWallTime);
-void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listSize=0);
+void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listSize=0);
 
 //handler functions for checkpoint
 void _checkpointRequestHandler(CheckpointRequest *request);
@@ -653,6 +652,6 @@ extern int _receiveLocationHandlerIdx;
 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
 inline void processRemoteMlogMessages(){
        CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
-}
+};
 
 #endif
index 65d6d84ea1908e8e5dfaebed5ebcc2c939648401..4a885944da78ead7ae0c760b6feabf621b798346 100644 (file)
@@ -188,6 +188,7 @@ static CkFtFn  faultFunc = NULL;
 static char* _restartDir;
 
 #ifdef _FAULT_MLOG_
+int teamSize=1;
 int chkptPeriod=1000;
 bool parallelRestart=false;
 extern int BUFFER_TIME; //time spent waiting for buffered messages
@@ -284,6 +285,9 @@ static inline void _parseCommandLineOpts(char **argv)
                _raiseEvac = 1;
        }
 #ifdef _FAULT_MLOG_
+       if(!CmiGetArgIntDesc(argv,"+teamSize",&teamSize,"Set the team size for message logging")){
+        teamSize = 1;
+    }
     if(!CmiGetArgIntDesc(argv,"+chkptPeriod",&chkptPeriod,"Set the checkpoint period for the message logging fault tolerance algorithm in seconds")){
         chkptPeriod = 100;
     }