06afc80b894ccd0a9a23037cd41ccbc60b9c5870
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_TEAM 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x) // x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_RECOVERY(x) //x
42
43 extern const char *idx2str(const CkArrayIndex &ind);
44 extern const char *idx2str(const ArrayElement *el);
45
46 void getGlobalStep(CkGroupID gID);
47
48 bool fault_aware(CkObjID &recver);
49 void sendCheckpointData(int mode);
50 void createObjIDList(void *data,ChareMlogData *mlogData);
51 inline bool isLocal(int destPE);
52 inline bool isTeamLocal(int destPE);
53 void printLog(TProcessedLog *log);
54
55 int _restartFlag=0;
56 int _numRestartResponses=0;
57
58 //TODO: remove for perf runs
59 int countHashRefs=0; //count the number of gets
60 int countHashCollisions=0;
61
62 char *checkpointDirectory=".";
63 int unAckedCheckpoint=0;
64
65 int countLocal=0,countBuffered=0;
66 int countPiggy=0;
67 int countClearBufferedLocalCalls=0;
68
69 int countUpdateHomeAcks=0;
70
71 extern int teamSize;
72 extern int chkptPeriod;
73 extern bool fastRecovery;
74 extern int parallelRecovery;
75
76 char *killFile;
77 char *faultFile;
78 int killFlag=0;
79 int faultFlag=0;
80 int restartingMlogFlag=0;
81 void readKillFile();
82 double killTime=0.0;
83 double faultMean;
84 int checkpointCount=0;
85
86 CpvDeclare(Chare *,_currentObj);
87 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
88 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
89 CpvDeclare(Queue, _outOfOrderMessageQueue);
90 CpvDeclare(Queue, _delayedRemoteMessageQueue);
91 CpvDeclare(char **,_bufferedTicketRequests);
92 CpvDeclare(int *,_numBufferedTicketRequests);
93
94 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
95 /** @note All the determinants generated by a PE are stored in variable _localDets.
96  * As soon as a message is sent, then all the determinants are appended to the message,
97  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
98  * to delete the determinants. In the meantime the same determinants may be appended
99  * to other messages and more determinants can be added to _localDets.
100  * A simple solution to this problem was to have a primitive array and keep adding determinants
101  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
102  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
103  * determinant and a number of how many valid determinants there are behind it. We do not remove
104  * determinants until a checkpoint is made, since these determinants may have to be added to
105  * messages in case of a recovery.
106  */
107 // temporal storage for the outgoing determinants
108 CpvDeclare(char *, _localDets);
109 // number of buffered determinants
110 int _numBufferedDets;
111 // current index of first determinant in _localDets
112 int _indexBufferedDets;
113 // current phase of determinants
114 int _phaseBufferedDets;
115
116 // stores the determinants from other nodes, to send them in case of a crash
117 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
118 // max number of buffered determinants
119 int _maxBufferedDets;
120
121 // stores the incarnation number from every other processor
122 CpvDeclare(char *, _incarnation);
123
124 /***** *****/
125
126 #if COLLECT_STATS_MSGS
127 int *numMsgsTarget;
128 int *sizeMsgsTarget;
129 int totalMsgsTarget;
130 float totalMsgsSize;
131 #endif
132 #if COLLECT_STATS_DETS
133 int numPiggyDets;
134 int numDets;
135 int numDupDets;
136 #endif
137 #if COLLECT_STATS_MEMORY
138 int msgLogSize;
139 int bufferedDetsSize;
140 int storedDetsSize;
141 #endif
142 //TML: variables for measuring savings with teams in message logging
143 #if COLLECT_STATS_TEAM
144 float MLOGFT_totalLogSize = 0.0;
145 float MLOGFT_totalMessages = 0.0;
146 #endif
147
148 static double adjustChkptPeriod=0.0; //in ms
149 static double nextCheckpointTime=0.0;//in seconds
150 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
151
152 int _pingHandlerIdx;
153
154 char objString[100];
155 int _checkpointRequestHandlerIdx;
156 int _storeCheckpointHandlerIdx;
157 int _checkpointAckHandlerIdx;
158 int _getCheckpointHandlerIdx;
159 int _recvCheckpointHandlerIdx;
160 int _removeProcessedLogHandlerIdx;
161
162 int _verifyAckRequestHandlerIdx;
163 int _verifyAckHandlerIdx;
164 int _dummyMigrationHandlerIdx;
165
166
167 int     _getGlobalStepHandlerIdx;
168 int     _recvGlobalStepHandlerIdx;
169
170 int _updateHomeRequestHandlerIdx;
171 int _updateHomeAckHandlerIdx;
172 int _resendMessagesHandlerIdx;
173 int _resendReplyHandlerIdx;
174 int _receivedTNDataHandlerIdx;
175 int _receivedDetDataHandlerIdx;
176 int _distributedLocationHandlerIdx;
177 int _storeDeterminantsHandlerIdx;
178 int _removeDeterminantsHandlerIdx;
179
180 //TML: integer constants for team-based message logging
181 int _restartHandlerIdx;
182 int _getRestartCheckpointHandlerIdx;
183 int _recvRestartCheckpointHandlerIdx;
184 void setTeamRecovery(void *data, ChareMlogData *mlogData);
185 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
186
187 int verifyAckTotal;
188 int verifyAckCount;
189
190 int verifyAckedRequests=0;
191
192 RestartRequest *storedRequest;
193
194 int _falseRestart =0; /**
195                                                                                                         For testing on clusters we might carry out restarts on 
196                                                                                                         a porcessor without actually starting it
197                                                                                                         1 -> false restart
198                                                                                                         0 -> restart after an actual crash
199                                                                                                 */              
200
201 //Load balancing globals
202 int onGoingLoadBalancing=0;
203 void *centralLb;
204 void (*resumeLbFnPtr)(void *);
205 int _receiveMlogLocationHandlerIdx;
206 int _receiveMigrationNoticeHandlerIdx;
207 int _receiveMigrationNoticeAckHandlerIdx;
208 int _checkpointBarrierHandlerIdx;
209 int _checkpointBarrierAckHandlerIdx;
210
211 CkVec<MigrationRecord> migratedNoticeList;
212 CkVec<RetainedMigratedObject *> retainedObjectList;
213 int donotCountMigration=0;
214 int countLBMigratedAway=0;
215 int countLBToMigrate=0;
216 int migrationDoneCalled=0;
217 int checkpointBarrierCount=0;
218 int globalResumeCount=0;
219 CkGroupID globalLBID;
220 int restartDecisionNumber=-1;
221
222 double lastCompletedAlarm=0;
223 double lastRestart=0;
224
225 //update location globals
226 int _receiveLocationHandlerIdx;
227
228 /** 
229  * @brief Initialize message logging data structures and register handlers
230  */
231 void _messageLoggingInit(){
232         //current object
233         CpvInitialize(Chare *,_currentObj);
234         
235         //registering handlers for message logging
236         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
237                 
238         //handlers for checkpointing
239         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
240         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
241         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
242         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
243
244         //handlers for restart
245         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
246         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
247         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
248         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
249         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
250         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
251         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
252         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
253         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
254         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
255         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
256         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
257
258         //TML: handlers for team-based message logging
259         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
260         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
261         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
262
263         // handlers for causal message logging
264         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
265         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
266         
267         //handlers for load balancing
268         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
269         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
270         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
271         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
272         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
273         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
274         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
275         
276         //handlers for updating locations
277         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
278         
279         //Cpv variables for message logging
280         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
281         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
282         CpvInitialize(Queue, _outOfOrderMessageQueue);
283         CpvInitialize(Queue, _delayedRemoteMessageQueue);
284         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
285         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
286         
287         CpvInitialize(char **,_bufferedTicketRequests);
288         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
289         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
290         for(int i=0;i<CkNumPes();i++){
291                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
292                 CpvAccess(_numBufferedTicketRequests)[i]=0;
293         }
294  
295         // Cpv variables for causal protocol
296         _numBufferedDets = 0;
297         _indexBufferedDets = 0;
298         _phaseBufferedDets = 0;
299         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
300         CpvInitialize(char *, _localDets);
301         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
302         CpvInitialize(char *, _incarnation);
303         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
304         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
305         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
306         for(int i=0; i<CmiNumPes(); i++){
307                 CpvAccess(_incarnation)[i] = 0;
308         }
309         
310         //Cpv variables for checkpoint
311         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
312         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
313
314         // registering user events for projections      
315         traceRegisterUserEvent("Remove Logs", 20);
316         traceRegisterUserEvent("Ticket Request Handler", 21);
317         traceRegisterUserEvent("Ticket Handler", 22);
318         traceRegisterUserEvent("Local Message Copy Handler", 23);
319         traceRegisterUserEvent("Local Message Ack Handler", 24);        
320         traceRegisterUserEvent("Preprocess current message",25);
321         traceRegisterUserEvent("Preprocess past message",26);
322         traceRegisterUserEvent("Preprocess future message",27);
323         traceRegisterUserEvent("Checkpoint",28);
324         traceRegisterUserEvent("Checkpoint Store",29);
325         traceRegisterUserEvent("Checkpoint Ack",30);
326         traceRegisterUserEvent("Send Ticket Request",31);
327         traceRegisterUserEvent("Generalticketrequest1",32);
328         traceRegisterUserEvent("TicketLogLocal",33);
329         traceRegisterUserEvent("next_ticket and SN",34);
330         traceRegisterUserEvent("Timeout for buffered remote messages",35);
331         traceRegisterUserEvent("Timeout for buffered local messages",36);
332         traceRegisterUserEvent("Inform Location Home",37);
333         traceRegisterUserEvent("Receive Location Handler",38);
334         
335         lastCompletedAlarm=CmiWallTimer();
336         lastRestart = CmiWallTimer();
337
338 #if COLLECT_STATS_MSGS
339 #if COLLECT_STATS_MSGS_TOTAL
340         totalMsgsTarget = 0;
341         totalMsgsSize = 0.0;
342 #else
343         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
344         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
345         for(int i=0; i<CmiNumPes(); i++){
346                 numMsgsTarget[i] = 0;
347                 sizeMsgsTarget[i] = 0;
348         }
349 #endif
350 #endif
351 #if COLLECT_STATS_DETS
352         numPiggyDets = 0;
353         numDets = 0;
354         numDupDets = 0;
355 #endif
356 #if COLLECT_STATS_MEMORY
357         msgLogSize = 0;
358         bufferedDetsSize = 0;
359         storedDetsSize = 0;
360 #endif
361
362
363 }
364
365 void killLocal(void *_dummy,double curWallTime);        
366
367 void readKillFile(){
368         FILE *fp=fopen(killFile,"r");
369         if(!fp){
370                 return;
371         }
372         int proc;
373         double sec;
374         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
375                 if(proc == CkMyPe()){
376                         killTime = CmiWallTimer()+sec;
377                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
378                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
379                 }
380         }
381         fclose(fp);
382 }
383
384 /**
385  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
386  * We assume an exponential distribution for the mean-time-between-failures.
387  */
388 void readFaultFile(){
389         FILE *fp=fopen(faultFile,"r");
390         if(!fp){
391                 return;
392         }
393         int proc;
394         double sec;
395         fscanf(fp,"%d %lf",&proc,&sec);
396         faultMean = sec;
397         if(proc == CkMyPe()){
398                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
399                 CcdCallFnAfter(killLocal,NULL,sec*1000);
400         }
401         fclose(fp);
402 }
403
404 void killLocal(void *_dummy,double curWallTime){
405         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
406         if(CmiWallTimer()<killTime-1){
407                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
408         }else{  
409                 kill(getpid(),SIGKILL);
410         }
411 }
412
413 /*** Auxiliary Functions ***/
414
415 /**
416  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
417  * determinants needs to be extended.
418  */
419 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
420         Determinant *det, *auxDet;
421         char *aux;
422
423         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
424
425         // checking if we are overflowing the buffered determinants array
426         if(_indexBufferedDets >= _maxBufferedDets){
427                 aux = CpvAccess(_localDets);
428                 _maxBufferedDets *= 2;
429                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
430                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
431                 CmiFree(aux);
432         }
433
434         // adding the new determinant at the end
435         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
436         det->sender = sender;
437         det->receiver = receiver;
438         det->SN = SN;
439         det->TN = TN;
440         _numBufferedDets++;
441         _indexBufferedDets++;
442
443 #if COLLECT_STATS_MEMORY
444         bufferedDetsSize++;
445 #endif
446 #if COLLECT_STATS_DETS
447         numDets++;
448 #endif
449 }
450
451 /************************ Message logging methods ****************/
452
453 /**
454  * Sends a group message that might be a broadcast.
455  */
456 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
457         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
458                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
459                 void *origMsg = EnvToUsr(env);
460                 for(int i=0;i<CmiNumPes();i++){
461                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
462                                 void *copyMsg = CkCopyMsg(&origMsg);
463                                 envelope *copyEnv = UsrToEnv(copyMsg);
464                                 copyEnv->SN=0;
465                                 copyEnv->TN=0;
466                                 copyEnv->sender.type = TypeInvalid;
467                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
468                                 sendGroupMsg(copyEnv,i,_infoIdx);
469                         }
470                 }
471                 return;
472         }
473
474         // initializing values of envelope
475         env->SN=0;
476         env->TN=0;
477         env->sender.type = TypeInvalid;
478
479         CkObjID recver;
480         recver.type = TypeGroup;
481         recver.data.group.id = env->getGroupNum();
482         recver.data.group.onPE = destPE;
483         sendCommonMsg(recver,env,destPE,_infoIdx);
484 }
485
486 /**
487  * Sends a nodegroup message that might be a broadcast.
488  */
489 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
490         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
491                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
492                 void *origMsg = EnvToUsr(env);
493                 for(int i=0;i<CmiNumNodes();i++){
494                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
495                                 void *copyMsg = CkCopyMsg(&origMsg);
496                                 envelope *copyEnv = UsrToEnv(copyMsg);
497                                 copyEnv->SN=0;
498                                 copyEnv->TN=0;
499                                 copyEnv->sender.type = TypeInvalid;
500                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
501                         }
502                 }
503                 return;
504         }
505
506         // initializing values of envelope
507         env->SN=0;
508         env->TN=0;
509         env->sender.type = TypeInvalid;
510
511         CkObjID recver;
512         recver.type = TypeNodeGroup;
513         recver.data.group.id = env->getGroupNum();
514         recver.data.group.onPE = destNode;
515         sendCommonMsg(recver,env,destNode,_infoIdx);
516 }
517
518 /**
519  * Sends a message to an array element.
520  */
521 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
522         CkObjID recver;
523         recver.type = TypeArray;
524         recver.data.array.id = env->getsetArrayMgr();
525         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
526
527         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
528                 char recverString[100],senderString[100];
529                 
530                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
531         }
532
533         // initializing values of envelope
534         env->SN = 0;
535         env->TN = 0;
536
537         sendCommonMsg(recver,env,destPE,_infoIdx);
538 };
539
540 /**
541  * Sends a message to a singleton chare.
542  */
543 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
544         CkObjID recver;
545         recver.type = TypeChare;
546         recver.data.chare.id = *pCid;
547
548         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
549                 char recverString[100],senderString[100];
550                 
551                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
552         }
553
554         // initializing values of envelope
555         env->SN = 0;
556         env->TN = 0;
557
558         sendCommonMsg(recver,env,destPE,_infoIdx);
559 };
560
561 /**
562  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
563  */
564 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
565         envelope *env = _env;
566         MCount ticketNumber = 0;
567         int resend=0; //is it a resend
568         char recverName[100];
569         double _startTime=CkWallTimer();
570         
571         DEBUG_MEM(CmiMemoryCheck());
572
573         if(CpvAccess(_currentObj) == NULL){
574 //              CkAssert(0);
575                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
576                 generalCldEnqueue(destPE,env,_infoIdx);
577                 return;
578         }
579         
580         // setting message logging data in the envelope
581         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
582         if(env->sender.type == TypeInvalid){
583                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
584         }else{
585                 envelope *copyEnv = copyEnvelope(env);
586                 env = copyEnv;
587                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
588                 env->SN = 0;
589         }
590         
591         DEBUG_MEM(CmiMemoryCheck());
592
593         CkObjID &sender = env->sender;
594         env->recver = recver;
595
596         Chare *obj = (Chare *)env->sender.getObject();
597           
598         if(env->SN == 0){
599                 DEBUG_MEM(CmiMemoryCheck());
600                 env->SN = obj->mlogData->nextSN(recver);
601         }else{
602                 resend = 1;
603         }
604         
605         char senderString[100];
606 //      if(env->SN != 1){
607                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
608         //      CmiPrintStackTrace(0);
609 /*      }else{
610                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
611         }*/
612                 
613         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
614 //      CkPackMessage(&(mEntry->env));
615 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
616         
617         _startTime = CkWallTimer();
618
619         // uses the proper ticketing mechanism for local, team and general messages
620         if(isLocal(destPE)){
621                 sendLocalMsg(mEntry);
622         }else{
623                 if((teamSize > 1) && isTeamLocal(destPE)){
624
625                         // look to see if this message already has a ticket in the team-table
626                         Chare *senderObj = (Chare *)sender.getObject();
627                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
628                         if(ticketRow != NULL){
629                                 Ticket ticket = ticketRow->get(env->SN);
630                                 if(ticket.TN != 0){
631                                         ticketNumber = ticket.TN;
632                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
633                                 }
634                         }
635                 }
636                 
637                 // sending the message
638                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
639                 
640         }
641 }
642
643 /**
644  * Determines if the message is local or not. A message is local if:
645  * 1) Both the destination and origin are the same PE.
646  */
647 inline bool isLocal(int destPE){
648         // both the destination and the origin are the same PE
649         if(destPE == CkMyPe())
650                 return true;
651
652         return false;
653 }
654
655 /**
656  * Determines if the message is group local or not. A message is group local if:
657  * 1) They belong to the same group in the group-based message logging.
658  */
659 inline bool isTeamLocal(int destPE){
660
661         // they belong to the same group
662         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
663                 return true;
664
665         return false;
666 }
667
668 /**
669  * Method that does the actual send by creating a ticket request filling it up and sending it.
670  */
671 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
672         DEBUG_NOW(char recverString[100]);
673         DEBUG_NOW(char senderString[100]);
674
675         int totalSize;
676         StoreDeterminantsHeader header;
677         int sizes[2];
678         char *ptrs[2];
679
680         envelope *env = entry->env;
681         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
682
683         // setting all the information
684         Chare *obj = (Chare *)entry->env->sender.getObject();
685         entry->env->recver = recver;
686         entry->env->SN = SN;
687         entry->env->TN = TN;
688         if(!resend){
689                 //TML: only stores message if either it goes to this processor or to a processor in a different group
690                 if(!isTeamLocal(entry->destPE)){
691                         obj->mlogData->addLogEntry(entry);
692 #if COLLECT_STATS_TEAM
693                         MLOGFT_totalMessages += 1.0;
694                         MLOGFT_totalLogSize += entry->env->getTotalsize();
695 #endif
696                 }else{
697                         // the message has to be deleted after it has been sent
698                         entry->env->freeMsg = true;
699                 }
700         }
701
702         // sending the determinants that causally affect this message
703         if(_numBufferedDets > 0){
704
705                 // modifying the actual number of determinants sent in message
706                 header.number = _numBufferedDets;
707                 header.index = _indexBufferedDets;
708                 header.phase = _phaseBufferedDets;
709                 header.PE = CmiMyPe();
710         
711                 // sending the message
712                 sizes[0] = sizeof(StoreDeterminantsHeader);
713                 sizes[1] = _numBufferedDets * sizeof(Determinant);
714                 ptrs[0] = (char *) &header;
715                 ptrs[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
716                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
717                 CmiSetHandler(&header, _storeDeterminantsHandlerIdx);
718                 CmiSyncVectorSend(destPE, 2, &sizes[0], &ptrs[0]);
719         }
720
721         // updating its message log entry
722         entry->indexBufDets = _indexBufferedDets;
723         entry->numBufDets = _numBufferedDets;
724
725         // sending the message
726         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
727
728         DEBUG_MEM(CmiMemoryCheck());
729 #if COLLECT_STATS_MSGS
730 #if COLLECT_STATS_MSGS_TOTAL
731         totalMsgsTarget++;
732         totalMsgsSize += (float)env->getTotalsize();
733 #else
734         numMsgsTarget[destPE]++;
735         sizeMsgsTarget[destPE] += env->getTotalsize();
736 #endif
737 #endif
738 #if COLLECT_STATS_DETS
739         numPiggyDets += _numBufferedDets;
740 #endif
741 #if COLLECT_STATS_MEMORY
742         msgLogSize += env->getTotalsize();
743 #endif
744 };
745
746
747 /**
748  * @brief Function to send a local message. It first gets a ticket and
749  * then enqueues the message. If we are recovering, then the message 
750  * is enqueued in a delay queue.
751  */
752 void sendLocalMsg(MlogEntry *entry){
753         DEBUG_PERF(double _startTime=CkWallTimer());
754         DEBUG_MEM(CmiMemoryCheck());
755         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
756         DEBUG(char senderString[100]);
757         DEBUG(char recverString[100]);
758         Ticket ticket;
759
760         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
761
762         // getting the receiver local object
763         Chare *recverObj = (Chare *)entry->env->recver.getObject();
764
765         // if receiver object is not NULL, we will ask it for a ticket
766         if(recverObj){
767
768 /*HERE          // if we are recovery, then we must put this off for later
769                 if(recverObj->mlogData->restartFlag){
770                         CpvAccess(_delayedLocalMsgs)->enq(entry);
771                         return;
772                 }
773
774                 // check if this combination of sender and SN has been already a ticket
775                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
776                         
777                 // assigned a new ticket in case this message is completely new
778                 if(ticket.TN == 0){
779                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
780
781                         // if TN == 0 we enqueue this message since we are recovering from a crash
782                         if(ticket.TN == 0){
783                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
784                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
785                                 
786 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
787                                 return;
788                         }
789                 }
790
791                 //TODO: check for the case when an invalid ticket is returned
792                 //TODO: check for OLD or RECEIVED TICKETS
793                 entry->env->TN = ticket.TN;
794                 CkAssert(entry->env->TN > 0);
795                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
796         
797                 DEBUG_MEM(CmiMemoryCheck());
798
799                 // adding the determinant to _localDets
800                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
801 */
802                 // sends the local message
803                 _skipCldEnqueue(CmiMyPe(),entry->env,entry->_infoIdx);  
804
805                 DEBUG_MEM(CmiMemoryCheck());
806         }else{
807                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
808         }
809 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
810 };
811
812 /****
813         The handler functions
814 *****/
815
816 /*** CAUSAL PROTOCOL HANDLERS ***/
817
818 /**
819  * @brief Removes the determinants after a particular index in the _localDets array.
820  */
821 void _removeDeterminantsHandler(char *buffer){
822         RemoveDeterminantsHeader *header;
823         int index, phase;
824
825         // getting the header from the message
826         header = (RemoveDeterminantsHeader *)buffer;
827         index = header->index;
828         phase = header->phase;
829
830         // fprintf(stderr,"ACK index=%d\n",index);
831
832         // updating the number of buffered determinants
833         if(phase == _phaseBufferedDets){
834                 if(index > _indexBufferedDets)
835                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
836                 CmiAssert(index <= _indexBufferedDets);
837                 _numBufferedDets = _indexBufferedDets - index;
838         }
839
840         // releasing memory
841         CmiFree(buffer);
842
843 }
844
845 /**
846  * @brief Stores the determinants coming from other processor.
847  */
848 void _storeDeterminantsHandler(char *buffer){
849         StoreDeterminantsHeader *header;
850         RemoveDeterminantsHeader removeHeader;
851         Determinant *detPtr, *det;
852         int i, n, index, phase, destPE;
853         CkVec<Determinant> *vec;
854
855
856         // getting the header from the message and pointing to the first determinant
857         header = (StoreDeterminantsHeader *)buffer;
858         n = header->number;
859         index = header->index;
860         phase = header->phase;
861         destPE = header->PE;
862         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
863
864         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
865         DEBUG_MEM(CmiMemoryCheck());
866
867         // going through all determinants and storing them into _remoteDets
868         for(i = 0; i < n; i++){
869                 det = new Determinant();
870                 det->sender = detPtr->sender;
871                 det->receiver = detPtr->receiver;
872                 det->SN = detPtr->SN;
873                 det->TN = detPtr->TN;
874
875                 // getting the determinant array
876                 vec = CpvAccess(_remoteDets)->get(det->receiver);
877                 if(vec == NULL){
878                         vec = new CkVec<Determinant>();
879                         CpvAccess(_remoteDets)->put(det->receiver) = vec;
880                 }
881 #if COLLECT_STATS_DETS
882 #if COLLECT_STATS_DETS_DUP
883                 for(int j=0; j<vec->size(); j++){
884                         if(isSameDet(&(*vec)[j],det)){
885                                 numDupDets++;
886                                 break;
887                         }
888                 }
889 #endif
890 #endif
891 #if COLLECT_STATS_MEMORY
892                 storedDetsSize++;
893 #endif
894                 vec->push_back(*det);
895                 detPtr = detPtr++;
896         }
897
898         DEBUG_MEM(CmiMemoryCheck());
899
900         // freeing buffer
901         CmiFree(buffer);
902
903         // sending the ACK back to the sender
904         removeHeader.index = index;
905         removeHeader.phase = phase;
906         CmiSetHandler(&removeHeader,_removeDeterminantsHandlerIdx);
907         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)&removeHeader);
908         
909 }
910
911 /**
912  *  If there are any delayed requests, process them first before 
913  *  processing this request
914  * */
915 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
916         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
917         double  _startTime = CkWallTimer();
918         CmiFree(ticketRequest);
919 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
920 }
921
922
923 /**
924  * @brief Gets a ticket for a recently received message.
925  * @pre env->recver has to be on this processor.
926  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
927  */
928 inline bool _getTicket(envelope *env, int *flag){
929         DEBUG_MEM(CmiMemoryCheck());
930         DEBUG(char recverName[100]);
931         DEBUG(char senderString[100]);
932         Ticket ticket;
933         
934         // getting information from request
935         CkObjID sender = env->sender;
936         CkObjID recver = env->recver;
937         MCount SN = env->SN;
938         MCount TN = env->TN;
939         Chare *recverObj = (Chare *)recver.getObject();
940         
941         DEBUG(recver.toString(recverName);)
942
943         // verifying whether we are recovering from a failure or not
944         if(recverObj->mlogData->restartFlag)
945                 return false;
946
947         // checking ticket table to see if this message already received a ticket
948         ticket = recverObj->mlogData->getTicket(sender, SN);
949         TN = ticket.TN;
950         
951         // checking if the message is team local and if it has a ticket already assigned
952         if(teamSize > 1 && TN != 0){
953                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
954                 recverObj->mlogData->verifyTicket(sender,SN,TN);
955         }
956
957         //check if a ticket for this has been already handed out to an object that used to be local but 
958         // is no longer so.. need for parallel restart
959         if(TN == 0){
960                 ticket = recverObj->mlogData->next_ticket(sender,SN);
961                 *flag = NEW_TICKET;
962         } else {
963                 *flag = OLD_TICKET;
964         }
965         if(ticket.TN > recverObj->mlogData->tProcessed){
966                 ticket.state = NEW_TICKET;
967         } else {
968                 ticket.state = OLD_TICKET;
969         }
970         // @todo: check for the case when an invalid ticket is returned
971         if(ticket.TN == 0){
972                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
973                 return false;
974         }
975                 
976         // setting the ticket number in the envelope
977         env->TN = ticket.TN;
978         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
979         return true;
980
981 };
982
983 bool fault_aware(CkObjID &recver){
984         switch(recver.type){
985                 case TypeChare:
986                         return true;
987                 case TypeMainChare:
988                         return false;
989                 case TypeGroup:
990                 case TypeNodeGroup:
991                 case TypeArray:
992                         return true;
993                 default:
994                         return false;
995         }
996 };
997
998 /* Preprocesses a received message */
999 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1000         DEBUG_NOW(char recverString[100]);
1001         DEBUG_NOW(char senderString[100]);
1002         DEBUG_MEM(CmiMemoryCheck());
1003         int flag;
1004         bool ticketSuccess;
1005
1006         // getting the receiver object
1007         CkObjID recver = env->recver;
1008         if(!fault_aware(recver)){
1009                 return 1;
1010                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1011         }
1012
1013         Chare *obj = (Chare *)recver.getObject();
1014         *objPointer = obj;
1015         if(obj == NULL){
1016                 int possiblePE = recver.guessPE();
1017                 if(possiblePE != CkMyPe()){
1018                         int totalSize = env->getTotalsize();
1019                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1020                         
1021                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1022                 }else{
1023                         // this is the case where a message is received and the object has not been initialized
1024                         // we delayed the delivery of the message
1025                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1026                         
1027                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1028                 }
1029                 return 0;
1030         }
1031
1032         // checking if message comes from an old incarnation
1033         // message must be discarded
1034         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1035                 CmiFree(env);
1036                 return 0;
1037         }
1038
1039         DEBUG_MEM(CmiMemoryCheck());
1040         DEBUG_PE(0,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1041
1042         // getting a ticket for this message
1043         ticketSuccess = _getTicket(env,&flag);
1044
1045         // we might be during recovery when this message arrives
1046         if(!ticketSuccess){
1047                 
1048                 // adding the message to a delay queue
1049                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1050                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1051
1052                 return 0;
1053         }
1054         
1055         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1056         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1057         
1058         if(flag == NEW_TICKET){
1059                 // storing determinant of message in data structure
1060                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1061         }
1062
1063         DEBUG_MEM(CmiMemoryCheck());
1064
1065         double _startTime = CkWallTimer();
1066 //env->sender.updatePosition(env->getSrcPe());
1067         if(env->TN == obj->mlogData->tProcessed+1){
1068                 //the message that needs to be processed now
1069                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1070                 // once we find a message that we can process we put back all the messages in the out of order queue
1071                 // back into the main scheduler queue. 
1072         DEBUG_MEM(CmiMemoryCheck());
1073                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1074                         void *qMsgPtr;
1075                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1076                         envelope *qEnv = (envelope *)qMsgPtr;
1077                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1078         DEBUG_MEM(CmiMemoryCheck());
1079                 }
1080 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1081                 //TODO: this might be a problem.. change made for leanMD
1082 //              CpvAccess(_currentObj) = obj;
1083         DEBUG_MEM(CmiMemoryCheck());
1084                 return 1;
1085         }
1086
1087         // checking if message has already been processed
1088         // message must be discarded
1089         if(env->TN <= obj->mlogData->tProcessed){
1090                 DEBUG(printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1091                 
1092                 CmiFree(env);
1093                 return 0;
1094         }
1095         //message that needs to be processed in the future
1096
1097         DEBUG(printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1098         //the message cant be processed now put it back in the out of order message Q, 
1099         //It will be transferred to the main queue later
1100         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1101 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1102         DEBUG_MEM(CmiMemoryCheck());
1103         
1104         return 0;
1105 }
1106
1107 /**
1108  * @brief Updates a few variables once a message has been processed.
1109  */
1110 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1111         DEBUG(char senderString[100]);
1112         if(obj){
1113                 if(sender.guessPE() == CkMyPe()){
1114                         if(entry != NULL){
1115                                 entry->env = NULL;
1116                         }
1117                 }
1118                 obj->mlogData->tProcessed++;
1119 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1120                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1121 //              CpvAccess(_currentObj)= NULL;
1122         }
1123         DEBUG_MEM(CmiMemoryCheck());
1124 }
1125
1126 /***
1127         Helpers for the handlers and message logging methods
1128 ***/
1129
1130 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1131 //      double _startTime = CkWallTimer();
1132         if(env->recver.type != TypeNodeGroup){
1133         //This repeats a step performed in skipCldEnq for messages sent to
1134         //other processors. I do this here so that messages to local processors
1135         //undergo the same transformation.. It lets the resend be uniform for 
1136         //all messages
1137 //              CmiSetXHandler(env,CmiGetHandler(env));
1138                 _skipCldEnqueue(destPE,env,_infoIdx);
1139         }else{
1140                 _noCldNodeEnqueue(destPE,env);
1141         }
1142 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1143 }
1144 //extern "C" int CmiGetNonLocalLength();
1145 /** This method is used to retry the ticket requests
1146  * that had been queued up earlier
1147  * */
1148
1149 int calledRetryTicketRequest=0;
1150
1151 void _pingHandler(CkPingMsg *msg){
1152         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1153         CmiFree(msg);
1154 }
1155
1156
1157 /*****************************************************************************
1158         Checkpointing methods..
1159                 Pack all the data on a processor and send it to the buddy periodically
1160                 Also used to throw away message logs
1161 *****************************************************************************/
1162 CkVec<TProcessedLog> processedTicketLog;
1163 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1164 void clearUpMigratedRetainedLists(int PE);
1165
1166 void checkpointAlarm(void *_dummy,double curWallTime){
1167         double diff = curWallTime-lastCompletedAlarm;
1168         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1169 /*      if(CkWallTimer()-lastRestart < 50){
1170                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1171                 return;
1172         }*/
1173         if(diff < ((chkptPeriod) - 2)){
1174                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1175                 return;
1176         }
1177         CheckpointRequest request;
1178         request.PE = CkMyPe();
1179         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1180         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1181 };
1182
1183 void _checkpointRequestHandler(CheckpointRequest *request){
1184         startMlogCheckpoint(NULL,CmiWallTimer());
1185 }
1186
1187 /**
1188  * @brief Starts the checkpoint phase after migration.
1189  */
1190 void startMlogCheckpoint(void *_dummy, double curWallTime){
1191         double _startTime = CkWallTimer();
1192
1193         // increasing the checkpoint counter
1194         checkpointCount++;
1195         
1196 #if DEBUG_CHECKPOINT
1197         if(CmiMyPe() == 0){
1198                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1199         }
1200 #endif
1201
1202         DEBUG_MEM(CmiMemoryCheck());
1203
1204         PUP::sizer psizer;
1205         psizer | checkpointCount;
1206         for(int i=0; i<CmiNumPes(); i++){
1207                 psizer | CpvAccess(_incarnation)[i];
1208         }
1209         CkPupROData(psizer);
1210         DEBUG_MEM(CmiMemoryCheck());
1211         CkPupGroupData(psizer,CmiTrue);
1212         DEBUG_MEM(CmiMemoryCheck());
1213         CkPupNodeGroupData(psizer,CmiTrue);
1214         DEBUG_MEM(CmiMemoryCheck());
1215         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1216         DEBUG_MEM(CmiMemoryCheck());
1217
1218         int dataSize = psizer.size();
1219         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1220         char *msg = (char *)CmiAlloc(totalSize);
1221         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1222         chkMsg->PE = CkMyPe();
1223         chkMsg->dataSize = dataSize;
1224         
1225         char *buf = &msg[sizeof(CheckPointDataMsg)];
1226         PUP::toMem pBuf(buf);
1227
1228         pBuf | checkpointCount;
1229         for(int i=0; i<CmiNumPes(); i++){
1230                 pBuf | CpvAccess(_incarnation)[i];
1231         }
1232         CkPupROData(pBuf);
1233         CkPupGroupData(pBuf,CmiTrue);
1234         CkPupNodeGroupData(pBuf,CmiTrue);
1235         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1236
1237         unAckedCheckpoint=1;
1238         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1239         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1240         
1241         /*
1242                 Store the highest Ticket number processed for each chare on this processor
1243         */
1244         processedTicketLog.removeAll();
1245         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1246
1247 #if DEBUG_CHECKPOINT
1248         if(CmiMyPe() == 0){
1249                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1250         }
1251 #endif
1252
1253 #if COLLECT_STATS_MEMORY
1254         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1255         msgLogSize = 0;
1256         bufferedDetsSize = 0;
1257         storedDetsSize = 0;
1258 #endif
1259
1260         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1261                 lastCompletedAlarm = curWallTime;
1262                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1263         }
1264         traceUserBracketEvent(28,_startTime,CkWallTimer());
1265 };
1266
1267 /**
1268  * @brief A chare adds the latest ticket number processed.
1269  */
1270 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1271         DEBUG(char objString[100]);
1272
1273         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1274         TProcessedLog logEntry;
1275         logEntry.recver = mlogData->objID;
1276         logEntry.tProcessed = mlogData->tProcessed;
1277         log->push_back(logEntry);
1278
1279         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1280 }
1281
1282 class ElementPacker : public CkLocIterator {
1283 private:
1284         CkLocMgr *locMgr;
1285         PUP::er &p;
1286 public:
1287         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1288         void addLocation(CkLocation &loc) {
1289                 CkArrayIndexMax idx=loc.getIndex();
1290                 CkGroupID gID = locMgr->ckGetGroupID();
1291                 p|gID;      // store loc mgr's GID as well for easier restore
1292                 p|idx;
1293                 p|loc;
1294         }
1295 };
1296
1297 /**
1298  * Pups all the array elements in this processor.
1299  */
1300 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1301         int numElements,i;
1302         int numGroups = CkpvAccess(_groupIDTable)->size();      
1303         if(!p.isUnpacking()){
1304                 numElements = CkCountArrayElements();
1305         }       
1306         p | numElements;
1307         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1308         if(!p.isUnpacking()){
1309                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1310         }else{
1311                 //Flush all recs of all LocMgrs before putting in new elements
1312 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1313                 for(int j=0;j<listsize;j++){
1314                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1315                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1316                                 listToSkip[j].idx.print();
1317                         }
1318                 }
1319                 
1320                 printf("numElements = %d\n",numElements);
1321         
1322                 for (int i=0; i<numElements; i++) {
1323                         CkGroupID gID;
1324                         CkArrayIndexMax idx;
1325                         p|gID;
1326                 p|idx;
1327                         int flag=0;
1328                         int matchedIdx=0;
1329                         for(int j=0;j<listsize;j++){
1330                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1331                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1332                                                 matchedIdx = j;
1333                                                 flag = 1;
1334                                                 break;
1335                                         }
1336                                 }
1337                         }
1338                         if(flag == 1){
1339                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1340                         }else{
1341                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1342                         }
1343                                 
1344                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1345                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1346                         mgr->resume(idx,p,create,flag);
1347                         if(flag == 1){
1348                                 int homePE = mgr->homePe(idx);
1349                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1350                         }
1351                 }
1352         }
1353 };
1354
1355
1356 void writeCheckpointToDisk(int size,char *chkpt){
1357         char fNameTemp[100];
1358         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1359         int fd = creat(fNameTemp,S_IRWXU);
1360         int ret = write(fd,chkpt,size);
1361         CkAssert(ret == size);
1362         close(fd);
1363         
1364         char fName[100];
1365         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1366         unlink(fName);
1367
1368         rename(fNameTemp,fName);
1369 }
1370
1371 //handler that receives the checkpoint from a processor
1372 //it stores it and acks it
1373 void _storeCheckpointHandler(char *msg){
1374         double _startTime=CkWallTimer();
1375                 
1376         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1377         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1378         
1379         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1380         
1381         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1382         if(oldChkpt != NULL){
1383                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1384                 CmiFree(oldmsg);
1385         }
1386         //turning off storing checkpoints
1387         
1388         int sendingPE = chkMsg->PE;
1389         
1390         CpvAccess(_storedCheckpointData)->buf = chkpt;
1391         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1392         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1393
1394         int count=0;
1395         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1396                 if(migratedNoticeList[j].fromPE == sendingPE){
1397                         migratedNoticeList[j].ackFrom = 1;
1398                 }else{
1399                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1400                 }
1401                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1402                         migratedNoticeList.remove(j);
1403                         count++;
1404                 }
1405                 
1406         }
1407         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1408         
1409         CheckPointAck ackMsg;
1410         ackMsg.PE = CkMyPe();
1411         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1412         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1413         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1414         
1415         traceUserBracketEvent(29,_startTime,CkWallTimer());
1416 };
1417
1418
1419 /**
1420  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1421  * @note The remove log request message looks like
1422                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1423  */
1424 void sendRemoveLogRequests(){
1425 #if SYNCHRONIZED_CHECKPOINT
1426         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1427 #endif
1428         double _startTime = CkWallTimer();      
1429
1430         // computing total message size
1431         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1432         char *requestMsg = (char *)CmiAlloc(totalSize);
1433
1434         // filling up the message
1435         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1436         request->PE = CkMyPe();
1437         request->numberObjects = processedTicketLog.size();
1438         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1439         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1440         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1441         int *numDeterminants = (int *)listDeterminants;
1442         numDeterminants[0] = 0; 
1443         listDeterminants = (char *)&numDeterminants[1];
1444
1445         // setting the handler in the message
1446         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1447         
1448         DEBUG_MEM(CmiMemoryCheck());
1449         for(int i=0;i<CkNumPes();i++){
1450                 CmiSyncSend(i,totalSize,requestMsg);
1451         }
1452         CmiFree(requestMsg);
1453
1454         clearUpMigratedRetainedLists(CmiMyPe());
1455         //TODO: clear ticketTable
1456         
1457         traceUserBracketEvent(30,_startTime,CkWallTimer());
1458
1459         DEBUG_MEM(CmiMemoryCheck());
1460 }
1461
1462
1463 void _checkpointAckHandler(CheckPointAck *ackMsg){
1464         DEBUG_MEM(CmiMemoryCheck());
1465         unAckedCheckpoint=0;
1466         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1467         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1468         if(onGoingLoadBalancing){
1469                 onGoingLoadBalancing = 0;
1470                 finishedCheckpointLoadBalancing();
1471         }else{
1472                 sendRemoveLogRequests();
1473         }
1474         CmiFree(ackMsg);
1475         
1476 };
1477
1478
1479 /**
1480  * @brief Inserts all the determinants into a hash table.
1481  */
1482 inline void populateDeterminantTable(char *data){
1483         DEBUG(char recverString[100]);
1484         DEBUG(char senderString[100]);
1485         int numDets, *numDetsPtr;
1486         Determinant *detList;
1487         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1488         SNToTicket *tickets;
1489         Ticket ticket;
1490
1491         RemoveLogRequest *request = (RemoveLogRequest *)data;
1492         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1493         
1494         numDetsPtr = (int *)&list[request->numberObjects];
1495         numDets = numDetsPtr[0];
1496         detList = (Determinant *)&numDetsPtr[1];
1497
1498         // inserting determinants into hashtable
1499         for(int i=0; i<numDets; i++){
1500                 table = detTable.get(detList[i].sender);
1501                 if(table == NULL){
1502                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1503                         detTable.put(detList[i].sender) = table;
1504                 }
1505                 tickets = table->get(detList[i].receiver);
1506                 if(tickets == NULL){
1507                         tickets = new SNToTicket();
1508                         table->put(detList[i].receiver) = tickets; 
1509                 }
1510                 ticket.TN = detList[i].TN;
1511                 tickets->put(detList[i].SN) = ticket;
1512         }
1513
1514         DEBUG_MEM(CmiMemoryCheck());    
1515
1516 }
1517
1518 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1519         int total;
1520         DEBUG(char nameString[100]);
1521         DEBUG_MEM(CmiMemoryCheck());
1522         CmiMemoryCheck();
1523         char *data = (char *)_data;
1524         RemoveLogRequest *request = (RemoveLogRequest *)data;
1525         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1526         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1527         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1528         SNToTicket *tickets;
1529         MCount TN;
1530
1531         int count=0;
1532         total = mlog->length();
1533         for(int i=0; i<total; i++){
1534                 MlogEntry *logEntry = mlog->deq();
1535
1536                 // looking message in determinant table
1537                 table = detTable.get(logEntry->env->sender);
1538                 if(table != NULL){
1539                         tickets = table->get(logEntry->env->recver);
1540                         if(tickets != NULL){
1541                                 TN = tickets->get(logEntry->env->SN).TN;
1542                                 if(TN != 0){
1543                                         logEntry->env->TN = TN;
1544                                 }
1545                         }
1546                 }
1547         
1548                 int match=0;
1549                 for(int j=0;j<request->numberObjects;j++){
1550                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1551                                 //this log Entry should be removed
1552                                 match = 1;
1553                                 break;
1554                         }
1555                 }
1556                 char senderString[100],recverString[100];
1557 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1558                 if(match){
1559                         count++;
1560                         delete logEntry;
1561                 }else{
1562                         mlog->enq(logEntry);
1563                 }
1564         }
1565         if(count > 0){
1566                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1567         }
1568         DEBUG_MEM(CmiMemoryCheck());
1569         CmiMemoryCheck();
1570 }
1571
1572 /**
1573  * @brief Removes messages in the log according to the received ticket numbers
1574  */
1575 void _removeProcessedLogHandler(char *requestMsg){
1576         double start = CkWallTimer();
1577
1578         // building map for determinants
1579         populateDeterminantTable(requestMsg);
1580
1581         // removing processed messages from the message log
1582         forAllCharesDo(removeProcessedLogs,requestMsg);
1583
1584         // @todo: clean determinant table 
1585         
1586         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1587         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1588         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1589         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1590         /*
1591                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1592         */
1593         DEBUG_MEM(CmiMemoryCheck());
1594         clearUpMigratedRetainedLists(request->PE);
1595         
1596         traceUserBracketEvent(20,start,CkWallTimer());
1597         CmiFree(requestMsg);    
1598 };
1599
1600
1601 void clearUpMigratedRetainedLists(int PE){
1602         int count=0;
1603         CmiMemoryCheck();
1604         
1605         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1606                 if(migratedNoticeList[j].toPE == PE){
1607                         migratedNoticeList[j].ackTo = 1;
1608                 }
1609                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1610                         migratedNoticeList.remove(j);
1611                         count++;
1612                 }
1613         }
1614         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1615         
1616         for(int j=retainedObjectList.size()-1;j>=0;j--){
1617                 if(retainedObjectList[j]->migRecord.toPE == PE){
1618                         RetainedMigratedObject *obj = retainedObjectList[j];
1619                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1620                         retainedObjectList.remove(j);
1621                         if(obj->msg != NULL){
1622                                 CmiMemoryCheck();
1623                                 CmiFree(obj->msg);
1624                         }
1625                         delete obj;
1626                 }
1627         }
1628 }
1629
1630 /***************************************************************
1631         Restart Methods and handlers
1632 ***************************************************************/        
1633
1634 /**
1635  * Function for restarting the crashed processor.
1636  * It sets the restart flag and contacts the buddy
1637  * processor to get the latest checkpoint.
1638  */
1639 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1640         RestartRequest msg;
1641
1642         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1643
1644         // setting the restart flag
1645         _restartFlag = 1;
1646         _numRestartResponses = 0;
1647
1648         // if we are using team-based message logging, all members of the group have to be restarted
1649         if(teamSize > 1){
1650                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1651                         if(i != CkMyPe() && i < CkNumPes()){
1652                                 // sending a message to the team member
1653                                 msg.PE = CkMyPe();
1654                             CmiSetHandler(&msg,_restartHandlerIdx);
1655                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1656                         }
1657                 }
1658         }
1659
1660         // requesting the latest checkpoint from its buddy
1661         msg.PE = CkMyPe();
1662         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1663         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1664 };
1665
1666 /**
1667  * Function to restart this processor.
1668  * The handler is invoked by a member of its same team in message logging.
1669  */
1670 void _restartHandler(RestartRequest *restartMsg){
1671         int i;
1672         int numGroups = CkpvAccess(_groupIDTable)->size();
1673         RestartRequest msg;
1674         
1675         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1676
1677     // setting the restart flag
1678         _restartFlag = 1;
1679         _numRestartResponses = 0;
1680
1681         // flushing all buffers
1682         //TEST END
1683 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1684         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1685         for(int i=0;i<numGroups;i++){
1686         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1687                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1688                 obj->flushStates();
1689                 obj->ckJustMigrated();
1690         }*/
1691
1692     // requesting the latest checkpoint from its buddy
1693         msg.PE = CkMyPe();
1694         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1695         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1696 }
1697
1698
1699 /**
1700  * Gets the stored checkpoint but calls another function in the sender.
1701  */
1702 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1703
1704         // retrieving the stored checkpoint
1705         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1706
1707         // making sure it is its buddy who is requesting the checkpoint
1708         CkAssert(restartMsg->PE == storedChkpt->PE);
1709
1710         storedRequest = restartMsg;
1711         verifyAckTotal = 0;
1712
1713         for(int i=0;i<migratedNoticeList.size();i++){
1714                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1715 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1716                         if(migratedNoticeList[i].ackFrom == 0){
1717                                 //need to verify if the object actually exists .. it might not
1718                                 //have been acked but it might exist on it
1719                                 VerifyAckMsg msg;
1720                                 msg.migRecord = migratedNoticeList[i];
1721                                 msg.index = i;
1722                                 msg.fromPE = CmiMyPe();
1723                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1724                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1725                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1726                                 verifyAckTotal++;
1727                         }
1728                 }
1729         }
1730
1731         // sending the checkpoint back to its buddy     
1732         if(verifyAckTotal == 0){
1733                 sendCheckpointData(MLOG_RESTARTED);
1734         }
1735         verifyAckCount = 0;
1736 }
1737
1738 /**
1739  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1740  */
1741 void _recvRestartCheckpointHandler(char *_restartData){
1742         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1743         MigrationRecord *migratedAwayElements;
1744
1745         globalLBID = restartData->lbGroupID;
1746         
1747         restartData->restartWallTime *= 1000;
1748         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1749         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1750         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1751
1752         
1753         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1754         char *buf = &_restartData[sizeof(RestartProcessorData)];
1755         
1756         if(restartData->numMigratedAwayElements != 0){
1757                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1758                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1759                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1760                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1761         }
1762
1763         // turning on the team recovery flag
1764         forAllCharesDo(setTeamRecovery,NULL);
1765         
1766         PUP::fromMem pBuf(buf);
1767         pBuf | checkpointCount;
1768         for(int i=0; i<CmiNumPes(); i++){
1769                 pBuf | CpvAccess(_incarnation)[i];
1770         }
1771         CkPupROData(pBuf);
1772         CkPupGroupData(pBuf,CmiFalse);
1773         CkPupNodeGroupData(pBuf,CmiFalse);
1774         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1775         CkAssert(pBuf.size() == restartData->checkPointSize);
1776         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1777
1778         // increasing the incarnation number of all the team members
1779         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1780                 CpvAccess(_incarnation)[i]++;
1781         }
1782         
1783         // turning off the team recovery flag
1784         forAllCharesDo(unsetTeamRecovery,NULL);
1785
1786         // initializing a few variables for handling local messages
1787         forAllCharesDo(initializeRestart,NULL);
1788         
1789         CmiFree(_restartData);  
1790
1791         /*HERE _initDone();
1792
1793         getGlobalStep(globalLBID);
1794         
1795         countUpdateHomeAcks = 0;
1796         RestartRequest updateHomeRequest;
1797         updateHomeRequest.PE = CmiMyPe();
1798         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1799         for(int i=0;i<CmiNumPes();i++){
1800                 if(i != CmiMyPe()){
1801                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1802                 }
1803         }
1804 */
1805
1806
1807         // Send out the request to resend logged messages to all other processors
1808         CkVec<TProcessedLog> objectVec;
1809         forAllCharesDo(createObjIDList, (void *)&objectVec);
1810         int numberObjects = objectVec.size();
1811         
1812         /*
1813                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1814         */
1815         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1816         char *resendMsg = (char *)CmiAlloc(totalSize);  
1817
1818         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1819         resendReq->PE =CkMyPe(); 
1820         resendReq->numberObjects = numberObjects;
1821         char *objList = &resendMsg[sizeof(ResendRequest)];
1822         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1823         
1824
1825         /* test for parallel restart migrate away object**/
1826 //      if(fastRecovery){
1827 //              distributeRestartedObjects();
1828 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1829 //      }
1830         
1831         /*      To make restart work for load balancing.. should only
1832         be used when checkpoint happens along with load balancing
1833         **/
1834 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1835
1836         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1837         CpvAccess(_currentObj) = lb;
1838         lb->ReceiveDummyMigration(restartDecisionNumber);
1839
1840         sleep(10);
1841         
1842         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1843         for(int i=0;i<CkNumPes();i++){
1844                 if(i != CkMyPe()){
1845                         CmiSyncSend(i,totalSize,resendMsg);
1846                 }       
1847         }
1848         _resendMessagesHandler(resendMsg);
1849
1850 }
1851
1852
1853 void CkMlogRestartDouble(void *,double){
1854         CkMlogRestart(NULL,NULL);
1855 };
1856
1857 //TML: restarting from local (group) failure
1858 void CkMlogRestartLocal(){
1859     CkMlogRestart(NULL,NULL);
1860 };
1861
1862 /**
1863  * Gets the stored checkpoint for its buddy processor.
1864  */
1865 void _getCheckpointHandler(RestartRequest *restartMsg){
1866
1867         // retrieving the stored checkpoint
1868         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1869
1870         // making sure it is its buddy who is requesting the checkpoint
1871         CkAssert(restartMsg->PE == storedChkpt->PE);
1872
1873         storedRequest = restartMsg;
1874         verifyAckTotal = 0;
1875
1876         for(int i=0;i<migratedNoticeList.size();i++){
1877                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1878 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1879                         if(migratedNoticeList[i].ackFrom == 0){
1880                                 //need to verify if the object actually exists .. it might not
1881                                 //have been acked but it might exist on it
1882                                 VerifyAckMsg msg;
1883                                 msg.migRecord = migratedNoticeList[i];
1884                                 msg.index = i;
1885                                 msg.fromPE = CmiMyPe();
1886                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1887                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1888                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1889                                 verifyAckTotal++;
1890                         }
1891                 }
1892         }
1893
1894         // sending the checkpoint back to its buddy     
1895         if(verifyAckTotal == 0){
1896                 sendCheckpointData(MLOG_CRASHED);
1897         }
1898         verifyAckCount = 0;
1899 }
1900
1901
1902 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1903         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1904         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1905         if(rec != NULL && rec->type() == CkLocRec::local){
1906                         //this location exists on this processor
1907                         //and needs to be removed       
1908                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1909                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1910                         
1911                         int localIdx = localRec->getLocalIndex();
1912                         LBDatabase *lbdb = localRec->getLBDB();
1913                         LDObjHandle ldHandle = localRec->getLdHandle();
1914                                 
1915                         locMgr->setDuringMigration(true);
1916                         
1917                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1918                         lbdb->UnregisterObj(ldHandle);
1919                         
1920                         locMgr->setDuringMigration(false);
1921                         
1922                         verifyAckedRequests++;
1923
1924         }
1925         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1926         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1927 };
1928
1929
1930 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1931         int index =     verifyReply->index;
1932         migratedNoticeList[index] = verifyReply->migRecord;
1933         verifyAckCount++;
1934         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1935         if(verifyAckCount == verifyAckTotal){
1936                 sendCheckpointData(MLOG_CRASHED);
1937         }
1938 }
1939
1940
1941 /**
1942  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1943  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1944  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1945  */
1946 void sendCheckpointData(int mode){      
1947         RestartRequest *restartMsg = storedRequest;
1948         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1949         int numMigratedAwayElements = migratedNoticeList.size();
1950         if(migratedNoticeList.size() != 0){
1951                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1952 //                      CkAssert(migratedNoticeList.size() == 0);
1953         }
1954         
1955         
1956         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1957         
1958         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1959         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1960         
1961         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1962         
1963         char *msg = (char *)CmiAlloc(totalSize);
1964         
1965         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1966         dataMsg->PE = CkMyPe();
1967         dataMsg->restartWallTime = CmiTimer();
1968         dataMsg->checkPointSize = storedChkpt->bufSize;
1969         
1970         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1971 //      dataMsg->numMigratedAwayElements = 0;
1972         
1973         dataMsg->numMigratedInElements = 0;
1974         dataMsg->migratedElementSize = 0;
1975         dataMsg->lbGroupID = globalLBID;
1976         /*msg layout 
1977                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1978                 Local MessageLog|
1979         */
1980         //store checkpoint data
1981         char *buf = &msg[sizeof(RestartProcessorData)];
1982
1983         if(dataMsg->numMigratedAwayElements != 0){
1984                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1985                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1986         }
1987         
1988
1989         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1990         buf = &buf[storedChkpt->bufSize];
1991
1992         if(mode == MLOG_RESTARTED){
1993                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
1994                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1995                 CmiFree(restartMsg);
1996         }else{
1997                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1998                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1999                 CmiFree(restartMsg);
2000         }
2001 };
2002
2003
2004 // this list is used to create a vector of the object ids of all
2005 //the chares on this processor currently and the highest TN processed by them 
2006 //the first argument is actually a CkVec<TProcessedLog> *
2007 void createObjIDList(void *data,ChareMlogData *mlogData){
2008         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2009         TProcessedLog entry;
2010         entry.recver = mlogData->objID;
2011         entry.tProcessed = mlogData->tProcessed;
2012         list->push_back(entry);
2013         DEBUG_TEAM(char objString[100]);
2014         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2015         DEBUG_RECOVERY(printLog(&entry));
2016 }
2017
2018
2019 /**
2020  * Receives the checkpoint data from its buddy, restores the state of all the objects
2021  * and asks everyone else to update its home.
2022  */
2023 void _recvCheckpointHandler(char *_restartData){
2024         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2025         MigrationRecord *migratedAwayElements;
2026
2027         globalLBID = restartData->lbGroupID;
2028         
2029         restartData->restartWallTime *= 1000;
2030         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2031         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2032         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2033
2034         
2035         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2036         char *buf = &_restartData[sizeof(RestartProcessorData)];
2037         
2038         if(restartData->numMigratedAwayElements != 0){
2039                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2040                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2041                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2042                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2043         }
2044         
2045         PUP::fromMem pBuf(buf);
2046
2047         pBuf | checkpointCount;
2048         for(int i=0; i<CmiNumPes(); i++){
2049                 pBuf | CpvAccess(_incarnation)[i];
2050         }
2051         CkPupROData(pBuf);
2052         CkPupGroupData(pBuf,CmiTrue);
2053         CkPupNodeGroupData(pBuf,CmiTrue);
2054         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2055         CkAssert(pBuf.size() == restartData->checkPointSize);
2056         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2057
2058         // increases the incarnation number
2059         CpvAccess(_incarnation)[CmiMyPe()]++;
2060         
2061         forAllCharesDo(initializeRestart,NULL);
2062         
2063         CmiFree(_restartData);
2064         
2065         _initDone();
2066
2067         getGlobalStep(globalLBID);
2068         
2069         countUpdateHomeAcks = 0;
2070         RestartRequest updateHomeRequest;
2071         updateHomeRequest.PE = CmiMyPe();
2072         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2073         for(int i=0;i<CmiNumPes();i++){
2074                 if(i != CmiMyPe()){
2075                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2076                 }
2077         }
2078
2079 }
2080
2081 /**
2082  * Receives the updateHome ACKs from all other processors. Once everybody
2083  * has replied, it sends a request to resend the logged messages.
2084  */
2085 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2086         countUpdateHomeAcks++;
2087         CmiFree(updateHomeAck);
2088         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2089         if(countUpdateHomeAcks != CmiNumPes()){
2090                 return;
2091         }
2092
2093         // Send out the request to resend logged messages to all other processors
2094         CkVec<TProcessedLog> objectVec;
2095         forAllCharesDo(createObjIDList, (void *)&objectVec);
2096         int numberObjects = objectVec.size();
2097         
2098         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2099         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2100         char *resendMsg = (char *)CmiAlloc(totalSize);  
2101
2102         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2103         resendReq->PE =CkMyPe(); 
2104         resendReq->numberObjects = numberObjects;
2105         char *objList = &resendMsg[sizeof(ResendRequest)];
2106         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2107
2108         /* test for parallel restart migrate away object**/
2109         if(fastRecovery){
2110                 distributeRestartedObjects();
2111                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2112         }
2113         
2114         /*      To make restart work for load balancing.. should only
2115         be used when checkpoint happens along with load balancing
2116         **/
2117 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2118
2119         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2120         CpvAccess(_currentObj) = lb;
2121         lb->ReceiveDummyMigration(restartDecisionNumber);
2122
2123 //HERE  sleep(10);
2124         
2125         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2126         for(int i=0;i<CkNumPes();i++){
2127                 if(i != CkMyPe()){
2128                         CmiSyncSend(i,totalSize,resendMsg);
2129                 }
2130         }
2131         _resendMessagesHandler(resendMsg);
2132         CmiFree(resendMsg);
2133
2134 };
2135
2136 /**
2137  * @brief Initializes variables and flags for restarting procedure.
2138  */
2139 void initializeRestart(void *data, ChareMlogData *mlogData){
2140         mlogData->resendReplyRecvd = 0;
2141         mlogData->receivedTNs = new CkVec<MCount>;
2142         mlogData->restartFlag = 1;
2143 };
2144
2145 /**
2146  * Updates the homePe of chare array elements.
2147  */
2148 void updateHomePE(void *data,ChareMlogData *mlogData){
2149         RestartRequest *updateRequest = (RestartRequest *)data;
2150         int PE = updateRequest->PE; //restarted PE
2151         //if this object is an array Element and its home is the restarted processor
2152         // the home processor needs to know its current location
2153         if(mlogData->objID.type == TypeArray){
2154                 //it is an array element
2155                 CkGroupID myGID = mlogData->objID.data.array.id;
2156                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2157                 CkArrayID aid(mlogData->objID.data.array.id);           
2158                 //check if the restarted processor is the home processor for this object
2159                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2160                 if(locMgr->homePe(myIdx) == PE){
2161                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2162                         DEBUG_RESTART(myIdx.print());
2163                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2164                 }
2165         }
2166 };
2167
2168
2169 /**
2170  * Updates the homePe for all chares in this processor.
2171  */
2172 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2173         int sender = updateRequest->PE;
2174         
2175         forAllCharesDo(updateHomePE,updateRequest);
2176         
2177         updateRequest->PE = CmiMyPe();
2178         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2179         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2180         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2181                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2182                 checkpointCount--;
2183                 startMlogCheckpoint(NULL,0);
2184         }
2185         if(sender == getCheckPointPE()){
2186                 for(int i=0;i<retainedObjectList.size();i++){
2187                         if(retainedObjectList[i]->acked == 0){
2188                                 MigrationNotice migMsg;
2189                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2190                                 migMsg.record = retainedObjectList[i];
2191                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2192                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2193                         }
2194                 }
2195         }
2196 }
2197
2198 /**
2199  * @brief Fills up the ticket vector for each chare.
2200  */
2201 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2202         DEBUG(char name[100]);
2203         ResendData *resendData = (ResendData *)data;
2204         int PE = resendData->PE; //restarted PE
2205         int count=0;
2206         CkHashtableIterator *iterator;
2207         void *objp;
2208         void *objkey;
2209         CkObjID *objID;
2210         SNToTicket *snToTicket;
2211         Ticket ticket;
2212         
2213         // traversing the team table looking up for the maximum TN received     
2214         iterator = mlogData->teamTable.iterator();
2215         while( (objp = iterator->next(&objkey)) != NULL ){
2216                 objID = (CkObjID *)objkey;
2217         
2218                 // traversing the resendData structure to add ticket numbers
2219                 for(int j=0;j<resendData->numberObjects;j++){
2220                         if((*objID) == (resendData->listObjects)[j].recver){
2221                                 snToTicket = *(SNToTicket **)objp;
2222 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2223                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2224                                         ticket = snToTicket->get(snIndex);      
2225                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2226                                                 //store the TNs that have been since the recver last checkpointed
2227                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2228                                         }
2229                                 }
2230                         }
2231                 }
2232         }
2233
2234         //releasing the memory for the iterator
2235         delete iterator;
2236 }
2237
2238
2239 /**
2240  * @brief Turns on the flag for team recovery that selectively restores
2241  * particular metadata information.
2242  */
2243 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2244         char name[100];
2245         mlogData->teamRecoveryFlag = 1; 
2246 }
2247
2248 /**
2249  * @brief Turns off the flag for team recovery.
2250  */
2251 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2252         mlogData->teamRecoveryFlag = 0;
2253 }
2254
2255 /**
2256  * Prints a processed log.
2257  */
2258 void printLog(TProcessedLog *log){
2259         char recverString[100];
2260         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2261 }
2262
2263 /**
2264  * Prints information about a message.
2265  */
2266 void printMsg(envelope *env, const char* par){
2267         char senderString[100];
2268         char recverString[100];
2269         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2270 }
2271
2272 /**
2273  * Prints information about a determinant.
2274  */
2275 void printDet(Determinant *det, const char* par){
2276         char senderString[100];
2277         char recverString[100];
2278         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2279 }
2280
2281 /**
2282  * @brief Resends all the logged messages to a particular chare list.
2283  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2284  * @param mlogData a particular chare living in this processor.
2285  */
2286 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2287         DEBUG_RESTART(char nameString[100]);
2288         DEBUG_RESTART(char recverString[100]);
2289         DEBUG_RESTART(char senderString[100]);
2290
2291         ResendData *resendData = (ResendData *)data;
2292         int PE = resendData->PE; //restarted PE
2293         int count=0;
2294         int ticketRequests=0;
2295         CkQ<MlogEntry *> *log = mlogData->getMlog();
2296
2297         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2298
2299         // traversing the message log to see if we must resend a message        
2300         for(int i=0;i<log->length();i++){
2301                 MlogEntry *logEntry = (*log)[i];
2302                 
2303                 // if we sent out the logs of a local message to buddy and it crashed
2304                 //before acknowledging 
2305                 envelope *env = logEntry->env;
2306                 if(env == NULL){
2307                         continue;
2308                 }
2309         
2310                 // resend if type is not invalid        
2311                 if(env->recver.type != TypeInvalid){
2312                         for(int j=0;j<resendData->numberObjects;j++){
2313                                 if(env->recver == (resendData->listObjects)[j].recver){
2314                                         if(PE != CkMyPe()){
2315                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2316                                                 if(env->recver.type == TypeNodeGroup){
2317                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2318                                                 }else{
2319                                                         CmiSetHandler(env,CmiGetXHandler(env));
2320                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2321                                                 }
2322                                         }else{
2323                                                 envelope *copyEnv = copyEnvelope(env);
2324                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2325                                         }
2326                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2327                                         count++;
2328                                 }
2329                         }//end of for loop of objects
2330                         
2331                 }       
2332         }
2333         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2334 }
2335
2336 /**
2337  * Resends messages since last checkpoint to the list of objects included in the 
2338  * request. It also sends stored remote determinants to the particular failed PE.
2339  */
2340 void _resendMessagesHandler(char *msg){
2341         ResendData d;
2342         CkVec<Determinant> *detVec;
2343         ResendRequest *resendReq = (ResendRequest *)msg;
2344
2345         // building the reply message
2346         char *listObjects = &msg[sizeof(ResendRequest)];
2347         d.numberObjects = resendReq->numberObjects;
2348         d.PE = resendReq->PE;
2349         d.listObjects = (TProcessedLog *)listObjects;
2350         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2351         detVec = new CkVec<Determinant>[d.numberObjects];
2352
2353         //Check if any of the retained objects need to be recreated
2354         //If they have not been recreated on the restarted processor
2355         //they need to be recreated on this processor
2356         int count=0;
2357         for(int i=0;i<retainedObjectList.size();i++){
2358                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2359                         count++;
2360                         int recreate=1;
2361                         for(int j=0;j<d.numberObjects;j++){
2362                                 if(d.listObjects[j].recver.type != TypeArray ){
2363                                         continue;
2364                                 }
2365                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2366                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2367                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2368                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asChild()){
2369                                                 recreate = 0;
2370                                                 break;
2371                                         }
2372                                 }
2373                         }
2374                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2375                         if(recreate){
2376                                 donotCountMigration=1;
2377                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2378                                 donotCountMigration=0;
2379                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2380                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2381                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2382                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2383                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2384                                 CmiAssert(rec->type() == CkLocRec::local);
2385                                 CkVec<CkMigratable *> eltList;
2386                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2387                                 for(int j=0;j<eltList.size();j++){
2388                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2389                                                 CpvAccess(_currentObj) = eltList[j];
2390                                                 eltList[j]->ResumeFromSync();
2391                                         }
2392                                 }
2393                                 retainedObjectList[i]->msg=NULL;        
2394                         }
2395                 }
2396         }
2397         
2398         if(count > 0){
2399 //              CmiAbort("retainedObjectList for restarted processor not empty");
2400         }
2401         
2402         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2403
2404
2405         //TML: examines the origin processor to determine if it belongs to the same group.
2406         // In that case, it only returns the maximum ticket received for each object in the list.
2407         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2408                 forAllCharesDo(fillTicketForChare,&d);
2409         else
2410                 forAllCharesDo(resendMessageForChare,&d);
2411
2412         // adding the stored determinants to the resendReplyMsg
2413         // traversing all the stored determinants
2414         CkVec<Determinant> *vec;
2415         for(int i=0; i<d.numberObjects; i++){
2416                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2417                 if(vec != NULL){
2418                         for(int j=0; j<vec->size(); j++){
2419
2420                                 // only relevant determinants are to be sent
2421                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2422
2423                                         // adding the ticket in the ticket vector
2424                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2425
2426                                         // adding the determinant in the determinant vector
2427                                         detVec[i].push_back((*vec)[j]);
2428
2429                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2430                                 }
2431                         }
2432                 }
2433         }
2434
2435         int totalDetStored = 0;
2436         for(int i=0;i<d.numberObjects;i++){
2437                 totalDetStored += detVec[i].size();
2438         }
2439
2440         //send back the maximum ticket number for a message sent to each object on the 
2441         //restarted processor
2442         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2443         
2444         int totalTNStored=0;
2445         for(int i=0;i<d.numberObjects;i++){
2446                 totalTNStored += d.ticketVecs[i].size();
2447         }
2448         
2449         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2450         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2451         
2452         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2453         resendReply->PE = CkMyPe();
2454         resendReply->numberObjects = d.numberObjects;
2455         
2456         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2457         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2458         for(int i=0;i<d.numberObjects;i++){
2459                 replyObjects[i] = d.listObjects[i].recver;
2460         }
2461         
2462         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2463         for(int i=0;i<d.numberObjects;i++){
2464                 int vecsize = d.ticketVecs[i].size();
2465                 memcpy(ticketList,&vecsize,sizeof(int));
2466                 ticketList = &ticketList[sizeof(int)];
2467                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2468                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2469         }
2470
2471         // adding the stored remote determinants to the message
2472         for(int i=0;i<d.numberObjects;i++){
2473                 int vecsize = detVec[i].size();
2474                 memcpy(ticketList,&vecsize,sizeof(int));
2475                 ticketList = &ticketList[sizeof(int)];
2476                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2477                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2478         }
2479
2480         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2481         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2482         
2483 /*      
2484         if(verifyAckRequestsUnacked){
2485                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2486                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2487                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2488                         LDObjHandle h;
2489                         lb->Migrated(h,1);
2490                 }
2491         }
2492         
2493         verifyAckRequestsUnacked=0;*/
2494         
2495         delete [] d.ticketVecs;
2496         delete [] detVec;
2497
2498         DEBUG_MEM(CmiMemoryCheck());
2499
2500         if(resendReq->PE != CkMyPe()){
2501                 CmiFree(msg);
2502         }       
2503 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2504         lastRestart = CmiWallTimer();
2505 }
2506
2507 void sortVec(CkVec<MCount> *TNvec);
2508 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2509
2510 /**
2511  * @brief Processes the messages in the delayed remote message queue
2512  */
2513 void processDelayedRemoteMsgQueue(){
2514         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2515         
2516         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2517                 void *qMsgPtr;
2518                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2519                 envelope *qEnv = (envelope *)qMsgPtr;
2520                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2521                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2522                 DEBUG_MEM(CmiMemoryCheck());
2523         }
2524
2525 }
2526
2527 /**
2528  * @brief Receives determinants stored on remote nodes.
2529  * Message format: |Header|ObjID list|TN list|Determinant list|
2530  * TN list = |number of TNs|list of TNs|...|
2531  */
2532 void _resendReplyHandler(char *msg){
2533         ResendRequest *resendReply = (ResendRequest *)msg;
2534         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2535         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2536         
2537 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2538         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2539         for(int i =0; i< resendReply->numberObjects;i++){
2540                 Chare *obj = (Chare *)listObjects[i].getObject();
2541                 
2542                 int vecsize;
2543                 memcpy(&vecsize,listTickets,sizeof(int));
2544                 listTickets = &listTickets[sizeof(int)];
2545                 MCount *listTNs = (MCount *)listTickets;
2546                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2547         
2548                 if(obj != NULL){
2549                         //the object was restarted on the processor on which it existed
2550                         processReceivedTN(obj,vecsize,listTNs);
2551                 }else{
2552                         //pack up objID vecsize and listTNs and send it to the correct processor
2553                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2554                         char *TNMsg = (char *)CmiAlloc(totalSize);
2555                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2556                         receivedTNData->recver = listObjects[i];
2557                         receivedTNData->numTNs = vecsize;
2558                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2559                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2560                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2561                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2562                 }
2563
2564         }
2565
2566         // traversing all the retrieved determinants
2567         for(int i = 0; i < resendReply->numberObjects; i++){
2568                 Chare *obj = (Chare *)listObjects[i].getObject();
2569                 
2570                 int vecsize;
2571                 memcpy(&vecsize,listTickets,sizeof(int));
2572                 listTickets = &listTickets[sizeof(int)];
2573                 Determinant *listDets = (Determinant *)listTickets;     
2574                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2575         
2576                 if(obj != NULL){
2577                         //the object was restarted on the processor on which it existed
2578                         processReceivedDet(obj,vecsize,listDets);
2579                 } else {
2580                         // pack the determinants and ship them to the other processor
2581                         // pack up objID vecsize and listDets and send it to the correct processor
2582                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2583                         char *detMsg = (char *)CmiAlloc(totalSize);
2584                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2585                         receivedDetData->recver = listObjects[i];
2586                         receivedDetData->numDets = vecsize;
2587                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2588                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2589                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2590                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2591                 }
2592
2593         }
2594
2595         // checking if the restart is over
2596         _numRestartResponses++;
2597         if(_numRestartResponses == CkNumPes()){
2598                 _numRestartResponses = 0;
2599                 processDelayedRemoteMsgQueue();
2600         }
2601
2602 };
2603
2604 /**
2605  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2606  */
2607 void _receivedDetDataHandler(ReceivedDetData *msg){
2608         DEBUG_NOW(char objName[100]);
2609         Chare *obj = (Chare *) msg->recver.getObject();
2610         if(obj){                
2611                 char *_msg = (char *)msg;
2612                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2613                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2614                 processReceivedDet(obj,msg->numDets,listDets);
2615                 CmiFree(msg);
2616         }else{
2617                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2618                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2619         }
2620 }
2621
2622 /**
2623  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2624  */
2625 void _receivedTNDataHandler(ReceivedTNData *msg){
2626         DEBUG_NOW(char objName[100]);
2627         Chare *obj = (Chare *) msg->recver.getObject();
2628         if(obj){                
2629                 char *_msg = (char *)msg;
2630                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2631                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2632                 processReceivedTN(obj,msg->numTNs,listTNs);
2633                 CmiFree(msg);
2634         }else{
2635                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2636                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2637         }
2638 };
2639
2640 /**
2641  * @brief Processes the received list of determinants from a particular PE.
2642  */
2643 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2644         Determinant *det;
2645
2646         // traversing the whole list of determinants
2647         for(int i=0; i<listSize; i++){
2648                 det = &listDets[i];
2649                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2650                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2651         }
2652         
2653         DEBUG_MEM(CmiMemoryCheck());
2654 }
2655         
2656 /**
2657  * @brief Processes the received list of tickets from a particular PE.
2658  */
2659 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2660         // increases the number of resendReply received
2661         obj->mlogData->resendReplyRecvd++;
2662
2663         DEBUG(char objName[100]);
2664         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2665         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2666         //if(obj->mlogData->receivedTNs == NULL)
2667         //      CkPrintf("NULL\n");     
2668         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2669
2670         // includes the tickets into the receivedTN structure
2671         for(int j=0;j<listSize;j++){
2672                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2673         }
2674         
2675         //if this object has received all the replies find the ticket numbers
2676         //that senders know about. Those less than the ticket number processed 
2677         //by the receiver can be thrown away. The rest need not be consecutive
2678         // ie there can be holes in the list of ticket numbers seen by senders
2679         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2680                 obj->mlogData->resendReplyRecvd = 0;
2681                 //sort the received TNS
2682                 sortVec(obj->mlogData->receivedTNs);
2683         
2684                 //after all the received tickets are in we need to sort them and then 
2685                 // calculate the holes  
2686                 if(obj->mlogData->receivedTNs->size() > 0){
2687                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2688                         int vecsize = obj->mlogData->receivedTNs->size();
2689                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2690                         
2691                         // updating tCount with the highest ticket handed out
2692                         if(teamSize > 1){
2693                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2694                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2695                         }else{
2696                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2697                         }
2698                         
2699                         if(numberHoles == 0){
2700                         }else{
2701                                 char objName[100];                                      
2702                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2703                                 obj->mlogData->numberHoles = numberHoles;
2704                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2705                                 int countHoles=0;
2706                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2707                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2708                                                 //the TNs are not consecutive at this point
2709                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2710                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2711                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2712                                                         countHoles++;
2713                                                 }       
2714                                         }
2715                                 }
2716                                 //Holes have been given new TN
2717                                 if(countHoles != numberHoles){
2718                                         char str[100];
2719                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2720                                 }
2721                                 CkAssert(countHoles == numberHoles);                                    
2722                                 obj->mlogData->currentHoles = numberHoles;
2723                         }
2724                 }
2725         
2726                 // cleaning up structures and getting ready to continue execution       
2727                 delete obj->mlogData->receivedTNs;
2728                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2729                 obj->mlogData->receivedTNs = NULL;
2730                 obj->mlogData->restartFlag = 0;
2731
2732                 processDelayedRemoteMsgQueue();
2733
2734                 DEBUG_RESTART(char objString[100]);
2735                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2736         }
2737
2738 }
2739
2740
2741 void sortVec(CkVec<MCount> *TNvec){
2742         //sort it ->its bloddy bubble sort
2743         //TODO: use quicksort
2744         for(int i=0;i<TNvec->size();i++){
2745                 for(int j=i+1;j<TNvec->size();j++){
2746                         if((*TNvec)[j] < (*TNvec)[i]){
2747                                 MCount temp;
2748                                 temp = (*TNvec)[i];
2749                                 (*TNvec)[i] = (*TNvec)[j];
2750                                 (*TNvec)[j] = temp;
2751                         }
2752                 }
2753         }
2754         //make it unique .. since its sorted all equal units will be consecutive
2755         MCount *tempArray = new MCount[TNvec->size()];
2756         int     uniqueCount=-1;
2757         for(int i=0;i<TNvec->size();i++){
2758                 tempArray[i] = 0;
2759                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2760                         uniqueCount++;
2761                         tempArray[uniqueCount] = (*TNvec)[i];
2762                 }
2763         }
2764         uniqueCount++;
2765         TNvec->removeAll();
2766         for(int i=0;i<uniqueCount;i++){
2767                 TNvec->push_back(tempArray[i]);
2768         }
2769         delete [] tempArray;
2770 }       
2771
2772 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2773         if(TNVec->size() == 0){
2774                 return -1; //not found in an empty vec
2775         }
2776         //binary search to find 
2777         int left=0;
2778         int right = TNVec->size();
2779         int mid = (left +right)/2;
2780         while(searchTN != (*TNVec)[mid] && left < right){
2781                 if((*TNVec)[mid] > searchTN){
2782                         right = mid-1;
2783                 }else{
2784                         left = mid+1;
2785                 }
2786                 mid = (left + right)/2;
2787         }
2788         if(left < right){
2789                 //mid is the element to be returned
2790                 return mid;
2791         }else{
2792                 if(mid < TNVec->size() && mid >=0){
2793                         if((*TNVec)[mid] == searchTN){
2794                                 return mid;
2795                         }else{
2796                                 return -1;
2797                         }
2798                 }else{
2799                         return -1;
2800                 }
2801         }
2802 };
2803
2804
2805 /*
2806         Method to do parallel restart. Distribute some of the array elements to other processors.
2807         The problem is that we cant use to charm entry methods to do migration as it will get
2808         stuck in the protocol that is going to restart
2809         Note: in order to avoid interference between the objects being recovered, the current PE
2810     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2811     objects end up stepping into each other's shoes (interference).
2812 */
2813
2814 class ElementDistributor: public CkLocIterator{
2815         CkLocMgr *locMgr;
2816         int *targetPE;
2817
2818         void pupLocation(CkLocation &loc,PUP::er &p){
2819                 CkArrayIndexMax idx=loc.getIndex();
2820                 CkGroupID gID = locMgr->ckGetGroupID();
2821                 p|gID;      // store loc mgr's GID as well for easier restore
2822                 p|idx;
2823                 p|loc;
2824         };
2825 public:
2826         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2827
2828         void addLocation(CkLocation &loc){
2829
2830                 // leaving object on this PE
2831                 if(*targetPE == CkMyPe()){
2832                         *targetPE = (*targetPE +1)%CkNumPes();
2833                         return;
2834                 }
2835                         
2836                 CkArrayIndexMax idx = loc.getIndex();
2837                 CkLocRec_local *rec = loc.getLocalRecord();
2838                         
2839                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2840                 idx.print();
2841                         
2842                 //TODO: an element that is being moved should leave some trace behind so that
2843                 // the arraybroadcaster can forward messages to it
2844                         
2845                 //pack up this location and send it across
2846                 PUP::sizer psizer;
2847                 pupLocation(loc,psizer);
2848                 int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2849                 char *msg = (char *)CmiAlloc(totalSize);
2850                 char *buf = &msg[CmiMsgHeaderSizeBytes];
2851                 PUP::toMem pmem(buf);
2852                 pmem.becomeDeleting();
2853                 pupLocation(loc,pmem);
2854                         
2855                 locMgr->setDuringMigration(CmiTrue);
2856                 delete rec;
2857                 locMgr->setDuringMigration(CmiFalse);
2858                 locMgr->inform(idx,*targetPE);
2859
2860                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2861                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2862
2863                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2864
2865                 //decide on the target processor for the next object
2866                 *targetPE = *targetPE + 1;
2867                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2868                         *targetPE = CkMyPe() + 1;
2869                 }
2870         }
2871
2872 };
2873
2874 /**
2875  * Distributes objects to accelerate recovery after a failure.
2876  */
2877 void distributeRestartedObjects(){
2878         int numGroups = CkpvAccess(_groupIDTable)->size();      
2879         int i;
2880         int targetPE=CkMyPe()+1;
2881         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2882 };
2883
2884 /**
2885  * Handler to update information about an object just received.
2886  */
2887 void _distributedLocationHandler(char *receivedMsg){
2888         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2889         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2890         PUP::fromMem pmem(buf);
2891         CkGroupID gID;
2892         CkArrayIndexMax idx;
2893         pmem |gID;
2894         pmem |idx;
2895         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2896         donotCountMigration=1;
2897         mgr->resume(idx,pmem,CmiTrue);
2898         donotCountMigration=0;
2899         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2900         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2901         idx.print();
2902
2903         CkLocRec *rec = mgr->elementRec(idx);
2904         CmiAssert(rec->type() == CkLocRec::local);
2905         
2906         CkVec<CkMigratable *> eltList;
2907         mgr->migratableList((CkLocRec_local *)rec,eltList);
2908         for(int i=0;i<eltList.size();i++){
2909                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2910                         CpvAccess(_currentObj) = eltList[i];
2911                         eltList[i]->ResumeFromSync();
2912                 }
2913         }
2914 }
2915
2916
2917 /** this method is used to send messages to a restarted processor to tell
2918  * it that a particular expected object is not going to get to it */
2919 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2920         DummyMigrationMsg buf;
2921         buf.flag = MLOG_OBJECT;
2922         buf.lbID = lbID;
2923         buf.mgrID = locMgrID;
2924         buf.idx = idx;
2925         buf.locationPE = locationPE;
2926         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2927         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2928 };
2929
2930
2931 /**this method is used by a restarted processor to tell other processors
2932  * that they are not going to receive these many objects.. just the count
2933  * not the objects themselves ***/
2934
2935 void sendDummyMigrationCounts(int *dummyCounts){
2936         DummyMigrationMsg buf;
2937         buf.flag = MLOG_COUNT;
2938         buf.lbID = globalLBID;
2939         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2940         for(int i=0;i<CmiNumPes();i++){
2941                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2942                         buf.count = dummyCounts[i];
2943                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2944                 }
2945         }
2946 }
2947
2948
2949 /** this handler is used to process a dummy migration msg.
2950  * it looks up the load balancer and calls migrated for it */
2951
2952 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2953         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2954         if(msg->flag == MLOG_OBJECT){
2955                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2956                 LDObjHandle h;
2957                 lb->Migrated(h,1);
2958         }
2959         if(msg->flag == MLOG_COUNT){
2960                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2961                 msg->count -= verifyAckedRequests;
2962                 for(int i=0;i<msg->count;i++){
2963                         LDObjHandle h;
2964                         lb->Migrated(h,1);
2965                 }
2966         }
2967         verifyAckedRequests=0;
2968         CmiFree(msg);
2969 };
2970
2971 /*****************************************************
2972         Implementation of a method that can be used to call
2973         any method on the ChareMlogData of all the chares on
2974         a processor currently
2975 ******************************************************/
2976
2977
2978 class ElementCaller :  public CkLocIterator {
2979 private:
2980         CkLocMgr *locMgr;
2981         MlogFn fnPointer;
2982         void *data;
2983 public:
2984         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2985                 locMgr = _locMgr;
2986                 fnPointer = _fnPointer;
2987                 data = _data;
2988         };
2989         void addLocation(CkLocation &loc){
2990                 CkVec<CkMigratable *> list;
2991                 CkLocRec_local *local = loc.getLocalRecord();
2992                 locMgr->migratableList (local,list);
2993                 for(int i=0;i<list.size();i++){
2994                         CkMigratable *migratableElement = list[i];
2995                         fnPointer(data,migratableElement->mlogData);
2996                 }
2997         }
2998 };
2999
3000 /**
3001  * Map function pointed by fnPointer over all the chares living in this processor.
3002  */
3003 void forAllCharesDo(MlogFn fnPointer, void *data){
3004         int numGroups = CkpvAccess(_groupIDTable)->size();
3005         for(int i=0;i<numGroups;i++){
3006                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3007                 fnPointer(data,obj->mlogData);
3008         }
3009         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3010         for(int i=0;i<numNodeGroups;i++){
3011                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3012                 fnPointer(data,obj->mlogData);
3013         }
3014         int i;
3015         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3016 };
3017
3018
3019 /******************************************************************
3020  Load Balancing
3021 ******************************************************************/
3022
3023 /**
3024  * This is the first time Converse is called after AtSync method has been called by every local object.
3025  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3026  * message logging, we can take advantage of this situation and garbage collect at this point.
3027  */
3028 void initMlogLBStep(CkGroupID gid){
3029         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3030         countLBMigratedAway = 0;
3031         countLBToMigrate=0;
3032         onGoingLoadBalancing=1;
3033         migrationDoneCalled=0;
3034         checkpointBarrierCount=0;
3035         if(globalLBID.idx != 0){
3036                 CmiAssert(globalLBID.idx == gid.idx);
3037         }
3038         globalLBID = gid;
3039 #if SYNCHRONIZED_CHECKPOINT
3040         garbageCollectMlog();
3041 #endif
3042 }
3043
3044 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3045         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3046         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3047         
3048         resumeLbFnPtr = _fnPtr;
3049         centralLb = _centralLb;
3050         migrationDoneCalled = 1;
3051         if(countLBToMigrate == countLBMigratedAway){
3052                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3053                 startMlogCheckpoint(NULL,CmiWallTimer());       
3054         }
3055 };
3056
3057 void finishedCheckpointLoadBalancing(){
3058         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3059         CheckpointBarrierMsg msg;
3060         msg.fromPE = CmiMyPe();
3061         msg.checkpointCount = checkpointCount;
3062
3063         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3064         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3065         
3066 };
3067
3068
3069 void sendMlogLocation(int targetPE, envelope *env){
3070 #if !SYNCHRONIZED_CHECKPOINT
3071         void *_msg = EnvToUsr(env);
3072         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3073
3074         int existing = 0;
3075         //if this object is already in the retainedobjectlust destined for this
3076         //processor it should not be sent
3077         
3078         for(int i=0;i<retainedObjectList.size();i++){
3079                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3080                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3081                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3082                         existing = 1;
3083                         break;
3084                 }
3085         }
3086
3087         if(existing){
3088                 return;
3089         }
3090         
3091         countLBToMigrate++;
3092         
3093         MigrationNotice migMsg;
3094         migMsg.migRecord.gID = msg->gid;
3095         migMsg.migRecord.idx = msg->idx;
3096         migMsg.migRecord.fromPE = CkMyPe();
3097         migMsg.migRecord.toPE =  targetPE;
3098         
3099         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3100         
3101         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3102         retainedObject->migRecord = migMsg.migRecord;
3103         retainedObject->acked  = 0;
3104         
3105         CkPackMessage(&env);
3106         
3107         migMsg.record = retainedObject;
3108         retainedObject->msg = env;
3109         int size = retainedObject->size = env->getTotalsize();
3110         
3111         retainedObjectList.push_back(retainedObject);
3112         
3113         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3114         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3115         
3116         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3117 #endif
3118 }
3119
3120 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3121         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3122         migratedNoticeList.push_back(msg->migRecord);
3123
3124         MigrationNoticeAck buf;
3125         buf.record = msg->record;
3126         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3127         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3128 }
3129
3130 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3131         
3132         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3133         retainedObject->acked = 1;
3134
3135         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3136         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3137
3138         //inform home about the new location of this object
3139         CkGroupID gID = retainedObject->migRecord.gID ;
3140         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3141         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3142         
3143         countLBMigratedAway++;
3144         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3145                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3146                 startMlogCheckpoint(NULL,CmiWallTimer());
3147         }
3148 };
3149
3150 void _receiveMlogLocationHandler(void *buf){
3151         envelope *env = (envelope *)buf;
3152         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3153         CkUnpackMessage(&env);
3154         void *_msg = EnvToUsr(env);
3155         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3156         CkGroupID gID= msg->gid;
3157         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3158         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3159         CpvAccess(_currentObj)=mgr;
3160         mgr->immigrate(msg);
3161 };
3162
3163
3164 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3165 /*      if(mlogData->objID.type == TypeArray){
3166                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3167         //      TODO: make sure later that atSync has been called and it needs 
3168         //      to be resumed from sync
3169         //
3170                 CpvAccess(_currentObj) = elt;
3171                 elt->ResumeFromSync();
3172         }*/
3173 }
3174
3175 /**
3176  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
3177  */
3178 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3179         if(checkpointBarrierCount == CmiNumPes()){
3180                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3181                 for(int i=0;i<CmiNumPes();i++){
3182                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3183                 }
3184         }
3185 }
3186
3187 /**
3188  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
3189  */ 
3190 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3191         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3192         if(msg->checkpointCount == checkpointCount){
3193                 checkpointBarrierCount++;
3194                 checkAndSendCheckpointBarrierAcks(msg);
3195         }else{
3196                 if(msg->checkpointCount-1 == checkpointCount){
3197                         checkpointBarrierCount++;
3198                         checkAndSendCheckpointBarrierAcks(msg);
3199                 }else{
3200                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3201                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3202                 }
3203         }
3204
3205         // deleting the received message
3206         CmiFree(msg);
3207 }
3208
3209 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3210         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3211         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3212
3213 #if !SYNCHRONIZED_CHECKPOINT
3214         // sending a notice to all senders to remove message logs
3215         sendRemoveLogRequests();
3216 #endif
3217
3218         // resuming LB function pointer
3219         (*resumeLbFnPtr)(centralLb);
3220
3221         // deleting message
3222         CmiFree(msg);
3223 }
3224
3225 /**
3226  * @brief Function to remove all messages in the message log of a particular chare.
3227  */
3228 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
3229         int total;
3230         MlogEntry *logEntry;
3231         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
3232
3233         // traversing the whole message log and removing all elements
3234         total = mlog->length();
3235         for(int i=0; i<total; i++){
3236                 logEntry = mlog->deq();
3237                 delete logEntry;
3238         }
3239
3240 }
3241
3242 /**
3243  * @brief Garbage collects the message log and other data structures.
3244  * In case of synchronized checkpoint, we use an optimization to avoid causal message logging protocol
3245  * to communicate all determinants to the rest of the processors.
3246  */
3247 void garbageCollectMlog(){
3248         CkHashtableIterator *iterator;
3249         CkVec<Determinant> *detArray;
3250
3251         DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
3252
3253         // cleaning up the buffered determinants, since they belong to a previous checkpoint period
3254         _indexBufferedDets = 0;
3255         _numBufferedDets = 0;
3256         _phaseBufferedDets++;
3257
3258         // cleaning up remote determinants, since they belong to a previous checkpoint period
3259         iterator = CpvAccess(_remoteDets)->iterator();
3260         while(iterator->hasNext()){
3261                 detArray = *(CkVec<Determinant> **)iterator->next();
3262                 detArray->removeAll();
3263         }
3264         
3265         // deleting the iterator
3266         delete iterator;
3267
3268         // removing all messages in message log for every chare
3269         forAllCharesDo(garbageCollectMlogForChare, NULL);
3270 }
3271
3272 /**
3273         method that informs an array elements home processor of its current location
3274         It is a converse method to bypass the charm++ message logging framework
3275 */
3276
3277 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3278         double _startTime = CmiWallTimer();
3279         CurrentLocationMsg msg;
3280         msg.mgrID = locMgrID;
3281         msg.idx = idx;
3282         msg.locationPE = currentPE;
3283         msg.fromPE = CkMyPe();
3284
3285         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3286         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3287         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3288         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3289 }
3290
3291
3292 void _receiveLocationHandler(CurrentLocationMsg *data){
3293         double _startTime = CmiWallTimer();
3294         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3295         if(mgr == NULL){
3296                 CmiFree(data);
3297                 return;
3298         }
3299         CkLocRec *rec = mgr->elementNrec(data->idx);
3300         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3301         if(rec != NULL){
3302                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3303                         if(data->fromPE == data->locationPE){
3304                                 CmiAbort("Another processor has the same object");
3305                         }
3306                 }
3307         }
3308         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3309                 int targetPE = data->fromPE;
3310                 data->fromPE = CmiMyPe();
3311                 data->locationPE = CmiMyPe();
3312                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3313                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3314         }else{
3315                 mgr->inform(data->idx,data->locationPE);
3316         }
3317         CmiFree(data);
3318         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3319 }
3320
3321
3322
3323 void getGlobalStep(CkGroupID gID){
3324         LBStepMsg msg;
3325         int destPE = 0;
3326         msg.lbID = gID;
3327         msg.fromPE = CmiMyPe();
3328         msg.step = -1;
3329         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3330         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3331 };
3332
3333 void _getGlobalStepHandler(LBStepMsg *msg){
3334         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3335         msg->step = lb->step();
3336         CmiAssert(msg->fromPE != CmiMyPe());
3337         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3338         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3339         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3340 };
3341
3342 void _recvGlobalStepHandler(LBStepMsg *msg){
3343         
3344         restartDecisionNumber=msg->step;
3345         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3346         _updateHomeAckHandler(dummyAck);
3347 };
3348
3349 /**
3350  * @brief Function to wrap up performance information.
3351  */
3352 void _messageLoggingExit(){
3353         
3354         // printing the signature for causal message logging
3355         if(CkMyPe() == 0)
3356                 printf("[%d] _causalMessageLoggingExit \n",CmiMyPe());
3357
3358         //TML: printing some statistics for group approach
3359 #if COLLECT_STATS_TEAM
3360         printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
3361         printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
3362 #endif
3363
3364 #if COLLECT_STATS_MSGS
3365 #if COLLECT_STATS_MSGS_TOTAL
3366         printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
3367         printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
3368 #else
3369         printf("[%d] TARGETS: ",CmiMyPe());
3370         for(int i=0; i<CmiNumPes(); i++){
3371 #if COLLECT_STATS_MSG_COUNT
3372                 printf("%d ",numMsgsTarget[i]);
3373 #else
3374                 printf("%d ",sizeMsgsTarget[i]);
3375 #endif
3376         }
3377         printf("\n");
3378 #endif
3379 #endif
3380
3381 #if COLLECT_STATS_DETS
3382         printf("\n");
3383         printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
3384         printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
3385 #if COLLECT_STATS_DETS_DUP
3386         printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
3387 #endif
3388 #endif
3389
3390 }
3391
3392 /**
3393         The method for returning the actual object pointed to by an id
3394         If the object doesnot exist on the processor it returns NULL
3395 **/
3396
3397 void* CkObjID::getObject(){
3398         
3399                 switch(type){
3400                         case TypeChare: 
3401                                 return CkLocalChare(&data.chare.id);
3402                         case TypeMainChare:
3403                                 return CkLocalChare(&data.chare.id);
3404                         case TypeGroup:
3405         
3406                                 CkAssert(data.group.onPE == CkMyPe());
3407                                 return CkLocalBranch(data.group.id);
3408                         case TypeNodeGroup:
3409                                 CkAssert(data.group.onPE == CkMyNode());
3410                                 //CkLocalNodeBranch(data.group.id);
3411                                 {
3412                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3413                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3414                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3415         
3416                                         return retval;
3417                                 }       
3418                         case TypeArray:
3419                                 {
3420         
3421         
3422                                         CkArrayID aid(data.array.id);
3423         
3424                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3425         
3426                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asChild());
3427         
3428                                         return aProxy.ckLocal();
3429                                 }
3430                         default:
3431                                 CkAssert(0);
3432                 }
3433 }
3434
3435
3436 int CkObjID::guessPE(){
3437                 switch(type){
3438                         case TypeChare:
3439                         case TypeMainChare:
3440                                 return data.chare.id.onPE;
3441                         case TypeGroup:
3442                         case TypeNodeGroup:
3443                                 return data.group.onPE;
3444                         case TypeArray:
3445                                 {
3446                                         CkArrayID aid(data.array.id);
3447                                         if(aid.ckLocalBranch() == NULL){
3448                                                 return -1;
3449                                         }
3450                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asChild());
3451                                 }
3452                         default:
3453                                 CkAssert(0);
3454                 }
3455 };
3456
3457 char *CkObjID::toString(char *buf) const {
3458         
3459         switch(type){
3460                 case TypeChare:
3461                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3462                         break;
3463                 case TypeMainChare:
3464                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);       
3465                         break;
3466                 case TypeGroup:
3467                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
3468                         break;
3469                 case TypeNodeGroup:
3470                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
3471                         break;
3472                 case TypeArray:
3473                         {
3474                                 const CkArrayIndexMax &idx = data.array.idx.asChild();
3475                                 const int *indexData = idx.data();
3476                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
3477                                 break;
3478                         }
3479                 default:
3480                         CkAssert(0);
3481         }
3482         
3483         return buf;
3484 };
3485
3486 void CkObjID::updatePosition(int PE){
3487         if(guessPE() == PE){
3488                 return;
3489         }
3490         switch(type){
3491                 case TypeArray:
3492                         {
3493                                         CkArrayID aid(data.array.id);
3494                                         if(aid.ckLocalBranch() == NULL){
3495                                                 
3496                                         }else{
3497                                                 char str[100];
3498                                                 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
3499 //                                              CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
3500                                                 CkLocRec *rec = mgr->elementNrec(data.array.idx.asChild());
3501                                                 if(rec != NULL){
3502                                                         if(rec->type() == CkLocRec::local){
3503                                                                 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
3504                                                                 return;
3505                                                         }
3506                                                 }
3507                                                 mgr->inform(data.array.idx.asChild(),PE);
3508                                         }       
3509                                 }
3510
3511                         break;
3512                 case TypeChare:
3513                 case TypeMainChare:
3514                         CkAssert(data.chare.id.onPE == PE);
3515                         break;
3516                 case TypeGroup:
3517                 case TypeNodeGroup:
3518                         CkAssert(data.group.onPE == PE);
3519                         break;
3520                 default:
3521                         CkAssert(0);
3522         }
3523 }
3524
3525
3526
3527 void MlogEntry::pup(PUP::er &p){
3528         p | destPE;
3529         p | _infoIdx;
3530         int size;
3531         if(!p.isUnpacking()){
3532 /*              CkAssert(env);
3533                 if(!env->isPacked()){
3534                         CkPackMessage(&env);
3535                 }*/
3536                 if(env == NULL){
3537                         //message was probably local and has been removed from logs
3538                         size = 0;
3539                 }else{
3540                         size = env->getTotalsize();
3541                 }       
3542         }
3543         p | size;
3544         if(p.isUnpacking()){
3545                 if(size > 0){
3546                         env = (envelope *)_allocEnv(ForChareMsg,size);
3547                 }else{
3548                         env = NULL;
3549                 }
3550         }
3551         if(size > 0){
3552                 p((char *)env,size);
3553         }
3554 };
3555
3556 void RestoredLocalMap::pup(PUP::er &p){
3557         p | minSN;
3558         p | maxSN;
3559         p | count;
3560         if(p.isUnpacking()){
3561                 TNArray = new MCount[count];
3562         }
3563         p(TNArray,count);
3564 };
3565
3566
3567
3568
3569 /**********************************
3570         * The methods of the message logging
3571         * data structure stored in each chare
3572         ********************************/
3573
3574 MCount ChareMlogData::nextSN(const CkObjID &recver){
3575 /*      MCount SN = snTable.get(recver);
3576         snTable.put(recver) = SN+1;
3577         return SN+1;*/
3578         DEBUG_MEM(CmiMemoryCheck());
3579         double _startTime = CmiWallTimer();
3580         MCount *SN = snTable.getPointer(recver);
3581         DEBUG_MEM(CmiMemoryCheck());
3582         if(SN==NULL){
3583                 snTable.put(recver) = 1;
3584                 return 1;
3585         }else{
3586                 (*SN)++;
3587                 return *SN;
3588         }
3589 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3590 };
3591  
3592 /**
3593  * @brief Gets a new ticket for a particular object.
3594  */
3595 MCount ChareMlogData::newTN(){
3596         MCount TN;
3597         if(currentHoles > 0){
3598                 int holeidx = numberHoles-currentHoles;
3599                 TN = ticketHoles[holeidx];
3600                 currentHoles--;
3601                 if(currentHoles == 0){
3602                         delete []ticketHoles;
3603                         numberHoles = 0;
3604                 }
3605         }else{
3606                 TN = ++tCount;
3607         }
3608         return TN;
3609 };
3610
3611 /**
3612  * @brief Get the ticket associated with a combination of sender and SN, if any.
3613  */
3614 inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
3615         Ticket ticket;
3616
3617         SNToTicket *ticketRow = ticketTable.get(sender);
3618         if(ticketRow != NULL){
3619                 return ticketRow->get(SN);
3620         }
3621         return ticket;
3622 }
3623
3624 /**
3625  * Inserts a ticket in the ticketTable if it is not already there.
3626  */
3627 inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
3628         Ticket ticket;
3629
3630         SNToTicket *ticketRow = ticketTable.get(sender);
3631         if(ticketRow != NULL){
3632                 Ticket earlierTicket = ticketRow->get(SN);
3633                 if(earlierTicket.TN != 0){
3634                         CkAssert(earlierTicket.TN == TN);
3635                         return;
3636                 }
3637         }else{
3638                 ticketRow = new SNToTicket();
3639                 ticketTable.put(sender) = ticketRow;
3640         }
3641         ticket.TN = TN;
3642         ticketRow->put(SN) = ticket;
3643 }
3644
3645 /**
3646  * Generates the next ticket for a request.
3647  */
3648 inline Ticket ChareMlogData::next_ticket(CkObjID &sender, MCount SN){
3649         DEBUG(char senderName[100];)
3650         DEBUG(char recverName[100];)
3651         double _startTime = CmiWallTimer();
3652         Ticket ticket;
3653
3654         // if a ticket is requested during restart, 0 is returned to make the requester to ask for it later.
3655         if(restartFlag){
3656                 ticket.TN = 0;
3657                 return ticket;
3658         }
3659         
3660         SNToTicket *ticketRow = ticketTable.get(sender);
3661         if(ticketRow != NULL){
3662                 Ticket earlierTicket = ticketRow->get(SN);
3663                 if(earlierTicket.TN == 0){
3664                         ticket.TN = newTN();
3665                         ticketRow->put(SN) = ticket;
3666                         DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
3667                 }else{
3668                         ticket.TN = earlierTicket.TN;
3669                         if(ticket.TN > tCount){
3670                                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3671                         }
3672                                 CmiAssert(ticket.TN <= tCount);
3673                 }
3674                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3675         }else{
3676                 SNToTicket *newRow = new SNToTicket;            
3677                 ticket.TN = newTN();
3678                 newRow->put(SN) = ticket;
3679                 ticketTable.put(sender) = newRow;
3680                 DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
3681         }
3682         ticket.state = NEW_TICKET;
3683
3684 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3685         return ticket;  
3686 };
3687
3688 /**
3689  * Adds an entry into the message log.
3690  */
3691 void ChareMlogData::addLogEntry(MlogEntry *entry){
3692         DEBUG(char nameString[100]);
3693         DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
3694         DEBUG_MEM(CmiMemoryCheck());
3695
3696         // enqueuing the entry in the message log
3697         mlog.enq(entry);
3698 };
3699
3700 double totalSearchRestoredTime=0;
3701 double totalSearchRestoredCount=0;
3702
3703 /**
3704  * Searches the restoredlocal map to see if the combination of sender and sequence number
3705  * shows up in the map. Returns the ticket if found, or 0 otherwise.
3706  */
3707 MCount ChareMlo