Adding support for causal message logging.
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 29 Mar 2012 22:08:32 +0000 (17:08 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 29 Mar 2012 22:08:32 +0000 (17:08 -0500)
15 files changed:
src/arch/mpi/conv-mach-causalft.h [new file with mode: 0644]
src/ck-core/charm++.h
src/ck-core/ck.C
src/ck-core/ckarray.C
src/ck-core/ckarray.ci
src/ck-core/ckarray.h
src/ck-core/ckcausalmlog.C
src/ck-core/ckcausalmlog.h
src/ck-core/cklocation.C
src/ck-core/ckreduction.C
src/ck-core/ckreduction.h
src/ck-core/envelope.h
src/ck-core/init.C
src/scripts/Makefile
tests/Makefile

diff --git a/src/arch/mpi/conv-mach-causalft.h b/src/arch/mpi/conv-mach-causalft.h
new file mode 100644 (file)
index 0000000..ca28aeb
--- /dev/null
@@ -0,0 +1,3 @@
+#define __FAULT__       1
+#define _FAULT_CAUSAL_      1
+#define CMK_CHARE_USE_PTR   1
index f3ff6d784015f69bc8734a706097e80dfbb5a6a1..05499e48f0176300ec36f16a9cde9ae8b3cd7bc6 100644 (file)
@@ -1006,9 +1006,13 @@ if(CpvAccess(networkProgressCount) >=  p)  \
 #endif
 
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if defined(_FAULT_MLOG_) 
 #include "ckmessagelogging.h"
 #endif
+#if defined(_FAULT_CAUSAL_)
+#include "ckcausalmlog.h"
+#endif
+
 #include "ckmemcheckpoint.h"
 #include "readonly.h"
 #include "ckarray.h"
index d9e89fd18bf22e1c2a47ddf614f46e930eaf9400..a03fee8a3b2e2afb09756bf4dd89df33bf55bf8e 100644 (file)
@@ -1664,8 +1664,10 @@ static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
 {
   int numPes;
   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if defined(_FAULT_MLOG_) 
   sendTicketGroupRequest(env,pe,_infoIdx);
+#elif defined(_FAULT_CAUSAL_)
+       sendGroupMsg(env,pe,_infoIdx);
 #else
   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
   _TRACE_CREATION_N(env, numPes);
@@ -1812,8 +1814,10 @@ static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
 {
   int numPes;
   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if defined(_FAULT_MLOG_)
         sendTicketNodeGroupRequest(env,node,_infoIdx);
+#elif defined(_FAULT_CAUSAL_)
+       sendNodeGroupMsg(env,node,_infoIdx);
 #else
   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
   _TRACE_CREATION_N(env, numPes);
@@ -1991,8 +1995,10 @@ extern "C"
 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
   register envelope *env = UsrToEnv(msg);
   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if defined(_FAULT_MLOG_)
    sendTicketArrayRequest(env,pe,_infoIdx);
+#elif defined(_FAULT_CAUSAL_)
+       sendArrayMsg(env,pe,_infoIdx);
 #else
   if (opts & CK_MSG_IMMEDIATE)
     CmiBecomeImmediate(env);
index 92e20f4fe78646e812c1d39cfff0f7f6b95df223..65dd8a1eb64c85b9cdeb1d21e712db1882088d04 100644 (file)
@@ -753,6 +753,40 @@ CkArray::CkArray(CkArrayOptions &opts,
   //nodeProxy = new CProxy_CkArrayReductionMgr (nodereductionID);
 #endif
 
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+       // creating the spanning tree to be used for broadcast
+       children = (int *) CmiAlloc(sizeof(int) * _MLOG_BCAST_BFACTOR_);
+       numChildren = 0;
+       
+       // computing the level of the tree this pe is in
+       // we should use the geometric series formula, but now a quick and dirty code should suffice
+       // PE 0 is at level 0, PEs 1.._MLOG_BCAST_BFACTOR_ are at level 1 and so on
+       int level = 0;
+       int aux = CmiMyPe();
+       int max = CmiNumPes();
+       int factor = _MLOG_BCAST_BFACTOR_;
+       int startLevel = 0;
+       int startNextLevel = 1;
+       while(aux >= 0){
+               level++;
+               startLevel = startNextLevel;
+               startNextLevel += factor;
+               aux -= factor;
+               factor *= _MLOG_BCAST_BFACTOR_;
+       }
+
+       // adding children to the tree
+       int first = startNextLevel + (CmiMyPe() - startLevel) * _MLOG_BCAST_BFACTOR_;
+       for(int i=0; i<_MLOG_BCAST_BFACTOR_; i++){
+               if(first + i >= CmiNumPes())
+                       break;
+               children[i] = first + i;
+               numChildren++;
+       }
+#endif
+
+
   if (opts.reductionClient.type != CkCallback::invalid && CkMyPe() == 0)
       ckSetReductionClient(&opts.reductionClient);
 }
@@ -1242,8 +1276,19 @@ void CkArray::sendBroadcast(CkMessage *msg)
 {
        CK_MAGICNUMBER_CHECK
        if(CkMyPe() == CpvAccess(serializer)){
+#if _MLOG_BCAST_TREE_
+               // Using the spanning tree to broadcast the message
+               for(int i=0; i<numChildren; i++){
+                       CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
+                       thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
+               }
+       
+               // delivering message locally
+               recvBroadcast(msg);     
+#else
                //Broadcast the message to all processors
                thisProxy.recvBroadcast(msg);
+#endif
        }else{
                thisProxy[CpvAccess(serializer)].sendBroadcast(msg);
        }
@@ -1258,6 +1303,21 @@ void CkArray::sendExpeditedBroadcast(CkMessage *msg)
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
 int _tempBroadcastCount=0;
 
+// Delivers a message using the spanning tree
+void CkArray::recvBroadcastViaTree(CkMessage *msg)
+{
+       CK_MAGICNUMBER_CHECK
+
+       // Using the spanning tree to broadcast the message
+       for(int i=0; i<numChildren; i++){
+               CkMessage *copyMsg = (CkMessage *) CkCopyMsg((void **)&msg);
+               thisProxy[children[i]].recvBroadcastViaTree(copyMsg);
+       }
+
+       // delivering message locally
+       recvBroadcast(msg);     
+}
+
 void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index){
     if(homePe(*index)==CmiMyPe()){
         CkArrayMessage *bcast = (CkArrayMessage *)data;
@@ -1285,6 +1345,9 @@ void CkArray::broadcastHomeElements(void *data,CkLocRec *rec,CkArrayIndex *index
 void CkArray::staticBroadcastHomeElements(CkArray *arr,void *data,CkLocRec *rec,CkArrayIndex *index){
     arr->broadcastHomeElements(data,rec,index);
 }
+#else
+void CkArray::recvBroadcastViaTree(CkMessage *msg){
+}
 #endif
 
 
@@ -1338,8 +1401,13 @@ void CkArray::recvBroadcast(CkMessage *m)
 
        // CkArrayBroadcaster doesn't have msg buffered, and there was
        // no last delivery to transfer ownership
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+       if (stableLocations)
+         delete msg;
+#else
        if (stableLocations && len == 0)
          delete msg;
+#endif
 }
 
 #include "CkArray.def.h"
index 2b6306451a2262c0484588cd8a925b2c9e7d1d85..3b14a9450bb7d2a6ed227fb91516985a5df421fa 100644 (file)
@@ -13,6 +13,7 @@ module CkArray {
        //Broadcast
        entry void sendBroadcast(CkMessage *);
        entry void recvBroadcast(CkMessage *);
+       entry void recvBroadcastViaTree(CkMessage *);
        entry [expedited] void sendExpeditedBroadcast(CkMessage *);
        entry [expedited] void recvExpeditedBroadcast(CkMessage *);
   };
index 2d3ab6aacdd821664ee323b5f5056024df61270c..7aba76112d58937bb9b9cb0eb6143c919712996f 100644 (file)
@@ -35,6 +35,10 @@ Orion Sky Lawlor, olawlor@acm.org
 extern void _registerCkArray(void);
 CpvExtern (int ,serializer);
 
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#define _MLOG_BCAST_TREE_ 1
+#define _MLOG_BCAST_BFACTOR_ 8
+#endif
 
 /** This flag is true when in the system there is anytime migration, false when
  *  the user code guarantees that no migration happens except during load balancing
@@ -642,6 +646,10 @@ class CkArray : public CkReductionMgr, public CkArrMgr {
   CProxy_CkArray thisProxy;
   typedef CkMigratableListT<ArrayElement> ArrayElementList;
   ArrayElementList *elements;
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+    int *children;
+    int numChildren;
+#endif
 private:
   bool stableLocations;
 
@@ -694,6 +702,7 @@ public:
   void recvBroadcast(CkMessage *msg);
   void sendExpeditedBroadcast(CkMessage *msg);
   void recvExpeditedBroadcast(CkMessage *msg) { recvBroadcast(msg); }
+  void recvBroadcastViaTree(CkMessage *msg);
 
   void pup(PUP::er &p);
   void ckJustMigrated(void){ doneInserting(); }
index fbfb930a4a0384356c6c6d382350642a084908c7..3a34c31de61e3593b4e654b7a0f3ad845adea012 100644 (file)
@@ -1,11 +1,13 @@
 /**
- * Message Logging Fault Tolerance Protocol
- * It includes the main functions for the basic and team-based schemes.
- */
+  * Simple Causal Message Logging Fault Tolerance Protocol.
+  * Features:
+       * Reduces the latency overhead of the pessimistic approach.
+       * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
+  */
 
 #include "charm.h"
 #include "ck.h"
-#include "ckmessagelogging.h"
+#include "ckcausalmlog.h"
 #include "queueing.h"
 #include <sys/types.h>
 #include <signal.h>
 
 #ifdef _FAULT_CAUSAL_
 
-//#define DEBUG(x)  if(_restartFlag) {x;}
-#define DEBUG_MEM(x) //x
-#define DEBUG(x)  //x
-#define DEBUGRESTART(x)  //x
-#define DEBUGLB(x) // x
+// Collects some statistics about message logging. Beware of the high cost of accounting for
+// duplicated determinants (for every determinant received, it consists in a linear search 
+// through a potentially big list).
+#define COLLECT_STATS_MSGS 0
+#define COLLECT_STATS_MSGS_TOTAL 0
+#define COLLECT_STATS_MSG_COUNT 0
+#define COLLECT_STATS_DETS 0
+#define COLLECT_STATS_DETS_DUP 0
+#define COLLECT_STATS_MEMORY 0
+#define COLLECT_STATS_TEAM 0
+
+#define RECOVERY_SEND "SEND"
+#define RECOVERY_PROCESS "PROCESS"
+
+#define DEBUG_MEM(x)  //x
+#define DEBUG(x) // x
+#define DEBUG_RESTART(x) // x
+#define DEBUGLB(x)   // x
 #define DEBUG_TEAM(x)  // x
-
-#define BUFFERED_LOCAL
-#define BUFFERED_REMOTE 
+#define DEBUG_PERF(x) // x
+#define DEBUG_CHECKPOINT 1
+#define DEBUG_NOW(x) x
+#define DEBUG_PE(x,y) if(CkMyPe() == x) y
+#define DEBUG_RECOVERY(x) //x
 
 extern const char *idx2str(const CkArrayIndex &ind);
 extern const char *idx2str(const ArrayElement *el);
-const char *idx2str(const CkArrayIndex &ind){
-       return idx2str((const CkArrayIndex &)ind);
-};
 
 void getGlobalStep(CkGroupID gID);
 
@@ -36,20 +50,15 @@ void sendCheckpointData(int mode);
 void createObjIDList(void *data,ChareMlogData *mlogData);
 inline bool isLocal(int destPE);
 inline bool isTeamLocal(int destPE);
+void printLog(TProcessedLog *log);
 
 int _restartFlag=0;
-//ERASE int restarted=0; // it's not being used anywhere
-
-//TML: variables for measuring savings with teams in message logging
-float MLOGFT_totalLogSize = 0.0;
-float MLOGFT_totalMessages = 0.0;
-float MLOGFT_totalObjects = 0.0;
+int _numRestartResponses=0;
 
 //TODO: remove for perf runs
 int countHashRefs=0; //count the number of gets
 int countHashCollisions=0;
 
-//#define CHECKPOINT_DISK
 char *checkpointDirectory=".";
 int unAckedCheckpoint=0;
 
@@ -61,50 +70,86 @@ int countUpdateHomeAcks=0;
 
 extern int teamSize;
 extern int chkptPeriod;
-extern bool parallelRestart;
+extern bool fastRecovery;
+extern int parallelRecovery;
 
 char *killFile;
+char *faultFile;
 int killFlag=0;
+int faultFlag=0;
 int restartingMlogFlag=0;
 void readKillFile();
 double killTime=0.0;
+double faultMean;
 int checkpointCount=0;
 
-
 CpvDeclare(Chare *,_currentObj);
-CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
-CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
-CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
+CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
 CpvDeclare(Queue, _outOfOrderMessageQueue);
-CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
-//CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
+CpvDeclare(Queue, _delayedRemoteMessageQueue);
 CpvDeclare(char **,_bufferedTicketRequests);
 CpvDeclare(int *,_numBufferedTicketRequests);
-CpvDeclare(char *,_bufferTicketReply);
-
 
+/***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
+/** @note All the determinants generated by a PE are stored in variable _localDets.
+ * As soon as a message is sent, then all the determinants are appended to the message,
+ * but those determinants are not deleted. We must wait until an ACK comes from the receiver
+ * to delete the determinants. In the meantime the same determinants may be appended
+ * to other messages and more determinants can be added to _localDets.
+ * A simple solution to this problem was to have a primitive array and keep adding determinants
+ * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
+ * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
+ * determinant and a number of how many valid determinants there are behind it. We do not remove
+ * determinants until a checkpoint is made, since these determinants may have to be added to
+ * messages in case of a recovery.
+ */
+// temporal storage for the outgoing determinants
+CpvDeclare(char *, _localDets);
+// number of buffered determinants
+int _numBufferedDets;
+// current index of first determinant in _localDets
+int _indexBufferedDets;
+// current phase of determinants
+int _phaseBufferedDets;
+
+// stores the determinants from other nodes, to send them in case of a crash
+CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
+// max number of buffered determinants
+int _maxBufferedDets;
+
+// stores the incarnation number from every other processor
+CpvDeclare(char *, _incarnation);
+
+/***** *****/
+
+#if COLLECT_STATS_MSGS
+int *numMsgsTarget;
+int *sizeMsgsTarget;
+int totalMsgsTarget;
+float totalMsgsSize;
+#endif
+#if COLLECT_STATS_DETS
+int numPiggyDets;
+int numDets;
+int numDupDets;
+#endif
+#if COLLECT_STATS_MEMORY
+int msgLogSize;
+int bufferedDetsSize;
+int storedDetsSize;
+#endif
+//TML: variables for measuring savings with teams in message logging
+#if COLLECT_STATS_TEAM
+float MLOGFT_totalLogSize = 0.0;
+float MLOGFT_totalMessages = 0.0;
+#endif
 
 static double adjustChkptPeriod=0.0; //in ms
 static double nextCheckpointTime=0.0;//in seconds
+static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
 
-double lastBufferedLocalMessageCopyTime;
-
-int _maxBufferedMessages;
-int _maxBufferedTicketRequests;
-int BUFFER_TIME=2; // in ms
-
-
-int _ticketRequestHandlerIdx;
-int _ticketHandlerIdx;
-int _localMessageCopyHandlerIdx;
-int _localMessageAckHandlerIdx;
 int _pingHandlerIdx;
-int _bufferedLocalMessageCopyHandlerIdx;
-int _bufferedLocalMessageAckHandlerIdx;
-int _bufferedTicketRequestHandlerIdx;
-int _bufferedTicketHandlerIdx;
-
 
 char objString[100];
 int _checkpointRequestHandlerIdx;
@@ -127,7 +172,10 @@ int _updateHomeAckHandlerIdx;
 int _resendMessagesHandlerIdx;
 int _resendReplyHandlerIdx;
 int _receivedTNDataHandlerIdx;
+int _receivedDetDataHandlerIdx;
 int _distributedLocationHandlerIdx;
+int _storeDeterminantsHandlerIdx;
+int _removeDeterminantsHandlerIdx;
 
 //TML: integer constants for team-based message logging
 int _restartHandlerIdx;
@@ -143,18 +191,12 @@ int verifyAckedRequests=0;
 
 RestartRequest *storedRequest;
 
-
-
 int _falseRestart =0; /**
                                                                                                        For testing on clusters we might carry out restarts on 
                                                                                                        a porcessor without actually starting it
                                                                                                        1 -> false restart
                                                                                                        0 -> restart after an actual crash
-                                                                                               */                                                                                                                              
-
-//lock for the ticketRequestHandler and ticketLogLocalMessage methods;
-int _lockNewTicket=0;
-
+                                                                                               */              
 
 //Load balancing globals
 int onGoingLoadBalancing=0;
@@ -177,32 +219,21 @@ int globalResumeCount=0;
 CkGroupID globalLBID;
 int restartDecisionNumber=-1;
 
-
 double lastCompletedAlarm=0;
 double lastRestart=0;
 
-
 //update location globals
 int _receiveLocationHandlerIdx;
 
-
-
-// initialize message logging datastructures and register handlers
+/** 
+ * @brief Initialize message logging data structures and register handlers
+ */
 void _messageLoggingInit(){
        //current object
        CpvInitialize(Chare *,_currentObj);
        
        //registering handlers for message logging
-       _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
-       _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
-       _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
-       _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
        _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
-       _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
-       _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
-       _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
-       _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
-
                
        //handlers for checkpointing
        _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
@@ -210,7 +241,6 @@ void _messageLoggingInit(){
        _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
        _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
 
-
        //handlers for restart
        _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
        _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
@@ -219,6 +249,7 @@ void _messageLoggingInit(){
        _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
        _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
        _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
+       _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
        _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
        _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
        _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
@@ -229,6 +260,9 @@ void _messageLoggingInit(){
        _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
        _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
 
+       // handlers for causal message logging
+       _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
+       _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
        
        //handlers for load balancing
        _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
@@ -236,26 +270,19 @@ void _messageLoggingInit(){
        _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
        _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
        _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
-       _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
-       _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
        _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
        _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
-
        
        //handlers for updating locations
        _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
        
        //Cpv variables for message logging
-       CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
-       CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
-       CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
-       CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
-       CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
-       CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
+       CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
+       CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
        CpvInitialize(Queue, _outOfOrderMessageQueue);
+       CpvInitialize(Queue, _delayedRemoteMessageQueue);
        CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
-       CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
-       CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
+       CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
        
        CpvInitialize(char **,_bufferedTicketRequests);
        CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
@@ -264,34 +291,27 @@ void _messageLoggingInit(){
                CpvAccess(_bufferedTicketRequests)[i]=NULL;
                CpvAccess(_numBufferedTicketRequests)[i]=0;
        }
-  CpvInitialize(char *,_bufferTicketReply);
-       CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
-       
-//     CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
-       CcdCallFnAfter(retryTicketRequest,NULL,100);    
-       
+       // Cpv variables for causal protocol
+       _numBufferedDets = 0;
+       _indexBufferedDets = 0;
+       _phaseBufferedDets = 0;
+       _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
+       CpvInitialize(char *, _localDets);
+       CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
+       CpvInitialize(char *, _incarnation);
+       CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
+       CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
+       CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
+       for(int i=0; i<CmiNumPes(); i++){
+               CpvAccess(_incarnation)[i] = 0;
+       }
        
        //Cpv variables for checkpoint
        CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
        CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
-       
-//     CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
-//     printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
-//     CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
-       if(CkMyPe() == 0){
-//             CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
-#ifdef         BUFFERED_LOCAL
-               if(CmiMyPe() == 0){
-                       printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
-               }
-#endif
-       }
-#ifdef         BUFFERED_REMOTE
-       if(CmiMyPe() == 0){
-               printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
-       }
-#endif
 
+       // registering user events for projections      
        traceRegisterUserEvent("Remove Logs", 20);
        traceRegisterUserEvent("Ticket Request Handler", 21);
        traceRegisterUserEvent("Ticket Handler", 22);
@@ -303,7 +323,6 @@ void _messageLoggingInit(){
        traceRegisterUserEvent("Checkpoint",28);
        traceRegisterUserEvent("Checkpoint Store",29);
        traceRegisterUserEvent("Checkpoint Ack",30);
-       
        traceRegisterUserEvent("Send Ticket Request",31);
        traceRegisterUserEvent("Generalticketrequest1",32);
        traceRegisterUserEvent("TicketLogLocal",33);
@@ -315,8 +334,32 @@ void _messageLoggingInit(){
        
        lastCompletedAlarm=CmiWallTimer();
        lastRestart = CmiWallTimer();
-//     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
-       CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
+
+#if COLLECT_STATS_MSGS
+#if COLLECT_STATS_MSGS_TOTAL
+       totalMsgsTarget = 0;
+       totalMsgsSize = 0.0;
+#else
+       numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
+       sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
+       for(int i=0; i<CmiNumPes(); i++){
+               numMsgsTarget[i] = 0;
+               sizeMsgsTarget[i] = 0;
+       }
+#endif
+#endif
+#if COLLECT_STATS_DETS
+       numPiggyDets = 0;
+       numDets = 0;
+       numDupDets = 0;
+#endif
+#if COLLECT_STATS_MEMORY
+       msgLogSize = 0;
+       bufferedDetsSize = 0;
+       storedDetsSize = 0;
+#endif
+
+
 }
 
 void killLocal(void *_dummy,double curWallTime);       
@@ -338,6 +381,26 @@ void readKillFile(){
        fclose(fp);
 }
 
+/**
+ * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
+ * We assume an exponential distribution for the mean-time-between-failures.
+ */
+void readFaultFile(){
+        FILE *fp=fopen(faultFile,"r");
+        if(!fp){
+                return;
+        }
+        int proc;
+        double sec;
+        fscanf(fp,"%d %lf",&proc,&sec);
+       faultMean = sec;
+       if(proc == CkMyPe()){
+               printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
+               CcdCallFnAfter(killLocal,NULL,sec*1000);
+       }
+        fclose(fp);
+}
+
 void killLocal(void *_dummy,double curWallTime){
        printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
        if(CmiWallTimer()<killTime-1){
@@ -347,12 +410,50 @@ void killLocal(void *_dummy,double curWallTime){
        }
 }
 
+/*** Auxiliary Functions ***/
 
+/**
+ * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
+ * determinants needs to be extended.
+ */
+inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
+       Determinant *det, *auxDet;
+       char *aux;
+
+       DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
+
+       // checking if we are overflowing the buffered determinants array
+       if(_indexBufferedDets >= _maxBufferedDets){
+               aux = CpvAccess(_localDets);
+               _maxBufferedDets *= 2;
+               CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
+               memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
+               CmiFree(aux);
+       }
+
+       // adding the new determinant at the end
+       det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
+       det->sender = sender;
+       det->receiver = receiver;
+       det->SN = SN;
+       det->TN = TN;
+       _numBufferedDets++;
+       _indexBufferedDets++;
+
+#if COLLECT_STATS_MEMORY
+       bufferedDetsSize++;
+#endif
+#if COLLECT_STATS_DETS
+       numDets++;
+#endif
+}
 
 /************************ Message logging methods ****************/
 
-// send a ticket request to a group on a processor
-void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
+/**
+ * Sends a group message that might be a broadcast.
+ */
+void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
        if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
                DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
                void *origMsg = EnvToUsr(env);
@@ -364,23 +465,28 @@ void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
                                copyEnv->TN=0;
                                copyEnv->sender.type = TypeInvalid;
                                DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
-                               sendTicketGroupRequest(copyEnv,i,_infoIdx);
+                               sendGroupMsg(copyEnv,i,_infoIdx);
                        }
                }
                return;
        }
+
+       // initializing values of envelope
+       env->SN=0;
+       env->TN=0;
+       env->sender.type = TypeInvalid;
+
        CkObjID recver;
        recver.type = TypeGroup;
        recver.data.group.id = env->getGroupNum();
        recver.data.group.onPE = destPE;
-/*     if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
-               CmiPrintStackTrace(0);
-       }*/
-       generateCommonTicketRequest(recver,env,destPE,_infoIdx);
+       sendCommonMsg(recver,env,destPE,_infoIdx);
 }
 
-//send a ticket request to a nodegroup
-void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
+/**
+ * Sends a nodegroup message that might be a broadcast.
+ */
+void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
        if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
                DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
                void *origMsg = EnvToUsr(env);
@@ -391,24 +497,32 @@ void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
                                copyEnv->SN=0;
                                copyEnv->TN=0;
                                copyEnv->sender.type = TypeInvalid;
-                               sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
+                               sendNodeGroupMsg(copyEnv,i,_infoIdx);
                        }
                }
                return;
        }
+
+       // initializing values of envelope
+       env->SN=0;
+       env->TN=0;
+       env->sender.type = TypeInvalid;
+
        CkObjID recver;
        recver.type = TypeNodeGroup;
        recver.data.group.id = env->getGroupNum();
        recver.data.group.onPE = destNode;
-       generateCommonTicketRequest(recver,env,destNode,_infoIdx);
+       sendCommonMsg(recver,env,destNode,_infoIdx);
 }
 
-//send a ticket request to an array element
-void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
+/**
+ * Sends a message to an array element.
+ */
+void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
        CkObjID recver;
        recver.type = TypeArray;
        recver.data.array.id = env->getsetArrayMgr();
-       recver.data.array.idx = *(&env->getsetArrayIndex());
+       recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
 
        if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
                char recverString[100],senderString[100];
@@ -416,19 +530,25 @@ void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
                DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
        }
 
-       generateCommonTicketRequest(recver,env,destPE,_infoIdx);
+       // initializing values of envelope
+       env->SN = 0;
+       env->TN = 0;
+
+       sendCommonMsg(recver,env,destPE,_infoIdx);
 };
 
 /**
  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
  */
-void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
+void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
        envelope *env = _env;
        MCount ticketNumber = 0;
        int resend=0; //is it a resend
        char recverName[100];
        double _startTime=CkWallTimer();
        
+       DEBUG_MEM(CmiMemoryCheck());
+
        if(CpvAccess(_currentObj) == NULL){
 //             CkAssert(0);
                DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
@@ -436,9 +556,10 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                return;
        }
        
+       // setting message logging data in the envelope
+       env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
        if(env->sender.type == TypeInvalid){
                env->sender = CpvAccess(_currentObj)->mlogData->objID;
-               //Set message logging data in the envelope
        }else{
                envelope *copyEnv = copyEnvelope(env);
                env = copyEnv;
@@ -446,12 +567,15 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                env->SN = 0;
        }
        
+       DEBUG_MEM(CmiMemoryCheck());
+
        CkObjID &sender = env->sender;
        env->recver = recver;
 
        Chare *obj = (Chare *)env->sender.getObject();
          
        if(env->SN == 0){
+               DEBUG_MEM(CmiMemoryCheck());
                env->SN = obj->mlogData->nextSN(recver);
        }else{
                resend = 1;
@@ -462,7 +586,7 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
        //      CmiPrintStackTrace(0);
 /*     }else{
-               DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
+               DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
        }*/
                
        MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
@@ -471,9 +595,9 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
        
        _startTime = CkWallTimer();
 
-       // uses the proper ticketing mechanism for local, group and general messages
+       // uses the proper ticketing mechanism for local, team and general messages
        if(isLocal(destPE)){
-               ticketLogLocalMessage(mEntry);
+               sendLocalMsg(mEntry);
        }else{
                if((teamSize > 1) && isTeamLocal(destPE)){
 
@@ -489,8 +613,8 @@ void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _
                        }
                }
                
-               // sending the ticket request
-               sendTicketRequest(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
+               // sending the message
+               sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
                
        }
 }
@@ -520,257 +644,249 @@ inline bool isTeamLocal(int destPE){
        return false;
 }
 
-
-
 /**
  * Method that does the actual send by creating a ticket request filling it up and sending it.
  */
-void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
-       char recverString[100],senderString[100];
+void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
+       DEBUG(char recverString[100]);
+       DEBUG(char senderString[100]);
+
+       int totalSize;
+       StoreDeterminantsHeader header;
+       int sizes[2];
+       char *ptrs[2];
+
        envelope *env = entry->env;
-       DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
-/*     envelope *env = entry->env;
-       printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
+       DEBUG(printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
 
+       // setting all the information
        Chare *obj = (Chare *)entry->env->sender.getObject();
+       entry->env->recver = recver;
+       entry->env->SN = SN;
+       entry->env->TN = TN;
        if(!resend){
                //TML: only stores message if either it goes to this processor or to a processor in a different group
                if(!isTeamLocal(entry->destPE)){
                        obj->mlogData->addLogEntry(entry);
+#if COLLECT_STATS_TEAM
                        MLOGFT_totalMessages += 1.0;
                        MLOGFT_totalLogSize += entry->env->getTotalsize();
+#endif
                }else{
                        // the message has to be deleted after it has been sent
                        entry->env->freeMsg = true;
                }
        }
 
-#ifdef BUFFERED_REMOTE
-       //buffer the ticket request 
-       if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
-               //first message to this processor, buffer needs to be created
-               int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
-               CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
-               DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
-       }
-       //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
-       //Buffer the ticketrequests
-       TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
-       ticketRequest->sender = sender;
-       ticketRequest->recver = recver;
-       ticketRequest->logEntry = entry;
-       ticketRequest->SN = SN;
-       ticketRequest->TN = TN;
-       ticketRequest->senderPE = CkMyPe();
+       // sending the determinants that causally affect this message
+       if(_numBufferedDets > 0){
 
-       CpvAccess(_numBufferedTicketRequests)[destPE]++;
+               // modifying the actual number of determinants sent in message
+               header.number = _numBufferedDets;
+               header.index = _indexBufferedDets;
+               header.phase = _phaseBufferedDets;
+               header.PE = CmiMyPe();
        
-       
-       if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
-               sendBufferedTicketRequests(destPE);
-       }else{
-               if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
-                       int *checkPE = new int;
-                       *checkPE = destPE;
-                       CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
-               }
+               // sending the message
+               sizes[0] = sizeof(StoreDeterminantsHeader);
+               sizes[1] = _numBufferedDets * sizeof(Determinant);
+               ptrs[0] = (char *) &header;
+               ptrs[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
+               DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
+               CmiSetHandler(&header, _storeDeterminantsHandlerIdx);
+               CmiSyncVectorSend(destPE, 2, &sizes[0], &ptrs[0]);
        }
-#else
 
-       TicketRequest ticketRequest;
-       ticketRequest.sender = sender;
-       ticketRequest.recver = recver;
-       ticketRequest.logEntry = entry;
-       ticketRequest.SN = SN;
-       ticketRequest.TN = TN;
-       ticketRequest.senderPE = CkMyPe();
-       
-       CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
-//     CmiBecomeImmediate(&ticketRequest);
-       CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
-#endif
-       DEBUG_MEM(CmiMemoryCheck());
-};
+       // updating its message log entry
+       entry->indexBufDets = _indexBufferedDets;
+       entry->numBufDets = _numBufferedDets;
 
-/**
- * Send the ticket requests buffered for processor PE
- **/
-void sendBufferedTicketRequests(int destPE){
-       DEBUG_MEM(CmiMemoryCheck());
-       int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
-       if(numberRequests == 0){
-               return;
-       }
-       DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
-       int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
-       void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
-       BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
-       header->numberLogs = numberRequests;
-       
-       CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
-       CmiSyncSend(destPE,totalSize,(char *)buf);
-       
-       CpvAccess(_numBufferedTicketRequests)[destPE]=0;
-       DEBUG_MEM(CmiMemoryCheck());
-};
+       // sending the message
+       generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
 
-void checkBufferedTicketRequests(void *_destPE,double curWallTime){
-       int destPE = *(int *)_destPE;
-  if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
-               sendBufferedTicketRequests(destPE);
-//             traceUserEvent(35);
-       }
-       delete (int *)_destPE;
        DEBUG_MEM(CmiMemoryCheck());
+#if COLLECT_STATS_MSGS
+#if COLLECT_STATS_MSGS_TOTAL
+       totalMsgsTarget++;
+       totalMsgsSize += (float)env->getTotalsize();
+#else
+       numMsgsTarget[destPE]++;
+       sizeMsgsTarget[destPE] += env->getTotalsize();
+#endif
+#endif
+#if COLLECT_STATS_DETS
+       numPiggyDets += _numBufferedDets;
+#endif
+#if COLLECT_STATS_MEMORY
+       msgLogSize += env->getTotalsize();
+#endif
 };
 
+
 /**
- * Gets a ticket for a local message and then sends a copy to the buddy.
- * This method is always in the main thread(not interrupt).. so it should 
- * never find itself locked out of a newTicket.
+ * @brief Function to send a local message. It first gets a ticket and
+ * then enqueues the message. If we are recovering, then the message 
+ * is enqueued in a delay queue.
  */
-void ticketLogLocalMessage(MlogEntry *entry){
-       double _startTime=CkWallTimer();
+void sendLocalMsg(MlogEntry *entry){
+       DEBUG_PERF(double _startTime=CkWallTimer());
        DEBUG_MEM(CmiMemoryCheck());
+       DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
+       DEBUG(char senderString[100]);
+       DEBUG(char recverString[100]);
+       Ticket ticket;
 
+       DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
+
+       // getting the receiver local object
        Chare *recverObj = (Chare *)entry->env->recver.getObject();
-       DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
+
+       // if receiver object is not NULL, we will ask it for a ticket
        if(recverObj){
-               //Consider the case, after a restart when this message has already been allotted a ticket number
-               // and should get the same one as the old one.
-               Ticket ticket;
-               if(recverObj->mlogData->mapTable.numObjects() > 0){
-                       ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
-               }else{
-                       ticket.TN = 0;
+
+/*HERE         // if we are recovery, then we must put this off for later
+               if(recverObj->mlogData->restartFlag){
+                       CpvAccess(_delayedLocalMsgs)->enq(entry);
+                       return;
                }
-               
-               char senderString[100], recverString[100] ;
-               
+
+               // check if this combination of sender and SN has been already a ticket
+               ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
+                       
+               // assigned a new ticket in case this message is completely new
                if(ticket.TN == 0){
                        ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
-       
+
+                       // if TN == 0 we enqueue this message since we are recovering from a crash
                        if(ticket.TN == 0){
-                               CpvAccess(_delayedLocalTicketRequests)->enq(entry);
+                               CpvAccess(_delayedLocalMsgs)->enq(entry);
                                DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
                                
-       //              _lockNewTicket = 0;
 //                             traceUserBracketEvent(33,_startTime,CkWallTimer());
                                return;
                        }
-               }       
+               }
+
                //TODO: check for the case when an invalid ticket is returned
                //TODO: check for OLD or RECEIVED TICKETS
                entry->env->TN = ticket.TN;
                CkAssert(entry->env->TN > 0);
                DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
        
-               // sends a copy of the metadata to the buddy    
-               sendLocalMessageCopy(entry);
-               
                DEBUG_MEM(CmiMemoryCheck());
 
-               // sets the unackedLocal flag and stores the message in the log
-               entry->unackedLocal = 1;
-               CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
+               // adding the determinant to _localDets
+               addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
+*/
+               // sends the local message
+               _skipCldEnqueue(CmiMyPe(),entry->env,entry->_infoIdx);  
 
                DEBUG_MEM(CmiMemoryCheck());
        }else{
-               CkPrintf("[%d] Local message in team-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
-               DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
+               DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
        }
-       _lockNewTicket=0;
 //     traceUserBracketEvent(33,_startTime,CkWallTimer());
 };
 
+/****
+       The handler functions
+*****/
+
+/*** CAUSAL PROTOCOL HANDLERS ***/
+
 /**
- * Sends the metadata of a local message to its buddy.
+ * @brief Removes the determinants after a particular index in the _localDets array.
  */
-void sendLocalMessageCopy(MlogEntry *entry){
-       LocalMessageLog msgLog;
-       msgLog.sender = entry->env->sender;
-       msgLog.recver = entry->env->recver;
-       msgLog.SN = entry->env->SN;
-       msgLog.TN = entry->env->TN;
-       msgLog.entry = entry;
-       msgLog.senderPE = CkMyPe();
-       
-       char recvString[100];
-       char senderString[100];
-       DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
+void _removeDeterminantsHandler(char *buffer){
+       RemoveDeterminantsHeader *header;
+       int index, phase;
 
-#ifdef BUFFERED_LOCAL
-       countLocal++;
-       CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
-       if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
-               sendBufferedLocalMessageCopy();
-       }else{
-               if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
-                       lastBufferedLocalMessageCopyTime = CkWallTimer();
-                       CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
-                       countClearBufferedLocalCalls++;
-               }       
-       }
-#else  
-       CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
-       
-       CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
-#endif
-       DEBUG_MEM(CmiMemoryCheck());
-};
+       // getting the header from the message
+       header = (RemoveDeterminantsHeader *)buffer;
+       index = header->index;
+       phase = header->phase;
 
+       // fprintf(stderr,"ACK index=%d\n",index);
 
-void sendBufferedLocalMessageCopy(){
-       int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
-       if(numberLogs == 0){
-               return;
+       // updating the number of buffered determinants
+       if(phase == _phaseBufferedDets){
+               if(index > _indexBufferedDets)
+                       CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
+               CmiAssert(index <= _indexBufferedDets);
+               _numBufferedDets = _indexBufferedDets - index;
        }
-       countBuffered++;
-       int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
-       void *buf=CmiAlloc(totalSize);
-       BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
-       header->numberLogs=numberLogs;
 
-       DEBUG_MEM(CmiMemoryCheck());
-       DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
-       
-       char *ptr = (char *)buf;
-       ptr = &ptr[sizeof(BufferedLocalLogHeader)];
-       
-       for(int i=0;i<numberLogs;i++){
-               LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
-               memcpy(ptr,&log,sizeof(LocalMessageLog));
-               ptr = &ptr[sizeof(LocalMessageLog)];
-       }
+       // releasing memory
+       CmiFree(buffer);
 
-       CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
+}
 
-       CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
+/**
+ * @brief Stores the determinants coming from other processor.
+ */
+void _storeDeterminantsHandler(char *buffer){
+       StoreDeterminantsHeader *header;
+       RemoveDeterminantsHeader removeHeader;
+       Determinant *detPtr, *det;
+       int i, n, index, phase, destPE;
+       CkVec<Determinant> *vec;
+
+
+       // getting the header from the message and pointing to the first determinant
+       header = (StoreDeterminantsHeader *)buffer;
+       n = header->number;
+       index = header->index;
+       phase = header->phase;
+       destPE = header->PE;
+       detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
+
+       DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
        DEBUG_MEM(CmiMemoryCheck());
-};
 
-void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
-       countClearBufferedLocalCalls--;
-       if(countClearBufferedLocalCalls > 10){
-               CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-       DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
-       if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
-               if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
-                       sendBufferedLocalMessageCopy();
-//                     traceUserEvent(36);
+       // going through all determinants and storing them into _remoteDets
+       for(i = 0; i < n; i++){
+               det = new Determinant();
+               det->sender = detPtr->sender;
+               det->receiver = detPtr->receiver;
+               det->SN = detPtr->SN;
+               det->TN = detPtr->TN;
+
+               // getting the determinant array
+               vec = CpvAccess(_remoteDets)->get(det->receiver);
+               if(vec == NULL){
+                       vec = new CkVec<Determinant>();
+                       CpvAccess(_remoteDets)->put(det->receiver) = vec;
+               }
+#if COLLECT_STATS_DETS
+#if COLLECT_STATS_DETS_DUP
+               for(int j=0; j<vec->size(); j++){
+                       if(isSameDet(&(*vec)[j],det)){
+                               numDupDets++;
+                               break;
+                       }
                }
+#endif
+#endif
+#if COLLECT_STATS_MEMORY
+               storedDetsSize++;
+#endif
+               vec->push_back(*det);
+               detPtr = detPtr++;
        }
+
        DEBUG_MEM(CmiMemoryCheck());
-}
 
-/****
-       The handler functions
-*****/
+       // freeing buffer
+       CmiFree(buffer);
 
+       // sending the ACK back to the sender
+       removeHeader.index = index;
+       removeHeader.phase = phase;
+       CmiSetHandler(&removeHeader,_removeDeterminantsHandlerIdx);
+       CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)&removeHeader);
+       
+}
 
-inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
 /**
  *  If there are any delayed requests, process them first before 
  *  processing this request
@@ -778,491 +894,70 @@ inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *repl
 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
        DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
        double  _startTime = CkWallTimer();
-       if(CpvAccess(_delayedTicketRequests)->length() > 0){
-               retryTicketRequest(NULL,_startTime);
-       }
-       _processTicketRequest(ticketRequest);
        CmiFree(ticketRequest);
 //     traceUserBracketEvent(21,_startTime,CkWallTimer());                     
 }
-/** Handler used for dealing with a bunch of ticket requests
- * from one processor. The replies are also bunched together
- * Does not use _ticketRequestHandler
- * */
-void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
-       DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
-       DEBUG_MEM(CmiMemoryCheck());
-       double _startTime = CkWallTimer();
-       if(CpvAccess(_delayedTicketRequests)->length() > 0){
-               retryTicketRequest(NULL,_startTime);
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-  int numRequests = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-       msg = &msg[sizeof(BufferedTicketRequestHeader)];
-       int senderPE=((TicketRequest *)msg)->senderPE;
 
-       
-       int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
-       void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
-       
-       char *ptr = (char *)buf;
-       BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
-       header->numberLogs = 0;
 
+/**
+ * @brief Gets a ticket for a recently received message.
+ * @pre env->recver has to be on this processor.
+ * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
+ */
+inline bool _getTicket(envelope *env, int *flag){
        DEBUG_MEM(CmiMemoryCheck());
+       DEBUG(char recverName[100]);
+       DEBUG(char senderString[100]);
+       Ticket ticket;
        
-       ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
-       
-       for(int i=0;i<numRequests;i++){
-               TicketRequest *request = (TicketRequest *)msg;
-               msg = &msg[sizeof(TicketRequest)];
-               bool replied = _processTicketRequest(request,(TicketReply *)ptr);
-
-               if(replied){
-                       //the ticket request has been processed and 
-                       //the reply will be stored in the ptr
-                       header->numberLogs++;
-                       ptr = &ptr[sizeof(TicketReply)];
-               }
-       }
-/*     if(header->numberLogs == 0){
-                       printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
-       }*/
-
-       CmiSetHandler(buf,_bufferedTicketHandlerIdx);
-       CmiSyncSend(senderPE,totalSize,(char *)buf);
-       CmiFree(recvdHeader);
-//     traceUserBracketEvent(21,_startTime,CkWallTimer());                     
-       DEBUG_MEM(CmiMemoryCheck());
-};
-
-/**Process the ticket request. 
- * If it is processed and a reply is being sent 
- * by this processor return true
- * else return false.
- * If a reply buffer is specified put the reply into that
- * else send the reply
- * */
-inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
-
-/*     if(_lockNewTicket){
-               printf("ddeded %d\n",CkMyPe());
-               if(CmiIsImmediate(ticketRequest)){
-                       CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
-               }
-               CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
-               
-       }else{
-               _lockNewTicket = 1;
-       }*/
-
-       DEBUG_MEM(CmiMemoryCheck());
-
        // getting information from request
-       CkObjID sender = ticketRequest->sender;
-       CkObjID recver = ticketRequest->recver;
-       MCount SN = ticketRequest->SN;
-       MCount TN = ticketRequest->TN;
+       CkObjID sender = env->sender;
+       CkObjID recver = env->recver;
+       MCount SN = env->SN;
+       MCount TN = env->TN;
        Chare *recverObj = (Chare *)recver.getObject();
        
-       DEBUG(char recverName[100]);
        DEBUG(recver.toString(recverName);)
 
-       if(recverObj == NULL){
-               int estPE = recver.guessPE();
-               if(estPE == CkMyPe() || estPE == -1){           
-                       //try to fulfill the request after some time
-                       char senderString[100];
-                       DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
-                       if(estPE == CkMyPe() && recver.type == TypeArray){
-                               CkArrayID aid(recver.data.array.id);            
-                               CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
-                               DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx)));
-                       }
-                       TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
-                       *delayed = *ticketRequest;
-                       CpvAccess(_delayedTicketRequests)->enq(delayed);
-                       
-               }else{
-                       DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
-                       TicketRequest forward = *ticketRequest;
-                       CmiSetHandler(&forward,_ticketRequestHandlerIdx);
-                       CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);
-               }
-       DEBUG_MEM(CmiMemoryCheck());
-               return false; // if the receverObj does not exist the ticket request cannot have been 
-                             // processed successfully
-       }else{
-               char senderString[100];
-               
-               Ticket ticket;
-
-               // checking if the message is team local and if it has a ticket already assigned
-               if(teamSize > 1 && TN != 0){
-                       DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
-                       ticket.TN = TN;
-                       recverObj->mlogData->verifyTicket(sender,SN,TN);
-               }
-
-               //check if a ticket for this has been already handed out to an object that used to be local but 
-               // is no longer so.. need for parallel restart
-               if(recverObj->mlogData->mapTable.numObjects() > 0){
-                       
-                       ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
-               }
-               
-               if(ticket.TN == 0){
-                       ticket = recverObj->mlogData->next_ticket(sender,SN);
-               }
-               if(ticket.TN > recverObj->mlogData->tProcessed){
-                       ticket.state = NEW_TICKET;
-               }else{
-                       ticket.state = OLD_TICKET;
-               }
-               //TODO: check for the case when an invalid ticket is returned
-               if(ticket.TN == 0){
-                       DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
-                       TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
-                       *delayed = *ticketRequest;
-                       CpvAccess(_delayedTicketRequests)->enq(delayed);
-                       return false;
-               }
-/*             if(ticket.TN < SN){ //error state this really should not happen
-                       recver.toString(recverName);
-                 printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
-               }*/
-//             CkAssert(ticket.TN >= SN);
-               DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
-//             TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
-    if(reply == NULL){ 
-                       //There is no reply buffer and the ticketreply is going to be 
-                       //sent immediately
-                       TicketReply ticketReply;
-                       ticketReply.request = *ticketRequest;
-                       ticketReply.ticket = ticket;
-                       ticketReply.recverPE = CkMyPe();
-                       CmiSetHandler(&ticketReply,_ticketHandlerIdx);
-//             CmiBecomeImmediate(&ticketReply);
-                       CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
-        }else{ // Store ticket reply in the buffer provided
-                reply->request = *ticketRequest;
-                reply->ticket = ticket;
-                reply->recverPE = CkMyPe();
-                CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
-                                                        // in case the ticket needs to be forwarded or something
-        }
-               DEBUG_MEM(CmiMemoryCheck());
-               return true;
-       }
-//     _lockNewTicket=0;
-};
-
-
-/**
- * @brief This function handles the ticket received after a request.
- */
-inline void _ticketHandler(TicketReply *ticketReply){
-
-       double _startTime = CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());    
-       
-       char senderString[100];
-       CkObjID sender = ticketReply->request.sender;
-       CkObjID recver = ticketReply->request.recver;
-       
-       if(sender.guessPE() != CkMyPe()){
-               DEBUG(CkAssert(sender.guessPE()>= 0));
-               DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
-       //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
-               ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
-               CmiSetHandler(ticketReply,_ticketHandlerIdx);
-#ifdef BUFFERED_REMOTE
-               //will be freed by the buffered ticket handler most of the time
-               //this might lead to a leak just after migration
-               //when the ticketHandler is directly used without going through the buffered handler
-               CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
-#else
-               CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
-#endif 
-       }else{
-               char recverName[100];
-               DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
-               MlogEntry *logEntry=NULL;
-               if(ticketReply->ticket.state & FORWARDED_TICKET){
-                       // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
-                       DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
-                       Chare *senderObj = (Chare *)sender.getObject();
-                       if(senderObj){
-                               CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
-                               for(int i=0;i<mlog->length();i++){
-                                       MlogEntry *tempEntry = (*mlog)[i];
-                                       if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
-                                               logEntry = tempEntry;
-                                               break;
-                                       }
-                               }
-                               if(logEntry == NULL){
-#ifdef BUFFERED_REMOTE
-#else
-                                       CmiFree(ticketReply);
-#endif                                 
-                                       return;
-                               }
-                       }else{
-                               CmiAbort("This processor thinks it should have the sender\n");
-                       }
-                       ticketReply->ticket.state ^= FORWARDED_TICKET;
-               }else{
-                       logEntry = ticketReply->request.logEntry;
-               }
-               if(logEntry->env->TN <= 0){
-                       //This logEntry has not received a TN earlier
-                       char recverString[100];
-                       logEntry->env->TN = ticketReply->ticket.TN;
-                       logEntry->env->setSrcPe(CkMyPe());
-                       if(ticketReply->ticket.state == NEW_TICKET){
-
-                               // if message is group local, we store its metadata in teamTable
-                               if(isTeamLocal(ticketReply->recverPE)){
-                                       //DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
-                                       Chare *senderObj = (Chare *)sender.getObject();
-                                       SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
-                                       if(ticketRow == NULL){
-                                               ticketRow = new SNToTicket();
-                                               senderObj->mlogData->teamTable.put(recver) = ticketRow; 
-                                       }
-                                       ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
-                               }
-
-                               DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
-                               if(ticketReply->recverPE != CkMyPe()){
-                                       generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
-                               }else{
-                                       //It is now a local message use the local message protocol
-                                       sendLocalMessageCopy(logEntry);
-                               }       
-                       }
-               }else{
-                       DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
-               }
-               recver.updatePosition(ticketReply->recverPE);
-#ifdef BUFFERED_REMOTE
-#else
-               CmiFree(ticketReply);
-#endif
-       }
-       CmiMemoryCheck();
-
-//     traceUserBracketEvent(22,_startTime,CkWallTimer());     
-};
-
-/**
- * Message to handle the bunch of tickets 
- * that we get from one processor. We send 
- * the tickets to be handled one at a time
- * */
-
-void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
-       double _startTime=CmiWallTimer();
-       int numTickets = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-       msg = &msg[sizeof(BufferedTicketRequestHeader)];
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       TicketReply *_reply = (TicketReply *)msg;
+       // verifying whether we are recovering from a failure or not
+       if(recverObj->mlogData->restartFlag)
+               return false;
 
+       // checking ticket table to see if this message already received a ticket
+       ticket = recverObj->mlogData->getTicket(sender, SN);
+       TN = ticket.TN;
        
-       for(int i=0;i<numTickets;i++){
-               TicketReply *reply = (TicketReply *)msg;
-               _ticketHandler(reply);
-               
-               msg = &msg[sizeof(TicketReply)];
+       // checking if the message is team local and if it has a ticket already assigned
+       if(teamSize > 1 && TN != 0){
+               DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
+               recverObj->mlogData->verifyTicket(sender,SN,TN);
        }
-       
-       CmiFree(recvdHeader);
-//     traceUserBracketEvent(22,_startTime,CkWallTimer());
-       DEBUG_MEM(CmiMemoryCheck());
-};
-
-/**
- * Stores the metadata of a local message from its buddy.
- */
-void _localMessageCopyHandler(LocalMessageLog *msgLog){
-       double _startTime = CkWallTimer();
-       
-       char senderString[100],recverString[100];
-       DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
-/*     if(!fault_aware(msgLog->recver)){
-               CmiAbort("localMessageCopyHandler with non fault aware local message copy");
-       }*/
-       CpvAccess(_localMessageLog)->enq(*msgLog);
-       
-       LocalMessageLogAck ack;
-       ack.entry = msgLog->entry;
-       DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
-       CmiSetHandler(&ack,_localMessageAckHandlerIdx);
-       CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
-       
-//     traceUserBracketEvent(23,_startTime,CkWallTimer());
-};
-
-void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
-       double _startTime = CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       int numLogs = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
 
-       //piggy back the logs already stored on this processor
-       int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
-       numPiggyLogs=0; //uncomment to turn off piggy backing of acks
-/*     if(numPiggyLogs > 0){
-               if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
-                       CmiAssert(0);
-               }
-       }*/
-       DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
-       
-       int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
-       void *buf = CmiAlloc(totalSize);
-       char *ptr = (char *)buf;
-       memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
-       
-       msg = &msg[sizeof(BufferedLocalLogHeader)];
-       ptr = &ptr[sizeof(BufferedLocalLogHeader)];
-
-       DEBUG_MEM(CmiMemoryCheck());
-       int PE;
-       for(int i=0;i<numLogs;i++){
-               LocalMessageLog *msgLog = (LocalMessageLog *)msg;
-               CpvAccess(_localMessageLog)->enq(*msgLog);
-               PE = msgLog->senderPE;
-               DEBUG(CmiAssert( PE == getCheckPointPE()));
-
-               LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
-               ack->entry = msgLog->entry;
-               
-               msg = &msg[sizeof(LocalMessageLog)];
-               ptr = &ptr[sizeof(LocalMessageLogAck)];
+       //check if a ticket for this has been already handed out to an object that used to be local but 
+       // is no longer so.. need for parallel restart
+       if(TN == 0){
+               ticket = recverObj->mlogData->next_ticket(sender,SN);
+               *flag = NEW_TICKET;
+       } else {
+               *flag = OLD_TICKET;
        }
-       DEBUG_MEM(CmiMemoryCheck());
-
-       BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
-       piggyHeader->numberLogs = numPiggyLogs;
-       ptr = &ptr[sizeof(BufferedLocalLogHeader)];
-       if(numPiggyLogs > 0){
-               countPiggy++;
+       if(ticket.TN > recverObj->mlogData->tProcessed){
+               ticket.state = NEW_TICKET;
+       } else {
+               ticket.state = OLD_TICKET;
        }
-
-       for(int i=0;i<numPiggyLogs;i++){
-               LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
-               memcpy(ptr,&log,sizeof(LocalMessageLog));
-               ptr = &ptr[sizeof(LocalMessageLog)];
+       // @todo: check for the case when an invalid ticket is returned
+       if(ticket.TN == 0){
+               DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
+               return false;
        }
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
-       CmiSyncSendAndFree(PE,totalSize,(char *)buf);
                
-/*     for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
-                       LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
-                       if(!fault_aware(localLogEntry.recver)){
-                               CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
-                       }
-       }*/
-       if(freeHeader){
-               CmiFree(recvdHeader);
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-//     traceUserBracketEvent(23,_startTime,CkWallTimer());
-}
-
-
-void _localMessageAckHandler(LocalMessageLogAck *ack){
-       
-       double _startTime = CkWallTimer();
-       
-       MlogEntry *entry = ack->entry;
-       if(entry == NULL){
-               CkExit();
-       }
-       entry->unackedLocal = 0;
-       envelope *env = entry->env;
-       char recverName[100];
-       char senderString[100];
-       DEBUG_MEM(CmiMemoryCheck());
-       
-       DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
-       if(env == NULL)
-               return;
-       CkAssert(env->SN > 0);
-       CkAssert(env->TN > 0);
-       env->sender.toString(senderString);
-       DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
-       env->recver.toString(recverName);
-
-       DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
-       
-/*     
-       void *origMsg = EnvToUsr(env);
-       void *copyMsg = CkCopyMsg(&origMsg);
-       envelope *copyEnv = UsrToEnv(copyMsg);
-       entry->env = UsrToEnv(origMsg);*/
+       // setting the ticket number in the envelope
+       env->TN = ticket.TN;
+       DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
+       return true;
 
-//     envelope *copyEnv = copyEnvelope(env);
-
-       envelope *copyEnv = env;
-       copyEnv->localMlogEntry = entry;
-
-       DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
-       if(CmiMyPe() != entry->destPE){
-               DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
-       }
-//     generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
-       _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
-       
-       
-#ifdef BUFFERED_LOCAL
-#else
-       CmiFree(ack);
-//     traceUserBracketEvent(24,_startTime,CkWallTimer());
-#endif
-       
-       DEBUG_MEM(CmiMemoryCheck());
-       DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
-}
-
-
-void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
-
-       double _startTime=CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());
-
-       int numLogs = recvdHeader->numberLogs;
-       char *msg = (char *)recvdHeader;
-       msg = &msg[sizeof(BufferedLocalLogHeader)];
-
-       DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
-       
-       for(int i=0;i<numLogs;i++){
-               LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
-               _localMessageAckHandler(ack);
-               
-               msg = &msg[sizeof(LocalMessageLogAck)]; 
-       }
-
-       //deal with piggy backed local logs
-       BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
-       //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
-       if(piggyHeader->numberLogs > 0){
-               _bufferedLocalMessageCopyHandler(piggyHeader,0);
-       }
-       
-       CmiFree(recvdHeader);
-       DEBUG_MEM(CmiMemoryCheck());
-//     traceUserBracketEvent(24,_startTime,CkWallTimer());
-}
+};
 
 bool fault_aware(CkObjID &recver){
        switch(recver.type){
@@ -1279,27 +974,70 @@ bool fault_aware(CkObjID &recver){
        }
 };
 
-int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
-       char recverString[100];
-       char senderString[100];
-       
+/* Preprocesses a received message */
+int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
+       DEBUG_NOW(char recverString[100]);
+       DEBUG_NOW(char senderString[100]);
        DEBUG_MEM(CmiMemoryCheck());
+       int flag;
+       bool ticketSuccess;
+
+       // getting the receiver object
        CkObjID recver = env->recver;
        if(!fault_aware(recver))
                return 1;
 
-
        Chare *obj = (Chare *)recver.getObject();
        *objPointer = obj;
        if(obj == NULL){
                int possiblePE = recver.guessPE();
                if(possiblePE != CkMyPe()){
-                       int totalSize = env->getTotalsize();                    
+                       int totalSize = env->getTotalsize();
                        CmiSyncSend(possiblePE,totalSize,(char *)env);
+                       
+                       DEBUG(printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
+               }else{
+                       // this is the case where a message is received and the object has not been initialized
+                       // we delayed the delivery of the message
+                       CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
+                       
+                       DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
                }
                return 0;
        }
 
+       // checking if message comes from an old incarnation
+       // message must be discarded
+       if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
+               CmiFree(env);
+               return 0;
+       }
+
+       DEBUG_MEM(CmiMemoryCheck());
+       DEBUG(printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
+
+       // getting a ticket for this message
+       ticketSuccess = _getTicket(env,&flag);
+
+       // we might be during recovery when this message arrives
+       if(!ticketSuccess){
+               
+               // adding the message to a delay queue
+               CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
+               DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
+
+               return 0;
+       }
+       
+       //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
+       //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
+       
+       if(flag == NEW_TICKET){
+               // storing determinant of message in data structure
+               addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
+       }
+
+       DEBUG_MEM(CmiMemoryCheck());
 
        double _startTime = CkWallTimer();
 //env->sender.updatePosition(env->getSrcPe());
@@ -1308,15 +1046,12 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
                DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
                // once we find a message that we can process we put back all the messages in the out of order queue
                // back into the main scheduler queue. 
-               if(env->sender.guessPE() == CkMyPe()){
-                       *logEntryPointer = env->localMlogEntry;
-               }
        DEBUG_MEM(CmiMemoryCheck());
                while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
                        void *qMsgPtr;
                        CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
                        envelope *qEnv = (envelope *)qMsgPtr;
-                       CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
+                       CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
        DEBUG_MEM(CmiMemoryCheck());
                }
 //             traceUserBracketEvent(25,_startTime,CkWallTimer());
@@ -1325,16 +1060,18 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
        DEBUG_MEM(CmiMemoryCheck());
                return 1;
        }
+
+       // checking if message has already been processed
+       // message must be discarded
        if(env->TN <= obj->mlogData->tProcessed){
-               //message already processed
-               DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
-//             traceUserBracketEvent(26,_startTime,CkWallTimer());
-       DEBUG_MEM(CmiMemoryCheck());
+               DEBUG(printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
+               
+               CmiFree(env);
                return 0;
        }
        //message that needs to be processed in the future
 
-//     DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
+       DEBUG(printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
        //the message cant be processed now put it back in the out of order message Q, 
        //It will be transferred to the main queue later
        CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
@@ -1347,7 +1084,7 @@ int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEn
 /**
  * @brief Updates a few variables once a message has been processed.
  */
-void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
+void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
        DEBUG(char senderString[100]);
        if(obj){
                if(sender.guessPE() == CkMyPe()){
@@ -1367,7 +1104,7 @@ void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *
        Helpers for the handlers and message logging methods
 ***/
 
-void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
+void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
 //     double _startTime = CkWallTimer();
        if(env->recver.type != TypeNodeGroup){
        //This repeats a step performed in skipCldEnq for messages sent to
@@ -1388,53 +1125,6 @@ void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
 
 int calledRetryTicketRequest=0;
 
-void retryTicketRequestTimer(void *_dummy,double _time){
-               calledRetryTicketRequest=0;
-               retryTicketRequest(_dummy,_time);
-}
-
-void retryTicketRequest(void *_dummy,double curWallTime){      
-       double start = CkWallTimer();
-       DEBUG_MEM(CmiMemoryCheck());
-       int length = CpvAccess(_delayedTicketRequests)->length();
-       for(int i=0;i<length;i++){
-               TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
-               if(ticketRequest){
-                       char senderString[100],recverString[100];
-                       DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
-                       DEBUG_MEM(CmiMemoryCheck());
-                       _processTicketRequest(ticketRequest);
-                 CmiFree(ticketRequest);
-                       DEBUG_MEM(CmiMemoryCheck());
-               }       
-       }       
-       for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
-               MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
-               ticketLogLocalMessage(entry);
-       }
-       int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
-//     int converse_qLength = CmiGetNonLocalLength();
-       
-//     DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
-
-/*     PingMsg pingMsg;
-       pingMsg.PE = CkMyPe();
-       CmiSetHandler(&pingMsg,_pingHandlerIdx);
-       if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
-               for(int i=0;i<CkNumPes();i++){
-                       if(i != CkMyPe()){
-                               CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
-                       }
-               }
-       }*/     
-       //TODO: change this back to 100
-       if(calledRetryTicketRequest == 0){
-               CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
-               calledRetryTicketRequest =1;
-       }
-       DEBUG_MEM(CmiMemoryCheck());
-}
-
 void _pingHandler(CkPingMsg *msg){
        printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
        CmiFree(msg);
@@ -1468,23 +1158,31 @@ void checkpointAlarm(void *_dummy,double curWallTime){
 };
 
 void _checkpointRequestHandler(CheckpointRequest *request){
-       startMlogCheckpoint(NULL,CmiWallTimer());       
+       startMlogCheckpoint(NULL,CmiWallTimer());
 }
 
-void startMlogCheckpoint(void *_dummy,double curWallTime){
+/**
+ * @brief Starts the checkpoint phase after migration.
+ */
+void startMlogCheckpoint(void *_dummy, double curWallTime){
        double _startTime = CkWallTimer();
+
+       // increasing the checkpoint counter
        checkpointCount++;
-/*     if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
-               kill(getpid(),SIGKILL);
-       }*/
-       if(CmiNumPes() < 256 || CmiMyPe() == 0){
+       
+#if DEBUG_CHECKPOINT
+       if(CmiMyPe() == 0){
                printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
        }
-       PUP::sizer psizer;
+#endif
+
        DEBUG_MEM(CmiMemoryCheck());
 
+       PUP::sizer psizer;
        psizer | checkpointCount;
-       
+       for(int i=0; i<CmiNumPes(); i++){
+               psizer | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(psizer);
        DEBUG_MEM(CmiMemoryCheck());
        CkPupGroupData(psizer,CmiTrue);
@@ -1500,13 +1198,14 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
        chkMsg->PE = CkMyPe();
        chkMsg->dataSize = dataSize;
-
        
        char *buf = &msg[sizeof(CheckPointDataMsg)];
-       PUP::toMem pBuf(buf);   
+       PUP::toMem pBuf(buf);
 
        pBuf | checkpointCount;
-       
+       for(int i=0; i<CmiNumPes(); i++){
+               pBuf | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(pBuf);
        CkPupGroupData(pBuf,CmiTrue);
        CkPupNodeGroupData(pBuf,CmiTrue);
@@ -1521,9 +1220,19 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        */
        processedTicketLog.removeAll();
        forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
-       if(CmiNumPes() < 256 || CmiMyPe() == 0){
+
+#if DEBUG_CHECKPOINT
+       if(CmiMyPe() == 0){
                printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
        }
+#endif
+
+#if COLLECT_STATS_MEMORY
+       CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
+       msgLogSize = 0;
+       bufferedDetsSize = 0;
+       storedDetsSize = 0;
+#endif
 
        if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
                lastCompletedAlarm = curWallTime;
@@ -1532,13 +1241,18 @@ void startMlogCheckpoint(void *_dummy,double curWallTime){
        traceUserBracketEvent(28,_startTime,CkWallTimer());
 };
 
-void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
-       CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
+/**
+ * @brief A chare adds the latest ticket number processed.
+ */
+void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
+       DEBUG(char objString[100]);
+
+       CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
        TProcessedLog logEntry;
        logEntry.recver = mlogData->objID;
        logEntry.tProcessed = mlogData->tProcessed;
        log->push_back(logEntry);
-       char objString[100];
+
        DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
 }
 
@@ -1547,14 +1261,14 @@ private:
        CkLocMgr *locMgr;
        PUP::er &p;
 public:
-               ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
-               void addLocation(CkLocation &loc) {
-                       CkArrayIndex idx=loc.getIndex();
-                       CkGroupID gID = locMgr->ckGetGroupID();
-                       p|gID;      // store loc mgr's GID as well for easier restore
-                       p|idx;
-                       p|loc;
-    }
+       ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
+       void addLocation(CkLocation &loc) {
+               CkArrayIndexMax idx=loc.getIndex();
+               CkGroupID gID = locMgr->ckGetGroupID();
+               p|gID;      // store loc mgr's GID as well for easier restore
+               p|idx;
+               p|loc;
+       }
 };
 
 /**
@@ -1584,7 +1298,7 @@ void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSki
        
                for (int i=0; i<numElements; i++) {
                        CkGroupID gID;
-                       CkArrayIndex idx;
+                       CkArrayIndexMax idx;
                        p|gID;
                p|idx;
                        int flag=0;
@@ -1629,13 +1343,11 @@ void writeCheckpointToDisk(int size,char *chkpt){
        unlink(fName);
 
        rename(fNameTemp,fName);
-       
 }
 
 //handler that receives the checkpoint from a processor
 //it stores it and acks it
 void _storeCheckpointHandler(char *msg){
-       
        double _startTime=CkWallTimer();
                
        CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
@@ -1656,13 +1368,6 @@ void _storeCheckpointHandler(char *msg){
        CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
        CpvAccess(_storedCheckpointData)->PE = sendingPE;
 
-#ifdef CHECKPOINT_DISK
-       //store the checkpoint on disk
-       writeCheckpointToDisk(chkMsg->dataSize,chkpt);
-       CpvAccess(_storedCheckpointData)->buf = NULL;
-       CmiFree(msg);
-#endif
-
        int count=0;
        for(int j=migratedNoticeList.size()-1;j>=0;j--){
                if(migratedNoticeList[j].fromPE == sendingPE){
@@ -1684,26 +1389,37 @@ void _storeCheckpointHandler(char *msg){
        CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
        CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
        
-       
-       
        traceUserBracketEvent(29,_startTime,CkWallTimer());
 };
 
 
+/**
+ * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.      
+ * @note The remove log request message looks like
+               |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
+ */
 void sendRemoveLogRequests(){
+#if SYNCHRONIZED_CHECKPOINT
+       CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
+#endif
        double _startTime = CkWallTimer();      
-       //send out the messages asking senders to throw away message logs below a certain ticket number
-       /*
-               The remove log request message looks like
-               |RemoveLogRequest||List of TProcessedLog|
-       */
-       int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
+
+       // computing total message size
+       int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
        char *requestMsg = (char *)CmiAlloc(totalSize);
+
+       // filling up the message
        RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
        request->PE = CkMyPe();
        request->numberObjects = processedTicketLog.size();
        char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
        memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
+       char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
+       int *numDeterminants = (int *)listDeterminants;
+       numDeterminants[0] = 0; 
+       listDeterminants = (char *)&numDeterminants[1];
+
+       // setting the handler in the message
        CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
        
        DEBUG_MEM(CmiMemoryCheck());
@@ -1716,6 +1432,7 @@ void sendRemoveLogRequests(){
        //TODO: clear ticketTable
        
        traceUserBracketEvent(30,_startTime,CkWallTimer());
+
        DEBUG_MEM(CmiMemoryCheck());
 }
 
@@ -1723,7 +1440,7 @@ void sendRemoveLogRequests(){
 void _checkpointAckHandler(CheckPointAck *ackMsg){
        DEBUG_MEM(CmiMemoryCheck());
        unAckedCheckpoint=0;
-       DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
+       DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
        DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
        if(onGoingLoadBalancing){
                onGoingLoadBalancing = 0;
@@ -1735,20 +1452,79 @@ void _checkpointAckHandler(CheckPointAck *ackMsg){
        
 };
 
+
+/**
+ * @brief Inserts all the determinants into a hash table.
+ */
+inline void populateDeterminantTable(char *data){
+       DEBUG(char recverString[100]);
+       DEBUG(char senderString[100]);
+       int numDets, *numDetsPtr;
+       Determinant *detList;
+       CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
+       SNToTicket *tickets;
+       Ticket ticket;
+
+       RemoveLogRequest *request = (RemoveLogRequest *)data;
+       TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
+       
+       numDetsPtr = (int *)&list[request->numberObjects];
+       numDets = numDetsPtr[0];
+       detList = (Determinant *)&numDetsPtr[1];
+
+       // inserting determinants into hashtable
+       for(int i=0; i<numDets; i++){
+               table = detTable.get(detList[i].sender);
+               if(table == NULL){
+                       table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
+                       detTable.put(detList[i].sender) = table;
+               }
+               tickets = table->get(detList[i].receiver);
+               if(tickets == NULL){
+                       tickets = new SNToTicket();
+                       table->put(detList[i].receiver) = tickets; 
+               }
+               ticket.TN = detList[i].TN;
+               tickets->put(detList[i].SN) = ticket;
+       }
+
+       DEBUG_MEM(CmiMemoryCheck());    
+
+}
+
 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
+       int total;
+       DEBUG(char nameString[100]);
        DEBUG_MEM(CmiMemoryCheck());
        CmiMemoryCheck();
        char *data = (char *)_data;
        RemoveLogRequest *request = (RemoveLogRequest *)data;
        TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
        CkQ<MlogEntry *> *mlog = mlogData->getMlog();
+       CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
+       SNToTicket *tickets;
+       MCount TN;
 
        int count=0;
-       for(int i=0;i<mlog->length();i++){
+       total = mlog->length();
+       for(int i=0; i<total; i++){
                MlogEntry *logEntry = mlog->deq();
+
+               // looking message in determinant table
+               table = detTable.get(logEntry->env->sender);
+               if(table != NULL){
+                       tickets = table->get(logEntry->env->recver);
+                       if(tickets != NULL){
+                               TN = tickets->get(logEntry->env->SN).TN;
+                               if(TN != 0){
+                                       logEntry->env->TN = TN;
+                               }
+                       }
+               }
+       
                int match=0;
                for(int j=0;j<request->numberObjects;j++){
-                       if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
+                       if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
                                //this log Entry should be removed
                                match = 1;
                                break;
@@ -1764,56 +1540,34 @@ void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
                }
        }
        if(count > 0){
-               char nameString[100];
                DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
        }
        DEBUG_MEM(CmiMemoryCheck());
        CmiMemoryCheck();
 }
 
+/**
+ * @brief Removes messages in the log according to the received ticket numbers
+ */
 void _removeProcessedLogHandler(char *requestMsg){
        double start = CkWallTimer();
+
+       // building map for determinants
+       populateDeterminantTable(requestMsg);
+
+       // removing processed messages from the message log
        forAllCharesDo(removeProcessedLogs,requestMsg);
+
+       // @todo: clean determinant table 
+       
        // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
        RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
        DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
        //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
-       if(request->PE == getCheckPointPE()){
-               TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
-               CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
-               CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
-               int count=0;
-/*             DEBUG(for(int j=0;j<request->numberObjects;j++){)
-               DEBUG(char nameString[100];)
-                       DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
-               DEBUG(})*/
-               for(int i=0;i<localQ->length();i++){
-                       LocalMessageLog localLogEntry = (*localQ)[i];
-                       if(!fault_aware(localLogEntry.recver)){
-                               CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
-                       }
-                       bool keep = true;
-                       for(int j=0;j<request->numberObjects;j++){                              
-                               if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
-                                       keep = false;
-                                       break;
-                               }
-                       }       
-                       if(keep){
-                               tempQ->enq(localLogEntry);
-                       }else{
-                               count++;
-                       }
-               }
-               delete localQ;
-               CpvAccess(_localMessageLog) = tempQ;
-               DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
-       }
-
        /*
                Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
        */
-       CmiMemoryCheck();
+       DEBUG_MEM(CmiMemoryCheck());
        clearUpMigratedRetainedLists(request->PE);
        
        traceUserBracketEvent(20,start,CkWallTimer());
@@ -1866,6 +1620,7 @@ void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
 
        // setting the restart flag
        _restartFlag = 1;
+       _numRestartResponses = 0;
 
        // if we are using team-based message logging, all members of the group have to be restarted
        if(teamSize > 1){
@@ -1898,6 +1653,7 @@ void _restartHandler(RestartRequest *restartMsg){
 
     // setting the restart flag
        _restartFlag = 1;
+       _numRestartResponses = 0;
 
        // flushing all buffers
        //TEST END
@@ -1986,12 +1742,20 @@ void _recvRestartCheckpointHandler(char *_restartData){
        
        PUP::fromMem pBuf(buf);
        pBuf | checkpointCount;
+       for(int i=0; i<CmiNumPes(); i++){
+               pBuf | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(pBuf);
        CkPupGroupData(pBuf,CmiFalse);
        CkPupNodeGroupData(pBuf,CmiFalse);
        pupArrayElementsSkip(pBuf,CmiFalse,NULL);
        CkAssert(pBuf.size() == restartData->checkPointSize);
        printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
+
+       // increasing the incarnation number of all the team members
+       for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
+               CpvAccess(_incarnation)[i]++;
+       }
        
        // turning off the team recovery flag
        forAllCharesDo(unsetTeamRecovery,NULL);
@@ -1999,26 +1763,6 @@ void _recvRestartCheckpointHandler(char *_restartData){
        // initializing a few variables for handling local messages
        forAllCharesDo(initializeRestart,NULL);
        
-       //store the restored local message log in a vector
-       buf = &buf[restartData->checkPointSize];        
-       for(int i=0;i<restartData->numLocalMessages;i++){
-               LocalMessageLog logEntry;
-               memcpy(&logEntry,buf,sizeof(LocalMessageLog));
-               
-               Chare *recverObj = (Chare *)logEntry.recver.getObject();
-               if(recverObj!=NULL){
-                       recverObj->mlogData->addToRestoredLocalQ(&logEntry);
-                       recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
-                       char senderString[100];
-                       char recverString[100];
-                       DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
-               }else{
-//                     DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
-               }
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-
-       forAllCharesDo(sortRestoredLocalMsgLog,NULL);
        CmiFree(_restartData);  
 
        /*HERE _initDone();
@@ -2056,7 +1800,7 @@ void _recvRestartCheckpointHandler(char *_restartData){
        
 
        /* test for parallel restart migrate away object**/
-//     if(parallelRestart){
+//     if(fastRecovery){
 //             distributeRestartedObjects();
 //             printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
 //     }
@@ -2092,22 +1836,6 @@ void CkMlogRestartLocal(){
     CkMlogRestart(NULL,NULL);
 };
 
-
-void readCheckpointFromDisk(int size,char *buf){
-       char fName[100];
-       sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
-
-       int fd = open(fName,O_RDONLY);
-       int count=0;
-       while(count < size){
-               count += read(fd,&buf[count],size-count);
-       }
-       close(fd);
-       
-};
-
-
-
 /**
  * Gets the stored checkpoint for its buddy processor.
  */
@@ -2194,7 +1922,7 @@ void _verifyAckHandler(VerifyAckMsg *verifyReply){
  */
 void sendCheckpointData(int mode){     
        RestartRequest *restartMsg = storedRequest;
-       StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
+       StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
        int numMigratedAwayElements = migratedNoticeList.size();
        if(migratedNoticeList.size() != 0){
                        printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
@@ -2204,11 +1932,9 @@ void sendCheckpointData(int mode){
        
        int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
        
-       DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
+       DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
        CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
        
-       CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
-       totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
        totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
        
        char *msg = (char *)CmiAlloc(totalSize);
@@ -2237,24 +1963,9 @@ void sendCheckpointData(int mode){
        }
        
 
-#ifdef CHECKPOINT_DISK
-       readCheckpointFromDisk(storedChkpt->bufSize,buf);
-#else  
        memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
-#endif
        buf = &buf[storedChkpt->bufSize];
 
-
-       //store localmessage Log
-       dataMsg->numLocalMessages = localMsgQ->length();
-       for(int i=0;i<localMsgQ->length();i++){
-               if(!fault_aware(((*localMsgQ)[i]).recver )){
-                       CmiAbort("Non fault aware localMsgQ");
-               }
-               memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-       
        if(mode == MLOG_RESTARTED){
                CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
                CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
@@ -2278,6 +1989,7 @@ void createObjIDList(void *data,ChareMlogData *mlogData){
        list->push_back(entry);
        DEBUG_TEAM(char objString[100]);
        DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
+       DEBUG_RECOVERY(printLog(&entry));
 }
 
 
@@ -2310,39 +2022,22 @@ void _recvCheckpointHandler(char *_restartData){
        PUP::fromMem pBuf(buf);
 
        pBuf | checkpointCount;
-
+       for(int i=0; i<CmiNumPes(); i++){
+               pBuf | CpvAccess(_incarnation)[i];
+       }
        CkPupROData(pBuf);
        CkPupGroupData(pBuf,CmiTrue);
        CkPupNodeGroupData(pBuf,CmiTrue);
        pupArrayElementsSkip(pBuf,CmiTrue,NULL);
-       CkAssert(pBuf.size() == restartData->checkPointSize);
-       printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
-       
-       forAllCharesDo(initializeRestart,NULL);
-       
-       //store the restored local message log in a vector
-       buf = &buf[restartData->checkPointSize];        
-       for(int i=0;i<restartData->numLocalMessages;i++){
-               LocalMessageLog logEntry;
-               memcpy(&logEntry,buf,sizeof(LocalMessageLog));
-               
-               Chare *recverObj = (Chare *)logEntry.recver.getObject();
-               if(recverObj!=NULL){
-                       recverObj->mlogData->addToRestoredLocalQ(&logEntry);
-                       recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
-                       char senderString[100];
-                       char recverString[100];
-                       DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
-               }else{
-//                     DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
-               }
-               buf = &buf[sizeof(LocalMessageLog)];
-       }
-
-       forAllCharesDo(sortRestoredLocalMsgLog,NULL);
+       CkAssert(pBuf.size() == restartData->checkPointSize);
+       printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
 
-       CmiFree(_restartData);
+       // increases the incarnation number
+       CpvAccess(_incarnation)[CmiMyPe()]++;
+       
+       forAllCharesDo(initializeRestart,NULL);
        
+       CmiFree(_restartData);
        
        _initDone();
 
@@ -2388,7 +2083,7 @@ void _updateHomeAckHandler(RestartRequest *updateHomeAck){
        memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
 
        /* test for parallel restart migrate away object**/
-       if(parallelRestart){
+       if(fastRecovery){
                distributeRestartedObjects();
                printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
        }
@@ -2402,16 +2097,17 @@ void _updateHomeAckHandler(RestartRequest *updateHomeAck){
        CpvAccess(_currentObj) = lb;
        lb->ReceiveDummyMigration(restartDecisionNumber);
 
-       sleep(10);
+//HERE sleep(10);
        
        CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
        for(int i=0;i<CkNumPes();i++){
                if(i != CkMyPe()){
                        CmiSyncSend(i,totalSize,resendMsg);
-               }       
+               }
        }
        _resendMessagesHandler(resendMsg);
        CmiFree(resendMsg);
+
 };
 
 /**
@@ -2421,8 +2117,6 @@ void initializeRestart(void *data, ChareMlogData *mlogData){
        mlogData->resendReplyRecvd = 0;
        mlogData->receivedTNs = new CkVec<MCount>;
        mlogData->restartFlag = 1;
-       mlogData->restoredLocalMsgLog.removeAll();
-       mlogData->mapTable.empty();
 };
 
 /**
@@ -2436,13 +2130,13 @@ void updateHomePE(void *data,ChareMlogData *mlogData){
        if(mlogData->objID.type == TypeArray){
                //it is an array element
                CkGroupID myGID = mlogData->objID.data.array.id;
-               CkArrayIndex myIdx =  mlogData->objID.data.array.idx;
+               CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
                CkArrayID aid(mlogData->objID.data.array.id);           
                //check if the restarted processor is the home processor for this object
                CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
                if(locMgr->homePe(myIdx) == PE){
-                       DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
-                       DEBUGRESTART(myIdx.print());
+                       DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
+                       DEBUG_RESTART(myIdx.print());
                        informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
                }
        }
@@ -2482,6 +2176,7 @@ void _updateHomeRequestHandler(RestartRequest *updateRequest){
  * @brief Fills up the ticket vector for each chare.
  */
 void fillTicketForChare(void *data, ChareMlogData *mlogData){
+       DEBUG(char name[100]);
        ResendData *resendData = (ResendData *)data;
        int PE = resendData->PE; //restarted PE
        int count=0;
@@ -2500,15 +2195,11 @@ void fillTicketForChare(void *data, ChareMlogData *mlogData){
                // traversing the resendData structure to add ticket numbers
                for(int j=0;j<resendData->numberObjects;j++){
                        if((*objID) == (resendData->listObjects)[j].recver){
-char name[100];
                                snToTicket = *(SNToTicket **)objp;
 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
                                for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
                                        ticket = snToTicket->get(snIndex);      
-                                       if(ticket.TN > resendData->maxTickets[j]){
-                                               resendData->maxTickets[j] = ticket.TN;
-                                       }
-                                       if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
+                               if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
                                                //store the TNs that have been since the recver last checkpointed
                                                resendData->ticketVecs[j].push_back(ticket.TN);
                                        }
@@ -2538,95 +2229,94 @@ void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
        mlogData->teamRecoveryFlag = 0;
 }
 
-//the data argument is of type ResendData which contains the 
-//array of objects on  the restartedProcessor
-//this method resends the messages stored in this chare's message log 
-//to the restarted processor. It also accumulates the maximum TN
-//for all the objects on the restarted processor
-void resendMessageForChare(void *data,ChareMlogData *mlogData){
-       char nameString[100];
+/**
+ * Prints a processed log.
+ */
+void printLog(TProcessedLog *log){
+       char recverString[100];
+       CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
+}
+
+/**
+ * Prints information about a message.
+ */
+void printMsg(envelope *env, const char* par){
+       char senderString[100];
+       char recverString[100];
+       CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
+}
+
+/**
+ * Prints information about a determinant.
+ */
+void printDet(Determinant *det, const char* par){
+       char senderString[100];
+       char recverString[100];
+       CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
+}
+
+/**
+ * @brief Resends all the logged messages to a particular chare list.
+ * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
+ * @param mlogData a particular chare living in this processor.
+ */
+void resendMessageForChare(void *data, ChareMlogData *mlogData){
+       DEBUG_RESTART(char nameString[100]);
+       DEBUG_RESTART(char recverString[100]);
+       DEBUG_RESTART(char senderString[100]);
+
        ResendData *resendData = (ResendData *)data;
        int PE = resendData->PE; //restarted PE
-       DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
        int count=0;
        int ticketRequests=0;
        CkQ<MlogEntry *> *log = mlogData->getMlog();
-       
+
+       DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
+
+       // traversing the message log to see if we must resend a message        
        for(int i=0;i<log->length();i++){
                MlogEntry *logEntry = (*log)[i];
                
-               // if we sent out the logs of a local message to buddy and he crashed
-               //before acking
+               // if we sent out the logs of a local message to buddy and it crashed
+               //before acknowledging 
                envelope *env = logEntry->env;
                if(env == NULL){
                        continue;
                }
-               if(logEntry->unackedLocal){
-                       char recverString[100];
-                       DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
-                       sendLocalMessageCopy(logEntry);
-               }
-               //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
-               if(env->TN <= 0){
-                       //ticket not yet replied send it out again
-                       sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,0,1);
-               }
-               
+       
+               // resend if type is not invalid        
                if(env->recver.type != TypeInvalid){
-                       int flag = 0;//marks if any of the restarted objects matched this log entry
                        for(int j=0;j<resendData->numberObjects;j++){
                                if(env->recver == (resendData->listObjects)[j].recver){
-                                       flag = 1;
-                                       //message has a valid TN
-                                       if(env->TN > 0){
-                                               //store maxTicket
-                                               if(env->TN > resendData->maxTickets[j]){
-                                                       resendData->maxTickets[j] = env->TN;
+                                       if(PE != CkMyPe()){
+                                               DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
+                                               if(env->recver.type == TypeNodeGroup){
+                                                       CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
+                                               }else{
+                                                       CmiSetHandler(env,CmiGetXHandler(env));
+                                                       CmiSyncSend(PE,env->getTotalsize(),(char *)env);
                                                }
-                                               //if the TN for this entry is more than the TN processed, send the message out
-                                               if(env->TN >= (resendData->listObjects)[j].tProcessed){
-                                                       //store the TNs that have been since the recver last checkpointed
-                                                       resendData->ticketVecs[j].push_back(env->TN);
-                                                       
-                                                       if(PE != CkMyPe()){
-                                                               if(env->recver.type == TypeNodeGroup){
-                                                                       CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
-                                                               }else{
-                                                                       CmiSetHandler(env,CmiGetXHandler(env));
-                                                                       CmiSyncSend(PE,env->getTotalsize(),(char *)env);
-                                                               }
-                                                       }else{
-                                                               envelope *copyEnv = copyEnvelope(env);
-                                                               CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
-                                                       }
-                                                       char senderString[100];
-                                                       DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
-                                                       count++;
-                                               }       
                                        }else{
-/*                                     //the message didnt get a ticket the last time and needs to start with a ticket request
-                                               DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
-                                               //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
-                                               CkAssert(logEntry->destPE != CkMyPe());
-                                               
-                                               sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
-                                               
-                                               ticketRequests++;*/
+                                               envelope *copyEnv = copyEnvelope(env);
+                                               CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
                                        }
+                                       DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
+                                       count++;
                                }
                        }//end of for loop of objects
                        
                }       
        }
-       DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
+       DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
 }
 
 /**
- * Resends the messages since the last checkpoint to the list of objects included in the 
- * request.
+ * Resends messages since last checkpoint to the list of objects included in the 
+ * request. It also sends stored remote determinants to the particular failed PE.
  */
 void _resendMessagesHandler(char *msg){
        ResendData d;
+       CkVec<Determinant> *detVec;
        ResendRequest *resendReq = (ResendRequest *)msg;
 
        // building the reply message
@@ -2634,11 +2324,8 @@ void _resendMessagesHandler(char *msg){
        d.numberObjects = resendReq->numberObjects;
        d.PE = resendReq->PE;
        d.listObjects = (TProcessedLog *)listObjects;
-       d.maxTickets = new MCount[d.numberObjects];
        d.ticketVecs = new CkVec<MCount>[d.numberObjects];
-       for(int i=0;i<d.numberObjects;i++){
-               d.maxTickets[i] = 0;
-       }
+       detVec = new CkVec<Determinant>[d.numberObjects];
 
        //Check if any of the retained objects need to be recreated
        //If they have not been recreated on the restarted processor
@@ -2655,7 +2342,7 @@ void _resendMessagesHandler(char *msg){
                                CkArrayID aid(d.listObjects[j].recver.data.array.id);           
                                CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
                                if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
-                                       if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx){
+                                       if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asChild()){
                                                recreate = 0;
                                                break;
                                        }
@@ -2699,6 +2386,34 @@ void _resendMessagesHandler(char *msg){
        else
                forAllCharesDo(resendMessageForChare,&d);
 
+       // adding the stored determinants to the resendReplyMsg
+       // traversing all the stored determinants
+       CkVec<Determinant> *vec;
+       for(int i=0; i<d.numberObjects; i++){
+               vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
+               if(vec != NULL){
+                       for(int j=0; j<vec->size(); j++){
+
+                               // only relevant determinants are to be sent
+                               if((*vec)[j].TN > d.listObjects[i].tProcessed){
+
+                                       // adding the ticket in the ticket vector
+                                       d.ticketVecs[i].push_back((*vec)[j].TN);
+
+                                       // adding the determinant in the determinant vector
+                                       detVec[i].push_back((*vec)[j]);
+
+                                       DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
+                               }
+                       }
+               }
+       }
+
+       int totalDetStored = 0;
+       for(int i=0;i<d.numberObjects;i++){
+               totalDetStored += detVec[i].size();
+       }
+
        //send back the maximum ticket number for a message sent to each object on the 
        //restarted processor
        //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
@@ -2708,7 +2423,7 @@ void _resendMessagesHandler(char *msg){
                totalTNStored += d.ticketVecs[i].size();
        }
        
-       int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
+       int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
        char *resendReplyMsg = (char *)CmiAlloc(totalSize);
        
        ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
@@ -2728,7 +2443,16 @@ void _resendMessagesHandler(char *msg){
                ticketList = &ticketList[sizeof(int)];
                memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
                ticketList = &ticketList[sizeof(MCount)*vecsize];
-       }       
+       }
+
+       // adding the stored remote determinants to the message
+       for(int i=0;i<d.numberObjects;i++){
+               int vecsize = detVec[i].size();
+               memcpy(ticketList,&vecsize,sizeof(int));
+               ticketList = &ticketList[sizeof(int)];
+               memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
+               ticketList = &ticketList[sizeof(Determinant)*vecsize];
+       }
 
        CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
        CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
@@ -2745,8 +2469,11 @@ void _resendMessagesHandler(char *msg){
        
        verifyAckRequestsUnacked=0;*/
        
-       delete [] d.maxTickets;
        delete [] d.ticketVecs;
+       delete [] detVec;
+
+       DEBUG_MEM(CmiMemoryCheck());
+
        if(resendReq->PE != CkMyPe()){
                CmiFree(msg);
        }       
@@ -2758,33 +2485,48 @@ void sortVec(CkVec<MCount> *TNvec);
 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
 
 /**
- * @brief Receives the tickets assigned to message to other objects.
+ * @brief Processes the messages in the delayed remote message queue
  */
-void _resendReplyHandler(char *msg){   
-       /**
-               need to rewrite this method to deal with parallel restart
-       */
-       ResendRequest *resendReply = (ResendRequest *)msg;
-       CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
+void processDelayedRemoteMsgQueue(){
+       DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
+       
+       while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
+               void *qMsgPtr;
+               CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
+               envelope *qEnv = (envelope *)qMsgPtr;
+               DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
+               CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
+               DEBUG_MEM(CmiMemoryCheck());
+       }
+
+}
 
+/**
+ * @brief Receives determinants stored on remote nodes.
+ * Message format: |Header|ObjID list|TN list|Determinant list|
+ * TN list = |number of TNs|list of TNs|...|
+ */
+void _resendReplyHandler(char *msg){
+       ResendRequest *resendReply = (ResendRequest *)msg;
+       CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
        char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
        
-//     DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
+//     DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
        DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
-       for(int i =0; i< resendReply->numberObjects;i++){       
+       for(int i =0; i< resendReply->numberObjects;i++){
                Chare *obj = (Chare *)listObjects[i].getObject();
                
                int vecsize;
                memcpy(&vecsize,listTickets,sizeof(int));
                listTickets = &listTickets[sizeof(int)];
-               MCount *listTNs = (MCount *)listTickets;        
+               MCount *listTNs = (MCount *)listTickets;
                listTickets = &listTickets[vecsize*sizeof(MCount)];
-               
+       
                if(obj != NULL){
                        //the object was restarted on the processor on which it existed
                        processReceivedTN(obj,vecsize,listTNs);
                }else{
-               //pack up objID vecsize and listTNs and send it to the correct processor
+                       //pack up objID vecsize and listTNs and send it to the correct processor
                        int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
                        char *TNMsg = (char *)CmiAlloc(totalSize);
                        ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
@@ -2792,27 +2534,102 @@ void _resendReplyHandler(char *msg){
                        receivedTNData->numTNs = vecsize;
                        char *tnList = &TNMsg[sizeof(ReceivedTNData)];
                        memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
-
                        CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
                        CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
-               }       
+               }
+
+       }
+
+       // traversing all the retrieved determinants
+       for(int i = 0; i < resendReply->numberObjects; i++){
+               Chare *obj = (Chare *)listObjects[i].getObject();
+               
+               int vecsize;
+               memcpy(&vecsize,listTickets,sizeof(int));
+               listTickets = &listTickets[sizeof(int)];
+               Determinant *listDets = (Determinant *)listTickets;     
+               listTickets = &listTickets[vecsize*sizeof(Determinant)];
+       
+               if(obj != NULL){
+                       //the object was restarted on the processor on which it existed
+                       processReceivedDet(obj,vecsize,listDets);
+               } else {
+                       // pack the determinants and ship them to the other processor
+                       // pack up objID vecsize and listDets and send it to the correct processor
+                       int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
+                       char *detMsg = (char *)CmiAlloc(totalSize);
+                       ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
+                       receivedDetData->recver = listObjects[i];
+                       receivedDetData->numDets = vecsize;
+                       char *detList = &detMsg[sizeof(ReceivedDetData)];
+                       memcpy(detList,listDets,sizeof(Determinant)*vecsize);
+                       CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
+                       CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
+               }
+
+       }
+
+       // checking if the restart is over
+       _numRestartResponses++;
+       if(_numRestartResponses == CkNumPes()){
+               _numRestartResponses = 0;
+               processDelayedRemoteMsgQueue();
        }
+
 };
 
+/**
+ * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
+ */
+void _receivedDetDataHandler(ReceivedDetData *msg){
+       DEBUG_NOW(char objName[100]);
+       Chare *obj = (Chare *) msg->recver.getObject();
+       if(obj){                
+               char *_msg = (char *)msg;
+               DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
+               Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
+               processReceivedDet(obj,msg->numDets,listDets);
+               CmiFree(msg);
+       }else{
+               int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
+               CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
+       }
+}
+
+/**
+ * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
+ */
 void _receivedTNDataHandler(ReceivedTNData *msg){
-       char objName[100];
+       DEBUG_NOW(char objName[100]);
        Chare *obj = (Chare *) msg->recver.getObject();
        if(obj){                
                char *_msg = (char *)msg;
-               DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
+               DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
                MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
                processReceivedTN(obj,msg->numTNs,listTNs);
+               CmiFree(msg);
        }else{
                int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
                CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
        }
 };
 
+/**
+ * @brief Processes the received list of determinants from a particular PE.
+ */
+void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
+       Determinant *det;
+
+       // traversing the whole list of determinants
+       for(int i=0; i<listSize; i++){
+               det = &listDets[i];
+               obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
+               DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
+       }
+       
+       DEBUG_MEM(CmiMemoryCheck());
+}
+       
 /**
  * @brief Processes the received list of tickets from a particular PE.
  */
@@ -2889,8 +2706,10 @@ void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
                obj->mlogData->receivedTNs = NULL;
                obj->mlogData->restartFlag = 0;
 
-               DEBUGRESTART(char objString[100]);
-               DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
+               processDelayedRemoteMsgQueue();
+
+               DEBUG_RESTART(char objString[100]);
+               DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
        }
 
 }
@@ -2964,74 +2783,90 @@ int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
        Method to do parallel restart. Distribute some of the array elements to other processors.
        The problem is that we cant use to charm entry methods to do migration as it will get
        stuck in the protocol that is going to restart
+       Note: in order to avoid interference between the objects being recovered, the current PE
+    will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
+    objects end up stepping into each other's shoes (interference).
 */
 
 class ElementDistributor: public CkLocIterator{
        CkLocMgr *locMgr;
        int *targetPE;
+
        void pupLocation(CkLocation &loc,PUP::er &p){
-               CkArrayIndex idx=loc.getIndex();
+               CkArrayIndexMax idx=loc.getIndex();
                CkGroupID gID = locMgr->ckGetGroupID();
                p|gID;      // store loc mgr's GID as well for easier restore
                p|idx;
                p|loc;
        };
-       public:
-               ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
-               void addLocation(CkLocation &loc){
-                       if(*targetPE == CkMyPe()){
-                               *targetPE = (*targetPE +1)%CkNumPes();                          
-                               return;
-                       }
+public:
+       ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
+
+       void addLocation(CkLocation &loc){
+
+               // leaving object on this PE
+               if(*targetPE == CkMyPe()){
+                       *targetPE = (*targetPE +1)%CkNumPes();
+                       return;
+               }
                        
-                       CkArrayIndex idx=loc.getIndex();
-                       CkLocRec_local *rec = loc.getLocalRecord();
+               CkArrayIndexMax idx = loc.getIndex();
+               CkLocRec_local *rec = loc.getLocalRecord();
                        
-                       CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
-                       idx.print();
+               CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
+               idx.print();
                        
-
-                       //TODO: an element that is being moved should leave some trace behind so that
-                       // the arraybroadcaster can forward messages to it
+               //TODO: an element that is being moved should leave some trace behind so that
+               // the arraybroadcaster can forward messages to it
                        
-                       //pack up this location and send it across
-                       PUP::sizer psizer;
-                       pupLocation(loc,psizer);
-                       int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
-                       char *msg = (char *)CmiAlloc(totalSize);
-                       char *buf = &msg[CmiMsgHeaderSizeBytes];
-                       PUP::toMem pmem(buf);
-                       pmem.becomeDeleting();
-                       pupLocation(loc,pmem);
+               //pack up this location and send it across
+               PUP::sizer psizer;
+               pupLocation(loc,psizer);
+               int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
+               char *msg = (char *)CmiAlloc(totalSize);
+               char *buf = &msg[CmiMsgHeaderSizeBytes];
+               PUP::toMem pmem(buf);
+               pmem.becomeDeleting();
+               pupLocation(loc,pmem);
                        
-                       locMgr->setDuringMigration(CmiTrue);                    
-                       delete rec;
-                       locMgr->setDuringMigration(CmiFalse);                   
-                       locMgr->inform(idx,*targetPE);
+               locMgr->setDuringMigration(CmiTrue);
+               delete rec;
+               locMgr->setDuringMigration(CmiFalse);
+               locMgr->inform(idx,*targetPE);
 
-                       CmiSetHandler(msg,_distributedLocationHandlerIdx);
-                       CmiSyncSendAndFree(*targetPE,totalSize,msg);
+               CmiSetHandler(msg,_distributedLocationHandlerIdx);
+               CmiSyncSendAndFree(*targetPE,totalSize,msg);
 
-                       CmiAssert(locMgr->lastKnown(idx) == *targetPE);
-                       //decide on the target processor for the next object
-                       *targetPE = (*targetPE +1)%CkNumPes();
+               CmiAssert(locMgr->lastKnown(idx) == *targetPE);
+
+               //decide on the target processor for the next object
+               *targetPE = *targetPE + 1;
+               if(*targetPE > (CkMyPe() + parallelRecovery)){
+                       *targetPE = CkMyPe() + 1;
                }
-               
+       }
+
 };
 
+/**
+ * Distributes objects to accelerate recovery after a failure.
+ */
 void distributeRestartedObjects(){
        int numGroups = CkpvAccess(_groupIDTable)->size();      
        int i;
-       int targetPE=CkMyPe();
+       int targetPE=CkMyPe()+1;
        CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
 };
 
+/**
+ * Handler to update information about an object just received.
+ */
 void _distributedLocationHandler(char *receivedMsg){
        printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
        char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
        PUP::fromMem pmem(buf);
        CkGroupID gID;
-       CkArrayIndex idx;
+       CkArrayIndexMax idx;
        pmem |gID;
        pmem |idx;
        CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
@@ -3053,14 +2888,12 @@ void _distributedLocationHandler(char *receivedMsg){
                        eltList[i]->ResumeFromSync();
                }
        }
-       
-       
 }
 
 
 /** this method is used to send messages to a restarted processor to tell
  * it that a particular expected object is not going to get to it */
-void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndex &idx,int locationPE){
+void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
        DummyMigrationMsg buf;
        buf.flag = MLOG_OBJECT;
        buf.lbID = lbID;
@@ -3096,12 +2929,12 @@ void sendDummyMigrationCounts(int *dummyCounts){
 void _dummyMigrationHandler(DummyMigrationMsg *msg){
        CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
        if(msg->flag == MLOG_OBJECT){
-               DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
+               DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
                LDObjHandle h;
                lb->Migrated(h,1);
        }
        if(msg->flag == MLOG_COUNT){
-               DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
+               DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
                msg->count -= verifyAckedRequests;
                for(int i=0;i<msg->count;i++){
                        LDObjHandle h;
@@ -3144,7 +2977,7 @@ public:
 /**
  * Map function pointed by fnPointer over all the chares living in this processor.
  */
-void forAllCharesDo(MlogFn fnPointer,void *data){
+void forAllCharesDo(MlogFn fnPointer, void *data){
        int numGroups = CkpvAccess(_groupIDTable)->size();
        for(int i=0;i<numGroups;i++){
                Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
@@ -3164,6 +2997,11 @@ void forAllCharesDo(MlogFn fnPointer,void *data){
  Load Balancing
 ******************************************************************/
 
+/**
+ * This is the first time Converse is called after AtSync method has been called by every local object.
+ * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
+ * message logging, we can take advantage of this situation and garbage collect at this point.
+ */
 void initMlogLBStep(CkGroupID gid){
        DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
        countLBMigratedAway = 0;
@@ -3175,6 +3013,9 @@ void initMlogLBStep(CkGroupID gid){
                CmiAssert(globalLBID.idx == gid.idx);
        }
        globalLBID = gid;
+#if SYNCHRONIZED_CHECKPOINT
+       garbageCollectMlog();
+#endif
 }
 
 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
@@ -3202,11 +3043,11 @@ void finishedCheckpointLoadBalancing(){
 };
 
 
-void sendMlogLocation(int targetPE,envelope *env){
+void sendMlogLocation(int targetPE, envelope *env){
+#if !SYNCHRONIZED_CHECKPOINT
        void *_msg = EnvToUsr(env);
        CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
 
-
        int existing = 0;
        //if this object is already in the retainedobjectlust destined for this
        //processor it should not be sent
@@ -3224,7 +3065,6 @@ void sendMlogLocation(int targetPE,envelope *env){
                return;
        }
        
-       
        countLBToMigrate++;
        
        MigrationNotice migMsg;
@@ -3251,7 +3091,7 @@ void sendMlogLocation(int targetPE,envelope *env){
        CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
        
        DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
-
+#endif
 }
 
 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
@@ -3309,6 +3149,9 @@ void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
        }*/
 }
 
+/**
+ * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
+ */
 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
        if(checkpointBarrierCount == CmiNumPes()){
                CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
@@ -3318,6 +3161,9 @@ inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
        }
 }
 
+/**
+ * @brief Processor 0 receives a contribution from every other processor after checkpoint.
+ */ 
 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
        DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
        if(msg->checkpointCount == checkpointCount){
@@ -3332,23 +3178,80 @@ void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
                        CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
                }
        }
+
+       // deleting the received message
        CmiFree(msg);
 }
 
 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
        DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
        DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
+
+#if !SYNCHRONIZED_CHECKPOINT
+       // sending a notice to all senders to remove message logs
        sendRemoveLogRequests();
+#endif
+
+       // resuming LB function pointer
        (*resumeLbFnPtr)(centralLb);
+
+       // deleting message
        CmiFree(msg);
 }
 
+/**
+ * @brief Function to remove all messages in the message log of a particular chare.
+ */
+void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
+       int total;
+       MlogEntry *logEntry;
+       CkQ<MlogEntry *> *mlog = mlogData->getMlog();
+
+       // traversing the whole message log and removing all elements
+       total = mlog->length();
+       for(int i=0; i<total; i++){
+               logEntry = mlog->deq();
+               delete logEntry;
+       }
+
+}
+
+/**
+ * @brief Garbage collects the message log and other data structures.
+ * In case of synchronized checkpoint, we use an optimization to avoid causal message logging protocol
+ * to communicate all determinants to the rest of the processors.
+ */
+void garbageCollectMlog(){
+       CkHashtableIterator *iterator;
+       CkVec<Determinant> *detArray;
+
+       DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
+
+       // cleaning up the buffered determinants, since they belong to a previous checkpoint period
+       _indexBufferedDets = 0;
+       _numBufferedDets = 0;
+       _phaseBufferedDets++;
+
+       // cleaning up remote determinants, since they belong to a previous checkpoint period
+       iterator = CpvAccess(_remoteDets)->iterator();
+       while(iterator->hasNext()){
+               detArray = *(CkVec<Determinant> **)iterator->next();
+               detArray->removeAll();
+       }
+       
+       // deleting the iterator
+       delete iterator;
+
+       // removing all messages in message log for every chare
+       forAllCharesDo(garbageCollectMlogForChare, NULL);
+}
+
 /**
        method that informs an array elements home processor of its current location
        It is a converse method to bypass the charm++ message logging framework
 */
 
-void informLocationHome(CkGroupID locMgrID,CkArrayIndex idx,int homePE,int currentPE){
+void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
        double _startTime = CmiWallTimer();
        CurrentLocationMsg msg;
        msg.mgrID = locMgrID;
@@ -3424,19 +3327,42 @@ void _recvGlobalStepHandler(LBStepMsg *msg){
  * @brief Function to wrap up performance information.
  */
 void _messageLoggingExit(){
-/*     if(CkMyPe() == 0){
-               if(countBuffered != 0){
-                       printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
-               }
+       
+       // printing the signature for causal message logging
+       if(CkMyPe() == 0)
+               printf("[%d] _causalMessageLoggingExit \n",CmiMyPe());
+
+       //TML: printing some statistics for group approach
+#if COLLECT_STATS_TEAM
+       printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
+       printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
+#endif
 
-//             printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
+#if COLLECT_STATS_MSGS
+#if COLLECT_STATS_MSGS_TOTAL
+       printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
+       printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
+#else
+       printf("[%d] TARGETS: ",CmiMyPe());
+       for(int i=0; i<CmiNumPes(); i++){
+#if COLLECT_STATS_MSG_COUNT
+               printf("%d ",numMsgsTarget[i]);
+#else
+               printf("%d ",sizeMsgsTarget[i]);
+#endif
        }
-       printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
-       printf("[%d] _messageLoggingExit \n",CmiMyPe());
+       printf("\n");
+#endif
+#endif
 
-       //TML: printing some statistics for group approach
-       //if(teamSize > 1)
-               CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
+#if COLLECT_STATS_DETS
+       printf("\n");
+       printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
+       printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
+#if COLLECT_STATS_DETS_DUP
+       printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
+#endif
+#endif
 
 }
 
@@ -3474,7 +3400,7 @@ void* CkObjID::getObject(){
        
                                        if(aid.ckLocalBranch() == NULL){ return NULL;}
        
-                                       CProxyElement_ArrayBase aProxy(aid,data.array.idx);
+                                       CProxyElement_ArrayBase aProxy(aid,data.array.idx.asChild());
        
                                        return aProxy.ckLocal();
                                }
@@ -3498,7 +3424,7 @@ int CkObjID::guessPE(){
                                        if(aid.ckLocalBranch() == NULL){
                                                return -1;
                                        }
-                                       return aid.ckLocalBranch()->lastKnown(data.array.idx);
+                                       return aid.ckLocalBranch()->lastKnown(data.array.idx.asChild());
                                }
                        default:
                                CkAssert(0);
@@ -3522,7 +3448,7 @@ char *CkObjID::toString(char *buf) const {
                        break;
                case TypeArray:
                        {
-                               const CkArrayIndex &idx = data.array.idx;
+                               const CkArrayIndexMax &idx = data.array.idx.asChild();
                                const int *indexData = idx.data();
                                sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
                                break;
@@ -3548,14 +3474,14 @@ void CkObjID::updatePosition(int PE){
                                                char str[100];
                                                CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
 //                                             CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
-                                               CkLocRec *rec = mgr->elementNrec(data.array.idx);
+                                               CkLocRec *rec = mgr->elementNrec(data.array.idx.asChild());
                                                if(rec != NULL){
                                                        if(rec->type() == CkLocRec::local){
                                                                CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
                                                                return;
                                                        }
                                                }
-                                               mgr->inform(data.array.idx,PE);
+                                               mgr->inform(data.array.idx.asChild(),PE);
                                        }       
                                }
 
@@ -3578,7 +3504,6 @@ void CkObjID::updatePosition(int PE){
 void MlogEntry::pup(PUP::er &p){
        p | destPE;
        p | _infoIdx;
-       p | unackedLocal;
        int size;
        if(!p.isUnpacking()){
 /*             CkAssert(env);
@@ -3602,10 +3527,6 @@ void MlogEntry::pup(PUP::er &p){
        }
        if(size > 0){
                p((char *)env,size);
-       
-               if(p.isUnpacking()){
-                       env->localMlogEntry = NULL;
-               }
        }
 };
 
@@ -3631,8 +3552,10 @@ MCount ChareMlogData::nextSN(const CkObjID &recver){
 /*     MCount SN = snTable.get(recver);
        snTable.put(recver) = SN+1;
        return SN+1;*/
+       DEBUG_MEM(CmiMemoryCheck());
        double _startTime = CmiWallTimer();
        MCount *SN = snTable.getPointer(recver);
+       DEBUG_MEM(CmiMemoryCheck());
        if(SN==NULL){
                snTable.put(recver) = 1;
                return 1;
@@ -3642,8 +3565,10 @@ MCount ChareMlogData::nextSN(const CkObjID &recver){
        }
 //     traceUserBracketEvent(34,_startTime,CkWallTimer());
 };
-
-
+/**
+ * @brief Gets a new ticket for a particular object.
+ */
 MCount ChareMlogData::newTN(){
        MCount TN;
        if(currentHoles > 0){
@@ -3656,10 +3581,23 @@ MCount ChareMlogData::newTN(){
                }
        }else{
                TN = ++tCount;
-       }       
+       }
        return TN;
 };
 
+/**
+ * @brief Get the ticket associated with a combination of sender and SN, if any.
+ */
+inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
+       Ticket ticket;
+
+       SNToTicket *ticketRow = ticketTable.get(sender);
+       if(ticketRow != NULL){
+               return ticketRow->get(SN);
+       }
+       return ticket;
+}
+
 /**
  * Inserts a ticket in the ticketTable if it is not already there.
  */
@@ -3684,10 +3622,10 @@ inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
 /**
  * Generates the next ticket for a request.
  */
-inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
+inline Ticket ChareMlogData::next_ticket(CkObjID &sender, MCount SN){
        DEBUG(char senderName[100];)
        DEBUG(char recverName[100];)
-       double _startTime =CmiWallTimer();
+       double _startTime = CmiWallTimer();
        Ticket ticket;
 
        // if a ticket is requested during restart, 0 is returned to make the requester to ask for it later.
@@ -3695,17 +3633,7 @@ inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
                ticket.TN = 0;
                return ticket;
        }
-/*     SNToTicket &ticketRow = ticketTable.put(sender);
-       Ticket earlierTicket = ticketRow.get(SN);
-       if(earlierTicket.TN == 0){
-               //This SN has not been ever alloted a ticket
-               ticket.TN = newTN();
-               ticketRow.put(SN)=ticket;
-       }else{
-               ticket.TN = earlierTicket.TN;
-       }*/
        
-
        SNToTicket *ticketRow = ticketTable.get(sender);
        if(ticketRow != NULL){
                Ticket earlierTicket = ticketRow->get(SN);
@@ -3728,12 +3656,8 @@ inline Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
                ticketTable.put(sender) = newRow;
                DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
        }
-/*TODO: check if the message for this SN has already been received
-       in the table of received SNs 
-       If it was received before the last checkpoint mark it as old
-       other wise received
-       */
        ticket.state = NEW_TICKET;
+
 //     traceUserBracketEvent(34,_startTime,CkWallTimer());
        return ticket;  
 };
@@ -3760,15 +3684,6 @@ double totalSearchRestoredCount=0;
 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
        double start= CkWallTimer();
        MCount TN=0;    
-       if(mapTable.numObjects() > 0){
-               RestoredLocalMap *map = mapTable.get(sender);
-               if(map){
-                       int index = SN - map->minSN;
-                       if(index < map->count){
-                               TN = map->TNArray[index];
-                       }
-               }
-       }
        
        DEBUG(char senderName[100]);
        DEBUG(char recverName[100]);
@@ -3779,52 +3694,6 @@ MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCoun
        return TN;
 }
 
-void ChareMlogData::addToRestoredLocalQ(LocalMessageLog *logEntry){
-       restoredLocalMsgLog.push_back(*logEntry);
-}
-
-void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData){
-       mlogData->sortRestoredLocalMsgLog();
-}
-
-void ChareMlogData::sortRestoredLocalMsgLog(){
-       //sort it ->its bloddy bubble sort
-       
-       for(int i=0;i<restoredLocalMsgLog.size();i++){
-               LocalMessageLog &logEntry = restoredLocalMsgLog[i];
-               RestoredLocalMap *map = mapTable.get(logEntry.sender);
-               if(map == NULL){
-                       map = new RestoredLocalMap;
-                       mapTable.put(logEntry.sender)=map;
-               }
-               map->count++;
-               if(map->minSN == 0){
-                       map->minSN = logEntry.SN;
-               }else{
-                       if(logEntry.SN < map->minSN){
-                               map->minSN = logEntry.SN;
-                       }
-               }
-               if(logEntry.SN > map->maxSN){
-                       map->maxSN = logEntry.SN;
-               }
-
-       }
-       for(int i=0;i< restoredLocalMsgLog.size();i++){
-               LocalMessageLog &logEntry = restoredLocalMsgLog[i];
-               RestoredLocalMap *map = mapTable.get(logEntry.sender);
-               CkAssert(map != NULL);
-               if(map->TNArray == NULL){
-                       map->TNArray = new MCount[map->maxSN-map->minSN+1];                     
-                       CkAssert(map->count == map->maxSN-map->minSN+1);
-                       map->count = 0;
-               }
-               map->TNArray[map->count] = logEntry.TN;
-               map->count++;
-       }
-       restoredLocalMsgLog.free();
-}
-
 /**
  * Pup method for the metadata.
  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
@@ -3882,7 +3751,6 @@ void ChareMlogData::pup(PUP::er &p){
                }
        }
        
-       
        p | currentHoles;
        p | numberHoles;
        if(p.isUnpacking()){
@@ -3898,31 +3766,6 @@ void ChareMlogData::pup(PUP::er &p){
        
        snTable.pup(p);
 
-       // pupping only the unacked local messages in the message log
-       int length = 0;
-       MlogEntry *entry;
-       if(!p.isUnpacking()){
-               for(int i=0; i<mlog.length(); i++){
-                       entry = mlog[i];
-                       if(entry->unackedLocal)
-                               length++;
-               }
-       }
-       p | length;
-       if(p.isUnpacking()){
-               for(int i=0; i<length; i++){
-                       entry = new MlogEntry();
-                       mlog.enq(entry);
-                       entry->pup(p);
-               }
-       }else{
-               for(int i=0; i<mlog.length(); i++){
-                       entry = mlog[i];
-                       if(entry->unackedLocal){
-                               entry->pup(p);
-                       }
-               }
-       }
 
 /*     int length;
        if(!p.isUnpacking()){           
@@ -3942,12 +3785,11 @@ void ChareMlogData::pup(PUP::er &p){
                entry->pup(p);
        }*/
        
-       p | restoredLocalMsgLog;
        p | resendReplyRecvd;
        p | restartFlag;
 
        // pup the mapTable
-       int tableSize;
+/*     int tableSize;
        if(!p.isUnpacking()){
                tableSize = mapTable.numObjects();
        }
@@ -3970,7 +3812,7 @@ void ChareMlogData::pup(PUP::er &p){
                        map->pup(p);
                        mapTable.put(objID) = map;
                }
-       }
+       }*/
 
        //pup the ticketTable
        {
@@ -4020,13 +3862,11 @@ void ChareMlogData::pup(PUP::er &p){
 
 /**
  * Getting the pe number of the current processor's buddy.
+ * In the team-based approach each processor might checkpoint in the next team, but currently
+ * teams are only meant to reduce memory overhead.
  */
 int getCheckPointPE(){
-       //TML: assigning a team-based buddy
-       if(teamSize != 1){
-               return (CmiMyPe() + teamSize) % CmiNumPes();
-       }
-       return (CmiNumPes() -1 - CmiMyPe());
+       return (CmiMyPe() + 1) % CmiNumPes();
 }
 
 //assume it is a packed envelope
@@ -4036,4 +3876,9 @@ envelope *copyEnvelope(envelope *env){
        return newEnv;
 }
 
+/* Checks if two determinants are the same */
+inline int isSameDet(Determinant *first, Determinant *second){
+       return first->sender == second->sender && first->receiver == second->receiver && first->SN == second->SN && first->TN == second->TN;
+}
+
 #endif
index 4c44bbe6cccb4a6a9ba3a31065fecf8d804d07aa..542bb799e9c71a54a5f45e1d6641f5625b4178be 100644 (file)
@@ -24,8 +24,57 @@ CpvExtern(Chare *,_currentObj);
 //array on which we print the formatted string representing an object id
 extern char objString[100];
 
+// defines the initial size of _bufferedDets
+#define INITIAL_BUFFERED_DETERMINANTS 1024
+
+// constant to define the type of checkpoint used (synchronized or not)
+#define SYNCHRONIZED_CHECKPOINT 1
+
 /**
- * @brief
+ * @brief Struct to store the determinant of a particular message.
+ * The determinant remembers all the necessary information for a 
+ * message to be replayed in the same order as in the execution prior
+ * the failure.
+ */
+typedef struct {
+       // sender ID
+       CkObjID sender;
+       // receiver ID
+       CkObjID receiver;
+       // SSN: sender sequence number
+       MCount SN;
+       // TN: ticket number (RSN: receiver sequence number)
+       MCount TN;
+} Determinant;
+
+/**
+ * @brief Typedef for the hashtable type that maps object IDs to determinants.
+ */
+typedef CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> CkDeterminantHashtableT;
+
+/**
+ * @brief Struct for the header of the removeDeterminants handler
+ */
+typedef struct {
+       char header[CmiMsgHeaderSizeBytes];
+       int phase;
+       int index;
+} RemoveDeterminantsHeader;
+
+
+/**
+ * @brief Struct for the header of the storeDeterminants handler
+ */
+typedef struct {
+       char header[CmiMsgHeaderSizeBytes];
+       int number;
+       int index;
+       int phase;
+       int PE;
+} StoreDeterminantsHeader;
+
+/**
+ * @brief Structure for a ticket assigned to a particular message.
  */
 class Ticket {
 public:
@@ -43,26 +92,6 @@ public:
 PUPbytes(Ticket)
 class MlogEntry;
 
-/**
- * Log entry for local messages, can also be sent as a message.
- * A message is local if:
- * 1) It is sent between objects in the same processor.
- * 2) It is sent between objects residing in processors of the same group
- * (whenever group-based message logging is used).
- */
-typedef struct{
-       char header[CmiMsgHeaderSizeBytes];
-       CkObjID sender;
-       CkObjID recver;
-       MCount SN;
-       MCount TN;
-       MlogEntry *entry;
-       int senderPE;
-       int recverPE;
-} LocalMessageLog;
-PUPbytes(LocalMessageLog)
-
-class MlogEntry;
 class RestoredLocalMap;
 
 #define INITSIZE_SNTOTICKET 100
@@ -168,14 +197,14 @@ public:
        MCount *ticketHoles;
        int numberHoles;
        int currentHoles;
-       CkVec<LocalMessageLog> restoredLocalMsgLog;
-       int maxRestoredLocalTN;
-       int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
-       int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
-    int teamRecoveryFlag; // 0 -> normal state .. 1 -> recovery of a team member       
-       CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable;
+       // variable that keeps a count of the processors that have replied to a requests to resend messages. 
+       int resendReplyRecvd;
+       // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time 
+       int restartFlag;
+       // 0 -> normal state .. 1 -> recovery of a team member 
+    int teamRecoveryFlag;      
        //TML: teamTable, stores the SN to TN mapping for messages intra team
-       CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> teamTable;
+       CkHashtableT<CkHashtableAdaptorT<CkObjID>, SNToTicket *> teamTable;
 
        int toResumeOrNot;
        int resumeCount;
@@ -206,42 +235,43 @@ public:
                teamRecoveryFlag=0;
                receivedTNs = NULL;
                resendReplyRecvd=0;
-               maxRestoredLocalTN=0;
                toResumeOrNot=0;
                resumeCount=0;
        };
        inline MCount nextSN(const CkObjID &recver);
        inline Ticket next_ticket(CkObjID &sender,MCount SN);
        inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
+       inline Ticket getTicket(CkObjID &sender, MCount SN);
        void addLogEntry(MlogEntry *entry);
        virtual void pup(PUP::er &p);
        CkQ<MlogEntry *> *getMlog(){ return &mlog;};
        MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
-       void addToRestoredLocalQ(LocalMessageLog *logEntry);
-       void sortRestoredLocalMsgLog();
 };
 
 /**
- * @brief Entry in a message log
+ * @brief Entry in a message log. It also includes the index of the buffered
+ * determinants array and the number of appended determinants.
+ * @note: this message appended numBufDets counting downwards from indexBufDets.
+ * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
+ * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
  */
 class MlogEntry{
 public:
        envelope *env;
        int destPE;
        int _infoIdx;
-       char unackedLocal;
+       int indexBufDets;
+       int numBufDets;
        
        MlogEntry(envelope *_env,int _destPE,int __infoIdx){
                env = _env;
                destPE = _destPE;
                _infoIdx = __infoIdx;
-               unackedLocal = 0;
        }
        MlogEntry(){
                env = 0;
                destPE = -1;
                _infoIdx = 0;
-               unackedLocal = 0;
        }
        ~MlogEntry(){
                if(env){
@@ -256,7 +286,7 @@ public:
  */
 class LocationID{
 public:
-       CkArrayIndex idx;
+       CkArrayIndexMax idx;
        CkGroupID gid;
 };
 
@@ -315,12 +345,6 @@ typedef struct{
        int recverPE;
 } TicketReply;
 
-
-CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog); // used on buddy to store local message logs
-
-CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
-extern int _maxBufferedMessages; //Number of local message logs  to be buffered
-
 CpvExtern(char**,_bufferedTicketRequests);
 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
 
@@ -333,11 +357,6 @@ typedef struct {
 
 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
 
-typedef struct{
-       char header[CmiMsgHeaderSizeBytes];
-       MlogEntry *entry;               
-} LocalMessageLogAck;
-
 typedef struct{
        char header[CmiMsgHeaderSizeBytes];
        int PE;
@@ -357,6 +376,7 @@ typedef struct{
        MCount tProcessed;
 } TProcessedLog;
 
+
 /**
  * Struct to request a particular action during restart.
  */
@@ -394,17 +414,23 @@ typedef struct {
        int numTNs;
 } ReceivedTNData;
 
+// Structure to forward determinants in parallel restart
+typedef struct {
+       char header[CmiMsgHeaderSizeBytes];
+       CkObjID recver;
+       int numDets;
+} ReceivedDetData;
+
 typedef struct{
        int PE;
        int numberObjects;
        TProcessedLog *listObjects;
-       MCount *maxTickets;
        CkVec<MCount> *ticketVecs;
 } ResendData;
 
 typedef struct {
        CkGroupID gID;
-       CkArrayIndex idx;
+       CkArrayIndexMax idx;
        int fromPE,toPE;
        char ackFrom,ackTo;
 } MigrationRecord;
@@ -445,7 +471,7 @@ typedef struct {
 typedef struct {
        char header[CmiMsgHeaderSizeBytes];
        CkGroupID mgrID;
-       CkArrayIndex idx;
+       CkArrayIndexMax idx;
        int locationPE;
        int fromPE;
 } CurrentLocationMsg;
@@ -468,7 +494,7 @@ typedef struct {
        int count;// if just count
        /**if object **/
        CkGroupID mgrID;
-       CkArrayIndex idx;
+       CkArrayIndexMax idx;
        int locationPE;
 } DummyMigrationMsg;
 
@@ -481,41 +507,23 @@ typedef void (*MlogFn)(void *,ChareMlogData *);
 void _messageLoggingInit();
 
 //Methods for sending ticket requests
-void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
-void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
-void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
-void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
-void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
-void ticketLogLocalMessage(MlogEntry *entry);
-void sendLocalMessageCopy(MlogEntry *entry);
-void sendBufferedLocalMessageCopy();
-void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
-void sendBufferedTicketRequests(int destPE);
-void checkBufferedTicketRequests(void *_destPE,double curWallTime);
-
-
-
-
-//handler idxs
-extern int _ticketRequestHandlerIdx;
-extern int _ticketHandlerIdx;
-extern int _localMessageCopyHandlerIdx;
-extern int _localMessageAckHandlerIdx;
-extern int _bufferedLocalMessageCopyHandlerIdx;
-extern int _bufferedLocalMessageAckHandlerIdx;
-extern int _bufferedTicketRequestHandlerIdx;
-extern int _bufferedTicketHandlerIdx;
+void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
+void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
+void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
+void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
+void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
+void sendLocalMsg(MlogEntry *entry);
 
 //handler functions
 void _ticketRequestHandler(TicketRequest *);
 void _ticketHandler(TicketReply *);
-void _localMessageCopyHandler(LocalMessageLog *);
-void _localMessageAckHandler(LocalMessageLogAck *);
 void _pingHandler(CkPingMsg *msg);
 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
+void _storeDeterminantsHandler(char *buffer);
+void _removeDeterminantsHandler(char *buffer);
 
 
 //methods for sending messages
@@ -542,6 +550,7 @@ void _checkpointRequestHandler(CheckpointRequest *request);
 void _storeCheckpointHandler(char *msg);
 void _checkpointAckHandler(CheckPointAck *ackMsg);
 void _removeProcessedLogHandler(char *requestMsg);
+void garbageCollectMlog();
 
 //handler idxs for checkpoint
 extern int _checkpointRequestHandlerIdx;
@@ -556,10 +565,10 @@ extern int _removeProcessedLogHandlerIdx;
 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
 void CkMlogRestartDouble(void *,double);
 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
+void processReceivedDet(Chare *obj,int vecsize, Determinant *listDets);
 void initializeRestart(void *data,ChareMlogData *mlogData);
 void distributeRestartedObjects();
-void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
-void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndex &idx,int locationPE);
+void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
 
 //TML: function for locally calling the restart
 void CkMlogRestartLocal();
@@ -570,6 +579,7 @@ void _recvCheckpointHandler(char *_restartData);
 void _resendMessagesHandler(char *msg);
 void _resendReplyHandler(char *msg);
 void _receivedTNDataHandler(ReceivedTNData *msg);
+void _receivedDetDataHandler(ReceivedDetData *msg);
 void _distributedLocationHandler(char *receivedMsg);
 void _updateHomeRequestHandler(RestartRequest *updateRequest);
 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
@@ -582,13 +592,13 @@ void _restartHandler(RestartRequest *restartMsg);
 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
 void _recvRestartCheckpointHandler(char *_restartData);
 
-
 //handler idxs for restart
 extern int _getCheckpointHandlerIdx;
 extern int _recvCheckpointHandlerIdx;
 extern int _resendMessagesHandlerIdx;
 extern int _resendReplyHandlerIdx;
 extern int _receivedTNDataHandlerIdx;
+extern int _receivedDetDataHandlerIdx;
 extern int _distributedLocationHandlerIdx;
 extern int _updateHomeRequestHandlerIdx;
 extern int _updateHomeAckHandlerIdx;
@@ -596,7 +606,6 @@ extern int _verifyAckRequestHandlerIdx;
 extern int _verifyAckHandlerIdx;
 extern int _dummyMigrationHandlerIdx;
 
-
 /// Load Balancing
 
 //methods for load balancing
@@ -614,7 +623,6 @@ void _recvGlobalStepHandler(LBStepMsg *msg);
 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
 
-
 //globals used for loadBalancing
 extern int onGoingLoadBalancing;
 extern void *centralLb;
@@ -632,6 +640,7 @@ extern CkVec<MigrationRecord> migratedNoticeList;
 extern CkVec<RetainedMigratedObject *> retainedObjectList;
 
 int getCheckPointPE();
+inline int isSameDet(Determinant *first, Determinant *second);
 void forAllCharesDo(MlogFn fnPointer,void *data);
 envelope *copyEnvelope(envelope *env);
 extern void _initDone(void);
@@ -640,7 +649,7 @@ extern void _initDone(void);
 extern void _resetNodeBocInitVec(void);
 
 //methods for updating location
-void informLocationHome(CkGroupID mgrID,CkArrayIndex idx,int homePE,int currentPE);
+void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
 
 //handlers for updating locations
 void _receiveLocationHandler(CurrentLocationMsg *data);
@@ -650,8 +659,5 @@ extern int _receiveLocationHandlerIdx;
 
 
 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
-inline void processRemoteMlogMessages(){
-       CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
-}
 
 #endif
index f026c0f52cdd158dd1ff7d59af989b64e9dbf46f..82872b3d38e2bba1b35f83f137dfe5ee49d9f4ed 100644 (file)
@@ -2024,7 +2024,8 @@ void CkLocMgr::informHome(const CkArrayIndex &idx,int nowOnPe)
        if (home!=CkMyPe() && home!=nowOnPe) {
                //Let this element's home Pe know it lives here now
                DEBC((AA"  Telling %s's home %d that it lives on %d.\n"AB,idx2str(idx),home,nowOnPe));
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+//#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if defined(_FAULT_MLOG_)
         informLocationHome(thisgroup,idx,home,CkMyPe());
 #else
                thisProxy[home].updateLocation(idx,nowOnPe);
@@ -2193,7 +2194,8 @@ int CkLocMgr::deliver(CkMessage *m,CkDeliver_t type,int opts) {
        }else{
                DEBS((AA"deliver %s rec is null\n"AB,idx2str(idx)));
        }
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
+//#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
+#if !defined(_FAULT_MLOG_)
 #if CMK_LBDB_ON
        if (type==CkDeliver_queue) {
                if (!(opts & CK_MSG_LB_NOTRACE) && the_lbdb->CollectingCommStats()) {
@@ -2612,7 +2614,8 @@ void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
 
        DEBM((AA"Migrated index size %s to %d \n"AB,idx2str(idx),toPe));        
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+//#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if defined(_FAULT_MLOG_)
     sendMlogLocation(toPe,UsrToEnv(msg));
 #else
        //Send off message and delete old copy
@@ -2626,7 +2629,8 @@ void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
        duringMigration=CmiFalse;
        //The element now lives on another processor-- tell ourselves and its home
        inform(idx,toPe);
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
+//#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
+#if !defined(_FAULT_MLOG_)    
        informHome(idx,toPe);
 #endif
        CK_MAGICNUMBER_CHECK
@@ -2653,7 +2657,8 @@ void CkLocMgr::immigrate(CkArrayElementMigrateMessage *msg)
        }
 
        //Create a record for this element
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
+//#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))    
+#if !defined(_FAULT_MLOG_)     
        CkLocRec_local *rec=createLocal(idx,CmiTrue,msg->ignoreArrival,CmiFalse /* home told on departure */ );
 #else
     CkLocRec_local *rec=createLocal(idx,CmiTrue,CmiTrue,CmiFalse /* home told on departure */ );
index 83c1accee3de1d1db6083722e7ebac70dcf547e7..004c1df1d45ab7c804c028b7bca6faa041e96b4d 100644 (file)
@@ -192,7 +192,7 @@ CkReductionMgr::CkReductionMgr()//Constructor
   gcount=lcount=0;
   nContrib=nRemote=0;
   maxStartRequest=0;
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
     if(CkMyPe() != 0){
         perProcessorCounts = NULL;
     }else{
@@ -395,7 +395,7 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
   m->sourceFlag=-1;//A single contribution
   m->gcount=0;
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
     if(lcount == 0){
         m->sourceProcessorCount = 1;
     }else{
@@ -411,7 +411,7 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 #endif
 }
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
     CmiAssert(CmiMyPe() == 0);
     DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
@@ -458,7 +458,7 @@ void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
  }
  DEBR((AA" Group ReductionStarting called for redNo %d\n"AB,m->num));
  int srcPE = (UsrToEnv(m))->getSrcPe();
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) 
+#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
   if (isPresent(m->num) && !inProgress)
   {
     DEBR((AA"Starting reduction #%d at parent's request\n"AB,m->num));
@@ -551,8 +551,8 @@ void CkReductionMgr::startReduction(int number,int srcPE)
   }
 
   if(disableNotifyChildrenStart) return;
-  
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_
   if(CmiMyPe() == 0 && redNo == 0){
             for(int j=0;j<CkNumPes();j++){
                 if(j != CkMyPe() && j != srcPE){
@@ -672,7 +672,7 @@ void CkReductionMgr::finishReduction(void)
        return;
   }
   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
+#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
 
   if (nContrib<(lcount+adj(redNo).lcount)){
          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
@@ -709,7 +709,7 @@ void CkReductionMgr::finishReduction(void)
   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
   CkReductionMsg *result=reduceMessages();
   result->redNo=redNo;
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
+#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
 
 #if GROUP_LEVEL_REDUCTION
   if (hasParent())
@@ -780,7 +780,7 @@ void CkReductionMgr::finishReduction(void)
   redNo++;
   //Shift the count adjustment vector down one slot (to match new redNo)
   int i;
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_)) && !GROUP_LEVEL_REDUCTION
+#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_) && !GROUP_LEVEL_REDUCTION
     /* nodegroup reduction will adjust adjVec in endArrayReduction on PE 0 */
   if(CkMyPe()!=0)
 #endif
@@ -815,7 +815,7 @@ void CkReductionMgr::finishReduction(void)
   }
 #endif
 
-#if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))   
+#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
   if(maxStartRequest >= redNo){
          startReduction(redNo,CkMyPe());
          finishReduction();
@@ -1008,7 +1008,7 @@ void CkReductionMgr::pup(PUP::er &p)
   // we can not pup because inserting array elems will add the counters again
 //  p|lcount;
 //  p|gcount;
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
 //  p|lcount;
 //  //  p|gcount;
 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
index 1282b7c4e42ea1943324d153796cdd5814e98dcf..0e18601a58fefdf3a3b845c4ffe8d46be2e8aac9 100644 (file)
@@ -32,6 +32,7 @@ The calls needed to use the reduction manager are:
 
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
 #define MAX_INT 5000000
+#define _MLOG_REDUCE_P2P_ 0
 #endif
 
 //This message is sent between group objects on a single PE
@@ -268,7 +269,7 @@ private:
                >0 indicates this is a reduced contribution.
        */
        int nSources(void) {return sourceFlag<0?-sourceFlag:sourceFlag;}
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
     int sourceProcessorCount;
     int fromPE;
 #endif
@@ -621,7 +622,7 @@ protected:
 
 //Checkpointing utilities
 public:
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
     int *perProcessorCounts;
     int processorCount;
     int totalCount;
index d43652330f539a04cd66a7c55876158231769358..e3cb95449255d1de795c0bd24c4bd875cf450c37 100644 (file)
@@ -184,6 +184,7 @@ public:
     CkObjID recver;
     MCount SN;
     MCount TN;
+       int incarnation;
     MlogEntry *localMlogEntry;
     bool freeMsg;
 #endif
@@ -263,6 +264,7 @@ private:
       env->recver.type = TypeInvalid;
       env->SN = 0;
       env->TN = 0;
+         env->incarnation = -1;
       env->localMlogEntry = NULL;
 #endif
 
index d68368e213d63050106afee9aacb8d15214cb2c5..f73f8b98df12c9b46c8a45fb172046013447ea7d 100644 (file)
@@ -192,7 +192,8 @@ static char* _restartDir;
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
 int teamSize=1;
 int chkptPeriod=1000;
-bool parallelRestart=false;
+bool fastRecovery = false;
+int parallelRecovery = 1;
 extern int BUFFER_TIME; //time spent waiting for buffered messages
 #endif
 
@@ -293,9 +294,12 @@ static inline void _parseCommandLineOpts(char **argv)
     if(!CmiGetArgIntDesc(argv,"+chkptPeriod",&chkptPeriod,"Set the checkpoint period for the message logging fault tolerance algorithm in seconds")){
         chkptPeriod = 100;
     }
-    if(CmiGetArgFlagDesc(argv,"+Parallelrestart", "Parallel Restart with message logging protocol")){
-        parallelRestart = true;
+       if(CmiGetArgIntDesc(argv,"+fastRecovery", &parallelRecovery, "Parallel recovery with message logging protocol")){
+        fastRecovery = true;
     }
+#endif
+
+#if defined(_FAULT_MLOG_)
     if(!CmiGetArgIntDesc(argv,"+mlog_local_buffer",&_maxBufferedMessages,"# of local messages buffered in the message logging protoocl")){
         _maxBufferedMessages = 2;
     }
@@ -305,7 +309,6 @@ static inline void _parseCommandLineOpts(char **argv)
     if(!CmiGetArgIntDesc(argv,"+mlog_buffer_time",&BUFFER_TIME,"# Time spent waiting for messages to be buffered in the message logging protoocl")){
         BUFFER_TIME = 2;
     }
-
 #endif 
        /* Anytime migration flag */
        _isAnytimeMigration = true;
index 79e68a202534060038feed88475ac55b7e0b2071..1f96f1936e638fdb145fb123b9e4fa5635334275 100644 (file)
@@ -215,7 +215,7 @@ CKHEADERS=ck.h ckstream.h envelope.h init.h qd.h charm.h charm++.h \
          ckarrayreductionmgr.h cksection.h \
          ckarrayindex.h ckarray.h cklocation.h ckreduction.h \
          ckcheckpoint.h ckmemcheckpoint.h ckevacuation.h\
-          ckmessagelogging.h ckobjid.h\
+          ckmessagelogging.h ckcausalmlog.h ckobjid.h\
          ckobjQ.h readonly.h charisma.h \
           comlib.h ComlibArrayListener.h ComlibStrategy.h \
          ComlibLearner.h $(UTILHEADERS) \
@@ -706,7 +706,7 @@ LIBCK_CORE=trace-common.o tracec.o tracef.o init.o register.o qd.o ck.o main.o \
           msgalloc.o ckfutures.o ckIgetControl.o debug-message.o debug-charm.o ckcallback.o \
           cklocation.o ckarray.o ckreduction.o ckarrayreductionmgr.o \
            tempo.o waitqd.o LBDatabase.o lbdb.o lbdbf.o charisma.o ckobjQ.o  \
-          LBAgent.o LBProfit.o ckcheckpoint.o ckmemcheckpoint.o ckevacuation.o ckmessagelogging.o\
+          LBAgent.o LBProfit.o ckcheckpoint.o ckmemcheckpoint.o ckevacuation.o ckmessagelogging.o ckcausalmlog.o\
            LBDBManager.o LBComm.o LBObj.o LBMachineUtil.o CentralPredictor.o \
           BaseLB.o CentralLB.o HybridBaseLB.o NborBaseLB.o \
            ckgraph.o LButil.o Refiner.o RefinerApprox.o  \
index 61525ea99509d035f9d5cd5bd47a1e355da981b4..7630047f612f0d8592b951a0b838aeacc66f317d 100644 (file)
@@ -15,6 +15,11 @@ bgtest:
                (cd $$d && $(MAKE) bgtest OPTS='$(OPTS)' || exit 1) || exit 1; \
        done
 
+fttest:
+       for d in charm++ ampi; do \
+               (cd $$d && $(MAKE) fttest OPTS='$(OPTS)' || exit 1) || exit 1; \
+       done
+
 test-converse:
        cd converse; $(MAKE) test OPTS='$(OPTS)'