Implementation of recovery for the team-based message-logging approach.
authorEsteban Meneses <emenese2@illinois.edu>
Mon, 14 Dec 2009 15:49:26 +0000 (09:49 -0600)
committerEsteban Meneses <emenese2@illinois.edu>
Mon, 14 Dec 2009 15:49:26 +0000 (09:49 -0600)
src/ck-core/ckmessagelogging.C
src/ck-core/ckmessagelogging.h

index 262f008417119344953efe0cf4af8426c62ec161..c6bbea29dfc3ca58d9b8b673db512225387fd1d3 100644 (file)
 #ifdef _FAULT_MLOG_
 
 //#define DEBUG(x)  if(_restartFlag) {x;}
+#define DEBUG_MEM(x) // x
 #define DEBUG(x)  //x
 #define DEBUGRESTART(x)  //x
 #define DEBUGLB(x) // x
-#define DEBUG_TEAM(x)  //x
+#define DEBUG_TEAM(x)  x
 
 #define BUFFERED_LOCAL
 #define BUFFERED_REMOTE 
@@ -39,7 +40,7 @@ inline bool isTeamLocal(int destPE);
 int _restartFlag=0;
 //ERASE int restarted=0; // it's not being used anywhere
 
-//TML: variables for measuring savings with groups in message logging
+//TML: variables for measuring savings with teams in message logging
 float MLOGFT_totalLogSize = 0.0;
 float MLOGFT_totalMessages = 0.0;
 float MLOGFT_totalObjects = 0.0;
@@ -127,10 +128,12 @@ int _resendReplyHandlerIdx;
 int _receivedTNDataHandlerIdx;
 int _distributedLocationHandlerIdx;
 
-//TML: integer constants for group-based message logging
+//TML: integer constants for team-based message logging
 int _restartHandlerIdx;
 int _getRestartCheckpointHandlerIdx;
 int _recvRestartCheckpointHandlerIdx;
+void setTeamRecovery(void *data, ChareMlogData *mlogData);
+void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
 
 int verifyAckTotal;
 int verifyAckCount;
@@ -196,7 +199,7 @@ void _messageLoggingInit(){
        _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
        _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
        _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
-  _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
+       _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
        _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
 
                
@@ -220,7 +223,7 @@ void _messageLoggingInit(){
        _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
        _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
 
-       //TML: handlers for group-based message logging
+       //TML: handlers for team-based message logging
        _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
        _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
        _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
@@ -420,6 +423,7 @@ void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
  */
 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
        envelope *env = _env;
+       MCount ticketNumber = 0;
        int resend=0; //is it a resend
        char recverName[100];
        double _startTime=CkWallTimer();
@@ -470,7 +474,23 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
        if(isLocal(destPE)){
                ticketLogLocalMessage(mEntry);
        }else{
-               sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
+               if((TEAM_SIZE_MLOG > 1) && isTeamLocal(destPE)){
+
+                       // look to see if this message already has a ticket in the team-table
+                       Chare *senderObj = (Chare *)sender.getObject();
+                       SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
+                       if(ticketRow != NULL){
+                               Ticket ticket = ticketRow->get(env->SN);
+                               if(ticket.TN != 0){
+                                       ticketNumber = ticket.TN;
+                                       CkPrintf("[%d] Found a team preticketed message\n",CkMyPe());
+                               }
+                       }
+               }
+               
+               // sending the ticket request
+               sendTicketRequest(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
+               
        }
 }
 
@@ -504,7 +524,7 @@ inline bool isTeamLocal(int destPE){
 /**
  * Method that does the actual send by creating a ticket request filling it up and sending it.
  */
-void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
+void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
        char recverString[100],senderString[100];
        envelope *env = entry->env;
        DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
@@ -539,6 +559,7 @@ void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *ent
        ticketRequest->recver = recver;
        ticketRequest->logEntry = entry;
        ticketRequest->SN = SN;
+       ticketRequest->TN = TN;
        ticketRequest->senderPE = CkMyPe();
 
        CpvAccess(_numBufferedTicketRequests)[destPE]++;
@@ -560,20 +581,21 @@ void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *ent
        ticketRequest.recver = recver;
        ticketRequest.logEntry = entry;
        ticketRequest.SN = SN;
+       ticketRequest.TN = TN;
        ticketRequest.senderPE = CkMyPe();
        
        CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
 //     CmiBecomeImmediate(&ticketRequest);
        CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
 #endif
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 /**
  * Send the ticket requests buffered for processor PE
  **/
 void sendBufferedTicketRequests(int destPE){
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
        if(numberRequests == 0){
                return;
@@ -588,7 +610,7 @@ void sendBufferedTicketRequests(int destPE){
        CmiSyncSend(destPE,totalSize,(char *)buf);
        
        CpvAccess(_numBufferedTicketRequests)[destPE]=0;
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
@@ -598,7 +620,7 @@ void checkBufferedTicketRequests(void *_destPE,double curWallTime){
 //             traceUserEvent(35);
        }
        delete (int *)_destPE;
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 /**
@@ -608,7 +630,7 @@ void checkBufferedTicketRequests(void *_destPE,double curWallTime){
  */
 void ticketLogLocalMessage(MlogEntry *entry){
        double _startTime=CkWallTimer();
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 
        Chare *recverObj = (Chare *)entry->env->recver.getObject();
        DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
@@ -645,15 +667,15 @@ void ticketLogLocalMessage(MlogEntry *entry){
                // sends a copy of the metadata to the buddy    
                sendLocalMessageCopy(entry);
                
-               DEBUG(CmiMemoryCheck());
+               DEBUG_MEM(CmiMemoryCheck());
 
                // sets the unackedLocal flag and stores the message in the log
                entry->unackedLocal = 1;
                CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
 
-               DEBUG(CmiMemoryCheck());
+               DEBUG_MEM(CmiMemoryCheck());
        }else{
-               CkPrintf("[%d] Local message in group-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
+               CkPrintf("[%d] Local message in team-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
                DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
        }
        _lockNewTicket=0;
@@ -693,7 +715,7 @@ void sendLocalMessageCopy(MlogEntry *entry){
        
        CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
 #endif
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 
@@ -708,7 +730,7 @@ void sendBufferedLocalMessageCopy(){
        BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
        header->numberLogs=numberLogs;
 
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
        
        char *ptr = (char *)buf;
@@ -723,7 +745,7 @@ void sendBufferedLocalMessageCopy(){
        CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
 
        CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
@@ -731,7 +753,7 @@ void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
        if(countClearBufferedLocalCalls > 10){
                CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
        if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
                if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
@@ -739,7 +761,7 @@ void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
 //                     traceUserEvent(36);
                }
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 }
 
 /****
@@ -768,12 +790,12 @@ inline void _ticketRequestHandler(TicketRequest *ticketRequest){
  * */
 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
        DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        double _startTime = CkWallTimer();
        if(CpvAccess(_delayedTicketRequests)->length() > 0){
                retryTicketRequest(NULL,_startTime);
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
   int numRequests = recvdHeader->numberLogs;
        char *msg = (char *)recvdHeader;
        msg = &msg[sizeof(BufferedTicketRequestHeader)];
@@ -787,7 +809,7 @@ void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
        BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
        header->numberLogs = 0;
 
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        
        ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
        
@@ -811,7 +833,7 @@ void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
        CmiSyncSend(senderPE,totalSize,(char *)buf);
        CmiFree(recvdHeader);
 //     traceUserBracketEvent(21,_startTime,CkWallTimer());                     
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 /**Process the ticket request. 
@@ -834,17 +856,18 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                _lockNewTicket = 1;
        }*/
 
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
+
+       // getting information from request
        CkObjID sender = ticketRequest->sender;
        CkObjID recver = ticketRequest->recver;
        MCount SN = ticketRequest->SN;
-       
-       
+       MCount TN = ticketRequest->TN;
        Chare *recverObj = (Chare *)recver.getObject();
        
-       
-       char recverName[100];
+       DEBUG(char recverName[100]);
        DEBUG(recver.toString(recverName);)
+
        if(recverObj == NULL){
                int estPE = recver.guessPE();
                if(estPE == CkMyPe() || estPE == -1){           
@@ -855,7 +878,6 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                                CkArrayID aid(recver.data.array.id);            
                                CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
                                DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
-
                        }
                        TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
                        *delayed = *ticketRequest;
@@ -865,20 +887,27 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                        DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
                        TicketRequest forward = *ticketRequest;
                        CmiSetHandler(&forward,_ticketRequestHandlerIdx);
-                       CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
-                       
-                       
+                       CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);
                }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
                return false; // if the receverObj does not exist the ticket request cannot have been 
                              // processed successfully
        }else{
                char senderString[100];
+               
                Ticket ticket;
+
+               // checking if the message is team local and if it has a ticket already assigned
+               if(TEAM_SIZE_MLOG > 1 && TN != 0){
+                       CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe());
+                       ticket.TN = TN;
+                       recverObj->mlogData->verifyTicket(sender,SN,TN);
+               }
+
                //check if a ticket for this has been already handed out to an object that used to be local but 
                // is no longer so.. need for parallel restart
-               
                if(recverObj->mlogData->mapTable.numObjects() > 0){
+                       
                        ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
                }
                
@@ -904,7 +933,6 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                }*/
 //             CkAssert(ticket.TN >= SN);
                DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
-
 //             TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
     if(reply == NULL){ 
                        //There is no reply buffer and the ticketreply is going to be 
@@ -913,7 +941,6 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                        ticketReply.request = *ticketRequest;
                        ticketReply.ticket = ticket;
                        ticketReply.recverPE = CkMyPe();
-               
                        CmiSetHandler(&ticketReply,_ticketHandlerIdx);
 //             CmiBecomeImmediate(&ticketReply);
                        CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
@@ -923,9 +950,8 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
                 reply->recverPE = CkMyPe();
                 CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
                                                         // in case the ticket needs to be forwarded or something
-
         }
-               DEBUG(CmiMemoryCheck());
+               DEBUG_MEM(CmiMemoryCheck());
                return true;
        }
 //     _lockNewTicket=0;
@@ -938,13 +964,13 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
 inline void _ticketHandler(TicketReply *ticketReply){
 
        double _startTime = CkWallTimer();
-       DEBUG(CmiMemoryCheck());        
+       DEBUG_MEM(CmiMemoryCheck());    
        
        char senderString[100];
        CkObjID sender = ticketReply->request.sender;
        CkObjID recver = ticketReply->request.recver;
        
-       if(sender.guessPE() != CkMyPe()){       
+       if(sender.guessPE() != CkMyPe()){
                DEBUG(CkAssert(sender.guessPE()>= 0));
                DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
        //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
@@ -998,7 +1024,7 @@ inline void _ticketHandler(TicketReply *ticketReply){
 
                                // if message is group local, we store its metadata in teamTable
                                if(isTeamLocal(ticketReply->recverPE)){
-                                       DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
+                                       //DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
                                        Chare *senderObj = (Chare *)sender.getObject();
                                        SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
                                        if(ticketRow == NULL){
@@ -1041,7 +1067,7 @@ void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
        int numTickets = recvdHeader->numberLogs;
        char *msg = (char *)recvdHeader;
        msg = &msg[sizeof(BufferedTicketRequestHeader)];
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        
        TicketReply *_reply = (TicketReply *)msg;
 
@@ -1055,7 +1081,7 @@ void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
        
        CmiFree(recvdHeader);
 //     traceUserBracketEvent(22,_startTime,CkWallTimer());
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 };
 
 /**
@@ -1082,7 +1108,7 @@ void _localMessageCopyHandler(LocalMessageLog *msgLog){
 
 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
        double _startTime = CkWallTimer();
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        
        int numLogs = recvdHeader->numberLogs;
        char *msg = (char *)recvdHeader;
@@ -1105,7 +1131,7 @@ void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int fr
        msg = &msg[sizeof(BufferedLocalLogHeader)];
        ptr = &ptr[sizeof(BufferedLocalLogHeader)];
 
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        int PE;
        for(int i=0;i<numLogs;i++){
                LocalMessageLog *msgLog = (LocalMessageLog *)msg;
@@ -1119,7 +1145,7 @@ void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int fr
                msg = &msg[sizeof(LocalMessageLog)];
                ptr = &ptr[sizeof(LocalMessageLogAck)];
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 
        BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
        piggyHeader->numberLogs = numPiggyLogs;
@@ -1133,7 +1159,7 @@ void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int fr
                memcpy(ptr,&log,sizeof(LocalMessageLog));
                ptr = &ptr[sizeof(LocalMessageLog)];
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        
        CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
        CmiSyncSendAndFree(PE,totalSize,(char *)buf);
@@ -1147,7 +1173,7 @@ void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int fr
        if(freeHeader){
                CmiFree(recvdHeader);
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 //     traceUserBracketEvent(23,_startTime,CkWallTimer());
 }
 
@@ -1164,7 +1190,7 @@ void _localMessageAckHandler(LocalMessageLogAck *ack){
        envelope *env = entry->env;
        char recverName[100];
        char senderString[100];
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        
        DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
        if(env == NULL)
@@ -1202,7 +1228,7 @@ void _localMessageAckHandler(LocalMessageLogAck *ack){
 //     traceUserBracketEvent(24,_startTime,CkWallTimer());
 #endif
        
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
 }
 
@@ -1210,7 +1236,7 @@ void _localMessageAckHandler(LocalMessageLogAck *ack){
 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
 
        double _startTime=CkWallTimer();
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 
        int numLogs = recvdHeader->numberLogs;
        char *msg = (char *)recvdHeader;
@@ -1233,15 +1259,10 @@ void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
        }
        
        CmiFree(recvdHeader);
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 //     traceUserBracketEvent(24,_startTime,CkWallTimer());
 }
 
-
-
-
-
-
 bool fault_aware(CkObjID &recver){
        switch(recver.type){
                case TypeChare:
@@ -1261,7 +1282,7 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
        char recverString[100];
        char senderString[100];
        
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        CkObjID recver = env->recver;
        if(!fault_aware(recver))
                return 1;
@@ -1289,25 +1310,25 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
                if(env->sender.guessPE() == CkMyPe()){
                        *logEntryPointer = env->localMlogEntry;
                }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
                while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
                        void *qMsgPtr;
                        CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
                        envelope *qEnv = (envelope *)qMsgPtr;
                        CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
                }
 //             traceUserBracketEvent(25,_startTime,CkWallTimer());
                //TODO: this might be a problem.. change made for leanMD
 //             CpvAccess(_currentObj) = obj;
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
                return 1;
        }
        if(env->TN <= obj->mlogData->tProcessed){
                //message already processed
                DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
 //             traceUserBracketEvent(26,_startTime,CkWallTimer());
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
                return 0;
        }
        //message that needs to be processed in the future
@@ -1317,13 +1338,16 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
        //It will be transferred to the main queue later
        CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
 //             traceUserBracketEvent(27,_startTime,CkWallTimer());
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        
        return 0;
 }
 
+/**
+ * @brief Updates a few variables once a message has been processed.
+ */
 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
-       char senderString[100];
+       DEBUG(char senderString[100]);
        if(obj){
                if(sender.guessPE() == CkMyPe()){
                        if(entry != NULL){
@@ -1335,7 +1359,7 @@ void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *
                DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
 //             CpvAccess(_currentObj)= NULL;
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 }
 
 /***
@@ -1370,17 +1394,17 @@ void retryTicketRequestTimer(void *_dummy,double _time){
 
 void retryTicketRequest(void *_dummy,double curWallTime){      
        double start = CkWallTimer();
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        int length = CpvAccess(_delayedTicketRequests)->length();
        for(int i=0;i<length;i++){
                TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
                if(ticketRequest){
                        char senderString[100],recverString[100];
                        DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
-                       DEBUG(CmiMemoryCheck());
+                       DEBUG_MEM(CmiMemoryCheck());
                        _processTicketRequest(ticketRequest);
                  CmiFree(ticketRequest);
-                       DEBUG(CmiMemoryCheck());
+                       DEBUG_MEM(CmiMemoryCheck());
                }       
        }       
        for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
@@ -1407,7 +1431,7 @@ void retryTicketRequest(void *_dummy,double curWallTime){
                CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
                calledRetryTicketRequest =1;
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 }
 
 void _pingHandler(CkPingMsg *msg){
@@ -1456,18 +1480,18 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
                printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
        }
        PUP::sizer psizer;
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 
        psizer | checkpointCount;
        
        CkPupROData(psizer);
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        CkPupGroupData(psizer);
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        CkPupNodeGroupData(psizer);
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        pupArrayElementsSkip(psizer,NULL);
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 
        int dataSize = psizer.size();
        int totalSize = sizeof(CheckPointDataMsg)+dataSize;
@@ -1678,7 +1702,7 @@ void sendRemoveLogRequests(){
        memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
        CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
        
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        for(int i=0;i<CkNumPes();i++){
                CmiSyncSend(i,totalSize,requestMsg);
        }
@@ -1688,12 +1712,12 @@ void sendRemoveLogRequests(){
        //TODO: clear ticketTable
        
        traceUserBracketEvent(30,_startTime,CkWallTimer());
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
 }
 
 
 void _checkpointAckHandler(CheckPointAck *ackMsg){
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        unAckedCheckpoint=0;
        DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
        DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
@@ -1708,7 +1732,7 @@ void _checkpointAckHandler(CheckPointAck *ackMsg){
 };
 
 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        CmiMemoryCheck();
        char *data = (char *)_data;
        RemoveLogRequest *request = (RemoveLogRequest *)data;
@@ -1739,7 +1763,7 @@ void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
                char nameString[100];
                DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
        }
-       DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
        CmiMemoryCheck();
 }
 
@@ -1846,7 +1870,7 @@ void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
                                // sending a message to the group member
                                msg.PE = CkMyPe();
                            CmiSetHandler(&msg,_restartHandlerIdx);
-                           //HERE CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
+                           CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
                        }
                }
        }
@@ -1859,14 +1883,14 @@ void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
 
 /**
  * Function to restart this processor.
- * The handler is invoked by a member of its same group in message logging.
+ * The handler is invoked by a member of its same team in message logging.
  */
 void _restartHandler(RestartRequest *restartMsg){
        int i;
        int numGroups = CkpvAccess(_groupIDTable)->size();
        RestartRequest msg;
        
-       fprintf(stderr,"[%d] Restart-group started at %.6lf \n",CkMyPe(),CmiWallTimer());
+       fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
 
     // setting the restart flag
        _restartFlag = 1;
@@ -1928,7 +1952,7 @@ void _getRestartCheckpointHandler(RestartRequest *restartMsg){
 }
 
 /**
- * Receives the checkpoint coming from its buddy.
+ * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
  */
 void _recvRestartCheckpointHandler(char *_restartData){
        RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
@@ -1942,7 +1966,7 @@ void _recvRestartCheckpointHandler(char *_restartData){
        if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
 
        
-       printf("[%d] Group Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
+       printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
        char *buf = &_restartData[sizeof(RestartProcessorData)];
        
        if(restartData->numMigratedAwayElements != 0){
@@ -1951,11 +1975,12 @@ void _recvRestartCheckpointHandler(char *_restartData){
                printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
                buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
        }
+
+       // turning on the team recovery flag
+       forAllCharesDo(setTeamRecovery,NULL);
        
        PUP::fromMem pBuf(buf);
-
        pBuf | checkpointCount;
-
        CkPupROData(pBuf);
        CkPupGroupData(pBuf);
        CkPupNodeGroupData(pBuf);
@@ -1964,6 +1989,10 @@ void _recvRestartCheckpointHandler(char *_restartData){
        CkAssert(pBuf.size() == restartData->checkPointSize);
        printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
        
+       // turning off the team recovery flag
+       forAllCharesDo(unsetTeamRecovery,NULL);
+
+       // initializing a few variables for handling local messages
        forAllCharesDo(initializeRestart,NULL);
        
        //store the restored local message log in a vector
@@ -1987,6 +2016,7 @@ void _recvRestartCheckpointHandler(char *_restartData){
 
        forAllCharesDo(sortRestoredLocalMsgLog,NULL);
        CmiFree(_restartData);  
+
        /*HERE _initDone();
 
        getGlobalStep(globalLBID);
@@ -2012,8 +2042,7 @@ void _recvRestartCheckpointHandler(char *_restartData){
                resendMsg layout |ResendRequest|Array of TProcessedLog|
        */
        int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
-       char *resendMsg = (char *)CmiAlloc(totalSize);
-       
+       char *resendMsg = (char *)CmiAlloc(totalSize);  
 
        ResendRequest *resendReq = (ResendRequest *)resendMsg;
        resendReq->PE =CkMyPe(); 
@@ -2023,10 +2052,10 @@ void _recvRestartCheckpointHandler(char *_restartData){
        
 
        /* test for parallel restart migrate away object**/
-       if(parallelRestart){
-               distributeRestartedObjects();
-               printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
-       }
+//     if(parallelRestart){
+//             distributeRestartedObjects();
+//             printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
+//     }
        
        /*      To make restart work for load balancing.. should only
        be used when checkpoint happens along with load balancing
@@ -2047,7 +2076,6 @@ void _recvRestartCheckpointHandler(char *_restartData){
        }
        _resendMessagesHandler(resendMsg);
 
-
 }
 
 
@@ -2155,7 +2183,11 @@ void _verifyAckHandler(VerifyAckMsg *verifyReply){
 }
 
 
-
+/**
+ * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
+ * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
+ * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
+ */
 void sendCheckpointData(int mode){     
        RestartRequest *restartMsg = storedRequest;
        StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
@@ -2467,16 +2499,13 @@ void fillTicketForChare(void *data, ChareMlogData *mlogData){
                        if((*objID) == (resendData->listObjects)[j].recver){
 char name[100];
                                snToTicket = *(SNToTicket **)objp;
-CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
+//CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
                                for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
-CkPrintf("[%d] ---> Zero\n",CkMyPe());
                                        ticket = snToTicket->get(snIndex);      
                                        if(ticket.TN > resendData->maxTickets[j]){
-CkPrintf("[%d] ---> One\n",CkMyPe());
                                                resendData->maxTickets[j] = ticket.TN;
                                        }
                                        if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
-CkPrintf("[%d] ---> Two\n",CkMyPe());
                                                //store the TNs that have been since the recver last checkpointed
                                                resendData->ticketVecs[j].push_back(ticket.TN);
                                        }
@@ -2489,6 +2518,22 @@ CkPrintf("[%d] ---> Two\n",CkMyPe());
        delete iterator;
 }
 
+
+/**
+ * @brief Turns on the flag for team recovery that selectively restores
+ * particular metadata information.
+ */
+void setTeamRecovery(void *data, ChareMlogData *mlogData){
+       mlogData->teamRecoveryFlag = 1; 
+}
+
+/**
+ * @brief Turns off the flag for team recovery.
+ */
+void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
+       mlogData->teamRecoveryFlag = 0;
+}
+
 //the data argument is of type ResendData which contains the 
 //array of objects on  the restartedProcessor
 //this method resends the messages stored in this chare's message log 
@@ -2520,7 +2565,7 @@ void resendMessageForChare(void *data,ChareMlogData *mlogData){
                //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
                if(env->TN <= 0){
                        //ticket not yet replied send it out again
-                       sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
+                       sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,0,1);
                }
                
                if(env->recver.type != TypeInvalid){
@@ -2720,7 +2765,8 @@ void _resendReplyHandler(char *msg){
 
        char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
        
-       DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
+//     DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
+       DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
        for(int i =0; i< resendReply->numberObjects;i++){       
                Chare *obj = (Chare *)listObjects[i].getObject();
                
@@ -2745,8 +2791,7 @@ void _resendReplyHandler(char *msg){
 
                        CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
                        CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
-               }
-               
+               }       
        }
 };
 
@@ -2770,7 +2815,8 @@ void _receivedTNDataHandler(ReceivedTNData *msg){
 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
        // increases the number of resendReply received
        obj->mlogData->resendReplyRecvd++;
-       
+
+       CkPrintf("[%d] processReceivedTN with %d listSize",CkMyPe(),listSize);  
        // includes the tickets into the receivedTN structure
        for(int j=0;j<listSize;j++){
                obj->mlogData->receivedTNs->push_back(listTNs[j]);
@@ -2791,7 +2837,15 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                        int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
                        int vecsize = obj->mlogData->receivedTNs->size();
                        int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
-                       obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
+                       
+                       // updating tCount with the highest ticket handed out
+                       if(TEAM_SIZE_MLOG > 1){
+                               if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
+                                       obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
+                       }else{
+                               obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
+                       }
+                       
                        if(numberHoles == 0){
                        }else{
                                char objName[100];                                      
@@ -3082,6 +3136,9 @@ public:
        }
 };
 
+/**
+ * Map function pointed by fnPointer over all the chares living in this processor.
+ */
 void forAllCharesDo(MlogFn fnPointer,void *data){
        int numGroups = CkpvAccess(_groupIDTable)->size();
        for(int i=0;i<numGroups;i++){
@@ -3599,11 +3656,37 @@ MCount ChareMlogData::newTN(){
        return TN;
 };
 
+/**
+ * Inserts a ticket in the ticketTable if it is not already there.
+ */
+inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
+       Ticket ticket;
+
+       SNToTicket *ticketRow = ticketTable.get(sender);
+       if(ticketRow != NULL){
+               Ticket earlierTicket = ticketRow->get(SN);
+               if(earlierTicket.TN != 0){
+                       CkAssert(earlierTicket.TN == TN);
+                       return;
+               }
+       }else{
+               ticketRow = new SNToTicket();
+               ticketTable.put(sender) = ticketRow;
+       }
+       ticket.TN = TN;
+       ticketRow->put(SN) = ticket;
+}
+
+/**
+ * Generates the next ticket for a request.
+ */
 inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
-       char senderName[100];
-       char recverName[100];
+       DEBUG(char senderName[100];)
+       DEBUG(char recverName[100];)
        double _startTime =CmiWallTimer();
        Ticket ticket;
+
+       // if a ticket is requested during restart, 0 is returned to make the requester to ask for it later.
        if(restartFlag){
                ticket.TN = 0;
                return ticket;
@@ -3655,16 +3738,21 @@ inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
  * Adds an entry into the message log.
  */
 void ChareMlogData::addLogEntry(MlogEntry *entry){
-       char nameString[100];
+       DEBUG(char nameString[100]);
        DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
-               DEBUG(CmiMemoryCheck());
+       DEBUG_MEM(CmiMemoryCheck());
+
+       // enqueuing the entry in the message log
        mlog.enq(entry);
 };
 
 double totalSearchRestoredTime=0;
 double totalSearchRestoredCount=0;
 
-
+/**
+ * Searches the restoredlocal map to see if the combination of sender and sequence number
+ * shows up in the map. Returns the ticket if found, or 0 otherwise.
+ */
 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
        double start= CkWallTimer();
        MCount TN=0;    
@@ -3676,18 +3764,14 @@ MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCoun
                                TN = map->TNArray[index];
                        }
                }
-       /*      if(i > 0){
-                       printf("i %d restoredLocalMsgLog.size() %d\n",i,restoredLocalMsgLog.size());
-               }*/
        }
        
-       char senderName[100],recverName[100];
-       
-       if(TN != 0){
-               DEBUG(CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN));
-       }
+       DEBUG(char senderName[100]);
+       DEBUG(char recverName[100]);
+       DEBUG(if(TN != 0){ CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN);});
+
        totalSearchRestoredTime += CkWallTimer()-start;
-       totalSearchRestoredCount ++;
+       totalSearchRestoredCount++;
        return TN;
 }
 
@@ -3737,13 +3821,13 @@ void ChareMlogData::sortRestoredLocalMsgLog(){
        restoredLocalMsgLog.free();
 }
 
-
 /**
  * Pup method for the metadata.
  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
  * Then, we only support one failure at a time. Read Sayantan's thesis, sections 4.2 and 4.3 for more details.
  */
 void ChareMlogData::pup(PUP::er &p){
+       int tCountAux;
        int startSize=0;
        char nameStr[100];
        if(p.isSizing()){
@@ -3753,7 +3837,10 @@ void ChareMlogData::pup(PUP::er &p){
        double _startTime = CkWallTimer();
        
        p | objID;
-       p | tCount;
+       if(teamRecoveryFlag)
+               p | tCountAux;
+       else
+               p | tCount;
        p | tProcessed;
        if(p.isUnpacking()){
                DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
@@ -3904,7 +3991,10 @@ void ChareMlogData::pup(PUP::er &p){
                                p | objID;
                                SNToTicket *ticketRow = new SNToTicket;
                                ticketRow->pup(p);
-                               ticketTable.put(objID) = ticketRow;
+                               if(!teamRecoveryFlag)
+                                       ticketTable.put(objID) = ticketRow;
+                               else
+                                       delete ticketRow;
                        }
                }
        }       
index 2fe5ebbec70b143d876365856f22256a64f48533..a0d20531a5341a7cab67e6a6ad32bb940d066699 100644 (file)
@@ -17,7 +17,7 @@ CpvExtern(Chare *,_currentObj);
 #define FORWARDED_TICKET 0x8000
 
 //TML: global variable for the size of the team
-#define TEAM_SIZE_MLOG 4
+#define TEAM_SIZE_MLOG 1
 #define MLOG_RESTARTED 0
 #define MLOG_CRASHED 1
 #define MEGABYTE 1048576
@@ -41,7 +41,7 @@ public:
                state = 0;
        }
 };
-PUPbytes(Ticket)
+PUPbytes(Ticket);
 class MlogEntry;
 
 /**
@@ -61,7 +61,7 @@ typedef struct{
        int senderPE;
        int recverPE;
 } LocalMessageLog;
-PUPbytes(LocalMessageLog)
+PUPbytes(LocalMessageLog);
 
 class MlogEntry;
 class RestoredLocalMap;
@@ -173,7 +173,8 @@ public:
        int maxRestoredLocalTN;
        int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
        int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
-       CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable; 
+    int teamRecoveryFlag; // 0 -> normal state .. 1 -> recovery of a team member       
+       CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable;
        //TML: teamTable, stores the SN to TN mapping for messages intra team
        CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> teamTable;
 
@@ -203,6 +204,7 @@ public:
                ticketHoles = NULL;
                currentHoles = 0;
                restartFlag=0;
+               teamRecoveryFlag=0;
                receivedTNs = NULL;
                resendReplyRecvd=0;
                maxRestoredLocalTN=0;
@@ -211,6 +213,7 @@ public:
        };
        inline MCount nextSN(const CkObjID &recver);
        inline Ticket next_ticket(CkObjID &sender,MCount SN);
+       inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
        void addLogEntry(MlogEntry *entry);
        virtual void pup(PUP::er &p);
        CkQ<MlogEntry *> *getMlog(){ return &mlog;};
@@ -274,7 +277,9 @@ public:
 };
 
 /**
- *  @brief 
+ *  @brief Class for storing metadata of local messages.
+ *  It maps sequence numbers to ticket numbers.
+ *  It is used after a restart to maintain the same ticket numbers.
  */
 class RestoredLocalMap {
 public:
@@ -299,6 +304,7 @@ typedef struct {
        CkObjID recver;
        MlogEntry *logEntry;
        MCount SN;
+       MCount TN;
        int senderPE;
 } TicketRequest;
 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
@@ -480,7 +486,7 @@ void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
-void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend);
+void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
 void ticketLogLocalMessage(MlogEntry *entry);
 void sendLocalMessageCopy(MlogEntry *entry);
 void sendBufferedLocalMessageCopy();
@@ -647,6 +653,6 @@ extern int _receiveLocationHandlerIdx;
 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
 inline void processRemoteMlogMessages(){
        CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
-}
+};
 
 #endif