Making use of both message-logging techniques homogeneous in Charm++ core.
[charm.git] / src / ck-core / ckmessagelogging.C
index 9c859099287cf836bf85217489d2e5045e65ef92..f02d31d00cfaa32199c18b25e2ec8f0dc208befe 100644 (file)
@@ -1,11 +1,13 @@
 /**
- * Message Logging Fault Tolerance Protocol
- * It includes the main functions for the basic and team-based schemes.
- */
+  * Simple Causal Message Logging Fault Tolerance Protocol.
+  * Features:
+       * Reduces the latency overhead of the pessimistic approach.
+       * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
+  */
 
 #include "charm.h"
 #include "ck.h"
-#include "ckmessagelogging.h"
+#include "ckcausalmlog.h"
 #include "queueing.h"
 #include <sys/types.h>
 #include <signal.h>
 
 #ifdef _FAULT_MLOG_
 
-//#define DEBUG(x)  if(_restartFlag) {x;}
-#define DEBUG_MEM(x) //x
-#define DEBUG(x)  //x
-#define DEBUGRESTART(x)  //x
-#define DEBUGLB(x) // x
+// Collects some statistics about message logging. Beware of the high cost of accounting for
+// duplicated determinants (for every determinant received, it consists in a linear search 
+// through a potentially big list).
+#define COLLECT_STATS_MSGS 0
+#define COLLECT_STATS_MSGS_TOTAL 0
+#define COLLECT_STATS_MSG_COUNT 0
+#define COLLECT_STATS_DETS 0
+#define COLLECT_STATS_DETS_DUP 0
+#define COLLECT_STATS_MEMORY 0
+#define COLLECT_STATS_TEAM 0
+
+#define RECOVERY_SEND "SEND"
+#define RECOVERY_PROCESS "PROCESS"
+
+#define DEBUG_MEM(x)  //x
+#define DEBUG(x) // x
+#define DEBUG_RESTART(x)  //x
+#define DEBUGLB(x)   // x
 #define DEBUG_TEAM(x)  // x
-
-#define BUFFERED_LOCAL
-#define BUFFERED_REMOTE 
+#define DEBUG_PERF(x) // x
+#define DEBUG_CHECKPOINT 1
+#define DEBUG_NOW(x) x
+#define DEBUG_PE(x,y) // if(CkMyPe() == x) y
+#define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
+#define DEBUG_RECOVERY(x) //x
 
 extern const char *idx2str(const CkArrayIndex &ind);
 extern const char *idx2str(const ArrayElement *el);
-const char *idx2str(const CkArrayIndex &ind){
-       return idx2str((const CkArrayIndex &)ind);
-};
 
 void getGlobalStep(CkGroupID gID);
 
@@ -36,20 +51,15 @@ void sendCheckpointData(int mode);
 void createObjIDList(void *data,ChareMlogData *mlogData);
 inline bool isLocal(int destPE);
 inline bool isTeamLocal(int destPE);
+void printLog(TProcessedLog *log);
 
 int _restartFlag=0;
-//ERASE int restarted=0; // it's not being used anywhere
-
-//TML: variables for measuring savings with teams in message logging
-float MLOGFT_totalLogSize = 0.0;
-float MLOGFT_totalMessages = 0.0;
-float MLOGFT_totalObjects = 0.0;
+int _numRestartResponses=0;
 
 //TODO: remove for perf runs
 int countHashRefs=0; //count the number of gets
 int countHashCollisions=0;
 
-//#define CHECKPOINT_DISK
 char *checkpointDirectory=".";
 int unAckedCheckpoint=0;
 
@@ -61,50 +71,101 @@ int countUpdateHomeAcks=0;
 
 extern int teamSize;
 extern int chkptPeriod;
-extern bool parallelRestart;
+extern bool fastRecovery;
+extern int parallelRecovery;
 
 char *killFile;
+char *faultFile;
 int killFlag=0;
+int faultFlag=0;
 int restartingMlogFlag=0;
 void readKillFile();
 double killTime=0.0;
+double faultMean;
 int checkpointCount=0;
 
-
 CpvDeclare(Chare *,_currentObj);
-CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
-CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
-CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
+CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
 CpvDeclare(Queue, _outOfOrderMessageQueue);
-CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
-//CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
+CpvDeclare(Queue, _delayedRemoteMessageQueue);
 CpvDeclare(char **,_bufferedTicketRequests);
 CpvDeclare(int *,_numBufferedTicketRequests);
-CpvDeclare(char *,_bufferTicketReply);
-
 
+/***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
+/** @note All the determinants generated by a PE are stored in variable _localDets.
+ * As soon as a message is sent, then all the determinants are appended to the message,
+ * but those determinants are not deleted. We must wait until an ACK comes from the receiver
+ * to delete the determinants. In the meantime the same determinants may be appended
+ * to other messages and more determinants can be added to _localDets.
+ * A simple solution to this problem was to have a primitive array and keep adding determinants
+ * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
+ * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
+ * determinant and a number of how many valid determinants there are behind it. We do not remove
+ * determinants until a checkpoint is made, since these determinants may have to be added to
+ * messages in case of a recovery.
+ */
+// temporal storage for the outgoing determinants
+CpvDeclare(char *, _localDets);
+// number of buffered determinants
+int _numBufferedDets;
+// current index of first determinant in _localDets
+int _indexBufferedDets;
+// current phase of determinants
+int _phaseBufferedDets;
+
+// stores the determinants from other nodes, to send them in case of a crash
+CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
+// max number of buffered determinants
+int _maxBufferedDets;
+
+// stores the incarnation number from every other processor
+CpvDeclare(char *, _incarnation);
+
+// storage for remove determinants header
+CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
+// storage for store determinants header
+CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
+// storage for the sizes in vector-send to store determinants
+CpvDeclare(int *, _storeDetsSizes);
+// storage for the pointers in vector-send to store determinants
+CpvDeclare(char **, _storeDetsPtrs);
+
+/***** *****/
+
+/***** VARIABLES FOR PARALLEL RECOVERY *****/
+CpvDeclare(int, _numEmigrantRecObjs);
+CpvDeclare(int, _numImmigrantRecObjs);
+CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
+/***** *****/
+
+#if COLLECT_STATS_MSGS
+int *numMsgsTarget;
+int *sizeMsgsTarget;
+int totalMsgsTarget;
+float totalMsgsSize;
+#endif
+#if COLLECT_STATS_DETS
+int numPiggyDets;
+int numDets;
+int numDupDets;
+#endif
+#if COLLECT_STATS_MEMORY
+int msgLogSize;
+int bufferedDetsSize;
+int storedDetsSize;
+#endif
+//TML: variables for measuring savings with teams in message logging
+#if COLLECT_STATS_TEAM
+float MLOGFT_totalLogSize = 0.0;
+float MLOGFT_totalMessages = 0.0;
+#endif
 
 static double adjustChkptPeriod=0.0; //in ms
 static double nextCheckpointTime=0.0;//in seconds
+static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
 
-double lastBufferedLocalMessageCopyTime;
-
-int _maxBufferedMessages;
-int _maxBufferedTicketRequests;
-int BUFFER_TIME=2; // in ms
-
-
-int _ticketRequestHandlerIdx;
-int _ticketHandlerIdx;
-int _localMessageCopyHandlerIdx;
-int _localMessageAckHandlerIdx;
 int _pingHandlerIdx;
-int _bufferedLocalMessageCopyHandlerIdx;
-int _bufferedLocalMessageAckHandlerIdx;
-int _bufferedTicketRequestHandlerIdx;
-int _bufferedTicketHandlerIdx;
-
 
 char objString[100];
 int _checkpointRequestHandlerIdx;
@@ -125,9 +186,14 @@ int        _recvGlobalStepHandlerIdx;
 int _updateHomeRequestHandlerIdx;
 int _updateHomeAckHandlerIdx;
 int _resendMessagesHandlerIdx;
-int _resendReplyHandlerIdx;
+int _sendDetsHandlerIdx;
+int _sendDetsReplyHandlerIdx;
 int _receivedTNDataHandlerIdx;
+int _receivedDetDataHandlerIdx;
 int _distributedLocationHandlerIdx;
+int _sendBackLocationHandlerIdx;
+int _storeDeterminantsHandlerIdx;
+int _removeDeterminantsHandlerIdx;
 
 //TML: integer constants for team-based message logging
 int _restartHandlerIdx;
@@ -143,18 +209,12 @@ int verifyAckedRequests=0;
 
 RestartRequest *storedRequest;
 
-
-
 int _falseRestart =0; /**
                                                                                                        For testing on clusters we might carry out restarts on 
                                                                                                        a porcessor without actually starting it
                                                                                                        1 -> false restart
                                                                                                        0 -> restart after an actual crash
-                                                                                               */                                                                                                                              
-
-//lock for the ticketRequestHandler and ticketLogLocalMessage methods;
-int _lockNewTicket=0;
-
+                                                                                               */
 
 //Load balancing globals
 int onGoingLoadBalancing=0;
@@ -177,32 +237,22 @@ int globalResumeCount=0;
 CkGroupID globalLBID;
 int restartDecisionNumber=-1;
 
-
 double lastCompletedAlarm=0;
 double lastRestart=0;
 
-
 //update location globals
 int _receiveLocationHandlerIdx;
 
 
-
-// initialize message logging datastructures and register handlers
+/** 
+ * @brief Initialize message logging data structures and register handlers
+ */
 void _messageLoggingInit(){
        //current object
        CpvInitialize(Chare *,_currentObj);
        
        //registering handlers for message logging
-       _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
-       _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
-       _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
-       _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
        _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
-       _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
-       _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
-       _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
-       _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
-
                
        //handlers for checkpointing
        _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
@@ -210,16 +260,18 @@ void _messageLoggingInit(){
        _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
        _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
 
-
        //handlers for restart
        _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
        _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
        _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
        _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
        _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
-       _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
+       _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
+       _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
        _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
+       _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
        _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
+       _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
        _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
        _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
        _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
@@ -229,6 +281,9 @@ void _messageLoggingInit(){
        _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
        _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
 
+       // handlers for causal message logging
+       _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
+       _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
        
        //handlers for load balancing
        _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
@@ -236,26 +291,19 @@ void _messageLoggingInit(){
        _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
        _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
        _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
-       _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
-       _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
        _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
        _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
-
        
        //handlers for updating locations
        _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
        
        //Cpv variables for message logging
-       CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
-       CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
-       CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
-       CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
-       CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
-       CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
+       CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
+       CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
        CpvInitialize(Queue, _outOfOrderMessageQueue);
+       CpvInitialize(Queue, _delayedRemoteMessageQueue);
        CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
-       CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
-       CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
+       CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
        
        CpvInitialize(char **,_bufferedTicketRequests);
        CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
@@ -264,34 +312,44 @@ void _messageLoggingInit(){
                CpvAccess(_bufferedTicketRequests)[i]=NULL;
                CpvAccess(_numBufferedTicketRequests)[i]=0;
        }
-  CpvInitialize(char *,_bufferTicketReply);
-       CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
-       
-//     CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
-       CcdCallFnAfter(retryTicketRequest,NULL,100);    
-       
-       
+       // Cpv variables for causal protocol
+       _numBufferedDets = 0;
+       _indexBufferedDets = 0;
+       _phaseBufferedDets = 0;
+       _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
+       CpvInitialize(char *, _localDets);
+       CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
+       CpvInitialize(char *, _incarnation);
+       CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
+       CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
+       CpvInitialize(int *, _storeDetsSizes);
+       CpvInitialize(char **, _storeDetsPtrs);
+       CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
+       CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
+       CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
+       for(int i=0; i<CmiNumPes(); i++){
+               CpvAccess(_incarnation)[i] = 0;
+       }
+       CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
+       CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
+       CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
+       CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
+
+       // Cpv variables for parallel recovery
+       CpvInitialize(int, _numEmigrantRecObjs);
+    CpvAccess(_numEmigrantRecObjs) = 0;
+    CpvInitialize(int, _numImmigrantRecObjs);
+    CpvAccess(_numImmigrantRecObjs) = 0;
+
+    CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
+    CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
+
        //Cpv variables for checkpoint
        CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
        CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
-       
-//     CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
-//     printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
-//     CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
-       if(CkMyPe() == 0){
-//             CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
-#ifdef         BUFFERED_LOCAL
-               if(CmiMyPe() == 0){
-                       printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
-               }
-#endif
-       }
-#ifdef         BUFFERED_REMOTE
-       if(CmiMyPe() == 0){
-               printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
-       }
-#endif
 
+       // registering user events for projections      
        traceRegisterUserEvent("Remove Logs", 20);
        traceRegisterUserEvent("Ticket Request Handler", 21);
        traceRegisterUserEvent("Ticket Handler", 22);
@@ -303,7 +361,6 @@ void _messageLoggingInit(){
        traceRegisterUserEvent("Checkpoint",28);
        traceRegisterUserEvent("Checkpoint Store",29);
        traceRegisterUserEvent("Checkpoint Ack",30);
-       
        traceRegisterUserEvent("Send Ticket Request",31);
        traceRegisterUserEvent("Generalticketrequest1",32);
        traceRegisterUserEvent("TicketLogLocal",33);
@@ -315,8 +372,32 @@ void _messageLoggingInit(){
        
        lastCompletedAlarm=CmiWallTimer();
        lastRestart = CmiWallTimer();
-//     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
-       CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
+
+#if COLLECT_STATS_MSGS
+#if COLLECT_STATS_MSGS_TOTAL
+       totalMsgsTarget = 0;
+       totalMsgsSize = 0.0;
+#else
+       numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
+       sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
+       for(int i=0; i<CmiNumPes(); i++){
+               numMsgsTarget[i] = 0;
+               sizeMsgsTarget[i] = 0;
+       }
+#endif
+#endif
+#if COLLECT_STATS_DETS
+       numPiggyDets = 0;
+       numDets = 0;
+       numDupDets = 0;
+#endif
+#if COLLECT_STATS_MEMORY
+       msgLogSize = 0;
+       bufferedDetsSize = 0;
+       storedDetsSize = 0;
+#endif
+
+
 }
 
 void killLocal(void *_dummy,double curWallTime);       
@@ -338,6 +419,26 @@ void readKillFile(){
        fclose(fp);
 }
 
+/**
+ * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
+ * We assume an exponential distribution for the mean-time-between-failures.
+ */
+void readFaultFile(){
+        FILE *fp=fopen(faultFile,"r");
+        if(!fp){
+                return;
+        }
+        int proc;
+        double sec;
+        fscanf(fp,"%d %lf",&proc,&sec);
+       faultMean = sec;
+       if(proc == CkMyPe()){
+               printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
+               CcdCallFnAfter(killLocal,NULL,sec*1000);
+       }
+        fclose(fp);
+}
+
 void killLocal(void *_dummy,double curWallTime){
        printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
        if(CmiWallTimer()<killTime-1){
@@ -347,12 +448,50 @@ void killLocal(void *_dummy,double curWallTime){
        }
 }
 
+/*** Auxiliary Functions ***/
 
+/**
+ * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
+ * determinants needs to be extended.
+ */
+inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
+       Determinant *det, *auxDet;
+       char *aux;
+
+       DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
+
+       // checking if we are overflowing the buffered determinants array
+       if(_indexBufferedDets >= _maxBufferedDets){
+               aux = CpvAccess(_localDets);
+               _maxBufferedDets *= 2;
+               CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
+               memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
+               CmiFree(aux);
+       }
+
+       // adding the new determinant at the end
+       det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
+       det->sender = sender;
+       det->receiver = receiver;
+       det->SN = SN;
+       det->TN = TN;
+       _numBufferedDets++;
+       _indexBufferedDets++;
+
+#if COLLECT_STATS_MEMORY
+       bufferedDetsSize++;
+#endif
+#if COLLECT_STATS_DETS
+       numDets++;
+#endif
+}
 
 /************************ Message logging methods ****************/
 
-// send a ticket request to a group on a processor
-void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
+/**
+ * Sends a group message that might be a broadcast.
+ */
+void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
        if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
                DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
                void *origMsg = EnvToUsr(env);
@@ -364,23 +503,28 @@ void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
                                copyEnv->TN=0;
                                copyEnv->sender.type = TypeInvalid;
                                DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
-                               sendTicketGroupRequest(copyEnv,i,_infoIdx);
+                               sendGroupMsg(copyEnv,i,_infoIdx);
                        }
                }
                return;
        }
+
+       // initializing values of envelope
+       env->SN=0;
+       env->TN=0;
+       env->sender.type = TypeInvalid;
+
        CkObjID recver;
        recver.type = TypeGroup;
        recver.data.group.id = env->getGroupNum();
        recver.data.group.onPE = destPE;
-/*     if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
-               CmiPrintStackTrace(0);
-       }*/
-       generateCommonTicketRequest(recver,env,destPE,_infoIdx);
+       sendCommonMsg(recver,env,destPE,_infoIdx);
 }
 
-//send a ticket request to a nodegroup
-void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
+/**
+ * Sends a nodegroup message that might be a broadcast.
+ */
+void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
        if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
                DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
                void *origMsg = EnvToUsr(env);
@@ -391,24 +535,53 @@ void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
                                copyEnv->SN=0;
                                copyEnv->TN=0;
                                copyEnv->sender.type = TypeInvalid;
-                               sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
+                               sendNodeGroupMsg(copyEnv,i,_infoIdx);
                        }
                }
                return;
        }
+
+       // initializing values of envelope
+       env->SN=0;
+       env->TN=0;
+       env->sender.type = TypeInvalid;
+
        CkObjID recver;
        recver.type = TypeNodeGroup;
        recver.data.group.id = env->getGroupNum();
        recver.data.group.onPE = destNode;
-       generateCommonTicketRequest(recver,env,destNode,_infoIdx);
+       sendCommonMsg(recver,env,destNode,_infoIdx);
 }
 
-//send a ticket request to an array element
-void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
+/**
+ * Sends a message to an array element.
+ */
+void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
        CkObjID recver;
        recver.type = TypeArray;
        recver.data.array.id = env->getsetArrayMgr();
-       recver.data.array.idx = *(&env->getsetArrayIndex());
+       recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
+
+       if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
+               char recverString[100],senderString[100];
+               
+               DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
+       }
+
+       // initializing values of envelope
+       env->SN = 0;
+       env->TN = 0;
+
+       sendCommonMsg(recver,env,destPE,_infoIdx);
+};
+
+/**
+ * Sends a message to a singleton chare.
+ */
+void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
+       CkObjID recver;
+       recver.type = TypeChare;
+       recver.data.chare.id = *pCid;
 
        if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
                char recverString[100],senderString[100];
@@ -416,64 +589,82 @@ void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
                DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
        }
 
-       generateCommonTicketRequest(recver,env,destPE,_infoIdx);
+       // initializing values of envelope
+       env->SN = 0;
+       env->TN = 0;
+
+       sendCommonMsg(recver,env,destPE,_infoIdx);
 };
 
 /**
  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
  */
-void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
+void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
        envelope *env = _env;
        MCount ticketNumber = 0;
        int resend=0; //is it a resend
        char recverName[100];
+       char senderString[100];
        double _startTime=CkWallTimer();
        
+       DEBUG_MEM(CmiMemoryCheck());
+
        if(CpvAccess(_currentObj) == NULL){
 //             CkAssert(0);
                DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
                generalCldEnqueue(destPE,env,_infoIdx);
                return;
        }
+
+       // checking if this message should bypass determinants in message-logging
+       if(env->flags & CK_BYPASS_DET_MLOG){
+               env->sender = CpvAccess(_currentObj)->mlogData->objID;
+               env->recver = recver;
+               DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
+               generalCldEnqueue(destPE,env,_infoIdx);
+               return;
+       }
        
+       // setting message logging data in the envelope
+       env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
        if(env->sender.type == TypeInvalid){
                env->sender = CpvAccess(_currentObj)->mlogData->objID;
-               //Set message logging data in the envelope
        }else{
-               envelope *copyEnv = copyEnvelope(env);
-               env = copyEnv;
+//             envelope *copyEnv = copyEnvelope(env);
+//             env = copyEnv;
                env->sender = CpvAccess(_currentObj)->mlogData->objID;
                env->SN = 0;
        }
        
+       DEBUG_MEM(CmiMemoryCheck());
+
        CkObjID &sender = env->sender;
        env->recver = recver;
 
        Chare *obj = (Chare *)env->sender.getObject();
          
        if(env->SN == 0){
+               DEBUG_MEM(CmiMemoryCheck());
                env->SN = obj->mlogData->nextSN(recver);
        }else{
                resend = 1;
        }
        
-       char senderString[100];
 //     if(env->SN != 1){
                DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
        //      CmiPrintStackTrace(0);
 /*     }else{
-               DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
+               DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
        }*/
                
-       MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
 //     CkPackMessage(&(mEntry->env));
 //     traceUserBracketEvent(32,_startTime,CkWallTimer());
        
        _startTime = CkWallTimer();
 
-       // uses the proper ticketing mechanism for local, group and general messages
+       // uses the proper ticketing mechanism for local, team and general messages
        if(isLocal(destPE)){
-               ticketLogLocalMessage(mEntry);
+               sendLocalMsg(env, _infoIdx);
        }else{
                if((teamSize > 1) && isTeamLocal(destPE)){
 
@@ -489,8 +680,9 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                        }
                }
                
-               // sending the ticket request
-               sendTicketRequest(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
+               // sending the message
+               MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
+               sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
                
        }
 }
@@ -520,257 +712,243 @@ inline bool isTeamLocal(int destPE){
        return false;
 }
 
-
-
 /**
  * Method that does the actual send by creating a ticket request filling it up and sending it.
  */
-void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
-       char recverString[100],senderString[100];
+void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
+       DEBUG_NOW(char recverString[100]);
+       DEBUG_NOW(char senderString[100]);
+
+       int totalSize;
+
        envelope *env = entry->env;
-       DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
-/*     envelope *env = entry->env;
-       printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
+       DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
 
+       // setting all the information
        Chare *obj = (Chare *)entry->env->sender.getObject();
+       entry->env->recver = recver;
+       entry->env->SN = SN;
+       entry->env->TN = TN;
        if(!resend){
                //TML: only stores message if either it goes to this processor or to a processor in a different group
                if(!isTeamLocal(entry->destPE)){
                        obj->mlogData->addLogEntry(entry);
+#if COLLECT_STATS_TEAM
                        MLOGFT_totalMessages += 1.0;
                        MLOGFT_totalLogSize += entry->env->getTotalsize();
+#endif
                }else{
                        // the message has to be deleted after it has been sent
-                       entry->env->freeMsg = true;
+                       entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
                }
        }
 
-#ifdef BUFFERED_REMOTE
-       //buffer the ticket request 
-       if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
-               //first message to this processor, buffer needs to be created
-               int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
-               CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
-               DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
-       }
-       //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
-       //Buffer the ticketrequests
-       TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
-       ticketRequest->sender = sender;
-       ticketRequest->recver = recver;
-       ticketRequest->logEntry = entry;
-       ticketRequest->SN = SN;
-       ticketRequest->TN = TN;
-       ticketRequest->senderPE = CkMyPe();
+       // sending the determinants that causally affect this message
+       if(_numBufferedDets > 0){
 
-       CpvAccess(_numBufferedTicketRequests)[destPE]++;
-       
+               // modifying the actual number of determinants sent in message
+               CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
+               CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
+               CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
+               CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
        
-       if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
-               sendBufferedTicketRequests(destPE);
-       }else{
-               if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
-                       int *checkPE = new int;
-                       *checkPE = destPE;
-                       CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
-               }
+               // sending the message
+               CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
+               CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
+               CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
+               CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
+               DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
+               CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
+               CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
        }
-#else
 
-       TicketRequest ticketRequest;
-       ticketRequest.sender = sender;
-       ticketRequest.recver = recver;
-       ticketRequest.logEntry = entry;
-       ticketRequest.SN = SN;
-       ticketRequest.TN = TN;
-       ticketRequest.senderPE = CkMyPe();
-       
-       CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
-//     CmiBecomeImmediate(&ticketRequest);
-       CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
-#endif
-       DEBUG_MEM(CmiMemoryCheck());
-};
+       // updating its message log entry
+       entry->indexBufDets = _indexBufferedDets;
+       entry->numBufDets = _numBufferedDets;
 
-/**
- * Send the ticket requests buffered for processor PE
- **/
-void sendBufferedTicketRequests(int destPE){
-       DEBUG_MEM(CmiMemoryCheck());
-       int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
-       if(numberRequests == 0){
-               return;
-       }
-       DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
-       int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
-       void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
-       BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
-       header->numberLogs = numberRequests;
-       
-       CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
-       CmiSyncSend(destPE,totalSize,(char *)buf);
-       
-       CpvAccess(_numBufferedTicketRequests)[destPE]=0;
-       DEBUG_MEM(CmiMemoryCheck());
-};
+       // sending the message
+       generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
 
-void checkBufferedTicketRequests(void *_destPE,double curWallTime){
-       int destPE = *(int *)_destPE;
-  if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
-               sendBufferedTicketRequests(destPE);
-//             traceUserEvent(35);
-       }
-       delete (int *)_destPE;
        DEBUG_MEM(CmiMemoryCheck());
+#if COLLECT_STATS_MSGS
+#if COLLECT_STATS_MSGS_TOTAL
+       totalMsgsTarget++;
+       totalMsgsSize += (float)env->getTotalsize();
+#else
+       numMsgsTarget[destPE]++;
+       sizeMsgsTarget[destPE] += env->getTotalsize();
+#endif
+#endif
+#if COLLECT_STATS_DETS
+       numPiggyDets += _numBufferedDets;
+#endif
+#if COLLECT_STATS_MEMORY
+       msgLogSize += env->getTotalsize();
+#endif
 };
 
+
 /**
- * Gets a ticket for a local message and then sends a copy to the buddy.
- * This method is always in the main thread(not interrupt).. so it should 
- * never find itself locked out of a newTicket.
+ * @brief Function to send a local message. It first gets a ticket and
+ * then enqueues the message. If we are recovering, then the message 
+ * is enqueued in a delay queue.
  */
-void ticketLogLocalMessage(MlogEntry *entry){
-       double _startTime=CkWallTimer();
+void sendLocalMsg(envelope *env, int _infoIdx){
+       DEBUG_PERF(double _startTime=CkWallTimer());
        DEBUG_MEM(CmiMemoryCheck());
+       DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
+       DEBUG(char senderString[100]);
+       DEBUG(char recverString[100]);
+       Ticket ticket;
+
+       DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
 
-       Chare *recverObj = (Chare *)entry->env->recver.getObject();
-       DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
+       // getting the receiver local object
+       Chare *recverObj = (Chare *)env->recver.getObject();
+
+       // if receiver object is not NULL, we will ask it for a ticket
        if(recverObj){
-               //Consider the case, after a restart when this message has already been allotted a ticket number
-               // and should get the same one as the old one.
-               Ticket ticket;
-               if(recverObj->mlogData->mapTable.numObjects() > 0){
-                       ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
-               }else{
-                       ticket.TN = 0;
+
+/*HERE         // if we are recovery, then we must put this off for later
+               if(recverObj->mlogData->restartFlag){
+                       CpvAccess(_delayedLocalMsgs)->enq(entry);
+                       return;
                }
-               
-               char senderString[100], recverString[100] ;
-               
+
+               // check if this combination of sender and SN has been already a ticket
+               ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
+                       
+               // assigned a new ticket in case this message is completely new
                if(ticket.TN == 0){
                        ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
-       
+
+                       // if TN == 0 we enqueue this message since we are recovering from a crash
                        if(ticket.TN == 0){
-                               CpvAccess(_delayedLocalTicketRequests)->enq(entry);
+                               CpvAccess(_delayedLocalMsgs)->enq(entry);
                                DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
                                
-       //              _lockNewTicket = 0;
 //                             traceUserBracketEvent(33,_startTime,CkWallTimer());
                                return;
                        }
-               }       
+               }
+
                //TODO: check for the case when an invalid ticket is returned
                //TODO: check for OLD or RECEIVED TICKETS
                entry->env->TN = ticket.TN;
                CkAssert(entry->env->TN > 0);
                DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
        
-               // sends a copy of the metadata to the buddy    
-               sendLocalMessageCopy(entry);
-               
                DEBUG_MEM(CmiMemoryCheck());
 
-               // sets the unackedLocal flag and stores the message in the log
-               entry->unackedLocal = 1;
-               CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
+               // adding the determinant to _localDets
+               addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
+*/
+               // sends the local message
+               _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
 
                DEBUG_MEM(CmiMemoryCheck());
        }else{
-               CkPrintf("[%d] Local message in team-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
-               DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
+               DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
        }
-       _lockNewTicket=0;
 //     traceUserBracketEvent(33,_startTime,CkWallTimer());
 };
 
+/****
+       The handler functions
+*****/
+
+/*** CAUSAL PROTOCOL HANDLERS ***/
+
 /**
- * Sends the metadata of a local message to its buddy.
+ * @brief Removes the determinants after a particular index in the _localDets array.
  */
-void sendLocalMessageCopy(MlogEntry *entry){
-       LocalMessageLog msgLog;
-       msgLog.sender = entry->env->sender;
-       msgLog.recver = entry->env->recver;
-       msgLog.SN = entry->env->SN;
-       msgLog.TN = entry->env->TN;
-       msgLog.entry = entry;
-       msgLog.senderPE = CkMyPe();
-       
-       char recvString[100];
-       char senderString[100];
-       DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
+void _removeDeterminantsHandler(char *buffer){
+       RemoveDeterminantsHeader *header;
+       int index, phase;
 
-#ifdef BUFFERED_LOCAL
-       countLocal++;
-       CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
-       if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
-               sendBufferedLocalMessageCopy();
-       }else{
-               if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
-                       lastBufferedLocalMessageCopyTime = CkWallTimer();
-                       CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
-                       countClearBufferedLocalCalls++;
-               }       
-       }
-#else  
-       CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
-       
-       CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
-#endif
-       DEBUG_MEM(CmiMemoryCheck());
-};
+       // getting the header from the message
+       header = (RemoveDeterminantsHeader *)buffer;
+       index = header->index;
+       phase = header->phase;
 
+       // fprintf(stderr,"ACK index=%d\n",index);
 
-void sendBufferedLocalMessageCopy(){
-       int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
-       if(numberLogs == 0){
-               return;
+       // updating the number of buffered determinants
+       if(phase == _phaseBufferedDets){
+               if(index > _indexBufferedDets)
+                       CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
+               CmiAssert(index <= _indexBufferedDets);
+               _numBufferedDets = _indexBufferedDets - index;
        }
-       countBuffered++;
-       int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
-       void *buf=CmiAlloc(totalSize);
-       BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
-       header->numberLogs=numberLogs;
 
-       DEBUG_MEM(CmiMemoryCheck());
-       DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
-       
-       char *ptr = (char *)buf;
-       ptr = &ptr[sizeof(BufferedLocalLogHeader)];
-       
-       for(int i=0;i<numberLogs;i++){
-               LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
-               memcpy(ptr,&log,sizeof(LocalMessageLog));
-               ptr = &ptr[sizeof(LocalMessageLog)];
-       }
+       // releasing memory
+       CmiFree(buffer);
 
-       CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
+}
 
-       CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
+/**
+ * @brief Stores the determinants coming from other processor.
+ */
+void _storeDeterminantsHandler(char *buffer){
+       StoreDeterminantsHeader *header;
+       Determinant *detPtr, det;
+       int i, n, index, phase, destPE;
+       CkVec<Determinant> *vec;
+
+       // getting the header from the message and pointing to the first determinant
+       header = (StoreDeterminantsHeader *)buffer;
+       n = header->number;
+       index = header->index;
+       phase = header->phase;
+       destPE = header->PE;
+       detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
+
+       DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
        DEBUG_MEM(CmiMemoryCheck());
-};
 
-void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
-       countClearBufferedLocalCalls--;
-       if(countClearBufferedLocalCalls > 10){
-               CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-       DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
-       if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
-               if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
-                       sendBufferedLocalMessageCopy();
-//                     traceUserEvent(36);
+       // going through all determinants and storing them into _remoteDets
+       for(i = 0; i < n; i++){
+               det.sender = detPtr->sender;
+               det.receiver = detPtr->receiver;
+               det.SN = detPtr->SN;
+               det.TN = detPtr->TN;
+
+               // getting the determinant array
+               vec = CpvAccess(_remoteDets)->get(det.receiver);
+               if(vec == NULL){
+                       vec = new CkVec<Determinant>();
+                       CpvAccess(_remoteDets)->put(det.receiver) = vec;
+               }
+#if COLLECT_STATS_DETS
+#if COLLECT_STATS_DETS_DUP
+               for(int j=0; j<vec->size(); j++){
+                       if(isSameDet(&(*vec)[j],&det)){
+                               numDupDets++;
+                               break;
+                       }
                }
+#endif
+#endif
+#if COLLECT_STATS_MEMORY
+               storedDetsSize++;
+#endif
+               vec->push_back(det);
+               detPtr = detPtr++;
        }
+
        DEBUG_MEM(CmiMemoryCheck());
-}
 
-/****
-       The handler functions
-*****/
+       // freeing buffer
+       CmiFree(buffer);
 
+       // sending the ACK back to the sender
+       CpvAccess(_removeDetsHeader)->index = index;
+       CpvAccess(_removeDetsHeader)->phase = phase;
+       CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
+       CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
+       
+}
 
-inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
 /**
  *  If there are any delayed requests, process them first before 
  *  processing this request
@@ -778,496 +956,75 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
        DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
        double  _startTime = CkWallTimer();
-       if(CpvAccess(_delayedTicketRequests)->length() > 0){
-               retryTicketRequest(NULL,_startTime);
-       }
-       _processTicketRequest(ticketRequest);
        CmiFree(ticketRequest);
 //     traceUserBracketEvent(21,_startTime,CkWallTimer());                     
 }
-/** Handler used for dealing with a bunch of ticket requests
- * from one processor. The replies are also bunched together
- * Does not use _ticketRequestHandler
- * */
-void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
-       DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
-       DEBUG_MEM(CmiMemoryCheck());
-       double _startTime = CkWallTimer();
-       if(CpvAccess(_delayedTicketRequests)->length() > 0){
-               retryTicketRequest(NULL,_startTime);
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-  int numRequests = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-       msg = &msg[sizeof(BufferedTicketRequestHeader)];
-       int senderPE=((TicketRequest *)msg)->senderPE;
 
-       
-       int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
-       void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
-       
-       char *ptr = (char *)buf;
-       BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
-       header->numberLogs = 0;
 
+/**
+ * @brief Gets a ticket for a recently received message.
+ * @pre env->recver has to be on this processor.
+ * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
+ */
+inline bool _getTicket(envelope *env, int *flag){
        DEBUG_MEM(CmiMemoryCheck());
+       DEBUG(char recverName[100]);
+       DEBUG(char senderString[100]);
+       Ticket ticket;
        
-       ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
-       
-       for(int i=0;i<numRequests;i++){
-               TicketRequest *request = (TicketRequest *)msg;
-               msg = &msg[sizeof(TicketRequest)];
-               bool replied = _processTicketRequest(request,(TicketReply *)ptr);
-
-               if(replied){
-                       //the ticket request has been processed and 
-                       //the reply will be stored in the ptr
-                       header->numberLogs++;
-                       ptr = &ptr[sizeof(TicketReply)];
-               }
-       }
-/*     if(header->numberLogs == 0){
-                       printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
-       }*/
-
-       CmiSetHandler(buf,_bufferedTicketHandlerIdx);
-       CmiSyncSend(senderPE,totalSize,(char *)buf);
-       CmiFree(recvdHeader);
-//     traceUserBracketEvent(21,_startTime,CkWallTimer());                     
-       DEBUG_MEM(CmiMemoryCheck());
-};
-
-/**Process the ticket request. 
- * If it is processed and a reply is being sent 
- * by this processor return true
- * else return false.
- * If a reply buffer is specified put the reply into that
- * else send the reply
- * */
-inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
-
-/*     if(_lockNewTicket){
-               printf("ddeded %d\n",CkMyPe());
-               if(CmiIsImmediate(ticketRequest)){
-                       CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
-               }
-               CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
-               
-       }else{
-               _lockNewTicket = 1;
-       }*/
-
-       DEBUG_MEM(CmiMemoryCheck());
-
        // getting information from request
-       CkObjID sender = ticketRequest->sender;
-       CkObjID recver = ticketRequest->recver;
-       MCount SN = ticketRequest->SN;
-       MCount TN = ticketRequest->TN;
+       CkObjID sender = env->sender;
+       CkObjID recver = env->recver;
+       MCount SN = env->SN;
+       MCount TN = env->TN;
        Chare *recverObj = (Chare *)recver.getObject();
        
-       DEBUG(char recverName[100]);
        DEBUG(recver.toString(recverName);)
 
-       if(recverObj == NULL){
-               int estPE = recver.guessPE();
-               if(estPE == CkMyPe() || estPE == -1){           
-                       //try to fulfill the request after some time
-                       char senderString[100];
-                       DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
-                       if(estPE == CkMyPe() && recver.type == TypeArray){
-                               CkArrayID aid(recver.data.array.id);            
-                               CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
-                               DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx)));
-                       }
-                       TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
-                       *delayed = *ticketRequest;
-                       CpvAccess(_delayedTicketRequests)->enq(delayed);
-                       
-               }else{
-                       DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
-                       TicketRequest forward = *ticketRequest;
-                       CmiSetHandler(&forward,_ticketRequestHandlerIdx);
-                       CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);
-               }
-       DEBUG_MEM(CmiMemoryCheck());
-               return false; // if the receverObj does not exist the ticket request cannot have been 
-                             // processed successfully
-       }else{
-               char senderString[100];
-               
-               Ticket ticket;
-
-               // checking if the message is team local and if it has a ticket already assigned
-               if(teamSize > 1 && TN != 0){
-                       DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
-                       ticket.TN = TN;
-                       recverObj->mlogData->verifyTicket(sender,SN,TN);
-               }
-
-               //check if a ticket for this has been already handed out to an object that used to be local but 
-               // is no longer so.. need for parallel restart
-               if(recverObj->mlogData->mapTable.numObjects() > 0){
-                       
-                       ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
-               }
-               
-               if(ticket.TN == 0){
-                       ticket = recverObj->mlogData->next_ticket(sender,SN);
-               }
-               if(ticket.TN > recverObj->mlogData->tProcessed){
-                       ticket.state = NEW_TICKET;
-               }else{
-                       ticket.state = OLD_TICKET;
-               }
-               //TODO: check for the case when an invalid ticket is returned
-               if(ticket.TN == 0){
-                       DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
-                       TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
-                       *delayed = *ticketRequest;
-                       CpvAccess(_delayedTicketRequests)->enq(delayed);
-                       return false;
-               }
-/*             if(ticket.TN < SN){ //error state this really should not happen
-                       recver.toString(recverName);
-                 printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
-               }*/
-//             CkAssert(ticket.TN >= SN);
-               DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
-//             TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
-    if(reply == NULL){ 
-                       //There is no reply buffer and the ticketreply is going to be 
-                       //sent immediately
-                       TicketReply ticketReply;
-                       ticketReply.request = *ticketRequest;
-                       ticketReply.ticket = ticket;
-                       ticketReply.recverPE = CkMyPe();
-                       CmiSetHandler(&ticketReply,_ticketHandlerIdx);
-//             CmiBecomeImmediate(&ticketReply);
-                       CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
-        }else{ // Store ticket reply in the buffer provided
-                reply->request = *ticketRequest;
-                reply->ticket = ticket;
-                reply->recverPE = CkMyPe();
-                CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
-                                                        // in case the ticket needs to be forwarded or something
-        }
-               DEBUG_MEM(CmiMemoryCheck());
-               return true;
-       }
-//     _lockNewTicket=0;
-};
-
-
-/**
- * @brief This function handles the ticket received after a request.
- */
-inline void _ticketHandler(TicketReply *ticketReply){
-
-       double _startTime = CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());    
-       
-       char senderString[100];
-       CkObjID sender = ticketReply->request.sender;
-       CkObjID recver = ticketReply->request.recver;
-       
-       if(sender.guessPE() != CkMyPe()){
-               DEBUG(CkAssert(sender.guessPE()>= 0));
-               DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
-       //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
-               ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
-               CmiSetHandler(ticketReply,_ticketHandlerIdx);
-#ifdef BUFFERED_REMOTE
-               //will be freed by the buffered ticket handler most of the time
-               //this might lead to a leak just after migration
-               //when the ticketHandler is directly used without going through the buffered handler
-               CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
-#else
-               CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
-#endif 
-       }else{
-               char recverName[100];
-               DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
-               MlogEntry *logEntry=NULL;
-               if(ticketReply->ticket.state & FORWARDED_TICKET){
-                       // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
-                       DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
-                       Chare *senderObj = (Chare *)sender.getObject();
-                       if(senderObj){
-                               CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
-                               for(int i=0;i<mlog->length();i++){
-                                       MlogEntry *tempEntry = (*mlog)[i];
-                                       if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
-                                               logEntry = tempEntry;
-                                               break;
-                                       }
-                               }
-                               if(logEntry == NULL){
-#ifdef BUFFERED_REMOTE
-#else
-                                       CmiFree(ticketReply);
-#endif                                 
-                                       return;
-                               }
-                       }else{
-                               CmiAbort("This processor thinks it should have the sender\n");
-                       }
-                       ticketReply->ticket.state ^= FORWARDED_TICKET;
-               }else{
-                       logEntry = ticketReply->request.logEntry;
-               }
-               if(logEntry->env->TN <= 0){
-                       //This logEntry has not received a TN earlier
-                       char recverString[100];
-                       logEntry->env->TN = ticketReply->ticket.TN;
-                       logEntry->env->setSrcPe(CkMyPe());
-                       if(ticketReply->ticket.state == NEW_TICKET){
-
-                               // if message is group local, we store its metadata in teamTable
-                               if(isTeamLocal(ticketReply->recverPE)){
-                                       //DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
-                                       Chare *senderObj = (Chare *)sender.getObject();
-                                       SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
-                                       if(ticketRow == NULL){
-                                               ticketRow = new SNToTicket();
-                                               senderObj->mlogData->teamTable.put(recver) = ticketRow; 
-                                       }
-                                       ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
-                               }
-
-                               DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
-                               if(ticketReply->recverPE != CkMyPe()){
-                                       generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
-                               }else{
-                                       //It is now a local message use the local message protocol
-                                       sendLocalMessageCopy(logEntry);
-                               }       
-                       }
-               }else{
-                       DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
-               }
-               recver.updatePosition(ticketReply->recverPE);
-#ifdef BUFFERED_REMOTE
-#else
-               CmiFree(ticketReply);
-#endif
-       }
-       CmiMemoryCheck();
-
-//     traceUserBracketEvent(22,_startTime,CkWallTimer());     
-};
-
-/**
- * Message to handle the bunch of tickets 
- * that we get from one processor. We send 
- * the tickets to be handled one at a time
- * */
-
-void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
-       double _startTime=CmiWallTimer();
-       int numTickets = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-       msg = &msg[sizeof(BufferedTicketRequestHeader)];
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       TicketReply *_reply = (TicketReply *)msg;
-
-       
-       for(int i=0;i<numTickets;i++){
-               TicketReply *reply = (TicketReply *)msg;
-               _ticketHandler(reply);
-               
-               msg = &msg[sizeof(TicketReply)];
-       }
-       
-       CmiFree(recvdHeader);
-//     traceUserBracketEvent(22,_startTime,CkWallTimer());
-       DEBUG_MEM(CmiMemoryCheck());
-};
-
-/**
- * Stores the metadata of a local message from its buddy.
- */
-void _localMessageCopyHandler(LocalMessageLog *msgLog){
-       double _startTime = CkWallTimer();
-       
-       char senderString[100],recverString[100];
-       DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
-/*     if(!fault_aware(msgLog->recver)){
-               CmiAbort("localMessageCopyHandler with non fault aware local message copy");
-       }*/
-       CpvAccess(_localMessageLog)->enq(*msgLog);
-       
-       LocalMessageLogAck ack;
-       ack.entry = msgLog->entry;
-       DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
-       CmiSetHandler(&ack,_localMessageAckHandlerIdx);
-       CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
-       
-//     traceUserBracketEvent(23,_startTime,CkWallTimer());
-};
+       // verifying whether we are recovering from a failure or not
+       if(recverObj->mlogData->restartFlag)
+               return false;
 
-void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
-       double _startTime = CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());
+       // checking ticket table to see if this message already received a ticket
+       ticket = recverObj->mlogData->getTicket(sender, SN);
+       TN = ticket.TN;
        
-       int numLogs = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-
-       //piggy back the logs already stored on this processor
-       int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
-       numPiggyLogs=0; //uncomment to turn off piggy backing of acks
-/*     if(numPiggyLogs > 0){
-               if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
-                       CmiAssert(0);
-               }
-       }*/
-       DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
-       
-       int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
-       void *buf = CmiAlloc(totalSize);
-       char *ptr = (char *)buf;
-       memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
-       
-       msg = &msg[sizeof(BufferedLocalLogHeader)];
-       ptr = &ptr[sizeof(BufferedLocalLogHeader)];
-
-       DEBUG_MEM(CmiMemoryCheck());
-       int PE;
-       for(int i=0;i<numLogs;i++){
-               LocalMessageLog *msgLog = (LocalMessageLog *)msg;
-               CpvAccess(_localMessageLog)->enq(*msgLog);
-               PE = msgLog->senderPE;
-               DEBUG(CmiAssert( PE == getCheckPointPE()));
-
-               LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
-               ack->entry = msgLog->entry;
-               
-               msg = &msg[sizeof(LocalMessageLog)];
-               ptr = &ptr[sizeof(LocalMessageLogAck)];
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-
-       BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
-       piggyHeader->numberLogs = numPiggyLogs;
-       ptr = &ptr[sizeof(BufferedLocalLogHeader)];
-       if(numPiggyLogs > 0){
-               countPiggy++;
+       // checking if the message is team local and if it has a ticket already assigned
+       if(teamSize > 1 && TN != 0){
+               DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
+               recverObj->mlogData->verifyTicket(sender,SN,TN);
        }
 
-       for(int i=0;i<numPiggyLogs;i++){
-               LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
-               memcpy(ptr,&log,sizeof(LocalMessageLog));
-               ptr = &ptr[sizeof(LocalMessageLog)];
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
-       CmiSyncSendAndFree(PE,totalSize,(char *)buf);
-               
-/*     for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
-                       LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
-                       if(!fault_aware(localLogEntry.recver)){
-                               CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
-                       }
-       }*/
-       if(freeHeader){
-               CmiFree(recvdHeader);
+       //check if a ticket for this has been already handed out to an object that used to be local but 
+       // is no longer so.. need for parallel restart
+       if(TN == 0){
+               ticket = recverObj->mlogData->next_ticket(sender,SN);
+               *flag = NEW_TICKET;
+       } else {
+               *flag = OLD_TICKET;
        }
-       DEBUG_MEM(CmiMemoryCheck());
-//     traceUserBracketEvent(23,_startTime,CkWallTimer());
-}
-
-
-void _localMessageAckHandler(LocalMessageLogAck *ack){
-       
-       double _startTime = CkWallTimer();
-       
-       MlogEntry *entry = ack->entry;
-       if(entry == NULL){
-               CkExit();
+       if(ticket.TN > recverObj->mlogData->tProcessed){
+               ticket.state = NEW_TICKET;
+       } else {
+               ticket.state = OLD_TICKET;
        }
-       entry->unackedLocal = 0;
-       envelope *env = entry->env;
-       char recverName[100];
-       char senderString[100];
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
-       if(env == NULL)
-               return;
-       CkAssert(env->SN > 0);
-       CkAssert(env->TN > 0);
-       env->sender.toString(senderString);
-       DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
-       env->recver.toString(recverName);
-
-       DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
-       
-/*     
-       void *origMsg = EnvToUsr(env);
-       void *copyMsg = CkCopyMsg(&origMsg);
-       envelope *copyEnv = UsrToEnv(copyMsg);
-       entry->env = UsrToEnv(origMsg);*/
-
-//     envelope *copyEnv = copyEnvelope(env);
-
-       envelope *copyEnv = env;
-       copyEnv->localMlogEntry = entry;
-
-       DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
-       if(CmiMyPe() != entry->destPE){
-               DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
+       // @todo: check for the case when an invalid ticket is returned
+       if(ticket.TN == 0){
+               DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
+               return false;
        }
-//     generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
-       _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
-       
-       
-#ifdef BUFFERED_LOCAL
-#else
-       CmiFree(ack);
-//     traceUserBracketEvent(24,_startTime,CkWallTimer());
-#endif
-       
-       DEBUG_MEM(CmiMemoryCheck());
-       DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
-}
-
-
-void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
-
-       double _startTime=CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());
-
-       int numLogs = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-       msg = &msg[sizeof(BufferedLocalLogHeader)];
-
-       DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
-       
-       for(int i=0;i<numLogs;i++){
-               LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
-               _localMessageAckHandler(ack);
                
-               msg = &msg[sizeof(LocalMessageLogAck)]; 
-       }
+       // setting the ticket number in the envelope
+       env->TN = ticket.TN;
+       DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
+       return true;
 
-       //deal with piggy backed local logs
-       BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
-       //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
-       if(piggyHeader->numberLogs > 0){
-               _bufferedLocalMessageCopyHandler(piggyHeader,0);
-       }
-       
-       CmiFree(recvdHeader);
-       DEBUG_MEM(CmiMemoryCheck());
-//     traceUserBracketEvent(24,_startTime,CkWallTimer());
-}
+};
 
 bool fault_aware(CkObjID &recver){
        switch(recver.type){
                case TypeChare:
-                       return false;
+                       return true;
                case TypeMainChare:
                        return false;
                case TypeGroup:
@@ -1279,44 +1036,94 @@ bool fault_aware(CkObjID &recver){
        }
 };
 
-int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
-       char recverString[100];
-       char senderString[100];
-       
+/* Preprocesses a received message */
+int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
+       DEBUG_NOW(char recverString[100]);
+       DEBUG_NOW(char senderString[100]);
        DEBUG_MEM(CmiMemoryCheck());
+       int flag;
+       bool ticketSuccess;
+
+       // getting the receiver object
        CkObjID recver = env->recver;
-       if(!fault_aware(recver))
-               return 1;
 
+       // checking for determinants bypass in message logging
+       if(env->flags & CK_BYPASS_DET_MLOG){
+               DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
+               return 1;       
+       }
+
+       // checking if receiver is fault aware
+       if(!fault_aware(recver)){
+               CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
+               return 1;
+       }
 
        Chare *obj = (Chare *)recver.getObject();
        *objPointer = obj;
        if(obj == NULL){
                int possiblePE = recver.guessPE();
                if(possiblePE != CkMyPe()){
-                       int totalSize = env->getTotalsize();                    
+                       int totalSize = env->getTotalsize();
                        CmiSyncSend(possiblePE,totalSize,(char *)env);
+                       
+                       DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
+               }else{
+                       // this is the case where a message is received and the object has not been initialized
+                       // we delayed the delivery of the message
+                       CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
+                       
+                       DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
                }
                return 0;
        }
 
+       // checking if message comes from an old incarnation
+       // message must be discarded
+       if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
+               CmiFree(env);
+               return 0;
+       }
+
+       DEBUG_MEM(CmiMemoryCheck());
+       DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
+
+       // getting a ticket for this message
+       ticketSuccess = _getTicket(env,&flag);
+
+       // we might be during recovery when this message arrives
+       if(!ticketSuccess){
+               
+               // adding the message to a delay queue
+               CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
+               DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
+
+               return 0;
+       }
+       
+       //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
+       //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
+       
+       if(flag == NEW_TICKET){
+               // storing determinant of message in data structure
+               addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
+       }
+
+       DEBUG_MEM(CmiMemoryCheck());
 
        double _startTime = CkWallTimer();
 //env->sender.updatePosition(env->getSrcPe());
        if(env->TN == obj->mlogData->tProcessed+1){
                //the message that needs to be processed now
-               DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
+               DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
                // once we find a message that we can process we put back all the messages in the out of order queue
                // back into the main scheduler queue. 
-               if(env->sender.guessPE() == CkMyPe()){
-                       *logEntryPointer = env->localMlogEntry;
-               }
        DEBUG_MEM(CmiMemoryCheck());
                while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
                        void *qMsgPtr;
                        CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
                        envelope *qEnv = (envelope *)qMsgPtr;
-                       CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
+                       CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
        DEBUG_MEM(CmiMemoryCheck());
                }
 //             traceUserBracketEvent(25,_startTime,CkWallTimer());
@@ -1325,16 +1132,18 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
        DEBUG_MEM(CmiMemoryCheck());
                return 1;
        }
+
+       // checking if message has already been processed
+       // message must be discarded
        if(env->TN <= obj->mlogData->tProcessed){
-               //message already processed
-               DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
-//             traceUserBracketEvent(26,_startTime,CkWallTimer());
-       DEBUG_MEM(CmiMemoryCheck());
+               DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
+               
+               CmiFree(env);
                return 0;
        }
        //message that needs to be processed in the future
 
-//     DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
+       DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
        //the message cant be processed now put it back in the out of order message Q, 
        //It will be transferred to the main queue later
        CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
@@ -1347,7 +1156,7 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
 /**
  * @brief Updates a few variables once a message has been processed.
  */
-void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
+void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
        DEBUG(char senderString[100]);
        if(obj){
                if(sender.guessPE() == CkMyPe()){
@@ -1367,7 +1176,7 @@ void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *
        Helpers for the handlers and message logging methods
 ***/
 
-void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
+void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
 //     double _startTime = CkWallTimer();
        if(env->recver.type != TypeNodeGroup){
        //This repeats a step performed in skipCldEnq for messages sent to
@@ -1388,53 +1197,6 @@ void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
 
 int calledRetryTicketRequest=0;
 
-void retryTicketRequestTimer(void *_dummy,double _time){
-               calledRetryTicketRequest=0;
-               retryTicketRequest(_dummy,_time);
-}
-
-void retryTicketRequest(void *_dummy,double curWallTime){      
-       double start = CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());
-       int length = CpvAccess(_delayedTicketRequests)->length();
-       for(int i=0;i<length;i++){
-               TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
-               if(ticketRequest){
-                       char senderString[100],recverString[100];
-                       DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
-                       DEBUG_MEM(CmiMemoryCheck());
-                       _processTicketRequest(ticketRequest);
-                 CmiFree(ticketRequest);
-                       DEBUG_MEM(CmiMemoryCheck());
-               }       
-       }       
-       for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
-               MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
-               ticketLogLocalMessage(entry);
-       }
-       int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
-//     int converse_qLength = CmiGetNonLocalLength();
-       
-//     DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
-
-/*     PingMsg pingMsg;
-       pingMsg.PE = CkMyPe();
-       CmiSetHandler(&pingMsg,_pingHandlerIdx);
-       if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
-               for(int i=0;i<CkNumPes();i++){
-                       if(i != CkMyPe()){
-                               CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
-                       }
-               }
-       }*/     
-       //TODO: change this back to 100
-       if(calledRetryTicketRequest == 0){
-               CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
-               calledRetryTicketRequest =1;
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-}
-
 void _pingHandler(CkPingMsg *msg){
        printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
        CmiFree(msg);
@@ -1468,23 +1230,31 @@ void checkpointAlarm(void *_dummy,double curWallTime){
 };
 
 void _checkpointRequestHandler(CheckpointRequest *request){
-       startMlogCheckpoint(NULL,CmiWallTimer());       
+       startMlogCheckpoint(NULL,CmiWallTimer());
 }
 
-void startMlogCheckpoint(void *_dummy,double curWallTime){
+/**
+ * @brief Starts the checkpoint phase after migration.
+ */
+void startMlogCheckpoint(void *_dummy, double curWallTime){
        double _startTime = CkWallTimer();
+
+       // increasing the checkpoint counter
        checkpointCount++;
-/*     if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
-               kill(getpid(),SIGKILL);
-       }*/
-       if(CmiNumPes() < 256 || CmiMyPe() == 0){
+       
+#if DEBUG_CHECKPOINT
+       if(CmiMyPe() == 0){
                printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
        }
-       PUP::sizer psizer;
+#endif
+
        DEBUG_MEM(CmiMemoryCheck());
 
+       PUP::sizer psizer;
        psizer | checkpointCount;
-       
+       for(int i=0; i<CmiNumPes(); i++){
+               psizer | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(psizer);
        DEBUG_MEM(CmiMemoryCheck());
        CkPupGroupData(psizer,CmiTrue);
@@ -1500,13 +1270,14 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
        chkMsg->PE = CkMyPe();
        chkMsg->dataSize = dataSize;
-
        
        char *buf = &msg[sizeof(CheckPointDataMsg)];
-       PUP::toMem pBuf(buf);   
+       PUP::toMem pBuf(buf);
 
        pBuf | checkpointCount;
-       
+       for(int i=0; i<CmiNumPes(); i++){
+               pBuf | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(pBuf);
        CkPupGroupData(pBuf,CmiTrue);
        CkPupNodeGroupData(pBuf,CmiTrue);
@@ -1521,9 +1292,19 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        */
        processedTicketLog.removeAll();
        forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
-       if(CmiNumPes() < 256 || CmiMyPe() == 0){
+
+#if DEBUG_CHECKPOINT
+       if(CmiMyPe() == 0){
                printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
        }
+#endif
+
+#if COLLECT_STATS_MEMORY
+       CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
+       msgLogSize = 0;
+       bufferedDetsSize = 0;
+       storedDetsSize = 0;
+#endif
 
        if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
                lastCompletedAlarm = curWallTime;
@@ -1532,13 +1313,18 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        traceUserBracketEvent(28,_startTime,CkWallTimer());
 };
 
-void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
-       CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
+/**
+ * @brief A chare adds the latest ticket number processed.
+ */
+void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
+       DEBUG(char objString[100]);
+
+       CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
        TProcessedLog logEntry;
        logEntry.recver = mlogData->objID;
        logEntry.tProcessed = mlogData->tProcessed;
        log->push_back(logEntry);
-       char objString[100];
+
        DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
 }
 
@@ -1547,14 +1333,14 @@ private:
        CkLocMgr *locMgr;
        PUP::er &p;
 public:
-               ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
-               void addLocation(CkLocation &loc) {
-                       CkArrayIndex idx=loc.getIndex();
-                       CkGroupID gID = locMgr->ckGetGroupID();
-                       p|gID;      // store loc mgr's GID as well for easier restore
-                       p|idx;
-                       p|loc;
-    }
+       ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
+       void addLocation(CkLocation &loc) {
+               CkArrayIndexMax idx=loc.getIndex();
+               CkGroupID gID = locMgr->ckGetGroupID();
+               p|gID;      // store loc mgr's GID as well for easier restore
+               p|idx;
+               p|loc;
+       }
 };
 
 /**
@@ -1584,7 +1370,7 @@ void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSki
        
                for (int i=0; i<numElements; i++) {
                        CkGroupID gID;
-                       CkArrayIndex idx;
+                       CkArrayIndexMax idx;
                        p|gID;
                p|idx;
                        int flag=0;
@@ -1629,13 +1415,11 @@ void writeCheckpointToDisk(int size,char *chkpt){
        unlink(fName);
 
        rename(fNameTemp,fName);
-       
 }
 
 //handler that receives the checkpoint from a processor
 //it stores it and acks it
 void _storeCheckpointHandler(char *msg){
-       
        double _startTime=CkWallTimer();
                
        CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
@@ -1656,13 +1440,6 @@ void _storeCheckpointHandler(char *msg){
        CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
        CpvAccess(_storedCheckpointData)->PE = sendingPE;
 
-#ifdef CHECKPOINT_DISK
-       //store the checkpoint on disk
-       writeCheckpointToDisk(chkMsg->dataSize,chkpt);
-       CpvAccess(_storedCheckpointData)->buf = NULL;
-       CmiFree(msg);
-#endif
-
        int count=0;
        for(int j=migratedNoticeList.size()-1;j>=0;j--){
                if(migratedNoticeList[j].fromPE == sendingPE){
@@ -1684,26 +1461,37 @@ void _storeCheckpointHandler(char *msg){
        CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
        CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
        
-       
-       
        traceUserBracketEvent(29,_startTime,CkWallTimer());
 };
 
 
+/**
+ * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.      
+ * @note The remove log request message looks like
+               |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
+ */
 void sendRemoveLogRequests(){
+#if SYNCHRONIZED_CHECKPOINT
+       CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
+#endif
        double _startTime = CkWallTimer();      
-       //send out the messages asking senders to throw away message logs below a certain ticket number
-       /*
-               The remove log request message looks like
-               |RemoveLogRequest||List of TProcessedLog|
-       */
-       int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
+
+       // computing total message size
+       int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
        char *requestMsg = (char *)CmiAlloc(totalSize);
+
+       // filling up the message
        RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
        request->PE = CkMyPe();
        request->numberObjects = processedTicketLog.size();
        char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
        memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
+       char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
+       int *numDeterminants = (int *)listDeterminants;
+       numDeterminants[0] = 0; 
+       listDeterminants = (char *)&numDeterminants[1];
+
+       // setting the handler in the message
        CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
        
        DEBUG_MEM(CmiMemoryCheck());
@@ -1716,6 +1504,7 @@ void sendRemoveLogRequests(){
        //TODO: clear ticketTable
        
        traceUserBracketEvent(30,_startTime,CkWallTimer());
+
        DEBUG_MEM(CmiMemoryCheck());
 }
 
@@ -1723,7 +1512,7 @@ void sendRemoveLogRequests(){
 void _checkpointAckHandler(CheckPointAck *ackMsg){
        DEBUG_MEM(CmiMemoryCheck());
        unAckedCheckpoint=0;
-       DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
+       DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
        DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
        if(onGoingLoadBalancing){
                onGoingLoadBalancing = 0;
@@ -1735,20 +1524,79 @@ void _checkpointAckHandler(CheckPointAck *ackMsg){
        
 };
 
+
+/**
+ * @brief Inserts all the determinants into a hash table.
+ */
+inline void populateDeterminantTable(char *data){
+       DEBUG(char recverString[100]);
+       DEBUG(char senderString[100]);
+       int numDets, *numDetsPtr;
+       Determinant *detList;
+       CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
+       SNToTicket *tickets;
+       Ticket ticket;
+
+       RemoveLogRequest *request = (RemoveLogRequest *)data;
+       TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
+       
+       numDetsPtr = (int *)&list[request->numberObjects];
+       numDets = numDetsPtr[0];
+       detList = (Determinant *)&numDetsPtr[1];
+
+       // inserting determinants into hashtable
+       for(int i=0; i<numDets; i++){
+               table = detTable.get(detList[i].sender);
+               if(table == NULL){
+                       table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
+                       detTable.put(detList[i].sender) = table;
+               }
+               tickets = table->get(detList[i].receiver);
+               if(tickets == NULL){
+                       tickets = new SNToTicket();
+                       table->put(detList[i].receiver) = tickets; 
+               }
+               ticket.TN = detList[i].TN;
+               tickets->put(detList[i].SN) = ticket;
+       }
+
+       DEBUG_MEM(CmiMemoryCheck());    
+
+}
+
 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
+       int total;
+       DEBUG(char nameString[100]);
        DEBUG_MEM(CmiMemoryCheck());
        CmiMemoryCheck();
        char *data = (char *)_data;
        RemoveLogRequest *request = (RemoveLogRequest *)data;
        TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
        CkQ<MlogEntry *> *mlog = mlogData->getMlog();
+       CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
+       SNToTicket *tickets;
+       MCount TN;
 
        int count=0;
-       for(int i=0;i<mlog->length();i++){
+       total = mlog->length();
+       for(int i=0; i<total; i++){
                MlogEntry *logEntry = mlog->deq();
+
+               // looking message in determinant table
+               table = detTable.get(logEntry->env->sender);
+               if(table != NULL){
+                       tickets = table->get(logEntry->env->recver);
+                       if(tickets != NULL){
+                               TN = tickets->get(logEntry->env->SN).TN;
+                               if(TN != 0){
+                                       logEntry->env->TN = TN;
+                               }
+                       }
+               }
+       
                int match=0;
                for(int j=0;j<request->numberObjects;j++){
-                       if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
+                       if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
                                //this log Entry should be removed
                                match = 1;
                                break;
@@ -1764,56 +1612,34 @@ void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
                }
        }
        if(count > 0){
-               char nameString[100];
                DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
        }
        DEBUG_MEM(CmiMemoryCheck());
        CmiMemoryCheck();
 }
 
+/**
+ * @brief Removes messages in the log according to the received ticket numbers
+ */
 void _removeProcessedLogHandler(char *requestMsg){
        double start = CkWallTimer();
+
+       // building map for determinants
+       populateDeterminantTable(requestMsg);
+
+       // removing processed messages from the message log
        forAllCharesDo(removeProcessedLogs,requestMsg);
+
+       // @todo: clean determinant table 
+       
        // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
        RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
        DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
        //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
-       if(request->PE == getCheckPointPE()){
-               TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
-               CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
-               CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
-               int count=0;
-/*             DEBUG(for(int j=0;j<request->numberObjects;j++){)
-               DEBUG(char nameString[100];)
-                       DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
-               DEBUG(})*/
-               for(int i=0;i<localQ->length();i++){
-                       LocalMessageLog localLogEntry = (*localQ)[i];
-                       if(!fault_aware(localLogEntry.recver)){
-                               CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
-                       }
-                       bool keep = true;
-                       for(int j=0;j<request->numberObjects;j++){                              
-                               if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
-                                       keep = false;
-                                       break;
-                               }
-                       }       
-                       if(keep){
-                               tempQ->enq(localLogEntry);
-                       }else{
-                               count++;
-                       }
-               }
-               delete localQ;
-               CpvAccess(_localMessageLog) = tempQ;
-               DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
-       }
-
        /*
                Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
        */
-       CmiMemoryCheck();
+       DEBUG_MEM(CmiMemoryCheck());
        clearUpMigratedRetainedLists(request->PE);
        
        traceUserBracketEvent(20,start,CkWallTimer());
@@ -1866,6 +1692,7 @@ void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
 
        // setting the restart flag
        _restartFlag = 1;
+       _numRestartResponses = 0;
 
        // if we are using team-based message logging, all members of the group have to be restarted
        if(teamSize > 1){
@@ -1898,6 +1725,7 @@ void _restartHandler(RestartRequest *restartMsg){
 
     // setting the restart flag
        _restartFlag = 1;
+       _numRestartResponses = 0;
 
        // flushing all buffers
        //TEST END
@@ -1986,12 +1814,20 @@ void _recvRestartCheckpointHandler(char *_restartData){
        
        PUP::fromMem pBuf(buf);
        pBuf | checkpointCount;
+       for(int i=0; i<CmiNumPes(); i++){
+               pBuf | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(pBuf);
        CkPupGroupData(pBuf,CmiFalse);
        CkPupNodeGroupData(pBuf,CmiFalse);
        pupArrayElementsSkip(pBuf,CmiFalse,NULL);
        CkAssert(pBuf.size() == restartData->checkPointSize);
        printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
+
+       // increasing the incarnation number of all the team members
+       for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
+               CpvAccess(_incarnation)[i]++;
+       }
        
        // turning off the team recovery flag
        forAllCharesDo(unsetTeamRecovery,NULL);
@@ -1999,26 +1835,6 @@ void _recvRestartCheckpointHandler(char *_restartData){
        // initializing a few variables for handling local messages
        forAllCharesDo(initializeRestart,NULL);
        
-       //store the restored local message log in a vector
-       buf = &buf[restartData->checkPointSize];        
-       for(int i=0;i<restartData->numLocalMessages;i++){
-               LocalMessageLog logEntry;
-               memcpy(&logEntry,buf,sizeof(LocalMessageLog));
-               
-               Chare *recverObj = (Chare *)logEntry.recver.getObject();
-               if(recverObj!=NULL){
-                       recverObj->mlogData->addToRestoredLocalQ(&logEntry);
-                       recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
-                       char senderString[100];
-                       char recverString[100];
-                       DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
-               }else{
-//                     DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
-               }
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-
-       forAllCharesDo(sortRestoredLocalMsgLog,NULL);
        CmiFree(_restartData);  
 
        /*HERE _initDone();
@@ -2056,7 +1872,7 @@ void _recvRestartCheckpointHandler(char *_restartData){
        
 
        /* test for parallel restart migrate away object**/
-//     if(parallelRestart){
+//     if(fastRecovery){
 //             distributeRestartedObjects();
 //             printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
 //     }
@@ -2092,22 +1908,6 @@ void CkMlogRestartLocal(){
     CkMlogRestart(NULL,NULL);
 };
 
-
-void readCheckpointFromDisk(int size,char *buf){
-       char fName[100];
-       sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
-
-       int fd = open(fName,O_RDONLY);
-       int count=0;
-       while(count < size){
-               count += read(fd,&buf[count],size-count);
-       }
-       close(fd);
-       
-};
-
-
-
 /**
  * Gets the stored checkpoint for its buddy processor.
  */
@@ -2194,7 +1994,7 @@ void _verifyAckHandler(VerifyAckMsg *verifyReply){
  */
 void sendCheckpointData(int mode){     
        RestartRequest *restartMsg = storedRequest;
-       StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
+       StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
        int numMigratedAwayElements = migratedNoticeList.size();
        if(migratedNoticeList.size() != 0){
                        printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
@@ -2204,11 +2004,9 @@ void sendCheckpointData(int mode){
        
        int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
        
-       DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
+       DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
        CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
        
-       CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
-       totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
        totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
        
        char *msg = (char *)CmiAlloc(totalSize);
@@ -2237,24 +2035,9 @@ void sendCheckpointData(int mode){
        }
        
 
-#ifdef CHECKPOINT_DISK
-       readCheckpointFromDisk(storedChkpt->bufSize,buf);
-#else  
        memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
-#endif
        buf = &buf[storedChkpt->bufSize];
 
-
-       //store localmessage Log
-       dataMsg->numLocalMessages = localMsgQ->length();
-       for(int i=0;i<localMsgQ->length();i++){
-               if(!fault_aware(((*localMsgQ)[i]).recver )){
-                       CmiAbort("Non fault aware localMsgQ");
-               }
-               memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-       
        if(mode == MLOG_RESTARTED){
                CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
                CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
@@ -2278,6 +2061,7 @@ void createObjIDList(void *data,ChareMlogData *mlogData){
        list->push_back(entry);
        DEBUG_TEAM(char objString[100]);
        DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
+       DEBUG_RECOVERY(printLog(&entry));
 }
 
 
@@ -2310,54 +2094,53 @@ void _recvCheckpointHandler(char *_restartData){
        PUP::fromMem pBuf(buf);
 
        pBuf | checkpointCount;
-
+       for(int i=0; i<CmiNumPes(); i++){
+               pBuf | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(pBuf);
        CkPupGroupData(pBuf,CmiTrue);
        CkPupNodeGroupData(pBuf,CmiTrue);
        pupArrayElementsSkip(pBuf,CmiTrue,NULL);
        CkAssert(pBuf.size() == restartData->checkPointSize);
        printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
+
+       // increases the incarnation number
+       CpvAccess(_incarnation)[CmiMyPe()]++;
        
        forAllCharesDo(initializeRestart,NULL);
        
-       //store the restored local message log in a vector
-       buf = &buf[restartData->checkPointSize];        
-       for(int i=0;i<restartData->numLocalMessages;i++){
-               LocalMessageLog logEntry;
-               memcpy(&logEntry,buf,sizeof(LocalMessageLog));
-               
-               Chare *recverObj = (Chare *)logEntry.recver.getObject();
-               if(recverObj!=NULL){
-                       recverObj->mlogData->addToRestoredLocalQ(&logEntry);
-                       recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
-                       char senderString[100];
-                       char recverString[100];
-                       DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
-               }else{
-//                     DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
-               }
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-
-       forAllCharesDo(sortRestoredLocalMsgLog,NULL);
-
        CmiFree(_restartData);
        
-       
        _initDone();
 
        getGlobalStep(globalLBID);
+
+       // sending request to send determinants
+       _numRestartResponses = 0;
        
-       countUpdateHomeAcks = 0;
-       RestartRequest updateHomeRequest;
-       updateHomeRequest.PE = CmiMyPe();
-       CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
-       for(int i=0;i<CmiNumPes();i++){
-               if(i != CmiMyPe()){
-                       CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
+       // Send out the request to resend logged determinants to all other processors
+       CkVec<TProcessedLog> objectVec;
+       forAllCharesDo(createObjIDList, (void *)&objectVec);
+       int numberObjects = objectVec.size();
+       
+       //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
+       int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
+       char *resendMsg = (char *)CmiAlloc(totalSize);  
+
+       ResendRequest *resendReq = (ResendRequest *)resendMsg;
+       resendReq->PE =CkMyPe(); 
+       resendReq->numberObjects = numberObjects;
+       char *objList = &resendMsg[sizeof(ResendRequest)];
+       memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
+
+       CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
+       for(int i=0;i<CkNumPes();i++){
+               if(i != CkMyPe()){
+                       CmiSyncSend(i,totalSize,resendMsg);
                }
        }
-
+       CmiFree(resendMsg);
+       
 }
 
 /**
@@ -2388,7 +2171,7 @@ void _updateHomeAckHandler(RestartRequest *updateHomeAck){
        memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
 
        /* test for parallel restart migrate away object**/
-       if(parallelRestart){
+       if(fastRecovery){
                distributeRestartedObjects();
                printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
        }
@@ -2402,16 +2185,17 @@ void _updateHomeAckHandler(RestartRequest *updateHomeAck){
        CpvAccess(_currentObj) = lb;
        lb->ReceiveDummyMigration(restartDecisionNumber);
 
-       sleep(10);
+//HERE sleep(10);
        
        CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
        for(int i=0;i<CkNumPes();i++){
                if(i != CkMyPe()){
                        CmiSyncSend(i,totalSize,resendMsg);
-               }       
+               }
        }
        _resendMessagesHandler(resendMsg);
        CmiFree(resendMsg);
+
 };
 
 /**
@@ -2421,8 +2205,6 @@ void initializeRestart(void *data, ChareMlogData *mlogData){
        mlogData->resendReplyRecvd = 0;
        mlogData->receivedTNs = new CkVec<MCount>;
        mlogData->restartFlag = 1;
-       mlogData->restoredLocalMsgLog.removeAll();
-       mlogData->mapTable.empty();
 };
 
 /**
@@ -2436,13 +2218,13 @@ void updateHomePE(void *data,ChareMlogData *mlogData){
        if(mlogData->objID.type == TypeArray){
                //it is an array element
                CkGroupID myGID = mlogData->objID.data.array.id;
-               CkArrayIndex myIdx =  mlogData->objID.data.array.idx;
+               CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
                CkArrayID aid(mlogData->objID.data.array.id);           
                //check if the restarted processor is the home processor for this object
                CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
                if(locMgr->homePe(myIdx) == PE){
-                       DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
-                       DEBUGRESTART(myIdx.print());
+                       DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
+                       DEBUG_RESTART(myIdx.print());
                        informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
                }
        }
@@ -2482,6 +2264,7 @@ void _updateHomeRequestHandler(RestartRequest *updateRequest){
  * @brief Fills up the ticket vector for each chare.
  */
 void fillTicketForChare(void *data, ChareMlogData *mlogData){
+       DEBUG(char name[100]);
        ResendData *resendData = (ResendData *)data;
        int PE = resendData->PE; //restarted PE
        int count=0;
@@ -2500,15 +2283,11 @@ void fillTicketForChare(void *data, ChareMlogData *mlogData){
                // traversing the resendData structure to add ticket numbers
                for(int j=0;j<resendData->numberObjects;j++){
                        if((*objID) == (resendData->listObjects)[j].recver){
-char name[100];
                                snToTicket = *(SNToTicket **)objp;
 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
                                for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
                                        ticket = snToTicket->get(snIndex);      
-                                       if(ticket.TN > resendData->maxTickets[j]){
-                                               resendData->maxTickets[j] = ticket.TN;
-                                       }
-                                       if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
+                               if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
                                                //store the TNs that have been since the recver last checkpointed
                                                resendData->ticketVecs[j].push_back(ticket.TN);
                                        }
@@ -2538,166 +2317,133 @@ void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
        mlogData->teamRecoveryFlag = 0;
 }
 
-//the data argument is of type ResendData which contains the 
-//array of objects on  the restartedProcessor
-//this method resends the messages stored in this chare's message log 
-//to the restarted processor. It also accumulates the maximum TN
-//for all the objects on the restarted processor
-void resendMessageForChare(void *data,ChareMlogData *mlogData){
-       char nameString[100];
+/**
+ * Prints a processed log.
+ */
+void printLog(TProcessedLog *log){
+       char recverString[100];
+       CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
+}
+
+/**
+ * Prints information about a message.
+ */
+void printMsg(envelope *env, const char* par){
+       char senderString[100];
+       char recverString[100];
+       CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
+}
+
+/**
+ * Prints information about a determinant.
+ */
+void printDet(Determinant *det, const char* par){
+       char senderString[100];
+       char recverString[100];
+       CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
+}
+
+/**
+ * @brief Resends all the logged messages to a particular chare list.
+ * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
+ * @param mlogData a particular chare living in this processor.
+ */
+void resendMessageForChare(void *data, ChareMlogData *mlogData){
+       DEBUG_RESTART(char nameString[100]);
+       DEBUG_RESTART(char recverString[100]);
+       DEBUG_RESTART(char senderString[100]);
+
        ResendData *resendData = (ResendData *)data;
        int PE = resendData->PE; //restarted PE
-       DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
        int count=0;
        int ticketRequests=0;
        CkQ<MlogEntry *> *log = mlogData->getMlog();
-       
+
+       DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
+
+       // traversing the message log to see if we must resend a message        
        for(int i=0;i<log->length();i++){
                MlogEntry *logEntry = (*log)[i];
                
-               // if we sent out the logs of a local message to buddy and he crashed
-               //before acking
+               // if we sent out the logs of a local message to buddy and it crashed
+               //before acknowledging 
                envelope *env = logEntry->env;
                if(env == NULL){
                        continue;
                }
-               if(logEntry->unackedLocal){
-                       char recverString[100];
-                       DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
-                       sendLocalMessageCopy(logEntry);
-               }
-               //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
-               if(env->TN <= 0){
-                       //ticket not yet replied send it out again
-                       sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,0,1);
-               }
-               
+       
+               // resend if type is not invalid        
                if(env->recver.type != TypeInvalid){
-                       int flag = 0;//marks if any of the restarted objects matched this log entry
                        for(int j=0;j<resendData->numberObjects;j++){
                                if(env->recver == (resendData->listObjects)[j].recver){
-                                       flag = 1;
-                                       //message has a valid TN
-                                       if(env->TN > 0){
-                                               //store maxTicket
-                                               if(env->TN > resendData->maxTickets[j]){
-                                                       resendData->maxTickets[j] = env->TN;
+                                       if(PE != CkMyPe()){
+                                               DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
+                                               if(env->recver.type == TypeNodeGroup){
+                                                       CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
+                                               }else{
+                                                       CmiSetHandler(env,CmiGetXHandler(env));
+                                                       CmiSyncSend(PE,env->getTotalsize(),(char *)env);
                                                }
-                                               //if the TN for this entry is more than the TN processed, send the message out
-                                               if(env->TN >= (resendData->listObjects)[j].tProcessed){
-                                                       //store the TNs that have been since the recver last checkpointed
-                                                       resendData->ticketVecs[j].push_back(env->TN);
-                                                       
-                                                       if(PE != CkMyPe()){
-                                                               if(env->recver.type == TypeNodeGroup){
-                                                                       CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
-                                                               }else{
-                                                                       CmiSetHandler(env,CmiGetXHandler(env));
-                                                                       CmiSyncSend(PE,env->getTotalsize(),(char *)env);
-                                                               }
-                                                       }else{
-                                                               envelope *copyEnv = copyEnvelope(env);
-                                                               CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
-                                                       }
-                                                       char senderString[100];
-                                                       DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
-                                                       count++;
-                                               }       
                                        }else{
-/*                                     //the message didnt get a ticket the last time and needs to start with a ticket request
-                                               DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
-                                               //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
-                                               CkAssert(logEntry->destPE != CkMyPe());
-                                               
-                                               sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
-                                               
-                                               ticketRequests++;*/
+                                               envelope *copyEnv = copyEnvelope(env);
+                                               CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
                                        }
+                                       DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
+                                       count++;
                                }
                        }//end of for loop of objects
                        
                }       
        }
-       DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
+       DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
 }
 
 /**
- * Resends the messages since the last checkpoint to the list of objects included in the 
- * request.
+ * Send all remote determinants to a particular failed PE.
+ * It only sends determinants to those objects on the list.
  */
-void _resendMessagesHandler(char *msg){
+void _sendDetsHandler(char *msg){
        ResendData d;
+       CkVec<Determinant> *detVec;
        ResendRequest *resendReq = (ResendRequest *)msg;
 
+       // CkPrintf("[%d] Sending determinants\n",CkMyPe());
+
        // building the reply message
        char *listObjects = &msg[sizeof(ResendRequest)];
        d.numberObjects = resendReq->numberObjects;
        d.PE = resendReq->PE;
        d.listObjects = (TProcessedLog *)listObjects;
-       d.maxTickets = new MCount[d.numberObjects];
        d.ticketVecs = new CkVec<MCount>[d.numberObjects];
-       for(int i=0;i<d.numberObjects;i++){
-               d.maxTickets[i] = 0;
-       }
+       detVec = new CkVec<Determinant>[d.numberObjects];
 
-       //Check if any of the retained objects need to be recreated
-       //If they have not been recreated on the restarted processor
-       //they need to be recreated on this processor
-       int count=0;
-       for(int i=0;i<retainedObjectList.size();i++){
-               if(retainedObjectList[i]->migRecord.toPE == d.PE){
-                       count++;
-                       int recreate=1;
-                       for(int j=0;j<d.numberObjects;j++){
-                               if(d.listObjects[j].recver.type != TypeArray ){
-                                       continue;
-                               }
-                               CkArrayID aid(d.listObjects[j].recver.data.array.id);           
-                               CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
-                               if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
-                                       if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx){
-                                               recreate = 0;
-                                               break;
-                                       }
-                               }
-                       }
-                       CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
-                       if(recreate){
-                               donotCountMigration=1;
-                               _receiveMlogLocationHandler(retainedObjectList[i]->msg);
-                               donotCountMigration=0;
-                               CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
-                               int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
-                               informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
-                               sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
-                               CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
-                               CmiAssert(rec->type() == CkLocRec::local);
-                               CkVec<CkMigratable *> eltList;
-                               locMgr->migratableList((CkLocRec_local *)rec,eltList);
-                               for(int j=0;j<eltList.size();j++){
-                                       if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
-                                               CpvAccess(_currentObj) = eltList[j];
-                                               eltList[j]->ResumeFromSync();
-                                       }
+       // adding the remote determinants to the resendReplyMsg
+       // traversing all the remote determinants
+       CkVec<Determinant> *vec;
+       for(int i=0; i<d.numberObjects; i++){
+               vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
+               if(vec != NULL){
+                       for(int j=0; j<vec->size(); j++){
+
+                               // only relevant determinants are to be sent
+                               if((*vec)[j].TN > d.listObjects[i].tProcessed){
+
+                                       // adding the ticket in the ticket vector
+                                       d.ticketVecs[i].push_back((*vec)[j].TN);
+
+                                       // adding the determinant in the determinant vector
+                                       detVec[i].push_back((*vec)[j]);
+
+                                       DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
                                }
-                               retainedObjectList[i]->msg=NULL;        
                        }
                }
        }
-       
-       if(count > 0){
-//             CmiAbort("retainedObjectList for restarted processor not empty");
-       }
-       
-       DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
-
 
-       //TML: examines the origin processor to determine if it belongs to the same group.
-       // In that case, it only returns the maximum ticket received for each object in the list.
-       if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
-               forAllCharesDo(fillTicketForChare,&d);
-       else
-               forAllCharesDo(resendMessageForChare,&d);
+       int totalDetStored = 0;
+       for(int i=0;i<d.numberObjects;i++){
+               totalDetStored += detVec[i].size();
+       }
 
        //send back the maximum ticket number for a message sent to each object on the 
        //restarted processor
@@ -2708,7 +2454,7 @@ void _resendMessagesHandler(char *msg){
                totalTNStored += d.ticketVecs[i].size();
        }
        
-       int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
+       int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
        char *resendReplyMsg = (char *)CmiAlloc(totalSize);
        
        ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
@@ -2728,25 +2474,60 @@ void _resendMessagesHandler(char *msg){
                ticketList = &ticketList[sizeof(int)];
                memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
                ticketList = &ticketList[sizeof(MCount)*vecsize];
-       }       
+       }
 
-       CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
-       CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
-       
-/*     
-       if(verifyAckRequestsUnacked){
-               CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
-               for(int i=0;i<verifyAckRequestsUnacked;i++){
-                       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
-                       LDObjHandle h;
-                       lb->Migrated(h,1);
-               }
+       // adding the stored remote determinants to the message
+       for(int i=0;i<d.numberObjects;i++){
+               int vecsize = detVec[i].size();
+               memcpy(ticketList,&vecsize,sizeof(int));
+               ticketList = &ticketList[sizeof(int)];
+               memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
+               ticketList = &ticketList[sizeof(Determinant)*vecsize];
        }
-       
-       verifyAckRequestsUnacked=0;*/
-       
-       delete [] d.maxTickets;
+
+       CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
+       CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
+
+       delete [] detVec;
        delete [] d.ticketVecs;
+
+       DEBUG_MEM(CmiMemoryCheck());
+
+       if(resendReq->PE != CkMyPe()){
+               CmiFree(msg);
+       }       
+//     CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
+       lastRestart = CmiWallTimer();
+
+}
+
+/**
+ * Resends messages since last checkpoint to the list of objects included in the 
+ * request. It also sends stored remote determinants to the particular failed PE.
+ */
+void _resendMessagesHandler(char *msg){
+       ResendData d;
+       ResendRequest *resendReq = (ResendRequest *)msg;
+
+       //CkPrintf("[%d] Resending messages\n",CkMyPe());
+
+       // building the reply message
+       char *listObjects = &msg[sizeof(ResendRequest)];
+       d.numberObjects = resendReq->numberObjects;
+       d.PE = resendReq->PE;
+       d.listObjects = (TProcessedLog *)listObjects;
+       
+       DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
+
+       //TML: examines the origin processor to determine if it belongs to the same group.
+       // In that case, it only returns the maximum ticket received for each object in the list.
+       if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
+               forAllCharesDo(fillTicketForChare,&d);
+       else
+               forAllCharesDo(resendMessageForChare,&d);
+
+       DEBUG_MEM(CmiMemoryCheck());
+
        if(resendReq->PE != CkMyPe()){
                CmiFree(msg);
        }       
@@ -2754,37 +2535,54 @@ void _resendMessagesHandler(char *msg){
        lastRestart = CmiWallTimer();
 }
 
+MCount maxVec(CkVec<MCount> *TNvec);
 void sortVec(CkVec<MCount> *TNvec);
 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
 
 /**
- * @brief Receives the tickets assigned to message to other objects.
+ * @brief Processes the messages in the delayed remote message queue
  */
-void _resendReplyHandler(char *msg){   
-       /**
-               need to rewrite this method to deal with parallel restart
-       */
-       ResendRequest *resendReply = (ResendRequest *)msg;
-       CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
+void processDelayedRemoteMsgQueue(){
+       DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
+       
+       while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
+               void *qMsgPtr;
+               CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
+               envelope *qEnv = (envelope *)qMsgPtr;
+               DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
+               CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
+               DEBUG_MEM(CmiMemoryCheck());
+       }
 
+}
+
+/**
+ * @brief Receives determinants stored on remote nodes.
+ * Message format: |Header|ObjID list|TN list|Determinant list|
+ * TN list = |number of TNs|list of TNs|...|
+ */
+void _sendDetsReplyHandler(char *msg){
+       ResendRequest *resendReply = (ResendRequest *)msg;
+       CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
        char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
        
-//     DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
+//     DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
        DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
-       for(int i =0; i< resendReply->numberObjects;i++){       
+       
+       for(int i =0; i< resendReply->numberObjects;i++){
                Chare *obj = (Chare *)listObjects[i].getObject();
                
                int vecsize;
                memcpy(&vecsize,listTickets,sizeof(int));
                listTickets = &listTickets[sizeof(int)];
-               MCount *listTNs = (MCount *)listTickets;        
+               MCount *listTNs = (MCount *)listTickets;
                listTickets = &listTickets[vecsize*sizeof(MCount)];
-               
+       
                if(obj != NULL){
                        //the object was restarted on the processor on which it existed
                        processReceivedTN(obj,vecsize,listTNs);
                }else{
-               //pack up objID vecsize and listTNs and send it to the correct processor
+                       //pack up objID vecsize and listTNs and send it to the correct processor
                        int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
                        char *TNMsg = (char *)CmiAlloc(totalSize);
                        ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
@@ -2792,27 +2590,142 @@ void _resendReplyHandler(char *msg){
                        receivedTNData->numTNs = vecsize;
                        char *tnList = &TNMsg[sizeof(ReceivedTNData)];
                        memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
+                       CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
+                       CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
+               }
+
+       }
+
+       // traversing all the retrieved determinants
+       for(int i = 0; i < resendReply->numberObjects; i++){
+               Chare *obj = (Chare *)listObjects[i].getObject();
+               
+               int vecsize;
+               memcpy(&vecsize,listTickets,sizeof(int));
+               listTickets = &listTickets[sizeof(int)];
+               Determinant *listDets = (Determinant *)listTickets;     
+               listTickets = &listTickets[vecsize*sizeof(Determinant)];
+       
+               if(obj != NULL){
+                       //the object was restarted on the processor on which it existed
+                       processReceivedDet(obj,vecsize,listDets);
+               } else {
+                       // pack the determinants and ship them to the other processor
+                       // pack up objID vecsize and listDets and send it to the correct processor
+                       int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
+                       char *detMsg = (char *)CmiAlloc(totalSize);
+                       ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
+                       receivedDetData->recver = listObjects[i];
+                       receivedDetData->numDets = vecsize;
+                       char *detList = &detMsg[sizeof(ReceivedDetData)];
+                       memcpy(detList,listDets,sizeof(Determinant)*vecsize);
+                       CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
+                       CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
+               }
+
+       }
+
+       // checking if we have received all replies
+       _numRestartResponses++;
+       if(_numRestartResponses != CkNumPes())
+               return;
+       else 
+               _numRestartResponses = 0;
+
+       // continuing with restart process; send out the request to resend logged messages to all other processors
+       CkVec<TProcessedLog> objectVec;
+       forAllCharesDo(createObjIDList, (void *)&objectVec);
+       int numberObjects = objectVec.size();
+       
+       //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
+       int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
+       char *resendMsg = (char *)CmiAlloc(totalSize);  
+
+       ResendRequest *resendReq = (ResendRequest *)resendMsg;
+       resendReq->PE =CkMyPe(); 
+       resendReq->numberObjects = numberObjects;
+       char *objList = &resendMsg[sizeof(ResendRequest)];
+       memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
+
+       CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
+       CpvAccess(_currentObj) = lb;
+       lb->ReceiveDummyMigration(restartDecisionNumber);
 
-                       CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
-                       CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
-               }       
+//HERE sleep(10);
+//     CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
+       
+       CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
+       for(int i=0;i<CkNumPes();i++){
+               if(i != CkMyPe()){
+                       CmiSyncSend(i,totalSize,resendMsg);
+               }
+       }
+       _resendMessagesHandler(resendMsg);
+       CmiFree(resendMsg);
+
+       /* test for parallel restart migrate away object**/
+       if(fastRecovery){
+               distributeRestartedObjects();
+               printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
        }
+
+//     processDelayedRemoteMsgQueue();
+
 };
 
+/**
+ * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
+ */
+void _receivedDetDataHandler(ReceivedDetData *msg){
+       DEBUG_NOW(char objName[100]);
+       Chare *obj = (Chare *) msg->recver.getObject();
+       if(obj){                
+               char *_msg = (char *)msg;
+               DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
+               Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
+               processReceivedDet(obj,msg->numDets,listDets);
+               CmiFree(msg);
+       }else{
+               int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
+               CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
+       }
+}
+
+/**
+ * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
+ */
 void _receivedTNDataHandler(ReceivedTNData *msg){
-       char objName[100];
+       DEBUG_NOW(char objName[100]);
        Chare *obj = (Chare *) msg->recver.getObject();
        if(obj){                
                char *_msg = (char *)msg;
-               DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
+               DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
                MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
                processReceivedTN(obj,msg->numTNs,listTNs);
+               CmiFree(msg);
        }else{
                int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
                CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
        }
 };
 
+/**
+ * @brief Processes the received list of determinants from a particular PE.
+ */
+void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
+       Determinant *det;
+
+       // traversing the whole list of determinants
+       for(int i=0; i<listSize; i++){
+               det = &listDets[i];
+               if(CkMyPe() == 4) printDet(det,"RECOVERY");
+               obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
+               DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
+       }
+       
+       DEBUG_MEM(CmiMemoryCheck());
+}
+       
 /**
  * @brief Processes the received list of tickets from a particular PE.
  */
@@ -2836,8 +2749,10 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
        //that senders know about. Those less than the ticket number processed 
        //by the receiver can be thrown away. The rest need not be consecutive
        // ie there can be holes in the list of ticket numbers seen by senders
-       if(obj->mlogData->resendReplyRecvd == CkNumPes()){
+       if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
                obj->mlogData->resendReplyRecvd = 0;
+
+#if VERIFY_DETS
                //sort the received TNS
                sortVec(obj->mlogData->receivedTNs);
        
@@ -2882,19 +2797,34 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                                obj->mlogData->currentHoles = numberHoles;
                        }
                }
-       
+#else
+               if(obj->mlogData->receivedTNs->size() > 0){
+                       obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
+               }
+#endif
                // cleaning up structures and getting ready to continue execution       
                delete obj->mlogData->receivedTNs;
                DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
                obj->mlogData->receivedTNs = NULL;
                obj->mlogData->restartFlag = 0;
 
-               DEBUGRESTART(char objString[100]);
-               DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
+               // processDelayedRemoteMsgQueue();
+
+               DEBUG_RESTART(char objString[100]);
+               DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
        }
 
 }
 
+/** @brief Returns the maximum ticket from a vector */
+MCount maxVec(CkVec<MCount> *TNvec){
+       MCount max = 0;
+       for(int i=0; i<TNvec->size(); i++){
+               if((*TNvec)[i] > max)
+                       max = (*TNvec)[i];
+       }
+       return max;
+}
 
 void sortVec(CkVec<MCount> *TNvec){
        //sort it ->its bloddy bubble sort
@@ -2964,74 +2894,139 @@ int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
        Method to do parallel restart. Distribute some of the array elements to other processors.
        The problem is that we cant use to charm entry methods to do migration as it will get
        stuck in the protocol that is going to restart
+       Note: in order to avoid interference between the objects being recovered, the current PE
+    will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
+    objects end up stepping into each other's shoes (interference).
 */
 
 class ElementDistributor: public CkLocIterator{
        CkLocMgr *locMgr;
        int *targetPE;
+
        void pupLocation(CkLocation &loc,PUP::er &p){
-               CkArrayIndex idx=loc.getIndex();
+               CkArrayIndexMax idx=loc.getIndex();
                CkGroupID gID = locMgr->ckGetGroupID();
                p|gID;      // store loc mgr's GID as well for easier restore
                p|idx;
                p|loc;
        };
-       public:
-               ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
-               void addLocation(CkLocation &loc){
-                       if(*targetPE == CkMyPe()){
-                               *targetPE = (*targetPE +1)%CkNumPes();                          
-                               return;
-                       }
-                       
-                       CkArrayIndex idx=loc.getIndex();
-                       CkLocRec_local *rec = loc.getLocalRecord();
+public:
+       ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
+
+       void addLocation(CkLocation &loc){
+
+               // leaving object on this PE
+               if(*targetPE == CkMyPe()){
+                       *targetPE = (*targetPE +1)%CkNumPes();
+                       return;
+               }
                        
-                       CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
-                       idx.print();
+               CkArrayIndexMax idx = loc.getIndex();
+               CkLocRec_local *rec = loc.getLocalRecord();
+               CkLocMgr *locMgr = loc.getManager();
+               CkVec<CkMigratable *> eltList;
                        
+               CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
+               idx.print();
 
-                       //TODO: an element that is being moved should leave some trace behind so that
-                       // the arraybroadcaster can forward messages to it
-                       
-                       //pack up this location and send it across
-                       PUP::sizer psizer;
-                       pupLocation(loc,psizer);
-                       int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
-                       char *msg = (char *)CmiAlloc(totalSize);
-                       char *buf = &msg[CmiMsgHeaderSizeBytes];
-                       PUP::toMem pmem(buf);
-                       pmem.becomeDeleting();
-                       pupLocation(loc,pmem);
+               // incrementing number of emigrant objects
+               CpvAccess(_numEmigrantRecObjs)++;
+       locMgr->migratableList((CkLocRec_local *)rec,eltList);
+               CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
+               
+               // let everybody else know the object is leaving
+               locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
+               reductionMgr->incNumEmigrantRecObjs();
+       
+               //pack up this location and send it across
+               PUP::sizer psizer;
+               pupLocation(loc,psizer);
+               int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
+               char *msg = (char *)CmiAlloc(totalSize);
+               DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
+               distributeMsg->PE = CkMyPe();
+               char *buf = &msg[sizeof(DistributeObjectMsg)];
+               PUP::toMem pmem(buf);
+               pmem.becomeDeleting();
+               pupLocation(loc,pmem);
                        
-                       locMgr->setDuringMigration(CmiTrue);                    
-                       delete rec;
-                       locMgr->setDuringMigration(CmiFalse);                   
-                       locMgr->inform(idx,*targetPE);
+               locMgr->setDuringMigration(CmiTrue);
+               delete rec;
+               locMgr->setDuringMigration(CmiFalse);
+               locMgr->inform(idx,*targetPE);
 
-                       CmiSetHandler(msg,_distributedLocationHandlerIdx);
-                       CmiSyncSendAndFree(*targetPE,totalSize,msg);
+               CmiSetHandler(msg,_distributedLocationHandlerIdx);
+               CmiSyncSendAndFree(*targetPE,totalSize,msg);
 
-                       CmiAssert(locMgr->lastKnown(idx) == *targetPE);
-                       //decide on the target processor for the next object
-                       *targetPE = (*targetPE +1)%CkNumPes();
+               CmiAssert(locMgr->lastKnown(idx) == *targetPE);
+
+               //decide on the target processor for the next object
+               *targetPE = *targetPE + 1;
+               if(*targetPE > (CkMyPe() + parallelRecovery)){
+                       *targetPE = CkMyPe() + 1;
                }
-               
+       }
+
 };
 
+/**
+ * Distributes objects to accelerate recovery after a failure.
+ */
 void distributeRestartedObjects(){
        int numGroups = CkpvAccess(_groupIDTable)->size();      
        int i;
-       int targetPE=CkMyPe();
+       int targetPE=CkMyPe()+1;
        CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
 };
 
+/**
+ * Handler to receive back a location.
+ */
+void _sendBackLocationHandler(char *receivedMsg){
+       printf("Array element received at processor %d after recovery\n",CkMyPe());
+       DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
+       int sourcePE = distributeMsg->PE;
+       char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
+       PUP::fromMem pmem(buf);
+       CkGroupID gID;
+       CkArrayIndexMax idx;
+       pmem |gID;
+       pmem |idx;
+       CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
+       donotCountMigration=1;
+       mgr->resume(idx,pmem,CmiTrue);
+       donotCountMigration=0;
+       informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
+       printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
+       idx.print();
+
+       // decrementing number of emigrant objects at reduction manager
+       CkVec<CkMigratable *> eltList;
+       CkLocRec *rec = mgr->elementRec(idx);
+       mgr->migratableList((CkLocRec_local *)rec,eltList);
+       CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
+       reductionMgr->decNumEmigrantRecObjs();
+       reductionMgr->decGCount();
+
+       // checking if it has received all emigrant recovering objects
+       CpvAccess(_numEmigrantRecObjs)--;
+       if(CpvAccess(_numEmigrantRecObjs) == 0){
+               (*resumeLbFnPtr)(centralLb);
+       }
+
+}
+
+/**
+ * Handler to update information about an object just received.
+ */
 void _distributedLocationHandler(char *receivedMsg){
        printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
-       char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
+       DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
+       int sourcePE = distributeMsg->PE;
+       char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
        PUP::fromMem pmem(buf);
        CkGroupID gID;
-       CkArrayIndex idx;
+       CkArrayIndexMax idx;
        pmem |gID;
        pmem |idx;
        CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
@@ -3044,23 +3039,33 @@ void _distributedLocationHandler(char *receivedMsg){
 
        CkLocRec *rec = mgr->elementRec(idx);
        CmiAssert(rec->type() == CkLocRec::local);
+
+       // adding object to the list of immigrant recovery objects
+       CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
+       CpvAccess(_numImmigrantRecObjs)++;
        
        CkVec<CkMigratable *> eltList;
        mgr->migratableList((CkLocRec_local *)rec,eltList);
        for(int i=0;i<eltList.size();i++){
                if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
                        CpvAccess(_currentObj) = eltList[i];
+                       eltList[i]->mlogData->immigrantRecFlag = 1;
+                       eltList[i]->mlogData->immigrantSourcePE = sourcePE;
+
+                       // incrementing immigrant counter at reduction manager
+                       CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
+                       reductionMgr->incNumImmigrantRecObjs();
+                       reductionMgr->decGCount();
+
                        eltList[i]->ResumeFromSync();
                }
        }
-       
-       
 }
 
 
 /** this method is used to send messages to a restarted processor to tell
  * it that a particular expected object is not going to get to it */
-void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndex &idx,int locationPE){
+void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
        DummyMigrationMsg buf;
        buf.flag = MLOG_OBJECT;
        buf.lbID = lbID;
@@ -3096,12 +3101,12 @@ void sendDummyMigrationCounts(int *dummyCounts){
 void _dummyMigrationHandler(DummyMigrationMsg *msg){
        CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
        if(msg->flag == MLOG_OBJECT){
-               DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
+               DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
                LDObjHandle h;
                lb->Migrated(h,1);
        }
        if(msg->flag == MLOG_COUNT){
-               DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
+               DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
                msg->count -= verifyAckedRequests;
                for(int i=0;i<msg->count;i++){
                        LDObjHandle h;
@@ -3144,7 +3149,7 @@ public:
 /**
  * Map function pointed by fnPointer over all the chares living in this processor.
  */
-void forAllCharesDo(MlogFn fnPointer,void *data){
+void forAllCharesDo(MlogFn fnPointer, void *data){
        int numGroups = CkpvAccess(_groupIDTable)->size();
        for(int i=0;i<numGroups;i++){
                Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
@@ -3164,6 +3169,11 @@ void forAllCharesDo(MlogFn fnPointer,void *data){
  Load Balancing
 ******************************************************************/
 
+/**
+ * This is the first time Converse is called after AtSync method has been called by every local object.
+ * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
+ * message logging, we can take advantage of this situation and garbage collect at this point.
+ */
 void initMlogLBStep(CkGroupID gid){
        DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
        countLBMigratedAway = 0;
@@ -3175,18 +3185,120 @@ void initMlogLBStep(CkGroupID gid){
                CmiAssert(globalLBID.idx == gid.idx);
        }
        globalLBID = gid;
+#if SYNCHRONIZED_CHECKPOINT
+       garbageCollectMlog();
+#endif
+}
+
+/**
+ * Pups a location
+ */
+void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
+       CkArrayIndexMax idx = loc->getIndex();
+       CkGroupID gID = locMgr->ckGetGroupID();
+       p|gID;      // store loc mgr's GID as well for easier restore
+       p|idx;
+       p|*loc;
+};
+
+/**
+ * Sends back the immigrant recovering object to their origin PE.
+ */
+void sendBackImmigrantRecObjs(){
+       CkLocation *loc;
+       CkLocMgr *locMgr;
+       CkArrayIndexMax idx;
+       CkLocRec_local *rec;
+       PUP::sizer psizer;
+       int targetPE;
+       CkVec<CkMigratable *> eltList;
+       CkReductionMgr *reductionMgr;
+       // looping through all elements in immigrant recovery objects vector
+       for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
+
+               // getting the components of each location
+               loc = (*CpvAccess(_immigrantRecObjs))[i];
+               idx = loc->getIndex();
+               rec = loc->getLocalRecord();
+               locMgr = loc->getManager();
+       locMgr->migratableList((CkLocRec_local *)rec,eltList);
+               targetPE = eltList[i]->mlogData->immigrantSourcePE;
+
+               // decrement counter at array manager
+               reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
+               reductionMgr->decNumImmigrantRecObjs();
+
+               CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
+               idx.print();
+
+               // let everybody else know the object is leaving
+               locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
+                       
+               //pack up this location and send it across
+               pupLocation(loc,locMgr,psizer);
+               int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
+               char *msg = (char *)CmiAlloc(totalSize);
+               DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
+               distributeMsg->PE = CkMyPe();
+               char *buf = &msg[sizeof(DistributeObjectMsg)];
+               PUP::toMem pmem(buf);
+               pmem.becomeDeleting();
+               pupLocation(loc,locMgr,pmem);
+               
+               locMgr->setDuringMigration(CmiTrue);
+               delete rec;
+               locMgr->setDuringMigration(CmiFalse);
+               locMgr->inform(idx,targetPE);
+
+               // sending the object
+               CmiSetHandler(msg,_sendBackLocationHandlerIdx);
+               CmiSyncSendAndFree(targetPE,totalSize,msg);
+
+               // freeing memory
+               delete loc;
+
+               CmiAssert(locMgr->lastKnown(idx) == targetPE);
+               
+       }
+
+       // cleaning up all data structures
+       CpvAccess(_immigrantRecObjs)->removeAll();
+       CpvAccess(_numImmigrantRecObjs) = 0;
+
+}
+
+/**
+ * Restores objects after parallel recovery, either by sending back the immigrant objects or 
+ * by waiting for all emigrant objects to be back.
+ */
+void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
+       resumeLbFnPtr = _fnPtr;
+       centralLb = _centralLb;
+
+       // sending back the immigrant recovering objects
+       if(CpvAccess(_numImmigrantRecObjs) > 0){
+               sendBackImmigrantRecObjs();     
+       }
+
+       // checking whether it needs to wait for emigrant recovery objects
+       if(CpvAccess(_numEmigrantRecObjs) > 0)
+               return;
+
+       // otherwise, load balancing process is finished
+       (*resumeLbFnPtr)(centralLb);
 }
 
 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
        DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
        DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
-       
+
        resumeLbFnPtr = _fnPtr;
        centralLb = _centralLb;
        migrationDoneCalled = 1;
        if(countLBToMigrate == countLBMigratedAway){
                DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
-               startMlogCheckpoint(NULL,CmiWallTimer());       
+               startMlogCheckpoint(NULL,CmiWallTimer());
        }
 };
 
@@ -3202,11 +3314,11 @@ void finishedCheckpointLoadBalancing(){
 };
 
 
-void sendMlogLocation(int targetPE,envelope *env){
+void sendMlogLocation(int targetPE, envelope *env){
+#if !SYNCHRONIZED_CHECKPOINT
        void *_msg = EnvToUsr(env);
        CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
 
-
        int existing = 0;
        //if this object is already in the retainedobjectlust destined for this
        //processor it should not be sent
@@ -3224,7 +3336,6 @@ void sendMlogLocation(int targetPE,envelope *env){
                return;
        }
        
-       
        countLBToMigrate++;
        
        MigrationNotice migMsg;
@@ -3251,7 +3362,7 @@ void sendMlogLocation(int targetPE,envelope *env){
        CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
        
        DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
-
+#endif
 }
 
 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
@@ -3309,6 +3420,9 @@ void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
        }*/
 }
 
+/**
+ * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
+ */
 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
        if(checkpointBarrierCount == CmiNumPes()){
                CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
@@ -3318,6 +3432,9 @@ inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
        }
 }
 
+/**
+ * @brief Processor 0 receives a contribution from every other processor after checkpoint.
+ */ 
 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
        DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
        if(msg->checkpointCount == checkpointCount){
@@ -3332,23 +3449,80 @@ void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
                        CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
                }
        }
+
+       // deleting the received message
        CmiFree(msg);
 }
 
 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
        DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
        DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
+
+#if !SYNCHRONIZED_CHECKPOINT
+       // sending a notice to all senders to remove message logs
        sendRemoveLogRequests();
+#endif
+
+       // resuming LB function pointer
        (*resumeLbFnPtr)(centralLb);
+
+       // deleting message
        CmiFree(msg);
 }
 
+/**
+ * @brief Function to remove all messages in the message log of a particular chare.
+ */
+void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
+       int total;
+       MlogEntry *logEntry;
+       CkQ<MlogEntry *> *mlog = mlogData->getMlog();
+
+       // traversing the whole message log and removing all elements
+       total = mlog->length();
+       for(int i=0; i<total; i++){
+               logEntry = mlog->deq();
+               delete logEntry;
+       }
+
+}
+
+/**
+ * @brief Garbage collects the message log and other data structures.
+ * In case of synchronized checkpoint, we use an optimization to avoid causal message logging protocol
+ * to communicate all determinants to the rest of the processors.
+ */
+void garbageCollectMlog(){
+       CkHashtableIterator *iterator;
+       CkVec<Determinant> *detArray;
+
+       DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
+
+       // cleaning up the buffered determinants, since they belong to a previous checkpoint period
+       _indexBufferedDets = 0;
+       _numBufferedDets = 0;
+       _phaseBufferedDets++;
+
+       // cleaning up remote determinants, since they belong to a previous checkpoint period
+       iterator = CpvAccess(_remoteDets)->iterator();
+       while(iterator->hasNext()){
+               detArray = *(CkVec<Determinant> **)iterator->next();
+               detArray->removeAll();
+       }
+       
+       // deleting the iterator
+       delete iterator;
+
+       // removing all messages in message log for every chare
+       forAllCharesDo(garbageCollectMlogForChare, NULL);
+}
+
 /**
        method that informs an array elements home processor of its current location
        It is a converse method to bypass the charm++ message logging framework
 */
 
-void informLocationHome(CkGroupID locMgrID,CkArrayIndex idx,int homePE,int currentPE){
+void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
        double _startTime = CmiWallTimer();
        CurrentLocationMsg msg;
        msg.mgrID = locMgrID;
@@ -3413,30 +3587,64 @@ void _getGlobalStepHandler(LBStepMsg *msg){
        CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
 };
 
+/**
+ * @brief Receives the global step handler from PE 0
+ */
 void _recvGlobalStepHandler(LBStepMsg *msg){
        
-       restartDecisionNumber=msg->step;
-       RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
-       _updateHomeAckHandler(dummyAck);
+       // updating restart decision number
+       restartDecisionNumber = msg->step;
+       CmiFree(msg);
+
+       CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
+
+       // sending a dummy message to sendDetsReplyHandler
+       ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
+       resendReplyMsg->PE = CkMyPe();
+       resendReplyMsg->numberObjects = 0;
+       _sendDetsReplyHandler((char *)resendReplyMsg);
 };
 
 /**
  * @brief Function to wrap up performance information.
  */
 void _messageLoggingExit(){
-/*     if(CkMyPe() == 0){
-               if(countBuffered != 0){
-                       printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
-               }
+       
+       // printing the signature for causal message logging
+       if(CkMyPe() == 0)
+               printf("[%d] FastMessageLoggingExit \n",CmiMyPe());
+
+       //TML: printing some statistics for group approach
+#if COLLECT_STATS_TEAM
+       printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
+       printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
+#endif
 
-//             printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
+#if COLLECT_STATS_MSGS
+#if COLLECT_STATS_MSGS_TOTAL
+       printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
+       printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
+#else
+       printf("[%d] TARGETS: ",CmiMyPe());
+       for(int i=0; i<CmiNumPes(); i++){
+#if COLLECT_STATS_MSG_COUNT
+               printf("%d ",numMsgsTarget[i]);
+#else
+               printf("%d ",sizeMsgsTarget[i]);
+#endif
        }
-       printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
-       printf("[%d] _messageLoggingExit \n",CmiMyPe());
+       printf("\n");
+#endif
+#endif
 
-       //TML: printing some statistics for group approach
-       //if(teamSize > 1)
-               CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
+#if COLLECT_STATS_DETS
+       printf("\n");
+       printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
+       printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
+#if COLLECT_STATS_DETS_DUP
+       printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
+#endif
+#endif
 
 }
 
@@ -3474,7 +3682,7 @@ void* CkObjID::getObject(){
        
                                        if(aid.ckLocalBranch() == NULL){ return NULL;}
        
-                                       CProxyElement_ArrayBase aProxy(aid,data.array.idx);
+                                       CProxyElement_ArrayBase aProxy(aid,data.array.idx.asChild());
        
                                        return aProxy.ckLocal();
                                }
@@ -3498,7 +3706,7 @@ int CkObjID::guessPE(){
                                        if(aid.ckLocalBranch() == NULL){
                                                return -1;
                                        }
-                                       return aid.ckLocalBranch()->lastKnown(data.array.idx);
+                                       return aid.ckLocalBranch()->lastKnown(data.array.idx.asChild());
                                }
                        default:
                                CkAssert(0);
@@ -3522,7 +3730,7 @@ char *CkObjID::toString(char *buf) const {
                        break;
                case TypeArray:
                        {
-                               const CkArrayIndex &idx = data.array.idx;
+                               const CkArrayIndexMax &idx = data.array.idx.asChild();
                                const int *indexData = idx.data();
                                sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
                                break;
@@ -3548,14 +3756,14 @@ void CkObjID::updatePosition(int PE){
                                                char str[100];
                                                CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
 //                                             CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
-                                               CkLocRec *rec = mgr->elementNrec(data.array.idx);
+                                               CkLocRec *rec = mgr->elementNrec(data.array.idx.asChild());
                                                if(rec != NULL){
                                                        if(rec->type() == CkLocRec::local){
                                                                CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
                                                                return;
                                                        }
                                                }
-                                               mgr->inform(data.array.idx,PE);
+                                               mgr->inform(data.array.idx.asChild(),PE);
                                        }       
                                }
 
@@ -3578,7 +3786,6 @@ void CkObjID::updatePosition(int PE){
 void MlogEntry::pup(PUP::er &p){
        p | destPE;
        p | _infoIdx;
-       p | unackedLocal;
        int size;
        if(!p.isUnpacking()){
 /*             CkAssert(env);
@@ -3602,10 +3809,6 @@ void MlogEntry::pup(PUP::er &p){
        }
        if(size > 0){
                p((char *)env,size);
-       
-               if(p.isUnpacking()){
-                       env->localMlogEntry = NULL;
-               }
        }
 };
 
@@ -3619,19 +3822,12 @@ void RestoredLocalMap::pup(PUP::er &p){
        p(TNArray,count);
 };
 
-
-
-
 /**********************************
        * The methods of the message logging
        * data structure stored in each chare
        ********************************/
 
 MCount ChareMlogData::nextSN(const CkObjID &recver){
-/*     MCount SN = snTable.get(recver);
-       snTable.put(recver) = SN+1;
-       return SN+1;*/
-       double _startTime = CmiWallTimer();
        MCount *SN = snTable.getPointer(recver);
        if(SN==NULL){
                snTable.put(recver) = 1;
@@ -3640,10 +3836,11 @@ MCount ChareMlogData::nextSN(const CkObjID &recver){
                (*SN)++;
                return *SN;
        }
-//     traceUserBracketEvent(34,_startTime,CkWallTimer());
 };
-
-
+/**
+ * @brief Gets a new ticket for a particular object.
+ */
 MCount ChareMlogData::newTN(){
        MCount TN;
        if(currentHoles > 0){
@@ -3656,10 +3853,23 @@ MCount ChareMlogData::newTN(){
                }
        }else{
                TN = ++tCount;
-       }       
+       }
        return TN;
 };
 
+/**
+ * @brief Get the ticket associated with a combination of sender and SN, if any.
+ */
+inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
+       Ticket ticket;
+
+       SNToTicket *ticketRow = ticketTable.get(sender);
+       if(ticketRow != NULL){
+               return ticketRow->get(SN);
+       }
+       return ticket;
+}
+
 /**
  * Inserts a ticket in the ticketTable if it is not already there.
  */
@@ -3684,10 +3894,10 @@ inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
 /**
  * Generates the next ticket for a request.
  */
-inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
+inline Ticket ChareMlogData::next_ticket(CkObjID &sender, MCount SN){
        DEBUG(char senderName[100];)
        DEBUG(char recverName[100];)
-       double _startTime =CmiWallTimer();
+       double _startTime = CmiWallTimer();
        Ticket ticket;
 
        // if a ticket is requested during restart, 0 is returned to make the requester to ask for it later.
@@ -3695,17 +3905,7 @@ inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
                ticket.TN = 0;
                return ticket;
        }
-/*     SNToTicket &ticketRow = ticketTable.put(sender);
-       Ticket earlierTicket = ticketRow.get(SN);
-       if(earlierTicket.TN == 0){
-               //This SN has not been ever alloted a ticket
-               ticket.TN = newTN();
-               ticketRow.put(SN)=ticket;
-       }else{
-               ticket.TN = earlierTicket.TN;
-       }*/
        
-
        SNToTicket *ticketRow = ticketTable.get(sender);
        if(ticketRow != NULL){
                Ticket earlierTicket = ticketRow->get(SN);
@@ -3728,12 +3928,8 @@ inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
                ticketTable.put(sender) = newRow;
                DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
        }
-/*TODO: check if the message for this SN has already been received
-       in the table of received SNs 
-       If it was received before the last checkpoint mark it as old
-       other wise received
-       */
        ticket.state = NEW_TICKET;
+
 //     traceUserBracketEvent(34,_startTime,CkWallTimer());
        return ticket;  
 };
@@ -3760,15 +3956,6 @@ double totalSearchRestoredCount=0;
 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
        double start= CkWallTimer();
        MCount TN=0;    
-       if(mapTable.numObjects() > 0){
-               RestoredLocalMap *map = mapTable.get(sender);
-               if(map){
-                       int index = SN - map->minSN;
-                       if(index < map->count){
-                               TN = map->TNArray[index];
-                       }
-               }
-       }
        
        DEBUG(char senderName[100]);
        DEBUG(char recverName[100]);
@@ -3779,52 +3966,6 @@ MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCoun
        return TN;
 }
 
-void ChareMlogData::addToRestoredLocalQ(LocalMessageLog *logEntry){
-       restoredLocalMsgLog.push_back(*logEntry);
-}
-
-void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData){
-       mlogData->sortRestoredLocalMsgLog();
-}
-
-void ChareMlogData::sortRestoredLocalMsgLog(){
-       //sort it ->its bloddy bubble sort
-       
-       for(int i=0;i<restoredLocalMsgLog.size();i++){
-               LocalMessageLog &logEntry = restoredLocalMsgLog[i];
-               RestoredLocalMap *map = mapTable.get(logEntry.sender);
-               if(map == NULL){
-                       map = new RestoredLocalMap;
-                       mapTable.put(logEntry.sender)=map;
-               }
-               map->count++;
-               if(map->minSN == 0){
-                       map->minSN = logEntry.SN;
-               }else{
-                       if(logEntry.SN < map->minSN){
-                               map->minSN = logEntry.SN;
-                       }
-               }
-               if(logEntry.SN > map->maxSN){
-                       map->maxSN = logEntry.SN;
-               }
-
-       }
-       for(int i=0;i< restoredLocalMsgLog.size();i++){
-               LocalMessageLog &logEntry = restoredLocalMsgLog[i];
-               RestoredLocalMap *map = mapTable.get(logEntry.sender);
-               CkAssert(map != NULL);
-               if(map->TNArray == NULL){
-                       map->TNArray = new MCount[map->maxSN-map->minSN+1];                     
-                       CkAssert(map->count == map->maxSN-map->minSN+1);
-                       map->count = 0;
-               }
-               map->TNArray[map->count] = logEntry.TN;
-               map->count++;
-       }
-       restoredLocalMsgLog.free();
-}
-
 /**
  * Pup method for the metadata.
  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
@@ -3882,7 +4023,6 @@ void ChareMlogData::pup(PUP::er &p){
                }
        }
        
-       
        p | currentHoles;
        p | numberHoles;
        if(p.isUnpacking()){
@@ -3898,31 +4038,6 @@ void ChareMlogData::pup(PUP::er &p){
        
        snTable.pup(p);
 
-       // pupping only the unacked local messages in the message log
-       int length = 0;
-       MlogEntry *entry;
-       if(!p.isUnpacking()){
-               for(int i=0; i<mlog.length(); i++){
-                       entry = mlog[i];
-                       if(entry->unackedLocal)
-                               length++;
-               }
-       }
-       p | length;
-       if(p.isUnpacking()){
-               for(int i=0; i<length; i++){
-                       entry = new MlogEntry();
-                       mlog.enq(entry);
-                       entry->pup(p);
-               }
-       }else{
-               for(int i=0; i<mlog.length(); i++){
-                       entry = mlog[i];
-                       if(entry->unackedLocal){
-                               entry->pup(p);
-                       }
-               }
-       }
 
 /*     int length;
        if(!p.isUnpacking()){           
@@ -3942,12 +4057,11 @@ void ChareMlogData::pup(PUP::er &p){
                entry->pup(p);
        }*/
        
-       p | restoredLocalMsgLog;
        p | resendReplyRecvd;
        p | restartFlag;
 
        // pup the mapTable
-       int tableSize;
+/*     int tableSize;
        if(!p.isUnpacking()){
                tableSize = mapTable.numObjects();
        }
@@ -3970,7 +4084,7 @@ void ChareMlogData::pup(PUP::er &p){
                        map->pup(p);
                        mapTable.put(objID) = map;
                }
-       }
+       }*/
 
        //pup the ticketTable
        {
@@ -4020,13 +4134,11 @@ void ChareMlogData::pup(PUP::er &p){
 
 /**
  * Getting the pe number of the current processor's buddy.
+ * In the team-based approach each processor might checkpoint in the next team, but currently
+ * teams are only meant to reduce memory overhead.
  */
 int getCheckPointPE(){
-       //TML: assigning a team-based buddy
-       if(teamSize != 1){
-               return (CmiMyPe() + teamSize) % CmiNumPes();
-       }
-       return (CmiNumPes() -1 - CmiMyPe());
+       return (CmiMyPe() + 1) % CmiNumPes();
 }
 
 //assume it is a packed envelope
@@ -4036,4 +4148,9 @@ envelope *copyEnvelope(envelope *env){
        return newEnv;
 }
 
+/* Checks if two determinants are the same */
+inline int isSameDet(Determinant *first, Determinant *second){
+       return first->sender == second->sender && first->receiver == second->receiver && first->SN == second->SN && first->TN == second->TN;
+}
+
 #endif