Added support for team-based message logging.
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 29 Oct 2009 14:42:31 +0000 (14:42 +0000)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 29 Oct 2009 14:42:31 +0000 (14:42 +0000)
src/ck-core/ckmessagelogging.C

index 887e8c3fdbb1aa25947bd59f6fa3fc62a821da02..e08fb3ca2651af40d5c8d17cdff0ea46dd2bd489 100644 (file)
@@ -1,3 +1,8 @@
+/**
+ * Message Logging Fault Tolerance Protocol
+ * It includes the main functions for the basic and team-based schemes.
+ */
+
 #include "charm.h"
 #include "ck.h"
 #include "ckmessagelogging.h"
 #define DEBUG(x)  //x
 #define DEBUGRESTART(x)  //x
 #define DEBUGLB(x) // x
+#define DEBUG_TEAM(x)  //x
 
 #define BUFFERED_LOCAL
-#define BUFFERED_REMOTE
+#define BUFFERED_REMOTE 
 
 extern const char *idx2str(const CkArrayIndex &ind);
 extern const char *idx2str(const ArrayElement *el);
@@ -25,11 +31,15 @@ const char *idx2str(const CkArrayIndexMax &ind){
 void getGlobalStep(CkGroupID gID);
 
 bool fault_aware(CkObjID &recver);
+void sendCheckpointData(int mode);
+void createObjIDList(void *data,ChareMlogData *mlogData);
+inline bool isLocal(int destPE);
+inline bool isTeamLocal(int destPE);
 
 int _restartFlag=0;
-int restarted=0;
+//ERASE int restarted=0; // it's not being used anywhere
 
-//GML: variables for measuring savings with groups in message logging
+//TML: variables for measuring savings with groups in message logging
 float MLOGFT_totalLogSize = 0.0;
 float MLOGFT_totalMessages = 0.0;
 float MLOGFT_totalObjects = 0.0;
@@ -38,7 +48,6 @@ float MLOGFT_totalObjects = 0.0;
 int countHashRefs=0; //count the number of gets
 int countHashCollisions=0;
 
-
 //#define CHECKPOINT_DISK
 char *checkpointDirectory=".";
 int unAckedCheckpoint=0;
@@ -118,6 +127,10 @@ int _resendReplyHandlerIdx;
 int _receivedTNDataHandlerIdx;
 int _distributedLocationHandlerIdx;
 
+//TML: integer constants for group-based message logging
+int _restartHandlerIdx;
+int _getRestartCheckpointHandlerIdx;
+int _recvRestartCheckpointHandlerIdx;
 
 int verifyAckTotal;
 int verifyAckCount;
@@ -207,6 +220,11 @@ void _messageLoggingInit(){
        _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
        _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
 
+       //TML: handlers for group-based message logging
+       _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
+       _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
+       _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
+
        
        //handlers for load balancing
        _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
@@ -295,10 +313,6 @@ void _messageLoggingInit(){
        lastRestart = CmiWallTimer();
 //     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
        CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
-//     printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
-//     if(killFlag){
-//             readKillFile();
-//     }
 }
 
 void killLocal(void *_dummy,double curWallTime);       
@@ -401,7 +415,9 @@ void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
        generateCommonTicketRequest(recver,env,destPE,_infoIdx);
 };
 
-//a method to generate the actual ticket requests for groups,nodegroups or arrays
+/**
+ * A method to generate the actual ticket requests for groups, nodegroups or arrays.
+ */
 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
        envelope *env = _env;
        int resend=0; //is it a resend
@@ -425,7 +441,6 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                env->SN = 0;
        }
        
-       
        CkObjID &sender = env->sender;
        env->recver = recver;
 
@@ -437,7 +452,6 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                resend = 1;
        }
        
-       
        char senderString[100];
 //     if(env->SN != 1){
                DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
@@ -446,22 +460,50 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
        }*/
                
-       MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
+       MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
 //     CkPackMessage(&(mEntry->env));
 //     traceUserBracketEvent(32,_startTime,CkWallTimer());
        
-       
-
        _startTime = CkWallTimer();
-       if(destPE == CkMyPe()){
+
+       // uses the proper ticketing mechanism for local, group and general messages
+       if(isLocal(destPE)){
                ticketLogLocalMessage(mEntry);
        }else{
                sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
-//             traceUserBracketEvent(31,_startTime,CkWallTimer());                     
        }
 }
 
-//method that does the actual send by creating a ticketrequest filling it up and sending it
+/**
+ * Determines if the message is local or not. A message is local if:
+ * 1) Both the destination and origin are the same PE.
+ */
+inline bool isLocal(int destPE){
+       // both the destination and the origin are the same PE
+       if(destPE == CkMyPe())
+               return true;
+
+       return false;
+}
+
+/**
+ * Determines if the message is group local or not. A message is group local if:
+ * 1) They belong to the same group in the group-based message logging.
+ */
+inline bool isTeamLocal(int destPE){
+
+       // they belong to the same group
+       if(TEAM_SIZE_MLOG > 1 && destPE/TEAM_SIZE_MLOG == CkMyPe()/TEAM_SIZE_MLOG)
+               return true;
+
+       return false;
+}
+
+
+
+/**
+ * Method that does the actual send by creating a ticket request filling it up and sending it.
+ */
 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
        char recverString[100],senderString[100];
        envelope *env = entry->env;
@@ -471,14 +513,16 @@ void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *ent
 
        Chare *obj = (Chare *)entry->env->sender.getObject();
        if(!resend){
-               //GML: only stores message if either it goes to this processor or to a processor in a different group
-               if(CkMyPe() != destPE && CkMyPe()/GROUP_SIZE_MLOG != destPE/GROUP_SIZE_MLOG){
+               //TML: only stores message if either it goes to this processor or to a processor in a different group
+               if(!isTeamLocal(entry->destPE)){
                        obj->mlogData->addLogEntry(entry);
                        MLOGFT_totalMessages += 1.0;
                        MLOGFT_totalLogSize += entry->env->getTotalsize();
+               }else{
+                       // the message has to be deleted after it has been sent
+                       entry->env->freeMsg = true;
                }
        }
-       
 
 #ifdef BUFFERED_REMOTE
        //buffer the ticket request 
@@ -557,20 +601,14 @@ void checkBufferedTicketRequests(void *_destPE,double curWallTime){
        DEBUG(CmiMemoryCheck());
 };
 
-
-
-
-//get a ticket for a local message and then send a copy to the buddy
+/**
+ * Gets a ticket for a local message and then sends a copy to the buddy.
+ * This method is always in the main thread(not interrupt).. so it should 
+ * never find itself locked out of a newTicket.
+ */
 void ticketLogLocalMessage(MlogEntry *entry){
-       //this method is always in the main thread(not interrupt).. so it should 
-       //never find itself locked out of a newTicket
-//     CkAssert(_lockNewTicket==0);
        double _startTime=CkWallTimer();
-       
-//     _lockNewTicket=1;
-       
-               DEBUG(CmiMemoryCheck());
-       
+       DEBUG(CmiMemoryCheck());
 
        Chare *recverObj = (Chare *)entry->env->recver.getObject();
        DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
@@ -603,24 +641,28 @@ void ticketLogLocalMessage(MlogEntry *entry){
                entry->env->TN = ticket.TN;
                CkAssert(entry->env->TN > 0);
                DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
-               
-
+       
+               // sends a copy of the metadata to the buddy    
                sendLocalMessageCopy(entry);
                
-               
                DEBUG(CmiMemoryCheck());
+
+               // sets the unackedLocal flag and stores the message in the log
                entry->unackedLocal = 1;
-               DEBUG(CmiMemoryCheck());
                CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
+
                DEBUG(CmiMemoryCheck());
        }else{
+               CkPrintf("[%d] Local message in group-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
                DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
        }
        _lockNewTicket=0;
 //     traceUserBracketEvent(33,_startTime,CkWallTimer());
 };
 
-
+/**
+ * Sends the metadata of a local message to its buddy.
+ */
 void sendLocalMessageCopy(MlogEntry *entry){
        LocalMessageLog msgLog;
        msgLog.sender = entry->env->sender;
@@ -628,7 +670,7 @@ void sendLocalMessageCopy(MlogEntry *entry){
        msgLog.SN = entry->env->SN;
        msgLog.TN = entry->env->TN;
        msgLog.entry = entry;
-       msgLog.PE = CkMyPe();
+       msgLog.senderPE = CkMyPe();
        
        char recvString[100];
        char senderString[100];
@@ -889,17 +931,20 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
 //     _lockNewTicket=0;
 };
 
+
+/**
+ * @brief This function handles the ticket received after a request.
+ */
 inline void _ticketHandler(TicketReply *ticketReply){
 
        double _startTime = CkWallTimer();
-       DEBUG(CmiMemoryCheck());
-       
+       DEBUG(CmiMemoryCheck());        
        
        char senderString[100];
        CkObjID sender = ticketReply->request.sender;
        CkObjID recver = ticketReply->request.recver;
        
-       if(sender.guessPE() != CkMyPe()){               
+       if(sender.guessPE() != CkMyPe()){       
                DEBUG(CkAssert(sender.guessPE()>= 0));
                DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
        //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
@@ -950,13 +995,26 @@ inline void _ticketHandler(TicketReply *ticketReply){
                        logEntry->env->TN = ticketReply->ticket.TN;
                        logEntry->env->setSrcPe(CkMyPe());
                        if(ticketReply->ticket.state == NEW_TICKET){
+
+                               // if message is group local, we store its metadata in teamTable
+                               if(isTeamLocal(ticketReply->recverPE)){
+                                       DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
+                                       Chare *senderObj = (Chare *)sender.getObject();
+                                       SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
+                                       if(ticketRow == NULL){
+                                               ticketRow = new SNToTicket();
+                                               senderObj->mlogData->teamTable.put(recver) = ticketRow; 
+                                       }
+                                       ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
+                               }
+
                                DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
                                if(ticketReply->recverPE != CkMyPe()){
                                        generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
                                }else{
                                        //It is now a local message use the local message protocol
                                        sendLocalMessageCopy(logEntry);
-                               }               
+                               }       
                        }
                }else{
                        DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
@@ -968,11 +1026,8 @@ inline void _ticketHandler(TicketReply *ticketReply){
 #endif
        }
        CmiMemoryCheck();
-#ifdef BUFFERED_REMOTE
-#else
-//     traceUserBracketEvent(22,_startTime,CkWallTimer());
-#endif
-       
+
+//     traceUserBracketEvent(22,_startTime,CkWallTimer());     
 };
 
 /**
@@ -1003,7 +1058,9 @@ void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
        DEBUG(CmiMemoryCheck());
 };
 
-
+/**
+ * Stores the metadata of a local message from its buddy.
+ */
 void _localMessageCopyHandler(LocalMessageLog *msgLog){
        double _startTime = CkWallTimer();
        
@@ -1018,7 +1075,7 @@ void _localMessageCopyHandler(LocalMessageLog *msgLog){
        ack.entry = msgLog->entry;
        DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
        CmiSetHandler(&ack,_localMessageAckHandlerIdx);
-       CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
+       CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
        
 //     traceUserBracketEvent(23,_startTime,CkWallTimer());
 };
@@ -1053,7 +1110,7 @@ void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int fr
        for(int i=0;i<numLogs;i++){
                LocalMessageLog *msgLog = (LocalMessageLog *)msg;
                CpvAccess(_localMessageLog)->enq(*msgLog);
-               PE = msgLog->PE;
+               PE = msgLog->senderPE;
                DEBUG(CmiAssert( PE == getCheckPointPE()));
 
                LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
@@ -1234,10 +1291,8 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
                while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
                        void *qMsgPtr;
                        CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
-                       if(qMsgPtr != NULL){
-                         envelope *qEnv = (envelope *)qMsgPtr;
-                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());               
-                       }
+                       envelope *qEnv = (envelope *)qMsgPtr;
+                       CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
        DEBUG(CmiMemoryCheck());
                }
 //             traceUserBracketEvent(25,_startTime,CkWallTimer());
@@ -1684,7 +1739,7 @@ void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
        }
        DEBUG(CmiMemoryCheck());
        CmiMemoryCheck();
-};
+}
 
 void _removeProcessedLogHandler(char *requestMsg){
        double start = CkWallTimer();
@@ -1770,50 +1825,81 @@ void clearUpMigratedRetainedLists(int PE){
 ***************************************************************/       
 
 /**
- * Function for restarting an object with message logging
+ * Function for restarting the crashed processor.
+ * It sets the restart flag and contacts the buddy
+ * processor to get the latest checkpoint.
  */
 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
-       printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
+       RestartRequest msg;
+
        fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
+
+       // setting the restart flag
        _restartFlag = 1;
-       RestartRequest msg;
+
+       // if we are using group-based message logging, all members of the group have to be restarted
+       if(TEAM_SIZE_MLOG > 1){
+               for(int i=(CkMyPe()/TEAM_SIZE_MLOG)*TEAM_SIZE_MLOG; i<((CkMyPe()/TEAM_SIZE_MLOG)+1)*TEAM_SIZE_MLOG; i++){
+                       if(i != CkMyPe() && i < CkNumPes()){
+                               // sending a message to the group member
+                               msg.PE = CkMyPe();
+                           CmiSetHandler(&msg,_restartHandlerIdx);
+                           //HERE CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
+                       }
+               }
+       }
+
+       // requesting the latest checkpoint from its buddy
        msg.PE = CkMyPe();
        CmiSetHandler(&msg,_getCheckpointHandlerIdx);
        CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
 };
 
-void CkMlogRestartDouble(void *,double){
-       CkMlogRestart(NULL,NULL);
-};
+/**
+ * Function to restart this processor.
+ * The handler is invoked by a member of its same group in message logging.
+ */
+void _restartHandler(RestartRequest *restartMsg){
+       int i;
+       int numGroups = CkpvAccess(_groupIDTable)->size();
+       RestartRequest msg;
+       
+       fprintf(stderr,"[%d] Restart-group started at %.6lf \n",CkMyPe(),CmiWallTimer());
 
-//GML: restarting from local (group) failure
-void CkMlogRestartLocal(){
-    CkMlogRestart(NULL,NULL);
-};
+    // setting the restart flag
+       _restartFlag = 1;
 
+       // flushing all buffers 
+/*     CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
+       CKLOCMGR_LOOP(mgr->flushAllRecs(););    
+       for(int i=0;i<numGroups;i++){
+       CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
+               IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
+               obj->flushStates();
+               obj->ckJustMigrated();
+       }*/
 
-void readCheckpointFromDisk(int size,char *buf){
-       char fName[100];
-       sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
+    // requesting the latest checkpoint from its buddy
+       msg.PE = CkMyPe();
+       CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
+       CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
+}
 
-       int fd = open(fName,O_RDONLY);
-       int count=0;
-       while(count < size){
-               count += read(fd,&buf[count],size-count);
-       }
-       close(fd);
-       
-};
 
+/**
+ * Gets the stored checkpoint but calls another function in the sender.
+ */
+void _getRestartCheckpointHandler(RestartRequest *restartMsg){
 
-void sendCheckpointData();
+       // retrieving the stored checkpoint
+       StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
 
-void _getCheckpointHandler(RestartRequest *restartMsg){
-       StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
+       // making sure it is its buddy who is requesting the checkpoint
        CkAssert(restartMsg->PE == storedChkpt->PE);
+
        storedRequest = restartMsg;
-       
        verifyAckTotal = 0;
+
        for(int i=0;i<migratedNoticeList.size();i++){
                if(migratedNoticeList[i].fromPE == restartMsg->PE){
 //                     if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
@@ -1831,142 +1917,18 @@ void _getCheckpointHandler(RestartRequest *restartMsg){
                        }
                }
        }
-       
+
+       // sending the checkpoint back to its buddy     
        if(verifyAckTotal == 0){
-               sendCheckpointData();
+               sendCheckpointData(MLOG_RESTARTED);
        }
        verifyAckCount = 0;
 }
 
-
-void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
-       CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
-       CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
-       if(rec != NULL && rec->type() == CkLocRec::local){
-                       //this location exists on this processor
-                       //and needs to be removed       
-                       CkLocRec_local *localRec = (CkLocRec_local *) rec;
-                       CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
-                       
-                       int localIdx = localRec->getLocalIndex();
-                       LBDatabase *lbdb = localRec->getLBDB();
-                       LDObjHandle ldHandle = localRec->getLdHandle();
-                               
-                       locMgr->setDuringMigration(true);
-                       
-                       locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
-                       lbdb->UnregisterObj(ldHandle);
-                       
-                       locMgr->setDuringMigration(false);
-                       
-                       verifyAckedRequests++;
-
-       }
-       CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
-       CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
-};
-
-
-void _verifyAckHandler(VerifyAckMsg *verifyReply){
-       int index =     verifyReply->index;
-       migratedNoticeList[index] = verifyReply->migRecord;
-       verifyAckCount++;
-       CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
-       if(verifyAckCount == verifyAckTotal){
-               sendCheckpointData();
-       }
-}
-
-
-
-void sendCheckpointData(){     
-       RestartRequest *restartMsg = storedRequest;
-       StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
-       int numMigratedAwayElements = migratedNoticeList.size();
-       if(migratedNoticeList.size() != 0){
-                       printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
-//                     CkAssert(migratedNoticeList.size() == 0);
-       }
-       
-       
-       int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
-       
-       DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
-       CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
-       
-       CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
-       totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
-       totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
-       
-       char *msg = (char *)CmiAlloc(totalSize);
-       
-       RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
-       dataMsg->PE = CkMyPe();
-       dataMsg->restartWallTime = CmiTimer();
-       dataMsg->checkPointSize = storedChkpt->bufSize;
-       
-       dataMsg->numMigratedAwayElements = numMigratedAwayElements;
-//     dataMsg->numMigratedAwayElements = 0;
-       
-       dataMsg->numMigratedInElements = 0;
-       dataMsg->migratedElementSize = 0;
-       dataMsg->lbGroupID = globalLBID;
-       /*msg layout 
-               |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
-               Local MessageLog|
-       */
-       //store checkpoint data
-       char *buf = &msg[sizeof(RestartProcessorData)];
-
-       if(dataMsg->numMigratedAwayElements != 0){
-               memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
-               buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
-       }
-       
-
-#ifdef CHECKPOINT_DISK
-       readCheckpointFromDisk(storedChkpt->bufSize,buf);
-#else  
-       memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
-#endif
-       buf = &buf[storedChkpt->bufSize];
-
-
-       //store localmessage Log
-       dataMsg->numLocalMessages = localMsgQ->length();
-       for(int i=0;i<localMsgQ->length();i++){
-               if(!fault_aware(((*localMsgQ)[i]).recver )){
-                       CmiAbort("Non fault aware localMsgQ");
-               }
-               memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-       
-       CmiSetHandler(msg,_recvCheckpointHandlerIdx);
-       CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
-       CmiFree(restartMsg);
-};
-
-
-// this list is used to create a vector of the object ids of all
-//the chares on this processor currently and the highest TN processed by them 
-//the first argument is actually a CkVec<TProcessedLog> *
-void createObjIDList(void *data,ChareMlogData *mlogData){
-       CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
-       TProcessedLog entry;
-       entry.recver = mlogData->objID;
-       entry.tProcessed = mlogData->tProcessed;
-       list->push_back(entry);
-       char objString[100];
-       DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
-}
-
-
 /**
- * Receives the checkpoint data from its buddy, restores the state of all the objects
- * and asks everyone else to update its home.
+ * Receives the checkpoint coming from its buddy.
  */
-void _recvCheckpointHandler(char *_restartData){
+void _recvRestartCheckpointHandler(char *_restartData){
        RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
        MigrationRecord *migratedAwayElements;
 
@@ -1978,7 +1940,7 @@ void _recvCheckpointHandler(char *_restartData){
        if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
 
        
-       printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
+       printf("[%d] Group Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
        char *buf = &_restartData[sizeof(RestartProcessorData)];
        
        if(restartData->numMigratedAwayElements != 0){
@@ -2022,11 +1984,8 @@ void _recvCheckpointHandler(char *_restartData){
        }
 
        forAllCharesDo(sortRestoredLocalMsgLog,NULL);
-
-       CmiFree(_restartData);
-       
-       
-       _initDone();
+       CmiFree(_restartData);  
+       /*HERE _initDone();
 
        getGlobalStep(globalLBID);
        
@@ -2039,23 +1998,8 @@ void _recvCheckpointHandler(char *_restartData){
                        CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
                }
        }
+*/
 
-}
-
-/**
- * Receives the updateHome ACKs from all other processors. Once everybody
- * has replied, it sends a request to resed the logged messages.
- */
-void _updateHomeAckHandler(RestartRequest *updateHomeAck){
-
-       CkPrintf("[%d] Updating Home Ack Handler\n",CkMyPe());
-
-       countUpdateHomeAcks++;
-       CmiFree(updateHomeAck);
-       // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
-       if(countUpdateHomeAcks != CmiNumPes()){
-               return;
-       }
 
        // Send out the request to resend logged messages to all other processors
        CkVec<TProcessedLog> objectVec;
@@ -2076,7 +2020,335 @@ void _updateHomeAckHandler(RestartRequest *updateHomeAck){
        memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
        
 
+       /* test for parallel restart migrate away object**/
+       if(parallelRestart){
+               distributeRestartedObjects();
+               printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
+       }
+       
+       /*      To make restart work for load balancing.. should only
+       be used when checkpoint happens along with load balancing
+       **/
+//     forAllCharesDo(resumeFromSyncRestart,NULL);
+
+       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
+       CpvAccess(_currentObj) = lb;
+       lb->ReceiveDummyMigration(restartDecisionNumber);
+
+       sleep(10);
+       
+       CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
+       for(int i=0;i<CkNumPes();i++){
+               if(i != CkMyPe()){
+                       CmiSyncSend(i,totalSize,resendMsg);
+               }       
+       }
+       _resendMessagesHandler(resendMsg);
+
+
+}
+
+
+void CkMlogRestartDouble(void *,double){
+       CkMlogRestart(NULL,NULL);
+};
+
+//TML: restarting from local (group) failure
+void CkMlogRestartLocal(){
+    CkMlogRestart(NULL,NULL);
+};
+
+
+void readCheckpointFromDisk(int size,char *buf){
+       char fName[100];
+       sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
+
+       int fd = open(fName,O_RDONLY);
+       int count=0;
+       while(count < size){
+               count += read(fd,&buf[count],size-count);
+       }
+       close(fd);
+       
+};
+
+
+
+/**
+ * Gets the stored checkpoint for its buddy processor.
+ */
+void _getCheckpointHandler(RestartRequest *restartMsg){
+
+       // retrieving the stored checkpoint
+       StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
+
+       // making sure it is its buddy who is requesting the checkpoint
+       CkAssert(restartMsg->PE == storedChkpt->PE);
+
+       storedRequest = restartMsg;
+       verifyAckTotal = 0;
+
+       for(int i=0;i<migratedNoticeList.size();i++){
+               if(migratedNoticeList[i].fromPE == restartMsg->PE){
+//                     if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
+                       if(migratedNoticeList[i].ackFrom == 0){
+                               //need to verify if the object actually exists .. it might not
+                               //have been acked but it might exist on it
+                               VerifyAckMsg msg;
+                               msg.migRecord = migratedNoticeList[i];
+                               msg.index = i;
+                               msg.fromPE = CmiMyPe();
+                               CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
+                               CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
+                               CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
+                               verifyAckTotal++;
+                       }
+               }
+       }
+
+       // sending the checkpoint back to its buddy     
+       if(verifyAckTotal == 0){
+               sendCheckpointData(MLOG_CRASHED);
+       }
+       verifyAckCount = 0;
+}
+
+
+void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
+       CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
+       CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
+       if(rec != NULL && rec->type() == CkLocRec::local){
+                       //this location exists on this processor
+                       //and needs to be removed       
+                       CkLocRec_local *localRec = (CkLocRec_local *) rec;
+                       CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
+                       
+                       int localIdx = localRec->getLocalIndex();
+                       LBDatabase *lbdb = localRec->getLBDB();
+                       LDObjHandle ldHandle = localRec->getLdHandle();
+                               
+                       locMgr->setDuringMigration(true);
+                       
+                       locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
+                       lbdb->UnregisterObj(ldHandle);
+                       
+                       locMgr->setDuringMigration(false);
+                       
+                       verifyAckedRequests++;
+
+       }
+       CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
+       CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
+};
+
+
+void _verifyAckHandler(VerifyAckMsg *verifyReply){
+       int index =     verifyReply->index;
+       migratedNoticeList[index] = verifyReply->migRecord;
+       verifyAckCount++;
+       CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
+       if(verifyAckCount == verifyAckTotal){
+               sendCheckpointData(MLOG_CRASHED);
+       }
+}
+
+
+
+void sendCheckpointData(int mode){     
+       RestartRequest *restartMsg = storedRequest;
+       StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
+       int numMigratedAwayElements = migratedNoticeList.size();
+       if(migratedNoticeList.size() != 0){
+                       printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
+//                     CkAssert(migratedNoticeList.size() == 0);
+       }
+       
+       
+       int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
+       
+       DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
+       CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
+       
+       CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
+       totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
+       totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
+       
+       char *msg = (char *)CmiAlloc(totalSize);
+       
+       RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
+       dataMsg->PE = CkMyPe();
+       dataMsg->restartWallTime = CmiTimer();
+       dataMsg->checkPointSize = storedChkpt->bufSize;
+       
+       dataMsg->numMigratedAwayElements = numMigratedAwayElements;
+//     dataMsg->numMigratedAwayElements = 0;
+       
+       dataMsg->numMigratedInElements = 0;
+       dataMsg->migratedElementSize = 0;
+       dataMsg->lbGroupID = globalLBID;
+       /*msg layout 
+               |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
+               Local MessageLog|
+       */
+       //store checkpoint data
+       char *buf = &msg[sizeof(RestartProcessorData)];
+
+       if(dataMsg->numMigratedAwayElements != 0){
+               memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
+               buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
+       }
+       
+
+#ifdef CHECKPOINT_DISK
+       readCheckpointFromDisk(storedChkpt->bufSize,buf);
+#else  
+       memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
+#endif
+       buf = &buf[storedChkpt->bufSize];
+
+
+       //store localmessage Log
+       dataMsg->numLocalMessages = localMsgQ->length();
+       for(int i=0;i<localMsgQ->length();i++){
+               if(!fault_aware(((*localMsgQ)[i]).recver )){
+                       CmiAbort("Non fault aware localMsgQ");
+               }
+               memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
+               buf = &buf[sizeof(LocalMessageLog)];
+       }
+       
+       if(mode == MLOG_RESTARTED){
+               CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
+               CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
+               CmiFree(restartMsg);
+       }else{
+               CmiSetHandler(msg,_recvCheckpointHandlerIdx);
+               CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
+               CmiFree(restartMsg);
+       }
+};
+
+
+// this list is used to create a vector of the object ids of all
+//the chares on this processor currently and the highest TN processed by them 
+//the first argument is actually a CkVec<TProcessedLog> *
+void createObjIDList(void *data,ChareMlogData *mlogData){
+       CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
+       TProcessedLog entry;
+       entry.recver = mlogData->objID;
+       entry.tProcessed = mlogData->tProcessed;
+       list->push_back(entry);
+       char objString[100];
+       DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
+}
+
+
+/**
+ * Receives the checkpoint data from its buddy, restores the state of all the objects
+ * and asks everyone else to update its home.
+ */
+void _recvCheckpointHandler(char *_restartData){
+       RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
+       MigrationRecord *migratedAwayElements;
+
+       globalLBID = restartData->lbGroupID;
+       
+       restartData->restartWallTime *= 1000;
+       adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
+       adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
+       if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
+
+       
+       printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
+       char *buf = &_restartData[sizeof(RestartProcessorData)];
+       
+       if(restartData->numMigratedAwayElements != 0){
+               migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
+               memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
+               printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
+               buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
+       }
        
+       PUP::fromMem pBuf(buf);
+
+       pBuf | checkpointCount;
+
+       CkPupROData(pBuf);
+       CkPupGroupData(pBuf);
+       CkPupNodeGroupData(pBuf);
+//     pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
+       pupArrayElementsSkip(pBuf,NULL);
+       CkAssert(pBuf.size() == restartData->checkPointSize);
+       printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
+       
+       forAllCharesDo(initializeRestart,NULL);
+       
+       //store the restored local message log in a vector
+       buf = &buf[restartData->checkPointSize];        
+       for(int i=0;i<restartData->numLocalMessages;i++){
+               LocalMessageLog logEntry;
+               memcpy(&logEntry,buf,sizeof(LocalMessageLog));
+               
+               Chare *recverObj = (Chare *)logEntry.recver.getObject();
+               if(recverObj!=NULL){
+                       recverObj->mlogData->addToRestoredLocalQ(&logEntry);
+                       recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
+                       char senderString[100];
+                       char recverString[100];
+                       DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
+               }else{
+//                     DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
+               }
+               buf = &buf[sizeof(LocalMessageLog)];
+       }
+
+       forAllCharesDo(sortRestoredLocalMsgLog,NULL);
+
+       CmiFree(_restartData);
+       
+       
+       _initDone();
+
+       getGlobalStep(globalLBID);
+       
+       countUpdateHomeAcks = 0;
+       RestartRequest updateHomeRequest;
+       updateHomeRequest.PE = CmiMyPe();
+       CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
+       for(int i=0;i<CmiNumPes();i++){
+               if(i != CmiMyPe()){
+                       CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
+               }
+       }
+
+}
+
+/**
+ * Receives the updateHome ACKs from all other processors. Once everybody
+ * has replied, it sends a request to resend the logged messages.
+ */
+void _updateHomeAckHandler(RestartRequest *updateHomeAck){
+       countUpdateHomeAcks++;
+       CmiFree(updateHomeAck);
+       // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
+       if(countUpdateHomeAcks != CmiNumPes()){
+               return;
+       }
+
+       // Send out the request to resend logged messages to all other processors
+       CkVec<TProcessedLog> objectVec;
+       forAllCharesDo(createObjIDList, (void *)&objectVec);
+       int numberObjects = objectVec.size();
+       
+       //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
+       int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
+       char *resendMsg = (char *)CmiAlloc(totalSize);  
+
+       ResendRequest *resendReq = (ResendRequest *)resendMsg;
+       resendReq->PE =CkMyPe(); 
+       resendReq->numberObjects = numberObjects;
+       char *objList = &resendMsg[sizeof(ResendRequest)];
+       memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
 
        /* test for parallel restart migrate away object**/
        if(parallelRestart){
@@ -2105,7 +2377,10 @@ void _updateHomeAckHandler(RestartRequest *updateHomeAck){
        CmiFree(resendMsg);
 };
 
-void initializeRestart(void *data,ChareMlogData *mlogData){
+/**
+ * @brief Initializes variables and flags for restarting procedure.
+ */
+void initializeRestart(void *data, ChareMlogData *mlogData){
        mlogData->resendReplyRecvd = 0;
        mlogData->receivedTNs = new CkVec<MCount>;
        mlogData->restartFlag = 1;
@@ -2141,9 +2416,6 @@ void updateHomePE(void *data,ChareMlogData *mlogData){
  * Updates the homePe for all chares in this processor.
  */
 void _updateHomeRequestHandler(RestartRequest *updateRequest){
-
-       CkPrintf("[%d] ---------------->HERE\n",CkMyPe());
-       
        int sender = updateRequest->PE;
        
        forAllCharesDo(updateHomePE,updateRequest);
@@ -2167,8 +2439,53 @@ void _updateHomeRequestHandler(RestartRequest *updateRequest){
                        }
                }
        }
-}
+}
+
+/**
+ * @brief Fills up the ticket vector for each chare.
+ */
+void fillTicketForChare(void *data, ChareMlogData *mlogData){
+       ResendData *resendData = (ResendData *)data;
+       int PE = resendData->PE; //restarted PE
+       int count=0;
+       CkHashtableIterator *iterator;
+       void *objp;
+       void *objkey;
+       CkObjID *objID;
+       SNToTicket *snToTicket;
+       Ticket ticket;
+       
+       // traversing the team table looking up for the maximum TN received     
+       iterator = mlogData->teamTable.iterator();
+       while( (objp = iterator->next(&objkey)) != NULL ){
+               objID = (CkObjID *)objkey;
+       
+               // traversing the resendData structure to add ticket numbers
+               for(int j=0;j<resendData->numberObjects;j++){
+                       if((*objID) == (resendData->listObjects)[j].recver){
+char name[100];
+                               snToTicket = *(SNToTicket **)objp;
+CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
+                               for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
+CkPrintf("[%d] ---> Zero\n",CkMyPe());
+                                       ticket = snToTicket->get(snIndex);      
+                                       if(ticket.TN > resendData->maxTickets[j]){
+CkPrintf("[%d] ---> One\n",CkMyPe());
+                                               resendData->maxTickets[j] = ticket.TN;
+                                       }
+                                       if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
+CkPrintf("[%d] ---> Two\n",CkMyPe());
+                                               //store the TNs that have been since the recver last checkpointed
+                                               resendData->ticketVecs[j].push_back(ticket.TN);
+                                       }
+                               }
+                       }
+               }
+       }
 
+       //releasing the memory for the iterator
+       delete iterator;
+}
 
 //the data argument is of type ResendData which contains the 
 //array of objects on  the restartedProcessor
@@ -2183,9 +2500,6 @@ void resendMessageForChare(void *data,ChareMlogData *mlogData){
        int count=0;
        int ticketRequests=0;
        CkQ<MlogEntry *> *log = mlogData->getMlog();
-
-       
-       
        
        for(int i=0;i<log->length();i++){
                MlogEntry *logEntry = (*log)[i];
@@ -2261,30 +2575,11 @@ void resendMessageForChare(void *data,ChareMlogData *mlogData){
  * request.
  */
 void _resendMessagesHandler(char *msg){
+       ResendData d;
        ResendRequest *resendReq = (ResendRequest *)msg;
 
-       //GML: examines the origin processor to determine if it belongs to the same group
-       if(resendReq->PE != CkMyPe() && resendReq->PE/GROUP_SIZE_MLOG == CkMyPe()/GROUP_SIZE_MLOG){
-               //TODO: change the function call from restart to group-restart to avoid cyclic calls,
-               // for now it will work only with 1 failure
-               if(_restartFlag)
-                       return;
-               CmiMemoryCheck();
-               CkPrintf("[%d] RESTART: same group\n",CkMyPe());
-               //HERE _resetNodeBocInitVec();
-/*      int numGroups = CkpvAccess(_groupIDTable)->size();
-               int i;
-               CKLOCMGR_LOOP(mgr->startInserting(););
-               CKLOCMGR_LOOP(mgr->flushAllRecs(););
-*/
-               // rolls back to the previous checkpoint and sends a broadcast to resend messages to this processor
-               CkMlogRestartLocal();
-               return;
-       }
-
-
+       // building the reply message
        char *listObjects = &msg[sizeof(ResendRequest)];
-       ResendData d;
        d.numberObjects = resendReq->numberObjects;
        d.PE = resendReq->PE;
        d.listObjects = (TProcessedLog *)listObjects;
@@ -2323,9 +2618,7 @@ void _resendMessagesHandler(char *msg){
                                CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
                                int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
                                informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
-
                                sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
-                               
                                CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
                                CmiAssert(rec->type() == CkLocRec::local);
                                CkVec<CkMigratable *> eltList;
@@ -2336,12 +2629,7 @@ void _resendMessagesHandler(char *msg){
                                                eltList[j]->ResumeFromSync();
                                        }
                                }
-
-
-                               
-                               retainedObjectList[i]->msg=NULL;
-                               
-                                       
+                               retainedObjectList[i]->msg=NULL;        
                        }
                }
        }
@@ -2349,11 +2637,16 @@ void _resendMessagesHandler(char *msg){
        if(count > 0){
 //             CmiAbort("retainedObjectList for restarted processor not empty");
        }
-
-
        
        DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
-       forAllCharesDo(resendMessageForChare,&d);
+
+
+       //TML: examines the origin processor to determine if it belongs to the same group.
+       // In that case, it only returns the maximum ticket received for each object in the list.
+       if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
+               forAllCharesDo(fillTicketForChare,&d);
+       else
+               forAllCharesDo(resendMessageForChare,&d);
 
        //send back the maximum ticket number for a message sent to each object on the 
        //restarted processor
@@ -2400,8 +2693,6 @@ void _resendMessagesHandler(char *msg){
        }
        
        verifyAckRequestsUnacked=0;*/
-
-
        
        delete [] d.maxTickets;
        delete [] d.ticketVecs;
@@ -2415,6 +2706,9 @@ void _resendMessagesHandler(char *msg){
 void sortVec(CkVec<MCount> *TNvec);
 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
 
+/**
+ * @brief Receives the tickets assigned to message to other objects.
+ */
 void _resendReplyHandler(char *msg){   
        /**
                need to rewrite this method to deal with parallel restart
@@ -2431,9 +2725,7 @@ void _resendReplyHandler(char *msg){
                int vecsize;
                memcpy(&vecsize,listTickets,sizeof(int));
                listTickets = &listTickets[sizeof(int)];
-               MCount *listTNs = (MCount *)listTickets;
-               
-               
+               MCount *listTNs = (MCount *)listTickets;        
                listTickets = &listTickets[vecsize*sizeof(MCount)];
                
                if(obj != NULL){
@@ -2470,11 +2762,14 @@ void _receivedTNDataHandler(ReceivedTNData *msg){
        }
 };
 
-
-void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
+/**
+ * @brief Processes the received list of tickets from a particular PE.
+ */
+void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
+       // increases the number of resendReply received
        obj->mlogData->resendReplyRecvd++;
-
        
+       // includes the tickets into the receivedTN structure
        for(int j=0;j<listSize;j++){
                obj->mlogData->receivedTNs->push_back(listTNs[j]);
        }
@@ -2489,8 +2784,7 @@ void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
                sortVec(obj->mlogData->receivedTNs);
        
                //after all the received tickets are in we need to sort them and then 
-               // calculate the holes
-               
+               // calculate the holes  
                if(obj->mlogData->receivedTNs->size() > 0){
                        int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
                        int vecsize = obj->mlogData->receivedTNs->size();
@@ -2521,12 +2815,14 @@ void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
                                CkAssert(countHoles == numberHoles);                                    
                                obj->mlogData->currentHoles = numberHoles;
                        }
-               }       
-               
+               }
+       
+               // cleaning up structures and getting ready to continue execution       
                delete obj->mlogData->receivedTNs;
                obj->mlogData->receivedTNs = NULL;
                obj->mlogData->restartFlag = 0;
-               char objString[100];
+
+               DEBUGRESTART(char objString[100]);
                DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
        }
 
@@ -3020,70 +3316,243 @@ void _receiveLocationHandler(CurrentLocationMsg *data){
                        }
                }
        }
-       if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
-               int targetPE = data->fromPE;
-               data->fromPE = CmiMyPe();
-               data->locationPE = CmiMyPe();
-               DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
-               CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
-       }else{
-               mgr->inform(data->idx,data->locationPE);
+       if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
+               int targetPE = data->fromPE;
+               data->fromPE = CmiMyPe();
+               data->locationPE = CmiMyPe();
+               DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
+               CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
+       }else{
+               mgr->inform(data->idx,data->locationPE);
+       }
+       CmiFree(data);
+       traceUserBracketEvent(38,_startTime,CmiWallTimer());
+}
+
+
+
+void getGlobalStep(CkGroupID gID){
+       LBStepMsg msg;
+       int destPE = 0;
+       msg.lbID = gID;
+       msg.fromPE = CmiMyPe();
+       msg.step = -1;
+       CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
+       CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
+};
+
+void _getGlobalStepHandler(LBStepMsg *msg){
+       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
+       msg->step = lb->step();
+       CmiAssert(msg->fromPE != CmiMyPe());
+       CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
+       CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
+       CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
+};
+
+void _recvGlobalStepHandler(LBStepMsg *msg){
+       
+       restartDecisionNumber=msg->step;
+       RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
+       _updateHomeAckHandler(dummyAck);
+};
+
+/**
+ * @brief Function to wrap up performance information.
+ */
+void _messageLoggingExit(){
+/*     if(CkMyPe() == 0){
+               if(countBuffered != 0){
+                       printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
+               }
+
+//             printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
+       }
+       printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
+       printf("[%d] _messageLoggingExit \n",CmiMyPe());
+
+       //TML: printing some statistics for group approach
+       //if(TEAM_SIZE_MLOG > 1)
+               CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
+
+}
+
+/**
+       The method for returning the actual object pointed to by an id
+       If the object doesnot exist on the processor it returns NULL
+**/
+
+void* CkObjID::getObject(){
+       
+               switch(type){
+                       case TypeChare:
+       
+                               return CkLocalChare(&data.chare.id);
+                       case TypeGroup:
+       
+                               CkAssert(data.group.onPE == CkMyPe());
+                               return CkLocalBranch(data.group.id);
+                       case TypeNodeGroup:
+                               CkAssert(data.group.onPE == CkMyNode());
+                               //CkLocalNodeBranch(data.group.id);
+                               {
+                                       CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
+                                 void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
+                                 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
+       
+                                       return retval;
+                               }       
+                       case TypeArray:
+                               {
+       
+       
+                                       CkArrayID aid(data.array.id);
+       
+                                       if(aid.ckLocalBranch() == NULL){ return NULL;}
+       
+                                       CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
+       
+                                       return aProxy.ckLocal();
+                               }
+                       default:
+                               CkAssert(0);
+               }
+}
+
+
+int CkObjID::guessPE(){
+               switch(type){
+                       case TypeChare:
+                               return data.chare.id.onPE;
+                       case TypeGroup:
+                       case TypeNodeGroup:
+                               return data.group.onPE;
+                       case TypeArray:
+                               {
+                                       CkArrayID aid(data.array.id);
+                                       if(aid.ckLocalBranch() == NULL){
+                                               return -1;
+                                       }
+                                       return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
+                               }
+                       default:
+                               CkAssert(0);
+               }
+};
+
+char *CkObjID::toString(char *buf) const {
+       
+       switch(type){
+               case TypeChare:
+                       sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
+                       break;
+               case TypeGroup:
+                       sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
+                       break;
+               case TypeNodeGroup:
+                       sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
+                       break;
+               case TypeArray:
+                       {
+                               const CkArrayIndexMax &idx = data.array.idx.asMax();
+                               const int *indexData = idx.data();
+                               sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
+                               break;
+                       }
+               default:
+                       CkAssert(0);
+       }
+       
+       return buf;
+};
+
+void CkObjID::updatePosition(int PE){
+       if(guessPE() == PE){
+               return;
+       }
+       switch(type){
+               case TypeArray:
+                       {
+                                       CkArrayID aid(data.array.id);
+                                       if(aid.ckLocalBranch() == NULL){
+                                               
+                                       }else{
+                                               char str[100];
+                                               CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
+//                                             CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
+                                               CkLocRec *rec = mgr->elementNrec(data.array.idx.asMax());
+                                               if(rec != NULL){
+                                                       if(rec->type() == CkLocRec::local){
+                                                               CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
+                                                               return;
+                                                       }
+                                               }
+                                               mgr->inform(data.array.idx.asMax(),PE);
+                                       }       
+                               }
+
+                       break;
+               case TypeChare:
+                       CkAssert(data.chare.id.onPE == PE);
+                       break;
+               case TypeGroup:
+               case TypeNodeGroup:
+                       CkAssert(data.group.onPE == PE);
+                       break;
+               default:
+                       CkAssert(0);
+       }
+}
+
+
+
+void MlogEntry::pup(PUP::er &p){
+       p | destPE;
+       p | _infoIdx;
+       p | unackedLocal;
+       int size;
+       if(!p.isUnpacking()){
+/*             CkAssert(env);
+               if(!env->isPacked()){
+                       CkPackMessage(&env);
+               }*/
+               if(env == NULL){
+                       //message was probably local and has been removed from logs
+                       size = 0;
+               }else{
+                       size = env->getTotalsize();
+               }       
+       }
+       p | size;
+       if(p.isUnpacking()){
+               if(size > 0){
+                       env = (envelope *)_allocEnv(ForChareMsg,size);
+               }else{
+                       env = NULL;
+               }
+       }
+       if(size > 0){
+               p((char *)env,size);
+       
+               if(p.isUnpacking()){
+                       env->localMlogEntry = NULL;
+               }
        }
-       CmiFree(data);
-       traceUserBracketEvent(38,_startTime,CmiWallTimer());
-}
-
-
-
-void getGlobalStep(CkGroupID gID){
-       LBStepMsg msg;
-       int destPE = 0;
-       msg.lbID = gID;
-       msg.fromPE = CmiMyPe();
-       msg.step = -1;
-       CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
-       CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
-};
-
-void _getGlobalStepHandler(LBStepMsg *msg){
-       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
-       msg->step = lb->step();
-       CmiAssert(msg->fromPE != CmiMyPe());
-       CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
-       CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
-       CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
 };
 
-void _recvGlobalStepHandler(LBStepMsg *msg){
-       
-       restartDecisionNumber=msg->step;
-       RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
-       _updateHomeAckHandler(dummyAck);
+void RestoredLocalMap::pup(PUP::er &p){
+       p | minSN;
+       p | maxSN;
+       p | count;
+       if(p.isUnpacking()){
+               TNArray = new MCount[count];
+       }
+       p(TNArray,count);
 };
 
 
 
 
-
-
-
-
-void _messageLoggingExit(){
-/*     if(CkMyPe() == 0){
-               if(countBuffered != 0){
-                       printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
-               }
-
-//             printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
-       }
-       printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
-       printf("[%d] _messageLoggingExit \n",CmiMyPe());
-
-       //GML: printing some statistics for group approach
-       if(GROUP_SIZE_MLOG > 1)
-               CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
-
-}
 /**********************************
        * The methods of the message logging
        * data structure stored in each chare
@@ -3122,7 +3591,7 @@ MCount ChareMlogData::newTN(){
        return TN;
 };
 
-Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
+inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
        char senderName[100];
        char recverName[100];
        double _startTime =CmiWallTimer();
@@ -3174,6 +3643,9 @@ Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
        return ticket;  
 };
 
+/**
+ * Adds an entry into the message log.
+ */
 void ChareMlogData::addLogEntry(MlogEntry *entry){
        char nameString[100];
        DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
@@ -3248,12 +3720,6 @@ void ChareMlogData::sortRestoredLocalMsgLog(){
                CkAssert(map != NULL);
                if(map->TNArray == NULL){
                        map->TNArray = new MCount[map->maxSN-map->minSN+1];                     
-                       //HERE: erase from here
-                       //printf("map->count:%d\n",map->count);
-                       //printf("map->maxSN:%d\n",map->maxSN);
-                       //printf("map->minSN:%d\n",map->minSN);
-                       //HERE: to here
-
                        CkAssert(map->count == map->maxSN-map->minSN+1);
                        map->count = 0;
                }
@@ -3395,6 +3861,8 @@ void ChareMlogData::pup(PUP::er &p){
                        p | (*objID);
                        (*map)->pup(p);
                }
+               // releasing memory for iterator
+               delete iter;
        }else{
                for(int i=0;i<tableSize;i++){
                        CkObjID objID;
@@ -3420,6 +3888,8 @@ void ChareMlogData::pup(PUP::er &p){
                                p | (*objID);
                                (*ticketRow)->pup(p);
                        }
+                       //releasing memory for iterator
+                       delete iter;
                }else{
                        for(int i=0;i<ticketTableSize;i++){
                                CkObjID objID;
@@ -3429,15 +3899,12 @@ void ChareMlogData::pup(PUP::er &p){
                                ticketTable.put(objID) = ticketRow;
                        }
                }
-       }
-
-
-       
+       }       
        
        if(p.isSizing()){
-               char name[40];
                PUP::sizer *sizep = (PUP::sizer *)&p;
                int pupSize = sizep->size()-startSize;
+               DEBUG(char name[40]);
                DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
        //      CkAssert(pupSize <100000000);
        }
@@ -3446,187 +3913,16 @@ void ChareMlogData::pup(PUP::er &p){
        DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
 };
 
-
-/**
-       The method for returning the actual object pointed to by an id
-       If the object doesnot exist on the processor it returns NULL
-**/
-
-void* CkObjID::getObject(){
-       
-               switch(type){
-                       case TypeChare:
-       
-                               return CkLocalChare(&data.chare.id);
-                       case TypeGroup:
-       
-                               CkAssert(data.group.onPE == CkMyPe());
-                               return CkLocalBranch(data.group.id);
-                       case TypeNodeGroup:
-                               CkAssert(data.group.onPE == CkMyNode());
-                               //CkLocalNodeBranch(data.group.id);
-                               {
-                                       CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
-                                 void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
-                                 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
-       
-                                       return retval;
-                               }       
-                       case TypeArray:
-                               {
-       
-       
-                                       CkArrayID aid(data.array.id);
-       
-                                       if(aid.ckLocalBranch() == NULL){ return NULL;}
-       
-                                       CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
-       
-                                       return aProxy.ckLocal();
-                               }
-                       default:
-                               CkAssert(0);
-               }
-}
-
-
-int CkObjID::guessPE(){
-               switch(type){
-                       case TypeChare:
-                               return data.chare.id.onPE;
-                       case TypeGroup:
-                       case TypeNodeGroup:
-                               return data.group.onPE;
-                       case TypeArray:
-                               {
-                                       CkArrayID aid(data.array.id);
-                                       if(aid.ckLocalBranch() == NULL){
-                                               return -1;
-                                       }
-                                       return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
-                               }
-                       default:
-                               CkAssert(0);
-               }
-};
-
-char *CkObjID::toString(char *buf) const {
-       
-       switch(type){
-               case TypeChare:
-                       sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
-                       break;
-               case TypeGroup:
-                       sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
-                       break;
-               case TypeNodeGroup:
-                       sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
-                       break;
-               case TypeArray:
-                       {
-                               const CkArrayIndexMax &idx = data.array.idx.asMax();
-                               const int *indexData = idx.data();
-                               sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
-                               break;
-                       }
-               default:
-                       CkAssert(0);
-       }
-       
-       return buf;
-};
-
-void CkObjID::updatePosition(int PE){
-       if(guessPE() == PE){
-               return;
-       }
-       switch(type){
-               case TypeArray:
-                       {
-                                       CkArrayID aid(data.array.id);
-                                       if(aid.ckLocalBranch() == NULL){
-                                               
-                                       }else{
-                                               char str[100];
-                                               CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
-//                                             CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
-                                               CkLocRec *rec = mgr->elementNrec(data.array.idx.asMax());
-                                               if(rec != NULL){
-                                                       if(rec->type() == CkLocRec::local){
-                                                               CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
-                                                               return;
-                                                       }
-                                               }
-                                               mgr->inform(data.array.idx.asMax(),PE);
-                                       }       
-                               }
-
-                       break;
-               case TypeChare:
-                       CkAssert(data.chare.id.onPE == PE);
-                       break;
-               case TypeGroup:
-               case TypeNodeGroup:
-                       CkAssert(data.group.onPE == PE);
-                       break;
-               default:
-                       CkAssert(0);
-       }
-}
-
-
-
-void MlogEntry::pup(PUP::er &p){
-       p | destPE;
-       p | _infoIdx;
-       p | unackedLocal;
-       int size;
-       if(!p.isUnpacking()){
-/*             CkAssert(env);
-               if(!env->isPacked()){
-                       CkPackMessage(&env);
-               }*/
-               if(env == NULL){
-                       //message was probably local and has been removed from logs
-                       size = 0;
-               }else{
-                       size = env->getTotalsize();
-               }       
-       }
-       p | size;
-       if(p.isUnpacking()){
-               if(size > 0){
-                       env = (envelope *)_allocEnv(ForChareMsg,size);
-               }else{
-                       env = NULL;
-               }
-       }
-       if(size > 0){
-               p((char *)env,size);
-       
-               if(p.isUnpacking()){
-                       env->localMlogEntry = NULL;
-               }
-       }
-};
-
-void RestoredLocalMap::pup(PUP::er &p){
-       p | minSN;
-       p | maxSN;
-       p | count;
-       if(p.isUnpacking()){
-               TNArray = new MCount[count];
-       }
-       p(TNArray,count);
-};
-
-
 /****************
 *****************/
+
+/**
+ * Getting the pe number of the current processor's buddy.
+ */
 int getCheckPointPE(){
-       //GML: assigning a group-based buddy
-       if(GROUP_SIZE_MLOG != 1){
-               return (CmiMyPe() + GROUP_SIZE_MLOG) % CmiNumPes();
+       //TML: assigning a team-based buddy
+       if(TEAM_SIZE_MLOG != 1){
+               return (CmiMyPe() + TEAM_SIZE_MLOG) % CmiNumPes();
        }
        return (CmiNumPes() -1 - CmiMyPe());
 }