Adding support for collectives and parallel recovery.
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_TEAM 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x)  //x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_RECOVERY(x) //x
42
43 extern const char *idx2str(const CkArrayIndex &ind);
44 extern const char *idx2str(const ArrayElement *el);
45
46 void getGlobalStep(CkGroupID gID);
47
48 bool fault_aware(CkObjID &recver);
49 void sendCheckpointData(int mode);
50 void createObjIDList(void *data,ChareMlogData *mlogData);
51 inline bool isLocal(int destPE);
52 inline bool isTeamLocal(int destPE);
53 void printLog(TProcessedLog *log);
54
55 int _restartFlag=0;
56 int _numRestartResponses=0;
57
58 //TODO: remove for perf runs
59 int countHashRefs=0; //count the number of gets
60 int countHashCollisions=0;
61
62 char *checkpointDirectory=".";
63 int unAckedCheckpoint=0;
64
65 int countLocal=0,countBuffered=0;
66 int countPiggy=0;
67 int countClearBufferedLocalCalls=0;
68
69 int countUpdateHomeAcks=0;
70
71 extern int teamSize;
72 extern int chkptPeriod;
73 extern bool fastRecovery;
74 extern int parallelRecovery;
75
76 char *killFile;
77 char *faultFile;
78 int killFlag=0;
79 int faultFlag=0;
80 int restartingMlogFlag=0;
81 void readKillFile();
82 double killTime=0.0;
83 double faultMean;
84 int checkpointCount=0;
85
86 CpvDeclare(Chare *,_currentObj);
87 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
88 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
89 CpvDeclare(Queue, _outOfOrderMessageQueue);
90 CpvDeclare(Queue, _delayedRemoteMessageQueue);
91 CpvDeclare(char **,_bufferedTicketRequests);
92 CpvDeclare(int *,_numBufferedTicketRequests);
93
94 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
95 /** @note All the determinants generated by a PE are stored in variable _localDets.
96  * As soon as a message is sent, then all the determinants are appended to the message,
97  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
98  * to delete the determinants. In the meantime the same determinants may be appended
99  * to other messages and more determinants can be added to _localDets.
100  * A simple solution to this problem was to have a primitive array and keep adding determinants
101  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
102  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
103  * determinant and a number of how many valid determinants there are behind it. We do not remove
104  * determinants until a checkpoint is made, since these determinants may have to be added to
105  * messages in case of a recovery.
106  */
107 // temporal storage for the outgoing determinants
108 CpvDeclare(char *, _localDets);
109 // number of buffered determinants
110 int _numBufferedDets;
111 // current index of first determinant in _localDets
112 int _indexBufferedDets;
113 // current phase of determinants
114 int _phaseBufferedDets;
115
116 // stores the determinants from other nodes, to send them in case of a crash
117 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
118 // max number of buffered determinants
119 int _maxBufferedDets;
120
121 // stores the incarnation number from every other processor
122 CpvDeclare(char *, _incarnation);
123
124 // storage for remove determinants header
125 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
126 // storage for store determinants header
127 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
128 // storage for the sizes in vector-send to store determinants
129 CpvDeclare(int *, _storeDetsSizes);
130 // storage for the pointers in vector-send to store determinants
131 CpvDeclare(char **, _storeDetsPtrs);
132
133 /***** *****/
134
135 /***** VARIABLES FOR PARALLEL RECOVERY *****/
136 CpvDeclare(int, _numEmigrantRecObjs);
137 CpvDeclare(int, _numImmigrantRecObjs);
138 CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
139 /***** *****/
140
141 #if COLLECT_STATS_MSGS
142 int *numMsgsTarget;
143 int *sizeMsgsTarget;
144 int totalMsgsTarget;
145 float totalMsgsSize;
146 #endif
147 #if COLLECT_STATS_DETS
148 int numPiggyDets;
149 int numDets;
150 int numDupDets;
151 #endif
152 #if COLLECT_STATS_MEMORY
153 int msgLogSize;
154 int bufferedDetsSize;
155 int storedDetsSize;
156 #endif
157 //TML: variables for measuring savings with teams in message logging
158 #if COLLECT_STATS_TEAM
159 float MLOGFT_totalLogSize = 0.0;
160 float MLOGFT_totalMessages = 0.0;
161 #endif
162
163 static double adjustChkptPeriod=0.0; //in ms
164 static double nextCheckpointTime=0.0;//in seconds
165 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
166
167 int _pingHandlerIdx;
168
169 char objString[100];
170 int _checkpointRequestHandlerIdx;
171 int _storeCheckpointHandlerIdx;
172 int _checkpointAckHandlerIdx;
173 int _getCheckpointHandlerIdx;
174 int _recvCheckpointHandlerIdx;
175 int _removeProcessedLogHandlerIdx;
176
177 int _verifyAckRequestHandlerIdx;
178 int _verifyAckHandlerIdx;
179 int _dummyMigrationHandlerIdx;
180
181
182 int     _getGlobalStepHandlerIdx;
183 int     _recvGlobalStepHandlerIdx;
184
185 int _updateHomeRequestHandlerIdx;
186 int _updateHomeAckHandlerIdx;
187 int _resendMessagesHandlerIdx;
188 int _sendDetsHandlerIdx;
189 int _sendDetsReplyHandlerIdx;
190 int _receivedTNDataHandlerIdx;
191 int _receivedDetDataHandlerIdx;
192 int _distributedLocationHandlerIdx;
193 int _storeDeterminantsHandlerIdx;
194 int _removeDeterminantsHandlerIdx;
195
196 //TML: integer constants for team-based message logging
197 int _restartHandlerIdx;
198 int _getRestartCheckpointHandlerIdx;
199 int _recvRestartCheckpointHandlerIdx;
200 void setTeamRecovery(void *data, ChareMlogData *mlogData);
201 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
202
203 int verifyAckTotal;
204 int verifyAckCount;
205
206 int verifyAckedRequests=0;
207
208 RestartRequest *storedRequest;
209
210 int _falseRestart =0; /**
211                                                                                                         For testing on clusters we might carry out restarts on 
212                                                                                                         a porcessor without actually starting it
213                                                                                                         1 -> false restart
214                                                                                                         0 -> restart after an actual crash
215                                                                                                 */
216
217 //Load balancing globals
218 int onGoingLoadBalancing=0;
219 void *centralLb;
220 void (*resumeLbFnPtr)(void *);
221 int _receiveMlogLocationHandlerIdx;
222 int _receiveMigrationNoticeHandlerIdx;
223 int _receiveMigrationNoticeAckHandlerIdx;
224 int _checkpointBarrierHandlerIdx;
225 int _checkpointBarrierAckHandlerIdx;
226
227 CkVec<MigrationRecord> migratedNoticeList;
228 CkVec<RetainedMigratedObject *> retainedObjectList;
229 int donotCountMigration=0;
230 int countLBMigratedAway=0;
231 int countLBToMigrate=0;
232 int migrationDoneCalled=0;
233 int checkpointBarrierCount=0;
234 int globalResumeCount=0;
235 CkGroupID globalLBID;
236 int restartDecisionNumber=-1;
237
238 double lastCompletedAlarm=0;
239 double lastRestart=0;
240
241 //update location globals
242 int _receiveLocationHandlerIdx;
243
244
245 /** 
246  * @brief Initialize message logging data structures and register handlers
247  */
248 void _messageLoggingInit(){
249         //current object
250         CpvInitialize(Chare *,_currentObj);
251         
252         //registering handlers for message logging
253         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
254                 
255         //handlers for checkpointing
256         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
257         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
258         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
259         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
260
261         //handlers for restart
262         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
263         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
264         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
265         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
266         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
267         _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
268         _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
269         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
270         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
271         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
272         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
273         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
274         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
275
276         //TML: handlers for team-based message logging
277         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
278         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
279         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
280
281         // handlers for causal message logging
282         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
283         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
284         
285         //handlers for load balancing
286         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
287         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
288         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
289         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
290         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
291         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
292         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
293         
294         //handlers for updating locations
295         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
296         
297         //Cpv variables for message logging
298         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
299         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
300         CpvInitialize(Queue, _outOfOrderMessageQueue);
301         CpvInitialize(Queue, _delayedRemoteMessageQueue);
302         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
303         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
304         
305         CpvInitialize(char **,_bufferedTicketRequests);
306         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
307         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
308         for(int i=0;i<CkNumPes();i++){
309                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
310                 CpvAccess(_numBufferedTicketRequests)[i]=0;
311         }
312  
313         // Cpv variables for causal protocol
314         _numBufferedDets = 0;
315         _indexBufferedDets = 0;
316         _phaseBufferedDets = 0;
317         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
318         CpvInitialize(char *, _localDets);
319         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
320         CpvInitialize(char *, _incarnation);
321         CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
322         CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
323         CpvInitialize(int *, _storeDetsSizes);
324         CpvInitialize(char **, _storeDetsPtrs);
325         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
326         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
327         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
328         for(int i=0; i<CmiNumPes(); i++){
329                 CpvAccess(_incarnation)[i] = 0;
330         }
331         CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
332         CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
333         CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
334         CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
335
336         // Cpv variables for parallel recovery
337         CpvInitialize(int, _numEmigrantRecObjs);
338     CpvAccess(_numEmigrantRecObjs) = 0;
339     CpvInitialize(int, _numImmigrantRecObjs);
340     CpvAccess(_numImmigrantRecObjs) = 0;
341
342     CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
343     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
344
345         //Cpv variables for checkpoint
346         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
347         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
348
349         // registering user events for projections      
350         traceRegisterUserEvent("Remove Logs", 20);
351         traceRegisterUserEvent("Ticket Request Handler", 21);
352         traceRegisterUserEvent("Ticket Handler", 22);
353         traceRegisterUserEvent("Local Message Copy Handler", 23);
354         traceRegisterUserEvent("Local Message Ack Handler", 24);        
355         traceRegisterUserEvent("Preprocess current message",25);
356         traceRegisterUserEvent("Preprocess past message",26);
357         traceRegisterUserEvent("Preprocess future message",27);
358         traceRegisterUserEvent("Checkpoint",28);
359         traceRegisterUserEvent("Checkpoint Store",29);
360         traceRegisterUserEvent("Checkpoint Ack",30);
361         traceRegisterUserEvent("Send Ticket Request",31);
362         traceRegisterUserEvent("Generalticketrequest1",32);
363         traceRegisterUserEvent("TicketLogLocal",33);
364         traceRegisterUserEvent("next_ticket and SN",34);
365         traceRegisterUserEvent("Timeout for buffered remote messages",35);
366         traceRegisterUserEvent("Timeout for buffered local messages",36);
367         traceRegisterUserEvent("Inform Location Home",37);
368         traceRegisterUserEvent("Receive Location Handler",38);
369         
370         lastCompletedAlarm=CmiWallTimer();
371         lastRestart = CmiWallTimer();
372
373 #if COLLECT_STATS_MSGS
374 #if COLLECT_STATS_MSGS_TOTAL
375         totalMsgsTarget = 0;
376         totalMsgsSize = 0.0;
377 #else
378         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
379         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
380         for(int i=0; i<CmiNumPes(); i++){
381                 numMsgsTarget[i] = 0;
382                 sizeMsgsTarget[i] = 0;
383         }
384 #endif
385 #endif
386 #if COLLECT_STATS_DETS
387         numPiggyDets = 0;
388         numDets = 0;
389         numDupDets = 0;
390 #endif
391 #if COLLECT_STATS_MEMORY
392         msgLogSize = 0;
393         bufferedDetsSize = 0;
394         storedDetsSize = 0;
395 #endif
396
397
398 }
399
400 void killLocal(void *_dummy,double curWallTime);        
401
402 void readKillFile(){
403         FILE *fp=fopen(killFile,"r");
404         if(!fp){
405                 return;
406         }
407         int proc;
408         double sec;
409         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
410                 if(proc == CkMyPe()){
411                         killTime = CmiWallTimer()+sec;
412                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
413                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
414                 }
415         }
416         fclose(fp);
417 }
418
419 /**
420  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
421  * We assume an exponential distribution for the mean-time-between-failures.
422  */
423 void readFaultFile(){
424         FILE *fp=fopen(faultFile,"r");
425         if(!fp){
426                 return;
427         }
428         int proc;
429         double sec;
430         fscanf(fp,"%d %lf",&proc,&sec);
431         faultMean = sec;
432         if(proc == CkMyPe()){
433                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
434                 CcdCallFnAfter(killLocal,NULL,sec*1000);
435         }
436         fclose(fp);
437 }
438
439 void killLocal(void *_dummy,double curWallTime){
440         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
441         if(CmiWallTimer()<killTime-1){
442                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
443         }else{  
444                 kill(getpid(),SIGKILL);
445         }
446 }
447
448 /*** Auxiliary Functions ***/
449
450 /**
451  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
452  * determinants needs to be extended.
453  */
454 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
455         Determinant *det, *auxDet;
456         char *aux;
457
458         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
459
460         // checking if we are overflowing the buffered determinants array
461         if(_indexBufferedDets >= _maxBufferedDets){
462                 aux = CpvAccess(_localDets);
463                 _maxBufferedDets *= 2;
464                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
465                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
466                 CmiFree(aux);
467         }
468
469         // adding the new determinant at the end
470         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
471         det->sender = sender;
472         det->receiver = receiver;
473         det->SN = SN;
474         det->TN = TN;
475         _numBufferedDets++;
476         _indexBufferedDets++;
477
478 #if COLLECT_STATS_MEMORY
479         bufferedDetsSize++;
480 #endif
481 #if COLLECT_STATS_DETS
482         numDets++;
483 #endif
484 }
485
486 /************************ Message logging methods ****************/
487
488 /**
489  * Sends a group message that might be a broadcast.
490  */
491 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
492         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
493                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
494                 void *origMsg = EnvToUsr(env);
495                 for(int i=0;i<CmiNumPes();i++){
496                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
497                                 void *copyMsg = CkCopyMsg(&origMsg);
498                                 envelope *copyEnv = UsrToEnv(copyMsg);
499                                 copyEnv->SN=0;
500                                 copyEnv->TN=0;
501                                 copyEnv->sender.type = TypeInvalid;
502                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
503                                 sendGroupMsg(copyEnv,i,_infoIdx);
504                         }
505                 }
506                 return;
507         }
508
509         // initializing values of envelope
510         env->SN=0;
511         env->TN=0;
512         env->sender.type = TypeInvalid;
513
514         CkObjID recver;
515         recver.type = TypeGroup;
516         recver.data.group.id = env->getGroupNum();
517         recver.data.group.onPE = destPE;
518         sendCommonMsg(recver,env,destPE,_infoIdx);
519 }
520
521 /**
522  * Sends a nodegroup message that might be a broadcast.
523  */
524 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
525         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
526                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
527                 void *origMsg = EnvToUsr(env);
528                 for(int i=0;i<CmiNumNodes();i++){
529                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
530                                 void *copyMsg = CkCopyMsg(&origMsg);
531                                 envelope *copyEnv = UsrToEnv(copyMsg);
532                                 copyEnv->SN=0;
533                                 copyEnv->TN=0;
534                                 copyEnv->sender.type = TypeInvalid;
535                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
536                         }
537                 }
538                 return;
539         }
540
541         // initializing values of envelope
542         env->SN=0;
543         env->TN=0;
544         env->sender.type = TypeInvalid;
545
546         CkObjID recver;
547         recver.type = TypeNodeGroup;
548         recver.data.group.id = env->getGroupNum();
549         recver.data.group.onPE = destNode;
550         sendCommonMsg(recver,env,destNode,_infoIdx);
551 }
552
553 /**
554  * Sends a message to an array element.
555  */
556 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
557         CkObjID recver;
558         recver.type = TypeArray;
559         recver.data.array.id = env->getsetArrayMgr();
560         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
561
562         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
563                 char recverString[100],senderString[100];
564                 
565                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
566         }
567
568         // initializing values of envelope
569         env->SN = 0;
570         env->TN = 0;
571
572         sendCommonMsg(recver,env,destPE,_infoIdx);
573 };
574
575 /**
576  * Sends a message to a singleton chare.
577  */
578 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
579         CkObjID recver;
580         recver.type = TypeChare;
581         recver.data.chare.id = *pCid;
582
583         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
584                 char recverString[100],senderString[100];
585                 
586                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
587         }
588
589         // initializing values of envelope
590         env->SN = 0;
591         env->TN = 0;
592
593         sendCommonMsg(recver,env,destPE,_infoIdx);
594 };
595
596 /**
597  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
598  */
599 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
600         envelope *env = _env;
601         MCount ticketNumber = 0;
602         int resend=0; //is it a resend
603         char recverName[100];
604         double _startTime=CkWallTimer();
605         
606         DEBUG_MEM(CmiMemoryCheck());
607
608         if(CpvAccess(_currentObj) == NULL){
609 //              CkAssert(0);
610                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
611                 generalCldEnqueue(destPE,env,_infoIdx);
612                 return;
613         }
614         
615         // setting message logging data in the envelope
616         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
617         if(env->sender.type == TypeInvalid){
618                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
619         }else{
620                 envelope *copyEnv = copyEnvelope(env);
621                 env = copyEnv;
622                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
623                 env->SN = 0;
624         }
625         
626         DEBUG_MEM(CmiMemoryCheck());
627
628         CkObjID &sender = env->sender;
629         env->recver = recver;
630
631         Chare *obj = (Chare *)env->sender.getObject();
632           
633         if(env->SN == 0){
634                 DEBUG_MEM(CmiMemoryCheck());
635                 env->SN = obj->mlogData->nextSN(recver);
636         }else{
637                 resend = 1;
638         }
639         
640         char senderString[100];
641 //      if(env->SN != 1){
642                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
643         //      CmiPrintStackTrace(0);
644 /*      }else{
645                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
646         }*/
647                 
648 //      CkPackMessage(&(mEntry->env));
649 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
650         
651         _startTime = CkWallTimer();
652
653         // uses the proper ticketing mechanism for local, team and general messages
654         if(isLocal(destPE)){
655                 sendLocalMsg(env, _infoIdx);
656         }else{
657                 if((teamSize > 1) && isTeamLocal(destPE)){
658
659                         // look to see if this message already has a ticket in the team-table
660                         Chare *senderObj = (Chare *)sender.getObject();
661                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
662                         if(ticketRow != NULL){
663                                 Ticket ticket = ticketRow->get(env->SN);
664                                 if(ticket.TN != 0){
665                                         ticketNumber = ticket.TN;
666                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
667                                 }
668                         }
669                 }
670                 
671                 // sending the message
672                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
673                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
674                 
675         }
676 }
677
678 /**
679  * Determines if the message is local or not. A message is local if:
680  * 1) Both the destination and origin are the same PE.
681  */
682 inline bool isLocal(int destPE){
683         // both the destination and the origin are the same PE
684         if(destPE == CkMyPe())
685                 return true;
686
687         return false;
688 }
689
690 /**
691  * Determines if the message is group local or not. A message is group local if:
692  * 1) They belong to the same group in the group-based message logging.
693  */
694 inline bool isTeamLocal(int destPE){
695
696         // they belong to the same group
697         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
698                 return true;
699
700         return false;
701 }
702
703 /**
704  * Method that does the actual send by creating a ticket request filling it up and sending it.
705  */
706 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
707         DEBUG_NOW(char recverString[100]);
708         DEBUG_NOW(char senderString[100]);
709
710         int totalSize;
711
712         envelope *env = entry->env;
713         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
714
715         // setting all the information
716         Chare *obj = (Chare *)entry->env->sender.getObject();
717         entry->env->recver = recver;
718         entry->env->SN = SN;
719         entry->env->TN = TN;
720         if(!resend){
721                 //TML: only stores message if either it goes to this processor or to a processor in a different group
722                 if(!isTeamLocal(entry->destPE)){
723                         obj->mlogData->addLogEntry(entry);
724 #if COLLECT_STATS_TEAM
725                         MLOGFT_totalMessages += 1.0;
726                         MLOGFT_totalLogSize += entry->env->getTotalsize();
727 #endif
728                 }else{
729                         // the message has to be deleted after it has been sent
730                         entry->env->freeMsg = true;
731                 }
732         }
733
734         // sending the determinants that causally affect this message
735         if(_numBufferedDets > 0){
736
737                 // modifying the actual number of determinants sent in message
738                 CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
739                 CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
740                 CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
741                 CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
742         
743                 // sending the message
744                 CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
745                 CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
746                 CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
747                 CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
748                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
749                 CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
750                 CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
751         }
752
753         // updating its message log entry
754         entry->indexBufDets = _indexBufferedDets;
755         entry->numBufDets = _numBufferedDets;
756
757         // sending the message
758         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
759
760         DEBUG_MEM(CmiMemoryCheck());
761 #if COLLECT_STATS_MSGS
762 #if COLLECT_STATS_MSGS_TOTAL
763         totalMsgsTarget++;
764         totalMsgsSize += (float)env->getTotalsize();
765 #else
766         numMsgsTarget[destPE]++;
767         sizeMsgsTarget[destPE] += env->getTotalsize();
768 #endif
769 #endif
770 #if COLLECT_STATS_DETS
771         numPiggyDets += _numBufferedDets;
772 #endif
773 #if COLLECT_STATS_MEMORY
774         msgLogSize += env->getTotalsize();
775 #endif
776 };
777
778
779 /**
780  * @brief Function to send a local message. It first gets a ticket and
781  * then enqueues the message. If we are recovering, then the message 
782  * is enqueued in a delay queue.
783  */
784 void sendLocalMsg(envelope *env, int _infoIdx){
785         DEBUG_PERF(double _startTime=CkWallTimer());
786         DEBUG_MEM(CmiMemoryCheck());
787         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
788         DEBUG(char senderString[100]);
789         DEBUG(char recverString[100]);
790         Ticket ticket;
791
792         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
793
794         // getting the receiver local object
795         Chare *recverObj = (Chare *)env->recver.getObject();
796
797         // if receiver object is not NULL, we will ask it for a ticket
798         if(recverObj){
799
800 /*HERE          // if we are recovery, then we must put this off for later
801                 if(recverObj->mlogData->restartFlag){
802                         CpvAccess(_delayedLocalMsgs)->enq(entry);
803                         return;
804                 }
805
806                 // check if this combination of sender and SN has been already a ticket
807                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
808                         
809                 // assigned a new ticket in case this message is completely new
810                 if(ticket.TN == 0){
811                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
812
813                         // if TN == 0 we enqueue this message since we are recovering from a crash
814                         if(ticket.TN == 0){
815                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
816                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
817                                 
818 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
819                                 return;
820                         }
821                 }
822
823                 //TODO: check for the case when an invalid ticket is returned
824                 //TODO: check for OLD or RECEIVED TICKETS
825                 entry->env->TN = ticket.TN;
826                 CkAssert(entry->env->TN > 0);
827                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
828         
829                 DEBUG_MEM(CmiMemoryCheck());
830
831                 // adding the determinant to _localDets
832                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
833 */
834                 // sends the local message
835                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
836
837                 DEBUG_MEM(CmiMemoryCheck());
838         }else{
839                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
840         }
841 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
842 };
843
844 /****
845         The handler functions
846 *****/
847
848 /*** CAUSAL PROTOCOL HANDLERS ***/
849
850 /**
851  * @brief Removes the determinants after a particular index in the _localDets array.
852  */
853 void _removeDeterminantsHandler(char *buffer){
854         RemoveDeterminantsHeader *header;
855         int index, phase;
856
857         // getting the header from the message
858         header = (RemoveDeterminantsHeader *)buffer;
859         index = header->index;
860         phase = header->phase;
861
862         // fprintf(stderr,"ACK index=%d\n",index);
863
864         // updating the number of buffered determinants
865         if(phase == _phaseBufferedDets){
866                 if(index > _indexBufferedDets)
867                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
868                 CmiAssert(index <= _indexBufferedDets);
869                 _numBufferedDets = _indexBufferedDets - index;
870         }
871
872         // releasing memory
873         CmiFree(buffer);
874
875 }
876
877 /**
878  * @brief Stores the determinants coming from other processor.
879  */
880 void _storeDeterminantsHandler(char *buffer){
881         StoreDeterminantsHeader *header;
882         Determinant *detPtr, det;
883         int i, n, index, phase, destPE;
884         CkVec<Determinant> *vec;
885
886         // getting the header from the message and pointing to the first determinant
887         header = (StoreDeterminantsHeader *)buffer;
888         n = header->number;
889         index = header->index;
890         phase = header->phase;
891         destPE = header->PE;
892         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
893
894         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
895         DEBUG_MEM(CmiMemoryCheck());
896
897         // going through all determinants and storing them into _remoteDets
898         for(i = 0; i < n; i++){
899                 det.sender = detPtr->sender;
900                 det.receiver = detPtr->receiver;
901                 det.SN = detPtr->SN;
902                 det.TN = detPtr->TN;
903
904                 // getting the determinant array
905                 vec = CpvAccess(_remoteDets)->get(det.receiver);
906                 if(vec == NULL){
907                         vec = new CkVec<Determinant>();
908                         CpvAccess(_remoteDets)->put(det.receiver) = vec;
909                 }
910 #if COLLECT_STATS_DETS
911 #if COLLECT_STATS_DETS_DUP
912                 for(int j=0; j<vec->size(); j++){
913                         if(isSameDet(&(*vec)[j],&det)){
914                                 numDupDets++;
915                                 break;
916                         }
917                 }
918 #endif
919 #endif
920 #if COLLECT_STATS_MEMORY
921                 storedDetsSize++;
922 #endif
923                 vec->push_back(det);
924                 detPtr = detPtr++;
925         }
926
927         DEBUG_MEM(CmiMemoryCheck());
928
929         // freeing buffer
930         CmiFree(buffer);
931
932         // sending the ACK back to the sender
933         CpvAccess(_removeDetsHeader)->index = index;
934         CpvAccess(_removeDetsHeader)->phase = phase;
935         CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
936         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
937         
938 }
939
940 /**
941  *  If there are any delayed requests, process them first before 
942  *  processing this request
943  * */
944 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
945         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
946         double  _startTime = CkWallTimer();
947         CmiFree(ticketRequest);
948 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
949 }
950
951
952 /**
953  * @brief Gets a ticket for a recently received message.
954  * @pre env->recver has to be on this processor.
955  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
956  */
957 inline bool _getTicket(envelope *env, int *flag){
958         DEBUG_MEM(CmiMemoryCheck());
959         DEBUG(char recverName[100]);
960         DEBUG(char senderString[100]);
961         Ticket ticket;
962         
963         // getting information from request
964         CkObjID sender = env->sender;
965         CkObjID recver = env->recver;
966         MCount SN = env->SN;
967         MCount TN = env->TN;
968         Chare *recverObj = (Chare *)recver.getObject();
969         
970         DEBUG(recver.toString(recverName);)
971
972         // verifying whether we are recovering from a failure or not
973         if(recverObj->mlogData->restartFlag)
974                 return false;
975
976         // checking ticket table to see if this message already received a ticket
977         ticket = recverObj->mlogData->getTicket(sender, SN);
978         TN = ticket.TN;
979         
980         // checking if the message is team local and if it has a ticket already assigned
981         if(teamSize > 1 && TN != 0){
982                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
983                 recverObj->mlogData->verifyTicket(sender,SN,TN);
984         }
985
986         //check if a ticket for this has been already handed out to an object that used to be local but 
987         // is no longer so.. need for parallel restart
988         if(TN == 0){
989                 ticket = recverObj->mlogData->next_ticket(sender,SN);
990                 *flag = NEW_TICKET;
991         } else {
992                 *flag = OLD_TICKET;
993         }
994         if(ticket.TN > recverObj->mlogData->tProcessed){
995                 ticket.state = NEW_TICKET;
996         } else {
997                 ticket.state = OLD_TICKET;
998         }
999         // @todo: check for the case when an invalid ticket is returned
1000         if(ticket.TN == 0){
1001                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
1002                 return false;
1003         }
1004                 
1005         // setting the ticket number in the envelope
1006         env->TN = ticket.TN;
1007         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
1008         return true;
1009
1010 };
1011
1012 bool fault_aware(CkObjID &recver){
1013         switch(recver.type){
1014                 case TypeChare:
1015                         return true;
1016                 case TypeMainChare:
1017                         return false;
1018                 case TypeGroup:
1019                 case TypeNodeGroup:
1020                 case TypeArray:
1021                         return true;
1022                 default:
1023                         return false;
1024         }
1025 };
1026
1027 /* Preprocesses a received message */
1028 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1029         DEBUG_NOW(char recverString[100]);
1030         DEBUG_NOW(char senderString[100]);
1031         DEBUG_MEM(CmiMemoryCheck());
1032         int flag;
1033         bool ticketSuccess;
1034
1035         // getting the receiver object
1036         CkObjID recver = env->recver;
1037         if(!fault_aware(recver)){
1038                 return 1;
1039                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1040         }
1041
1042         Chare *obj = (Chare *)recver.getObject();
1043         *objPointer = obj;
1044         if(obj == NULL){
1045                 int possiblePE = recver.guessPE();
1046                 if(possiblePE != CkMyPe()){
1047                         int totalSize = env->getTotalsize();
1048                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1049                         
1050                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1051                 }else{
1052                         // this is the case where a message is received and the object has not been initialized
1053                         // we delayed the delivery of the message
1054                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1055                         
1056                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1057                 }
1058                 return 0;
1059         }
1060
1061         // checking if message comes from an old incarnation
1062         // message must be discarded
1063         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1064                 CmiFree(env);
1065                 return 0;
1066         }
1067
1068         DEBUG_MEM(CmiMemoryCheck());
1069         DEBUG_PE(0,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1070
1071         // getting a ticket for this message
1072         ticketSuccess = _getTicket(env,&flag);
1073
1074         // we might be during recovery when this message arrives
1075         if(!ticketSuccess){
1076                 
1077                 // adding the message to a delay queue
1078                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1079                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1080
1081                 return 0;
1082         }
1083         
1084         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1085         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1086         
1087         if(flag == NEW_TICKET){
1088                 // storing determinant of message in data structure
1089                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1090         }
1091
1092         DEBUG_MEM(CmiMemoryCheck());
1093
1094         double _startTime = CkWallTimer();
1095 //env->sender.updatePosition(env->getSrcPe());
1096         if(env->TN == obj->mlogData->tProcessed+1){
1097                 //the message that needs to be processed now
1098                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1099                 // once we find a message that we can process we put back all the messages in the out of order queue
1100                 // back into the main scheduler queue. 
1101         DEBUG_MEM(CmiMemoryCheck());
1102                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1103                         void *qMsgPtr;
1104                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1105                         envelope *qEnv = (envelope *)qMsgPtr;
1106                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1107         DEBUG_MEM(CmiMemoryCheck());
1108                 }
1109 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1110                 //TODO: this might be a problem.. change made for leanMD
1111 //              CpvAccess(_currentObj) = obj;
1112         DEBUG_MEM(CmiMemoryCheck());
1113                 return 1;
1114         }
1115
1116         // checking if message has already been processed
1117         // message must be discarded
1118         if(env->TN <= obj->mlogData->tProcessed){
1119                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1120                 
1121                 CmiFree(env);
1122                 return 0;
1123         }
1124         //message that needs to be processed in the future
1125
1126         DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1127         //the message cant be processed now put it back in the out of order message Q, 
1128         //It will be transferred to the main queue later
1129         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1130 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1131         DEBUG_MEM(CmiMemoryCheck());
1132         
1133         return 0;
1134 }
1135
1136 /**
1137  * @brief Updates a few variables once a message has been processed.
1138  */
1139 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1140         DEBUG(char senderString[100]);
1141         if(obj){
1142                 if(sender.guessPE() == CkMyPe()){
1143                         if(entry != NULL){
1144                                 entry->env = NULL;
1145                         }
1146                 }
1147                 obj->mlogData->tProcessed++;
1148 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1149                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1150 //              CpvAccess(_currentObj)= NULL;
1151         }
1152         DEBUG_MEM(CmiMemoryCheck());
1153 }
1154
1155 /***
1156         Helpers for the handlers and message logging methods
1157 ***/
1158
1159 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1160 //      double _startTime = CkWallTimer();
1161         if(env->recver.type != TypeNodeGroup){
1162         //This repeats a step performed in skipCldEnq for messages sent to
1163         //other processors. I do this here so that messages to local processors
1164         //undergo the same transformation.. It lets the resend be uniform for 
1165         //all messages
1166 //              CmiSetXHandler(env,CmiGetHandler(env));
1167                 _skipCldEnqueue(destPE,env,_infoIdx);
1168         }else{
1169                 _noCldNodeEnqueue(destPE,env);
1170         }
1171 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1172 }
1173 //extern "C" int CmiGetNonLocalLength();
1174 /** This method is used to retry the ticket requests
1175  * that had been queued up earlier
1176  * */
1177
1178 int calledRetryTicketRequest=0;
1179
1180 void _pingHandler(CkPingMsg *msg){
1181         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1182         CmiFree(msg);
1183 }
1184
1185
1186 /*****************************************************************************
1187         Checkpointing methods..
1188                 Pack all the data on a processor and send it to the buddy periodically
1189                 Also used to throw away message logs
1190 *****************************************************************************/
1191 CkVec<TProcessedLog> processedTicketLog;
1192 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1193 void clearUpMigratedRetainedLists(int PE);
1194
1195 void checkpointAlarm(void *_dummy,double curWallTime){
1196         double diff = curWallTime-lastCompletedAlarm;
1197         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1198 /*      if(CkWallTimer()-lastRestart < 50){
1199                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1200                 return;
1201         }*/
1202         if(diff < ((chkptPeriod) - 2)){
1203                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1204                 return;
1205         }
1206         CheckpointRequest request;
1207         request.PE = CkMyPe();
1208         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1209         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1210 };
1211
1212 void _checkpointRequestHandler(CheckpointRequest *request){
1213         startMlogCheckpoint(NULL,CmiWallTimer());
1214 }
1215
1216 /**
1217  * @brief Starts the checkpoint phase after migration.
1218  */
1219 void startMlogCheckpoint(void *_dummy, double curWallTime){
1220         double _startTime = CkWallTimer();
1221
1222         // increasing the checkpoint counter
1223         checkpointCount++;
1224         
1225 #if DEBUG_CHECKPOINT
1226         if(CmiMyPe() == 0){
1227                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1228         }
1229 #endif
1230
1231         DEBUG_MEM(CmiMemoryCheck());
1232
1233         PUP::sizer psizer;
1234         psizer | checkpointCount;
1235         for(int i=0; i<CmiNumPes(); i++){
1236                 psizer | CpvAccess(_incarnation)[i];
1237         }
1238         CkPupROData(psizer);
1239         DEBUG_MEM(CmiMemoryCheck());
1240         CkPupGroupData(psizer,CmiTrue);
1241         DEBUG_MEM(CmiMemoryCheck());
1242         CkPupNodeGroupData(psizer,CmiTrue);
1243         DEBUG_MEM(CmiMemoryCheck());
1244         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1245         DEBUG_MEM(CmiMemoryCheck());
1246
1247         int dataSize = psizer.size();
1248         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1249         char *msg = (char *)CmiAlloc(totalSize);
1250         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1251         chkMsg->PE = CkMyPe();
1252         chkMsg->dataSize = dataSize;
1253         
1254         char *buf = &msg[sizeof(CheckPointDataMsg)];
1255         PUP::toMem pBuf(buf);
1256
1257         pBuf | checkpointCount;
1258         for(int i=0; i<CmiNumPes(); i++){
1259                 pBuf | CpvAccess(_incarnation)[i];
1260         }
1261         CkPupROData(pBuf);
1262         CkPupGroupData(pBuf,CmiTrue);
1263         CkPupNodeGroupData(pBuf,CmiTrue);
1264         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1265
1266         unAckedCheckpoint=1;
1267         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1268         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1269         
1270         /*
1271                 Store the highest Ticket number processed for each chare on this processor
1272         */
1273         processedTicketLog.removeAll();
1274         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1275
1276 #if DEBUG_CHECKPOINT
1277         if(CmiMyPe() == 0){
1278                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1279         }
1280 #endif
1281
1282 #if COLLECT_STATS_MEMORY
1283         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1284         msgLogSize = 0;
1285         bufferedDetsSize = 0;
1286         storedDetsSize = 0;
1287 #endif
1288
1289         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1290                 lastCompletedAlarm = curWallTime;
1291                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1292         }
1293         traceUserBracketEvent(28,_startTime,CkWallTimer());
1294 };
1295
1296 /**
1297  * @brief A chare adds the latest ticket number processed.
1298  */
1299 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1300         DEBUG(char objString[100]);
1301
1302         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1303         TProcessedLog logEntry;
1304         logEntry.recver = mlogData->objID;
1305         logEntry.tProcessed = mlogData->tProcessed;
1306         log->push_back(logEntry);
1307
1308         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1309 }
1310
1311 class ElementPacker : public CkLocIterator {
1312 private:
1313         CkLocMgr *locMgr;
1314         PUP::er &p;
1315 public:
1316         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1317         void addLocation(CkLocation &loc) {
1318                 CkArrayIndexMax idx=loc.getIndex();
1319                 CkGroupID gID = locMgr->ckGetGroupID();
1320                 p|gID;      // store loc mgr's GID as well for easier restore
1321                 p|idx;
1322                 p|loc;
1323         }
1324 };
1325
1326 /**
1327  * Pups all the array elements in this processor.
1328  */
1329 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1330         int numElements,i;
1331         int numGroups = CkpvAccess(_groupIDTable)->size();      
1332         if(!p.isUnpacking()){
1333                 numElements = CkCountArrayElements();
1334         }       
1335         p | numElements;
1336         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1337         if(!p.isUnpacking()){
1338                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1339         }else{
1340                 //Flush all recs of all LocMgrs before putting in new elements
1341 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1342                 for(int j=0;j<listsize;j++){
1343                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1344                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1345                                 listToSkip[j].idx.print();
1346                         }
1347                 }
1348                 
1349                 printf("numElements = %d\n",numElements);
1350         
1351                 for (int i=0; i<numElements; i++) {
1352                         CkGroupID gID;
1353                         CkArrayIndexMax idx;
1354                         p|gID;
1355                 p|idx;
1356                         int flag=0;
1357                         int matchedIdx=0;
1358                         for(int j=0;j<listsize;j++){
1359                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1360                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1361                                                 matchedIdx = j;
1362                                                 flag = 1;
1363                                                 break;
1364                                         }
1365                                 }
1366                         }
1367                         if(flag == 1){
1368                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1369                         }else{
1370                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1371                         }
1372                                 
1373                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1374                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1375                         mgr->resume(idx,p,create,flag);
1376                         if(flag == 1){
1377                                 int homePE = mgr->homePe(idx);
1378                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1379                         }
1380                 }
1381         }
1382 };
1383
1384
1385 void writeCheckpointToDisk(int size,char *chkpt){
1386         char fNameTemp[100];
1387         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1388         int fd = creat(fNameTemp,S_IRWXU);
1389         int ret = write(fd,chkpt,size);
1390         CkAssert(ret == size);
1391         close(fd);
1392         
1393         char fName[100];
1394         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1395         unlink(fName);
1396
1397         rename(fNameTemp,fName);
1398 }
1399
1400 //handler that receives the checkpoint from a processor
1401 //it stores it and acks it
1402 void _storeCheckpointHandler(char *msg){
1403         double _startTime=CkWallTimer();
1404                 
1405         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1406         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1407         
1408         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1409         
1410         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1411         if(oldChkpt != NULL){
1412                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1413                 CmiFree(oldmsg);
1414         }
1415         //turning off storing checkpoints
1416         
1417         int sendingPE = chkMsg->PE;
1418         
1419         CpvAccess(_storedCheckpointData)->buf = chkpt;
1420         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1421         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1422
1423         int count=0;
1424         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1425                 if(migratedNoticeList[j].fromPE == sendingPE){
1426                         migratedNoticeList[j].ackFrom = 1;
1427                 }else{
1428                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1429                 }
1430                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1431                         migratedNoticeList.remove(j);
1432                         count++;
1433                 }
1434                 
1435         }
1436         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1437         
1438         CheckPointAck ackMsg;
1439         ackMsg.PE = CkMyPe();
1440         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1441         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1442         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1443         
1444         traceUserBracketEvent(29,_startTime,CkWallTimer());
1445 };
1446
1447
1448 /**
1449  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1450  * @note The remove log request message looks like
1451                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1452  */
1453 void sendRemoveLogRequests(){
1454 #if SYNCHRONIZED_CHECKPOINT
1455         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1456 #endif
1457         double _startTime = CkWallTimer();      
1458
1459         // computing total message size
1460         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1461         char *requestMsg = (char *)CmiAlloc(totalSize);
1462
1463         // filling up the message
1464         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1465         request->PE = CkMyPe();
1466         request->numberObjects = processedTicketLog.size();
1467         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1468         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1469         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1470         int *numDeterminants = (int *)listDeterminants;
1471         numDeterminants[0] = 0; 
1472         listDeterminants = (char *)&numDeterminants[1];
1473
1474         // setting the handler in the message
1475         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1476         
1477         DEBUG_MEM(CmiMemoryCheck());
1478         for(int i=0;i<CkNumPes();i++){
1479                 CmiSyncSend(i,totalSize,requestMsg);
1480         }
1481         CmiFree(requestMsg);
1482
1483         clearUpMigratedRetainedLists(CmiMyPe());
1484         //TODO: clear ticketTable
1485         
1486         traceUserBracketEvent(30,_startTime,CkWallTimer());
1487
1488         DEBUG_MEM(CmiMemoryCheck());
1489 }
1490
1491
1492 void _checkpointAckHandler(CheckPointAck *ackMsg){
1493         DEBUG_MEM(CmiMemoryCheck());
1494         unAckedCheckpoint=0;
1495         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1496         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1497         if(onGoingLoadBalancing){
1498                 onGoingLoadBalancing = 0;
1499                 finishedCheckpointLoadBalancing();
1500         }else{
1501                 sendRemoveLogRequests();
1502         }
1503         CmiFree(ackMsg);
1504         
1505 };
1506
1507
1508 /**
1509  * @brief Inserts all the determinants into a hash table.
1510  */
1511 inline void populateDeterminantTable(char *data){
1512         DEBUG(char recverString[100]);
1513         DEBUG(char senderString[100]);
1514         int numDets, *numDetsPtr;
1515         Determinant *detList;
1516         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1517         SNToTicket *tickets;
1518         Ticket ticket;
1519
1520         RemoveLogRequest *request = (RemoveLogRequest *)data;
1521         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1522         
1523         numDetsPtr = (int *)&list[request->numberObjects];
1524         numDets = numDetsPtr[0];
1525         detList = (Determinant *)&numDetsPtr[1];
1526
1527         // inserting determinants into hashtable
1528         for(int i=0; i<numDets; i++){
1529                 table = detTable.get(detList[i].sender);
1530                 if(table == NULL){
1531                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1532                         detTable.put(detList[i].sender) = table;
1533                 }
1534                 tickets = table->get(detList[i].receiver);
1535                 if(tickets == NULL){
1536                         tickets = new SNToTicket();
1537                         table->put(detList[i].receiver) = tickets; 
1538                 }
1539                 ticket.TN = detList[i].TN;
1540                 tickets->put(detList[i].SN) = ticket;
1541         }
1542
1543         DEBUG_MEM(CmiMemoryCheck());    
1544
1545 }
1546
1547 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1548         int total;
1549         DEBUG(char nameString[100]);
1550         DEBUG_MEM(CmiMemoryCheck());
1551         CmiMemoryCheck();
1552         char *data = (char *)_data;
1553         RemoveLogRequest *request = (RemoveLogRequest *)data;
1554         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1555         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1556         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1557         SNToTicket *tickets;
1558         MCount TN;
1559
1560         int count=0;
1561         total = mlog->length();
1562         for(int i=0; i<total; i++){
1563                 MlogEntry *logEntry = mlog->deq();
1564
1565                 // looking message in determinant table
1566                 table = detTable.get(logEntry->env->sender);
1567                 if(table != NULL){
1568                         tickets = table->get(logEntry->env->recver);
1569                         if(tickets != NULL){
1570                                 TN = tickets->get(logEntry->env->SN).TN;
1571                                 if(TN != 0){
1572                                         logEntry->env->TN = TN;
1573                                 }
1574                         }
1575                 }
1576         
1577                 int match=0;
1578                 for(int j=0;j<request->numberObjects;j++){
1579                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1580                                 //this log Entry should be removed
1581                                 match = 1;
1582                                 break;
1583                         }
1584                 }
1585                 char senderString[100],recverString[100];
1586 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1587                 if(match){
1588                         count++;
1589                         delete logEntry;
1590                 }else{
1591                         mlog->enq(logEntry);
1592                 }
1593         }
1594         if(count > 0){
1595                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1596         }
1597         DEBUG_MEM(CmiMemoryCheck());
1598         CmiMemoryCheck();
1599 }
1600
1601 /**
1602  * @brief Removes messages in the log according to the received ticket numbers
1603  */
1604 void _removeProcessedLogHandler(char *requestMsg){
1605         double start = CkWallTimer();
1606
1607         // building map for determinants
1608         populateDeterminantTable(requestMsg);
1609
1610         // removing processed messages from the message log
1611         forAllCharesDo(removeProcessedLogs,requestMsg);
1612
1613         // @todo: clean determinant table 
1614         
1615         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1616         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1617         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1618         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1619         /*
1620                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1621         */
1622         DEBUG_MEM(CmiMemoryCheck());
1623         clearUpMigratedRetainedLists(request->PE);
1624         
1625         traceUserBracketEvent(20,start,CkWallTimer());
1626         CmiFree(requestMsg);    
1627 };
1628
1629
1630 void clearUpMigratedRetainedLists(int PE){
1631         int count=0;
1632         CmiMemoryCheck();
1633         
1634         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1635                 if(migratedNoticeList[j].toPE == PE){
1636                         migratedNoticeList[j].ackTo = 1;
1637                 }
1638                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1639                         migratedNoticeList.remove(j);
1640                         count++;
1641                 }
1642         }
1643         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1644         
1645         for(int j=retainedObjectList.size()-1;j>=0;j--){
1646                 if(retainedObjectList[j]->migRecord.toPE == PE){
1647                         RetainedMigratedObject *obj = retainedObjectList[j];
1648                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1649                         retainedObjectList.remove(j);
1650                         if(obj->msg != NULL){
1651                                 CmiMemoryCheck();
1652                                 CmiFree(obj->msg);
1653                         }
1654                         delete obj;
1655                 }
1656         }
1657 }
1658
1659 /***************************************************************
1660         Restart Methods and handlers
1661 ***************************************************************/        
1662
1663 /**
1664  * Function for restarting the crashed processor.
1665  * It sets the restart flag and contacts the buddy
1666  * processor to get the latest checkpoint.
1667  */
1668 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1669         RestartRequest msg;
1670
1671         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1672
1673         // setting the restart flag
1674         _restartFlag = 1;
1675         _numRestartResponses = 0;
1676
1677         // if we are using team-based message logging, all members of the group have to be restarted
1678         if(teamSize > 1){
1679                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1680                         if(i != CkMyPe() && i < CkNumPes()){
1681                                 // sending a message to the team member
1682                                 msg.PE = CkMyPe();
1683                             CmiSetHandler(&msg,_restartHandlerIdx);
1684                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1685                         }
1686                 }
1687         }
1688
1689         // requesting the latest checkpoint from its buddy
1690         msg.PE = CkMyPe();
1691         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1692         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1693 };
1694
1695 /**
1696  * Function to restart this processor.
1697  * The handler is invoked by a member of its same team in message logging.
1698  */
1699 void _restartHandler(RestartRequest *restartMsg){
1700         int i;
1701         int numGroups = CkpvAccess(_groupIDTable)->size();
1702         RestartRequest msg;
1703         
1704         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1705
1706     // setting the restart flag
1707         _restartFlag = 1;
1708         _numRestartResponses = 0;
1709
1710         // flushing all buffers
1711         //TEST END
1712 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1713         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1714         for(int i=0;i<numGroups;i++){
1715         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1716                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1717                 obj->flushStates();
1718                 obj->ckJustMigrated();
1719         }*/
1720
1721     // requesting the latest checkpoint from its buddy
1722         msg.PE = CkMyPe();
1723         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1724         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1725 }
1726
1727
1728 /**
1729  * Gets the stored checkpoint but calls another function in the sender.
1730  */
1731 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1732
1733         // retrieving the stored checkpoint
1734         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1735
1736         // making sure it is its buddy who is requesting the checkpoint
1737         CkAssert(restartMsg->PE == storedChkpt->PE);
1738
1739         storedRequest = restartMsg;
1740         verifyAckTotal = 0;
1741
1742         for(int i=0;i<migratedNoticeList.size();i++){
1743                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1744 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1745                         if(migratedNoticeList[i].ackFrom == 0){
1746                                 //need to verify if the object actually exists .. it might not
1747                                 //have been acked but it might exist on it
1748                                 VerifyAckMsg msg;
1749                                 msg.migRecord = migratedNoticeList[i];
1750                                 msg.index = i;
1751                                 msg.fromPE = CmiMyPe();
1752                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1753                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1754                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1755                                 verifyAckTotal++;
1756                         }
1757                 }
1758         }
1759
1760         // sending the checkpoint back to its buddy     
1761         if(verifyAckTotal == 0){
1762                 sendCheckpointData(MLOG_RESTARTED);
1763         }
1764         verifyAckCount = 0;
1765 }
1766
1767 /**
1768  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1769  */
1770 void _recvRestartCheckpointHandler(char *_restartData){
1771         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1772         MigrationRecord *migratedAwayElements;
1773
1774         globalLBID = restartData->lbGroupID;
1775         
1776         restartData->restartWallTime *= 1000;
1777         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1778         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1779         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1780
1781         
1782         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1783         char *buf = &_restartData[sizeof(RestartProcessorData)];
1784         
1785         if(restartData->numMigratedAwayElements != 0){
1786                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1787                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1788                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1789                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1790         }
1791
1792         // turning on the team recovery flag
1793         forAllCharesDo(setTeamRecovery,NULL);
1794         
1795         PUP::fromMem pBuf(buf);
1796         pBuf | checkpointCount;
1797         for(int i=0; i<CmiNumPes(); i++){
1798                 pBuf | CpvAccess(_incarnation)[i];
1799         }
1800         CkPupROData(pBuf);
1801         CkPupGroupData(pBuf,CmiFalse);
1802         CkPupNodeGroupData(pBuf,CmiFalse);
1803         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1804         CkAssert(pBuf.size() == restartData->checkPointSize);
1805         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1806
1807         // increasing the incarnation number of all the team members
1808         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1809                 CpvAccess(_incarnation)[i]++;
1810         }
1811         
1812         // turning off the team recovery flag
1813         forAllCharesDo(unsetTeamRecovery,NULL);
1814
1815         // initializing a few variables for handling local messages
1816         forAllCharesDo(initializeRestart,NULL);
1817         
1818         CmiFree(_restartData);  
1819
1820         /*HERE _initDone();
1821
1822         getGlobalStep(globalLBID);
1823         
1824         countUpdateHomeAcks = 0;
1825         RestartRequest updateHomeRequest;
1826         updateHomeRequest.PE = CmiMyPe();
1827         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1828         for(int i=0;i<CmiNumPes();i++){
1829                 if(i != CmiMyPe()){
1830                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1831                 }
1832         }
1833 */
1834
1835
1836         // Send out the request to resend logged messages to all other processors
1837         CkVec<TProcessedLog> objectVec;
1838         forAllCharesDo(createObjIDList, (void *)&objectVec);
1839         int numberObjects = objectVec.size();
1840         
1841         /*
1842                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1843         */
1844         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1845         char *resendMsg = (char *)CmiAlloc(totalSize);  
1846
1847         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1848         resendReq->PE =CkMyPe(); 
1849         resendReq->numberObjects = numberObjects;
1850         char *objList = &resendMsg[sizeof(ResendRequest)];
1851         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1852         
1853
1854         /* test for parallel restart migrate away object**/
1855 //      if(fastRecovery){
1856 //              distributeRestartedObjects();
1857 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1858 //      }
1859         
1860         /*      To make restart work for load balancing.. should only
1861         be used when checkpoint happens along with load balancing
1862         **/
1863 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1864
1865         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1866         CpvAccess(_currentObj) = lb;
1867         lb->ReceiveDummyMigration(restartDecisionNumber);
1868
1869         sleep(10);
1870         
1871         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1872         for(int i=0;i<CkNumPes();i++){
1873                 if(i != CkMyPe()){
1874                         CmiSyncSend(i,totalSize,resendMsg);
1875                 }       
1876         }
1877         _resendMessagesHandler(resendMsg);
1878
1879 }
1880
1881
1882 void CkMlogRestartDouble(void *,double){
1883         CkMlogRestart(NULL,NULL);
1884 };
1885
1886 //TML: restarting from local (group) failure
1887 void CkMlogRestartLocal(){
1888     CkMlogRestart(NULL,NULL);
1889 };
1890
1891 /**
1892  * Gets the stored checkpoint for its buddy processor.
1893  */
1894 void _getCheckpointHandler(RestartRequest *restartMsg){
1895
1896         // retrieving the stored checkpoint
1897         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1898
1899         // making sure it is its buddy who is requesting the checkpoint
1900         CkAssert(restartMsg->PE == storedChkpt->PE);
1901
1902         storedRequest = restartMsg;
1903         verifyAckTotal = 0;
1904
1905         for(int i=0;i<migratedNoticeList.size();i++){
1906                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1907 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1908                         if(migratedNoticeList[i].ackFrom == 0){
1909                                 //need to verify if the object actually exists .. it might not
1910                                 //have been acked but it might exist on it
1911                                 VerifyAckMsg msg;
1912                                 msg.migRecord = migratedNoticeList[i];
1913                                 msg.index = i;
1914                                 msg.fromPE = CmiMyPe();
1915                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1916                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1917                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1918                                 verifyAckTotal++;
1919                         }
1920                 }
1921         }
1922
1923         // sending the checkpoint back to its buddy     
1924         if(verifyAckTotal == 0){
1925                 sendCheckpointData(MLOG_CRASHED);
1926         }
1927         verifyAckCount = 0;
1928 }
1929
1930
1931 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1932         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1933         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1934         if(rec != NULL && rec->type() == CkLocRec::local){
1935                         //this location exists on this processor
1936                         //and needs to be removed       
1937                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1938                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1939                         
1940                         int localIdx = localRec->getLocalIndex();
1941                         LBDatabase *lbdb = localRec->getLBDB();
1942                         LDObjHandle ldHandle = localRec->getLdHandle();
1943                                 
1944                         locMgr->setDuringMigration(true);
1945                         
1946                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1947                         lbdb->UnregisterObj(ldHandle);
1948                         
1949                         locMgr->setDuringMigration(false);
1950                         
1951                         verifyAckedRequests++;
1952
1953         }
1954         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1955         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1956 };
1957
1958
1959 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1960         int index =     verifyReply->index;
1961         migratedNoticeList[index] = verifyReply->migRecord;
1962         verifyAckCount++;
1963         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1964         if(verifyAckCount == verifyAckTotal){
1965                 sendCheckpointData(MLOG_CRASHED);
1966         }
1967 }
1968
1969
1970 /**
1971  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1972  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1973  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1974  */
1975 void sendCheckpointData(int mode){      
1976         RestartRequest *restartMsg = storedRequest;
1977         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1978         int numMigratedAwayElements = migratedNoticeList.size();
1979         if(migratedNoticeList.size() != 0){
1980                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1981 //                      CkAssert(migratedNoticeList.size() == 0);
1982         }
1983         
1984         
1985         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1986         
1987         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1988         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1989         
1990         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1991         
1992         char *msg = (char *)CmiAlloc(totalSize);
1993         
1994         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1995         dataMsg->PE = CkMyPe();
1996         dataMsg->restartWallTime = CmiTimer();
1997         dataMsg->checkPointSize = storedChkpt->bufSize;
1998         
1999         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2000 //      dataMsg->numMigratedAwayElements = 0;
2001         
2002         dataMsg->numMigratedInElements = 0;
2003         dataMsg->migratedElementSize = 0;
2004         dataMsg->lbGroupID = globalLBID;
2005         /*msg layout 
2006                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2007                 Local MessageLog|
2008         */
2009         //store checkpoint data
2010         char *buf = &msg[sizeof(RestartProcessorData)];
2011
2012         if(dataMsg->numMigratedAwayElements != 0){
2013                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2014                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2015         }
2016         
2017
2018         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2019         buf = &buf[storedChkpt->bufSize];
2020
2021         if(mode == MLOG_RESTARTED){
2022                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2023                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2024                 CmiFree(restartMsg);
2025         }else{
2026                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2027                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2028                 CmiFree(restartMsg);
2029         }
2030 };
2031
2032
2033 // this list is used to create a vector of the object ids of all
2034 //the chares on this processor currently and the highest TN processed by them 
2035 //the first argument is actually a CkVec<TProcessedLog> *
2036 void createObjIDList(void *data,ChareMlogData *mlogData){
2037         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2038         TProcessedLog entry;
2039         entry.recver = mlogData->objID;
2040         entry.tProcessed = mlogData->tProcessed;
2041         list->push_back(entry);
2042         DEBUG_TEAM(char objString[100]);
2043         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2044         DEBUG_RECOVERY(printLog(&entry));
2045 }
2046
2047
2048 /**
2049  * Receives the checkpoint data from its buddy, restores the state of all the objects
2050  * and asks everyone else to update its home.
2051  */
2052 void _recvCheckpointHandler(char *_restartData){
2053         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2054         MigrationRecord *migratedAwayElements;
2055
2056         globalLBID = restartData->lbGroupID;
2057         
2058         restartData->restartWallTime *= 1000;
2059         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2060         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2061         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2062
2063         
2064         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2065         char *buf = &_restartData[sizeof(RestartProcessorData)];
2066         
2067         if(restartData->numMigratedAwayElements != 0){
2068                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2069                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2070                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2071                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2072         }
2073         
2074         PUP::fromMem pBuf(buf);
2075
2076         pBuf | checkpointCount;
2077         for(int i=0; i<CmiNumPes(); i++){
2078                 pBuf | CpvAccess(_incarnation)[i];
2079         }
2080         CkPupROData(pBuf);
2081         CkPupGroupData(pBuf,CmiTrue);
2082         CkPupNodeGroupData(pBuf,CmiTrue);
2083         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2084         CkAssert(pBuf.size() == restartData->checkPointSize);
2085         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2086
2087         // increases the incarnation number
2088         CpvAccess(_incarnation)[CmiMyPe()]++;
2089         
2090         forAllCharesDo(initializeRestart,NULL);
2091         
2092         CmiFree(_restartData);
2093         
2094         _initDone();
2095
2096         getGlobalStep(globalLBID);
2097
2098         // sending request to send determinants
2099         _numRestartResponses = 0;
2100         
2101         // Send out the request to resend logged determinants to all other processors
2102         CkVec<TProcessedLog> objectVec;
2103         forAllCharesDo(createObjIDList, (void *)&objectVec);
2104         int numberObjects = objectVec.size();
2105         
2106         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2107         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2108         char *resendMsg = (char *)CmiAlloc(totalSize);  
2109
2110         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2111         resendReq->PE =CkMyPe(); 
2112         resendReq->numberObjects = numberObjects;
2113         char *objList = &resendMsg[sizeof(ResendRequest)];
2114         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2115
2116         CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
2117         for(int i=0;i<CkNumPes();i++){
2118                 if(i != CkMyPe()){
2119                         CmiSyncSend(i,totalSize,resendMsg);
2120                 }
2121         }
2122         CmiFree(resendMsg);
2123         
2124 }
2125
2126 /**
2127  * Receives the updateHome ACKs from all other processors. Once everybody
2128  * has replied, it sends a request to resend the logged messages.
2129  */
2130 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2131         countUpdateHomeAcks++;
2132         CmiFree(updateHomeAck);
2133         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2134         if(countUpdateHomeAcks != CmiNumPes()){
2135                 return;
2136         }
2137
2138         // Send out the request to resend logged messages to all other processors
2139         CkVec<TProcessedLog> objectVec;
2140         forAllCharesDo(createObjIDList, (void *)&objectVec);
2141         int numberObjects = objectVec.size();
2142         
2143         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2144         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2145         char *resendMsg = (char *)CmiAlloc(totalSize);  
2146
2147         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2148         resendReq->PE =CkMyPe(); 
2149         resendReq->numberObjects = numberObjects;
2150         char *objList = &resendMsg[sizeof(ResendRequest)];
2151         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2152
2153         /* test for parallel restart migrate away object**/
2154         if(fastRecovery){
2155                 distributeRestartedObjects();
2156                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2157         }
2158         
2159         /*      To make restart work for load balancing.. should only
2160         be used when checkpoint happens along with load balancing
2161         **/
2162 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2163
2164         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2165         CpvAccess(_currentObj) = lb;
2166         lb->ReceiveDummyMigration(restartDecisionNumber);
2167
2168 //HERE  sleep(10);
2169         
2170         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2171         for(int i=0;i<CkNumPes();i++){
2172                 if(i != CkMyPe()){
2173                         CmiSyncSend(i,totalSize,resendMsg);
2174                 }
2175         }
2176         _resendMessagesHandler(resendMsg);
2177         CmiFree(resendMsg);
2178
2179 };
2180
2181 /**
2182  * @brief Initializes variables and flags for restarting procedure.
2183  */
2184 void initializeRestart(void *data, ChareMlogData *mlogData){
2185         mlogData->resendReplyRecvd = 0;
2186         mlogData->receivedTNs = new CkVec<MCount>;
2187         mlogData->restartFlag = 1;
2188 };
2189
2190 /**
2191  * Updates the homePe of chare array elements.
2192  */
2193 void updateHomePE(void *data,ChareMlogData *mlogData){
2194         RestartRequest *updateRequest = (RestartRequest *)data;
2195         int PE = updateRequest->PE; //restarted PE
2196         //if this object is an array Element and its home is the restarted processor
2197         // the home processor needs to know its current location
2198         if(mlogData->objID.type == TypeArray){
2199                 //it is an array element
2200                 CkGroupID myGID = mlogData->objID.data.array.id;
2201                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2202                 CkArrayID aid(mlogData->objID.data.array.id);           
2203                 //check if the restarted processor is the home processor for this object
2204                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2205                 if(locMgr->homePe(myIdx) == PE){
2206                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2207                         DEBUG_RESTART(myIdx.print());
2208                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2209                 }
2210         }
2211 };
2212
2213
2214 /**
2215  * Updates the homePe for all chares in this processor.
2216  */
2217 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2218         int sender = updateRequest->PE;
2219         
2220         forAllCharesDo(updateHomePE,updateRequest);
2221         
2222         updateRequest->PE = CmiMyPe();
2223         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2224         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2225         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2226                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2227                 checkpointCount--;
2228                 startMlogCheckpoint(NULL,0);
2229         }
2230         if(sender == getCheckPointPE()){
2231                 for(int i=0;i<retainedObjectList.size();i++){
2232                         if(retainedObjectList[i]->acked == 0){
2233                                 MigrationNotice migMsg;
2234                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2235                                 migMsg.record = retainedObjectList[i];
2236                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2237                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2238                         }
2239                 }
2240         }
2241 }
2242
2243 /**
2244  * @brief Fills up the ticket vector for each chare.
2245  */
2246 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2247         DEBUG(char name[100]);
2248         ResendData *resendData = (ResendData *)data;
2249         int PE = resendData->PE; //restarted PE
2250         int count=0;
2251         CkHashtableIterator *iterator;
2252         void *objp;
2253         void *objkey;
2254         CkObjID *objID;
2255         SNToTicket *snToTicket;
2256         Ticket ticket;
2257         
2258         // traversing the team table looking up for the maximum TN received     
2259         iterator = mlogData->teamTable.iterator();
2260         while( (objp = iterator->next(&objkey)) != NULL ){
2261                 objID = (CkObjID *)objkey;
2262         
2263                 // traversing the resendData structure to add ticket numbers
2264                 for(int j=0;j<resendData->numberObjects;j++){
2265                         if((*objID) == (resendData->listObjects)[j].recver){
2266                                 snToTicket = *(SNToTicket **)objp;
2267 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2268                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2269                                         ticket = snToTicket->get(snIndex);      
2270                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2271                                                 //store the TNs that have been since the recver last checkpointed
2272                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2273                                         }
2274                                 }
2275                         }
2276                 }
2277         }
2278
2279         //releasing the memory for the iterator
2280         delete iterator;
2281 }
2282
2283
2284 /**
2285  * @brief Turns on the flag for team recovery that selectively restores
2286  * particular metadata information.
2287  */
2288 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2289         char name[100];
2290         mlogData->teamRecoveryFlag = 1; 
2291 }
2292
2293 /**
2294  * @brief Turns off the flag for team recovery.
2295  */
2296 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2297         mlogData->teamRecoveryFlag = 0;
2298 }
2299
2300 /**
2301  * Prints a processed log.
2302  */
2303 void printLog(TProcessedLog *log){
2304         char recverString[100];
2305         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2306 }
2307
2308 /**
2309  * Prints information about a message.
2310  */
2311 void printMsg(envelope *env, const char* par){
2312         char senderString[100];
2313         char recverString[100];
2314         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2315 }
2316
2317 /**
2318  * Prints information about a determinant.
2319  */
2320 void printDet(Determinant *det, const char* par){
2321         char senderString[100];
2322         char recverString[100];
2323         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2324 }
2325
2326 /**
2327  * @brief Resends all the logged messages to a particular chare list.
2328  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2329  * @param mlogData a particular chare living in this processor.
2330  */
2331 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2332         DEBUG_RESTART(char nameString[100]);
2333         DEBUG_RESTART(char recverString[100]);
2334         DEBUG_RESTART(char senderString[100]);
2335
2336         ResendData *resendData = (ResendData *)data;
2337         int PE = resendData->PE; //restarted PE
2338         int count=0;
2339         int ticketRequests=0;
2340         CkQ<MlogEntry *> *log = mlogData->getMlog();
2341
2342         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2343
2344         // traversing the message log to see if we must resend a message        
2345         for(int i=0;i<log->length();i++){
2346                 MlogEntry *logEntry = (*log)[i];
2347                 
2348                 // if we sent out the logs of a local message to buddy and it crashed
2349                 //before acknowledging 
2350                 envelope *env = logEntry->env;
2351                 if(env == NULL){
2352                         continue;
2353                 }
2354         
2355                 // resend if type is not invalid        
2356                 if(env->recver.type != TypeInvalid){
2357                         for(int j=0;j<resendData->numberObjects;j++){
2358                                 if(env->recver == (resendData->listObjects)[j].recver){
2359                                         if(PE != CkMyPe()){
2360                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2361                                                 if(env->recver.type == TypeNodeGroup){
2362                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2363                                                 }else{
2364                                                         CmiSetHandler(env,CmiGetXHandler(env));
2365                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2366                                                 }
2367                                         }else{
2368                                                 envelope *copyEnv = copyEnvelope(env);
2369                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2370                                         }
2371                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2372                                         count++;
2373                                 }
2374                         }//end of for loop of objects
2375                         
2376                 }       
2377         }
2378         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2379 }
2380
2381 /**
2382  * Send all remote determinants to a particular failed PE.
2383  * It only sends determinants to those objects on the list.
2384  */
2385 void _sendDetsHandler(char *msg){
2386         ResendData d;
2387         CkVec<Determinant> *detVec;
2388         ResendRequest *resendReq = (ResendRequest *)msg;
2389
2390         // CkPrintf("[%d] Sending determinants\n",CkMyPe());
2391
2392         // building the reply message
2393         char *listObjects = &msg[sizeof(ResendRequest)];
2394         d.numberObjects = resendReq->numberObjects;
2395         d.PE = resendReq->PE;
2396         d.listObjects = (TProcessedLog *)listObjects;
2397         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2398         detVec = new CkVec<Determinant>[d.numberObjects];
2399
2400         // adding the remote determinants to the resendReplyMsg
2401         // traversing all the remote determinants
2402         CkVec<Determinant> *vec;
2403         for(int i=0; i<d.numberObjects; i++){
2404                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2405                 if(vec != NULL){
2406                         for(int j=0; j<vec->size(); j++){
2407
2408                                 // only relevant determinants are to be sent
2409                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2410
2411                                         // adding the ticket in the ticket vector
2412                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2413
2414                                         // adding the determinant in the determinant vector
2415                                         detVec[i].push_back((*vec)[j]);
2416
2417                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2418                                 }
2419                         }
2420                 }
2421         }
2422
2423         int totalDetStored = 0;
2424         for(int i=0;i<d.numberObjects;i++){
2425                 totalDetStored += detVec[i].size();
2426         }
2427
2428         //send back the maximum ticket number for a message sent to each object on the 
2429         //restarted processor
2430         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2431         
2432         int totalTNStored=0;
2433         for(int i=0;i<d.numberObjects;i++){
2434                 totalTNStored += d.ticketVecs[i].size();
2435         }
2436         
2437         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2438         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2439         
2440         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2441         resendReply->PE = CkMyPe();
2442         resendReply->numberObjects = d.numberObjects;
2443         
2444         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2445         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2446         for(int i=0;i<d.numberObjects;i++){
2447                 replyObjects[i] = d.listObjects[i].recver;
2448         }
2449         
2450         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2451         for(int i=0;i<d.numberObjects;i++){
2452                 int vecsize = d.ticketVecs[i].size();
2453                 memcpy(ticketList,&vecsize,sizeof(int));
2454                 ticketList = &ticketList[sizeof(int)];
2455                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2456                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2457         }
2458
2459         // adding the stored remote determinants to the message
2460         for(int i=0;i<d.numberObjects;i++){
2461                 int vecsize = detVec[i].size();
2462                 memcpy(ticketList,&vecsize,sizeof(int));
2463                 ticketList = &ticketList[sizeof(int)];
2464                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2465                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2466         }
2467
2468         CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
2469         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2470
2471         delete [] detVec;
2472         delete [] d.ticketVecs;
2473
2474         DEBUG_MEM(CmiMemoryCheck());
2475
2476         if(resendReq->PE != CkMyPe()){
2477                 CmiFree(msg);
2478         }       
2479 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2480         lastRestart = CmiWallTimer();
2481
2482 }
2483
2484 /**
2485  * Resends messages since last checkpoint to the list of objects included in the 
2486  * request. It also sends stored remote determinants to the particular failed PE.
2487  */
2488 void _resendMessagesHandler(char *msg){
2489         ResendData d;
2490         ResendRequest *resendReq = (ResendRequest *)msg;
2491
2492         //CkPrintf("[%d] Resending messages\n",CkMyPe());
2493
2494         // building the reply message
2495         char *listObjects = &msg[sizeof(ResendRequest)];
2496         d.numberObjects = resendReq->numberObjects;
2497         d.PE = resendReq->PE;
2498         d.listObjects = (TProcessedLog *)listObjects;
2499         
2500         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2501
2502         //TML: examines the origin processor to determine if it belongs to the same group.
2503         // In that case, it only returns the maximum ticket received for each object in the list.
2504         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2505                 forAllCharesDo(fillTicketForChare,&d);
2506         else
2507                 forAllCharesDo(resendMessageForChare,&d);
2508
2509         DEBUG_MEM(CmiMemoryCheck());
2510
2511         if(resendReq->PE != CkMyPe()){
2512                 CmiFree(msg);
2513         }       
2514 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2515         lastRestart = CmiWallTimer();
2516 }
2517
2518 MCount maxVec(CkVec<MCount> *TNvec);
2519 void sortVec(CkVec<MCount> *TNvec);
2520 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2521
2522 /**
2523  * @brief Processes the messages in the delayed remote message queue
2524  */
2525 void processDelayedRemoteMsgQueue(){
2526         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2527         
2528         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2529                 void *qMsgPtr;
2530                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2531                 envelope *qEnv = (envelope *)qMsgPtr;
2532                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2533                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2534                 DEBUG_MEM(CmiMemoryCheck());
2535         }
2536
2537 }
2538
2539 /**
2540  * @brief Receives determinants stored on remote nodes.
2541  * Message format: |Header|ObjID list|TN list|Determinant list|
2542  * TN list = |number of TNs|list of TNs|...|
2543  */
2544 void _sendDetsReplyHandler(char *msg){
2545         ResendRequest *resendReply = (ResendRequest *)msg;
2546         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2547         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2548         
2549 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2550         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2551         
2552         for(int i =0; i< resendReply->numberObjects;i++){
2553                 Chare *obj = (Chare *)listObjects[i].getObject();
2554                 
2555                 int vecsize;
2556                 memcpy(&vecsize,listTickets,sizeof(int));
2557                 listTickets = &listTickets[sizeof(int)];
2558                 MCount *listTNs = (MCount *)listTickets;
2559                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2560         
2561                 if(obj != NULL){
2562                         //the object was restarted on the processor on which it existed
2563                         processReceivedTN(obj,vecsize,listTNs);
2564                 }else{
2565                         //pack up objID vecsize and listTNs and send it to the correct processor
2566                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2567                         char *TNMsg = (char *)CmiAlloc(totalSize);
2568                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2569                         receivedTNData->recver = listObjects[i];
2570                         receivedTNData->numTNs = vecsize;
2571                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2572                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2573                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2574                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2575                 }
2576
2577         }
2578
2579         // traversing all the retrieved determinants
2580         for(int i = 0; i < resendReply->numberObjects; i++){
2581                 Chare *obj = (Chare *)listObjects[i].getObject();
2582                 
2583                 int vecsize;
2584                 memcpy(&vecsize,listTickets,sizeof(int));
2585                 listTickets = &listTickets[sizeof(int)];
2586                 Determinant *listDets = (Determinant *)listTickets;     
2587                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2588         
2589                 if(obj != NULL){
2590                         //the object was restarted on the processor on which it existed
2591                         processReceivedDet(obj,vecsize,listDets);
2592                 } else {
2593                         // pack the determinants and ship them to the other processor
2594                         // pack up objID vecsize and listDets and send it to the correct processor
2595                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2596                         char *detMsg = (char *)CmiAlloc(totalSize);
2597                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2598                         receivedDetData->recver = listObjects[i];
2599                         receivedDetData->numDets = vecsize;
2600                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2601                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2602                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2603                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2604                 }
2605
2606         }
2607
2608         // checking if we have received all replies
2609         _numRestartResponses++;
2610         if(_numRestartResponses != CkNumPes())
2611                 return;
2612         else 
2613                 _numRestartResponses = 0;
2614
2615         // continuing with restart process; send out the request to resend logged messages to all other processors
2616         CkVec<TProcessedLog> objectVec;
2617         forAllCharesDo(createObjIDList, (void *)&objectVec);
2618         int numberObjects = objectVec.size();
2619         
2620         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2621         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2622         char *resendMsg = (char *)CmiAlloc(totalSize);  
2623
2624         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2625         resendReq->PE =CkMyPe(); 
2626         resendReq->numberObjects = numberObjects;
2627         char *objList = &resendMsg[sizeof(ResendRequest)];
2628         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2629
2630         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2631         CpvAccess(_currentObj) = lb;
2632         lb->ReceiveDummyMigration(restartDecisionNumber);
2633
2634 //HERE  sleep(10);
2635 //      CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
2636         
2637         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2638         for(int i=0;i<CkNumPes();i++){
2639                 if(i != CkMyPe()){
2640                         CmiSyncSend(i,totalSize,resendMsg);
2641                 }
2642         }
2643         _resendMessagesHandler(resendMsg);
2644         CmiFree(resendMsg);
2645
2646         /* test for parallel restart migrate away object**/
2647         if(fastRecovery){
2648                 distributeRestartedObjects();
2649                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2650         }
2651
2652 //      processDelayedRemoteMsgQueue();
2653
2654 };
2655
2656 /**
2657  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2658  */
2659 void _receivedDetDataHandler(ReceivedDetData *msg){
2660         DEBUG_NOW(char objName[100]);
2661         Chare *obj = (Chare *) msg->recver.getObject();
2662         if(obj){                
2663                 char *_msg = (char *)msg;
2664                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2665                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2666                 processReceivedDet(obj,msg->numDets,listDets);
2667                 CmiFree(msg);
2668         }else{
2669                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2670                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2671         }
2672 }
2673
2674 /**
2675  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2676  */
2677 void _receivedTNDataHandler(ReceivedTNData *msg){
2678         DEBUG_NOW(char objName[100]);
2679         Chare *obj = (Chare *) msg->recver.getObject();
2680         if(obj){                
2681                 char *_msg = (char *)msg;
2682                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2683                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2684                 processReceivedTN(obj,msg->numTNs,listTNs);
2685                 CmiFree(msg);
2686         }else{
2687                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2688                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2689         }
2690 };
2691
2692 /**
2693  * @brief Processes the received list of determinants from a particular PE.
2694  */
2695 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2696         Determinant *det;
2697
2698         // traversing the whole list of determinants
2699         for(int i=0; i<listSize; i++){
2700                 det = &listDets[i];
2701                 if(CkMyPe() == 4) printDet(det,"RECOVERY");
2702                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2703                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2704         }
2705         
2706         DEBUG_MEM(CmiMemoryCheck());
2707 }
2708         
2709 /**
2710  * @brief Processes the received list of tickets from a particular PE.
2711  */
2712 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2713         // increases the number of resendReply received
2714         obj->mlogData->resendReplyRecvd++;
2715
2716         DEBUG(char objName[100]);
2717         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2718         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2719         //if(obj->mlogData->receivedTNs == NULL)
2720         //      CkPrintf("NULL\n");     
2721         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2722
2723         // includes the tickets into the receivedTN structure
2724         for(int j=0;j<listSize;j++){
2725                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2726         }
2727         
2728         //if this object has received all the replies find the ticket numbers
2729         //that senders know about. Those less than the ticket number processed 
2730         //by the receiver can be thrown away. The rest need not be consecutive
2731         // ie there can be holes in the list of ticket numbers seen by senders
2732         if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
2733                 obj->mlogData->resendReplyRecvd = 0;
2734
2735 #if VERIFY_DETS
2736                 //sort the received TNS
2737                 sortVec(obj->mlogData->receivedTNs);
2738         
2739                 //after all the received tickets are in we need to sort them and then 
2740                 // calculate the holes  
2741                 if(obj->mlogData->receivedTNs->size() > 0){
2742                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2743                         int vecsize = obj->mlogData->receivedTNs->size();
2744                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2745                         
2746                         // updating tCount with the highest ticket handed out
2747                         if(teamSize > 1){
2748                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2749                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2750                         }else{
2751                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2752                         }
2753                         
2754                         if(numberHoles == 0){
2755                         }else{
2756                                 char objName[100];                                      
2757                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2758                                 obj->mlogData->numberHoles = numberHoles;
2759                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2760                                 int countHoles=0;
2761                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2762                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2763                                                 //the TNs are not consecutive at this point
2764                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2765                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2766                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2767                                                         countHoles++;
2768                                                 }       
2769                                         }
2770                                 }
2771                                 //Holes have been given new TN
2772                                 if(countHoles != numberHoles){
2773                                         char str[100];
2774                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2775                                 }
2776                                 CkAssert(countHoles == numberHoles);                                    
2777                                 obj->mlogData->currentHoles = numberHoles;
2778                         }
2779                 }
2780 #else
2781                 if(obj->mlogData->receivedTNs->size() > 0){
2782                         obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
2783                 }
2784 #endif
2785                 // cleaning up structures and getting ready to continue execution       
2786                 delete obj->mlogData->receivedTNs;
2787                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2788                 obj->mlogData->receivedTNs = NULL;
2789                 obj->mlogData->restartFlag = 0;
2790
2791                 // processDelayedRemoteMsgQueue();
2792
2793                 DEBUG_RESTART(char objString[100]);
2794                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2795         }
2796
2797 }
2798
2799 /** @brief Returns the maximum ticket from a vector */
2800 MCount maxVec(CkVec<MCount> *TNvec){
2801         MCount max = 0;
2802         for(int i=0; i<TNvec->size(); i++){
2803                 if((*TNvec)[i] > max)
2804                         max = (*TNvec)[i];
2805         }
2806         return max;
2807 }
2808
2809 void sortVec(CkVec<MCount> *TNvec){
2810         //sort it ->its bloddy bubble sort
2811         //TODO: use quicksort
2812         for(int i=0;i<TNvec->size();i++){
2813                 for(int j=i+1;j<TNvec->size();j++){
2814                         if((*TNvec)[j] < (*TNvec)[i]){
2815                                 MCount temp;
2816                                 temp = (*TNvec)[i];
2817                                 (*TNvec)[i] = (*TNvec)[j];
2818                                 (*TNvec)[j] = temp;
2819                         }
2820                 }
2821         }
2822         //make it unique .. since its sorted all equal units will be consecutive
2823         MCount *tempArray = new MCount[TNvec->size()];
2824         int     uniqueCount=-1;
2825         for(int i=0;i<TNvec->size();i++){
2826                 tempArray[i] = 0;
2827                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2828                         uniqueCount++;
2829                         tempArray[uniqueCount] = (*TNvec)[i];
2830                 }
2831         }
2832         uniqueCount++;
2833         TNvec->removeAll();
2834         for(int i=0;i<uniqueCount;i++){
2835                 TNvec->push_back(tempArray[i]);
2836         }
2837         delete [] tempArray;
2838 }       
2839
2840 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2841         if(TNVec->size() == 0){
2842                 return -1; //not found in an empty vec
2843         }
2844         //binary search to find 
2845         int left=0;
2846         int right = TNVec->size();
2847         int mid = (left +right)/2;
2848         while(searchTN != (*TNVec)[mid] && left < right){
2849                 if((*TNVec)[mid] > searchTN){
2850                         right = mid-1;
2851                 }else{
2852                         left = mid+1;
2853                 }
2854                 mid = (left + right)/2;
2855         }
2856         if(left < right){
2857                 //mid is the element to be returned
2858                 return mid;
2859         }else{
2860                 if(mid < TNVec->size() && mid >=0){
2861                         if((*TNVec)[mid] == searchTN){
2862                                 return mid;
2863                         }else{
2864                                 return -1;
2865                         }
2866                 }else{
2867                         return -1;
2868                 }
2869         }
2870 };
2871
2872
2873 /*
2874         Method to do parallel restart. Distribute some of the array elements to other processors.
2875         The problem is that we cant use to charm entry methods to do migration as it will get
2876         stuck in the protocol that is going to restart
2877         Note: in order to avoid interference between the objects being recovered, the current PE
2878     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2879     objects end up stepping into each other's shoes (interference).
2880 */
2881
2882 class ElementDistributor: public CkLocIterator{
2883         CkLocMgr *locMgr;
2884         int *targetPE;
2885
2886         void pupLocation(CkLocation &loc,PUP::er &p){
2887                 CkArrayIndexMax idx=loc.getIndex();
2888                 CkGroupID gID = locMgr->ckGetGroupID();
2889                 p|gID;      // store loc mgr's GID as well for easier restore
2890                 p|idx;
2891                 p|loc;
2892         };
2893 public:
2894         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2895
2896         void addLocation(CkLocation &loc){
2897
2898                 // leaving object on this PE
2899                 if(*targetPE == CkMyPe()){
2900                         *targetPE = (*targetPE +1)%CkNumPes();
2901                         return;
2902                 }
2903                         
2904                 CkArrayIndexMax idx = loc.getIndex();
2905                 CkLocRec_local *rec = loc.getLocalRecord();
2906                         
2907                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2908                 idx.print();
2909
2910                 // incrementing number of emigrant objects
2911                 CpvAccess(_numEmigrantRecObjs)++;
2912                         
2913                 //pack up this location and send it across
2914                 PUP::sizer psizer;
2915                 pupLocation(loc,psizer);
2916                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
2917                 char *msg = (char *)CmiAlloc(totalSize);
2918                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
2919                 distributeMsg->PE = CkMyPe();
2920                 char *buf = &msg[sizeof(DistributeObjectMsg)];
2921                 PUP::toMem pmem(buf);
2922                 pmem.becomeDeleting();
2923                 pupLocation(loc,pmem);
2924                         
2925                 locMgr->setDuringMigration(CmiTrue);
2926                 delete rec;
2927                 locMgr->setDuringMigration(CmiFalse);
2928                 locMgr->inform(idx,*targetPE);
2929
2930                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2931                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2932
2933                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2934
2935                 //decide on the target processor for the next object
2936                 *targetPE = *targetPE + 1;
2937                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2938                         *targetPE = CkMyPe() + 1;
2939                 }
2940         }
2941
2942 };
2943
2944 /**
2945  * Distributes objects to accelerate recovery after a failure.
2946  */
2947 void distributeRestartedObjects(){
2948         int numGroups = CkpvAccess(_groupIDTable)->size();      
2949         int i;
2950         int targetPE=CkMyPe()+1;
2951         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2952 };
2953
2954 /**
2955  * Handler to update information about an object just received.
2956  */
2957 void _distributedLocationHandler(char *receivedMsg){
2958         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2959         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
2960         int sourcePE = distributeMsg->PE;
2961         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
2962         PUP::fromMem pmem(buf);
2963         CkGroupID gID;
2964         CkArrayIndexMax idx;
2965         pmem |gID;
2966         pmem |idx;
2967         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2968         donotCountMigration=1;
2969         mgr->resume(idx,pmem,CmiTrue);
2970         donotCountMigration=0;
2971         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2972         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2973         idx.print();
2974
2975         CkLocRec *rec = mgr->elementRec(idx);
2976         CmiAssert(rec->type() == CkLocRec::local);
2977
2978         // adding object to the list of immigrant recovery objects
2979         CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
2980         CpvAccess(_numImmigrantRecObjs)++;
2981         
2982         CkVec<CkMigratable *> eltList;
2983         mgr->migratableList((CkLocRec_local *)rec,eltList);
2984         for(int i=0;i<eltList.size();i++){
2985                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2986                         CpvAccess(_currentObj) = eltList[i];
2987                         eltList[i]->mlogData->immigrantRecFlag = 1;
2988                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
2989                         eltList[i]->ResumeFromSync();
2990                 }
2991         }
2992 }
2993
2994
2995 /** this method is used to send messages to a restarted processor to tell
2996  * it that a particular expected object is not going to get to it */
2997 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2998         DummyMigrationMsg buf;
2999         buf.flag = MLOG_OBJECT;
3000         buf.lbID = lbID;
3001         buf.mgrID = locMgrID;
3002         buf.idx = idx;
3003         buf.locationPE = locationPE;
3004         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3005         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3006 };
3007
3008
3009 /**this method is used by a restarted processor to tell other processors
3010  * that they are not going to receive these many objects.. just the count
3011  * not the objects themselves ***/
3012
3013 void sendDummyMigrationCounts(int *dummyCounts){
3014         DummyMigrationMsg buf;
3015         buf.flag = MLOG_COUNT;
3016         buf.lbID = globalLBID;
3017         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3018         for(int i=0;i<CmiNumPes();i++){
3019                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3020                         buf.count = dummyCounts[i];
3021                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3022                 }
3023         }
3024 }
3025
3026
3027 /** this handler is used to process a dummy migration msg.
3028  * it looks up the load balancer and calls migrated for it */
3029
3030 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3031         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3032         if(msg->flag == MLOG_OBJECT){
3033                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3034                 LDObjHandle h;
3035                 lb->Migrated(h,1);
3036         }
3037         if(msg->flag == MLOG_COUNT){
3038                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3039                 msg->count -= verifyAckedRequests;
3040                 for(int i=0;i<msg->count;i++){
3041                         LDObjHandle h;
3042                         lb->Migrated(h,1);
3043                 }
3044         }
3045         verifyAckedRequests=0;
3046         CmiFree(msg);
3047 };
3048
3049 /*****************************************************
3050         Implementation of a method that can be used to call
3051         any method on the ChareMlogData of all the chares on
3052         a processor currently
3053 ******************************************************/
3054
3055
3056 class ElementCaller :  public CkLocIterator {
3057 private:
3058         CkLocMgr *locMgr;
3059         MlogFn fnPointer;
3060         void *data;
3061 public:
3062         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3063                 locMgr = _locMgr;
3064                 fnPointer = _fnPointer;
3065                 data = _data;
3066         };
3067         void addLocation(CkLocation &loc){
3068                 CkVec<CkMigratable *> list;
3069                 CkLocRec_local *local = loc.getLocalRecord();
3070                 locMgr->migratableList (local,list);
3071                 for(int i=0;i<list.size();i++){
3072                         CkMigratable *migratableElement = list[i];
3073                         fnPointer(data,migratableElement->mlogData);
3074                 }
3075         }
3076 };
3077
3078 /**
3079  * Map function pointed by fnPointer over all the chares living in this processor.
3080  */
3081 void forAllCharesDo(MlogFn fnPointer, void *data){
3082         int numGroups = CkpvAccess(_groupIDTable)->size();
3083         for(int i=0;i<numGroups;i++){
3084                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3085                 fnPointer(data,obj->mlogData);
3086         }
3087         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3088         for(int i=0;i<numNodeGroups;i++){
3089                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3090                 fnPointer(data,obj->mlogData);
3091         }
3092         int i;
3093         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3094 };
3095
3096
3097 /******************************************************************
3098  Load Balancing
3099 ******************************************************************/
3100
3101 /**
3102  * This is the first time Converse is called after AtSync method has been called by every local object.
3103  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3104  * message logging, we can take advantage of this situation and garbage collect at this point.
3105  */
3106 void initMlogLBStep(CkGroupID gid){
3107         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3108         countLBMigratedAway = 0;
3109         countLBToMigrate=0;
3110         onGoingLoadBalancing=1;
3111         migrationDoneCalled=0;
3112         checkpointBarrierCount=0;
3113         if(globalLBID.idx != 0){
3114                 CmiAssert(globalLBID.idx == gid.idx);
3115         }
3116         globalLBID = gid;
3117 #if SYNCHRONIZED_CHECKPOINT
3118         garbageCollectMlog();
3119 #endif
3120 }
3121
3122 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3123         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3124         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3125         
3126         resumeLbFnPtr = _fnPtr;
3127         centralLb = _centralLb;
3128         migrationDoneCalled = 1;
3129         if(countLBToMigrate == countLBMigratedAway){
3130                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3131                 startMlogCheckpoint(NULL,CmiWallTimer());       
3132         }
3133 };
3134
3135 void finishedCheckpointLoadBalancing(){
3136         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3137         CheckpointBarrierMsg msg;
3138         msg.fromPE = CmiMyPe();
3139         msg.checkpointCount = checkpointCount;
3140
3141         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3142         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3143         
3144 };
3145
3146
3147 void sendMlogLocation(int targetPE, envelope *env){
3148 #if !SYNCHRONIZED_CHECKPOINT
3149         void *_msg = EnvToUsr(env);
3150         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3151
3152         int existing = 0;
3153         //if this object is already in the retainedobjectlust destined for this
3154         //processor it should not be sent
3155         
3156         for(int i=0;i<retainedObjectList.size();i++){
3157                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3158                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3159                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3160                         existing = 1;
3161                         break;
3162                 }
3163         }
3164
3165         if(existing){
3166                 return;
3167         }
3168         
3169         countLBToMigrate++;
3170         
3171         MigrationNotice migMsg;
3172         migMsg.migRecord.gID = msg->gid;
3173         migMsg.migRecord.idx = msg->idx;
3174         migMsg.migRecord.fromPE = CkMyPe();
3175         migMsg.migRecord.toPE =  targetPE;
3176         
3177         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3178         
3179         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3180         retainedObject->migRecord = migMsg.migRecord;
3181         retainedObject->acked  = 0;
3182         
3183         CkPackMessage(&env);
3184         
3185         migMsg.record = retainedObject;
3186         retainedObject->msg = env;
3187         int size = retainedObject->size = env->getTotalsize();
3188         
3189         retainedObjectList.push_back(retainedObject);
3190         
3191         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3192         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3193         
3194         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3195 #endif
3196 }
3197
3198 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3199         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3200         migratedNoticeList.push_back(msg->migRecord);
3201
3202         MigrationNoticeAck buf;
3203         buf.record = msg->record;
3204         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3205         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3206 }
3207
3208 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3209         
3210         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3211         retainedObject->acked = 1;
3212
3213         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3214         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3215
3216         //inform home about the new location of this object
3217         CkGroupID gID = retainedObject->migRecord.gID ;
3218         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3219         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3220         
3221         countLBMigratedAway++;
3222         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3223                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3224                 startMlogCheckpoint(NULL,CmiWallTimer());
3225         }
3226 };
3227
3228 void _receiveMlogLocationHandler(void *buf){
3229         envelope *env = (envelope *)buf;
3230         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3231         CkUnpackMessage(&env);
3232        &