Adding support for causal message logging.
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_TEAM 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x) // x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) if(CkMyPe() == x) y
41 #define DEBUG_RECOVERY(x) //x
42
43 extern const char *idx2str(const CkArrayIndex &ind);
44 extern const char *idx2str(const ArrayElement *el);
45
46 void getGlobalStep(CkGroupID gID);
47
48 bool fault_aware(CkObjID &recver);
49 void sendCheckpointData(int mode);
50 void createObjIDList(void *data,ChareMlogData *mlogData);
51 inline bool isLocal(int destPE);
52 inline bool isTeamLocal(int destPE);
53 void printLog(TProcessedLog *log);
54
55 int _restartFlag=0;
56 int _numRestartResponses=0;
57
58 //TODO: remove for perf runs
59 int countHashRefs=0; //count the number of gets
60 int countHashCollisions=0;
61
62 char *checkpointDirectory=".";
63 int unAckedCheckpoint=0;
64
65 int countLocal=0,countBuffered=0;
66 int countPiggy=0;
67 int countClearBufferedLocalCalls=0;
68
69 int countUpdateHomeAcks=0;
70
71 extern int teamSize;
72 extern int chkptPeriod;
73 extern bool fastRecovery;
74 extern int parallelRecovery;
75
76 char *killFile;
77 char *faultFile;
78 int killFlag=0;
79 int faultFlag=0;
80 int restartingMlogFlag=0;
81 void readKillFile();
82 double killTime=0.0;
83 double faultMean;
84 int checkpointCount=0;
85
86 CpvDeclare(Chare *,_currentObj);
87 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
88 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
89 CpvDeclare(Queue, _outOfOrderMessageQueue);
90 CpvDeclare(Queue, _delayedRemoteMessageQueue);
91 CpvDeclare(char **,_bufferedTicketRequests);
92 CpvDeclare(int *,_numBufferedTicketRequests);
93
94 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
95 /** @note All the determinants generated by a PE are stored in variable _localDets.
96  * As soon as a message is sent, then all the determinants are appended to the message,
97  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
98  * to delete the determinants. In the meantime the same determinants may be appended
99  * to other messages and more determinants can be added to _localDets.
100  * A simple solution to this problem was to have a primitive array and keep adding determinants
101  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
102  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
103  * determinant and a number of how many valid determinants there are behind it. We do not remove
104  * determinants until a checkpoint is made, since these determinants may have to be added to
105  * messages in case of a recovery.
106  */
107 // temporal storage for the outgoing determinants
108 CpvDeclare(char *, _localDets);
109 // number of buffered determinants
110 int _numBufferedDets;
111 // current index of first determinant in _localDets
112 int _indexBufferedDets;
113 // current phase of determinants
114 int _phaseBufferedDets;
115
116 // stores the determinants from other nodes, to send them in case of a crash
117 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
118 // max number of buffered determinants
119 int _maxBufferedDets;
120
121 // stores the incarnation number from every other processor
122 CpvDeclare(char *, _incarnation);
123
124 /***** *****/
125
126 #if COLLECT_STATS_MSGS
127 int *numMsgsTarget;
128 int *sizeMsgsTarget;
129 int totalMsgsTarget;
130 float totalMsgsSize;
131 #endif
132 #if COLLECT_STATS_DETS
133 int numPiggyDets;
134 int numDets;
135 int numDupDets;
136 #endif
137 #if COLLECT_STATS_MEMORY
138 int msgLogSize;
139 int bufferedDetsSize;
140 int storedDetsSize;
141 #endif
142 //TML: variables for measuring savings with teams in message logging
143 #if COLLECT_STATS_TEAM
144 float MLOGFT_totalLogSize = 0.0;
145 float MLOGFT_totalMessages = 0.0;
146 #endif
147
148 static double adjustChkptPeriod=0.0; //in ms
149 static double nextCheckpointTime=0.0;//in seconds
150 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
151
152 int _pingHandlerIdx;
153
154 char objString[100];
155 int _checkpointRequestHandlerIdx;
156 int _storeCheckpointHandlerIdx;
157 int _checkpointAckHandlerIdx;
158 int _getCheckpointHandlerIdx;
159 int _recvCheckpointHandlerIdx;
160 int _removeProcessedLogHandlerIdx;
161
162 int _verifyAckRequestHandlerIdx;
163 int _verifyAckHandlerIdx;
164 int _dummyMigrationHandlerIdx;
165
166
167 int     _getGlobalStepHandlerIdx;
168 int     _recvGlobalStepHandlerIdx;
169
170 int _updateHomeRequestHandlerIdx;
171 int _updateHomeAckHandlerIdx;
172 int _resendMessagesHandlerIdx;
173 int _resendReplyHandlerIdx;
174 int _receivedTNDataHandlerIdx;
175 int _receivedDetDataHandlerIdx;
176 int _distributedLocationHandlerIdx;
177 int _storeDeterminantsHandlerIdx;
178 int _removeDeterminantsHandlerIdx;
179
180 //TML: integer constants for team-based message logging
181 int _restartHandlerIdx;
182 int _getRestartCheckpointHandlerIdx;
183 int _recvRestartCheckpointHandlerIdx;
184 void setTeamRecovery(void *data, ChareMlogData *mlogData);
185 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
186
187 int verifyAckTotal;
188 int verifyAckCount;
189
190 int verifyAckedRequests=0;
191
192 RestartRequest *storedRequest;
193
194 int _falseRestart =0; /**
195                                                                                                         For testing on clusters we might carry out restarts on 
196                                                                                                         a porcessor without actually starting it
197                                                                                                         1 -> false restart
198                                                                                                         0 -> restart after an actual crash
199                                                                                                 */              
200
201 //Load balancing globals
202 int onGoingLoadBalancing=0;
203 void *centralLb;
204 void (*resumeLbFnPtr)(void *);
205 int _receiveMlogLocationHandlerIdx;
206 int _receiveMigrationNoticeHandlerIdx;
207 int _receiveMigrationNoticeAckHandlerIdx;
208 int _checkpointBarrierHandlerIdx;
209 int _checkpointBarrierAckHandlerIdx;
210
211 CkVec<MigrationRecord> migratedNoticeList;
212 CkVec<RetainedMigratedObject *> retainedObjectList;
213 int donotCountMigration=0;
214 int countLBMigratedAway=0;
215 int countLBToMigrate=0;
216 int migrationDoneCalled=0;
217 int checkpointBarrierCount=0;
218 int globalResumeCount=0;
219 CkGroupID globalLBID;
220 int restartDecisionNumber=-1;
221
222 double lastCompletedAlarm=0;
223 double lastRestart=0;
224
225 //update location globals
226 int _receiveLocationHandlerIdx;
227
228 /** 
229  * @brief Initialize message logging data structures and register handlers
230  */
231 void _messageLoggingInit(){
232         //current object
233         CpvInitialize(Chare *,_currentObj);
234         
235         //registering handlers for message logging
236         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
237                 
238         //handlers for checkpointing
239         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
240         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
241         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
242         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
243
244         //handlers for restart
245         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
246         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
247         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
248         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
249         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
250         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
251         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
252         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
253         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
254         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
255         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
256         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
257
258         //TML: handlers for team-based message logging
259         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
260         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
261         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
262
263         // handlers for causal message logging
264         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
265         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
266         
267         //handlers for load balancing
268         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
269         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
270         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
271         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
272         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
273         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
274         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
275         
276         //handlers for updating locations
277         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
278         
279         //Cpv variables for message logging
280         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
281         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
282         CpvInitialize(Queue, _outOfOrderMessageQueue);
283         CpvInitialize(Queue, _delayedRemoteMessageQueue);
284         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
285         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
286         
287         CpvInitialize(char **,_bufferedTicketRequests);
288         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
289         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
290         for(int i=0;i<CkNumPes();i++){
291                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
292                 CpvAccess(_numBufferedTicketRequests)[i]=0;
293         }
294  
295         // Cpv variables for causal protocol
296         _numBufferedDets = 0;
297         _indexBufferedDets = 0;
298         _phaseBufferedDets = 0;
299         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
300         CpvInitialize(char *, _localDets);
301         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
302         CpvInitialize(char *, _incarnation);
303         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
304         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
305         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
306         for(int i=0; i<CmiNumPes(); i++){
307                 CpvAccess(_incarnation)[i] = 0;
308         }
309         
310         //Cpv variables for checkpoint
311         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
312         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
313
314         // registering user events for projections      
315         traceRegisterUserEvent("Remove Logs", 20);
316         traceRegisterUserEvent("Ticket Request Handler", 21);
317         traceRegisterUserEvent("Ticket Handler", 22);
318         traceRegisterUserEvent("Local Message Copy Handler", 23);
319         traceRegisterUserEvent("Local Message Ack Handler", 24);        
320         traceRegisterUserEvent("Preprocess current message",25);
321         traceRegisterUserEvent("Preprocess past message",26);
322         traceRegisterUserEvent("Preprocess future message",27);
323         traceRegisterUserEvent("Checkpoint",28);
324         traceRegisterUserEvent("Checkpoint Store",29);
325         traceRegisterUserEvent("Checkpoint Ack",30);
326         traceRegisterUserEvent("Send Ticket Request",31);
327         traceRegisterUserEvent("Generalticketrequest1",32);
328         traceRegisterUserEvent("TicketLogLocal",33);
329         traceRegisterUserEvent("next_ticket and SN",34);
330         traceRegisterUserEvent("Timeout for buffered remote messages",35);
331         traceRegisterUserEvent("Timeout for buffered local messages",36);
332         traceRegisterUserEvent("Inform Location Home",37);
333         traceRegisterUserEvent("Receive Location Handler",38);
334         
335         lastCompletedAlarm=CmiWallTimer();
336         lastRestart = CmiWallTimer();
337
338 #if COLLECT_STATS_MSGS
339 #if COLLECT_STATS_MSGS_TOTAL
340         totalMsgsTarget = 0;
341         totalMsgsSize = 0.0;
342 #else
343         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
344         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
345         for(int i=0; i<CmiNumPes(); i++){
346                 numMsgsTarget[i] = 0;
347                 sizeMsgsTarget[i] = 0;
348         }
349 #endif
350 #endif
351 #if COLLECT_STATS_DETS
352         numPiggyDets = 0;
353         numDets = 0;
354         numDupDets = 0;
355 #endif
356 #if COLLECT_STATS_MEMORY
357         msgLogSize = 0;
358         bufferedDetsSize = 0;
359         storedDetsSize = 0;
360 #endif
361
362
363 }
364
365 void killLocal(void *_dummy,double curWallTime);        
366
367 void readKillFile(){
368         FILE *fp=fopen(killFile,"r");
369         if(!fp){
370                 return;
371         }
372         int proc;
373         double sec;
374         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
375                 if(proc == CkMyPe()){
376                         killTime = CmiWallTimer()+sec;
377                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
378                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
379                 }
380         }
381         fclose(fp);
382 }
383
384 /**
385  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
386  * We assume an exponential distribution for the mean-time-between-failures.
387  */
388 void readFaultFile(){
389         FILE *fp=fopen(faultFile,"r");
390         if(!fp){
391                 return;
392         }
393         int proc;
394         double sec;
395         fscanf(fp,"%d %lf",&proc,&sec);
396         faultMean = sec;
397         if(proc == CkMyPe()){
398                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
399                 CcdCallFnAfter(killLocal,NULL,sec*1000);
400         }
401         fclose(fp);
402 }
403
404 void killLocal(void *_dummy,double curWallTime){
405         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
406         if(CmiWallTimer()<killTime-1){
407                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
408         }else{  
409                 kill(getpid(),SIGKILL);
410         }
411 }
412
413 /*** Auxiliary Functions ***/
414
415 /**
416  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
417  * determinants needs to be extended.
418  */
419 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
420         Determinant *det, *auxDet;
421         char *aux;
422
423         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
424
425         // checking if we are overflowing the buffered determinants array
426         if(_indexBufferedDets >= _maxBufferedDets){
427                 aux = CpvAccess(_localDets);
428                 _maxBufferedDets *= 2;
429                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
430                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
431                 CmiFree(aux);
432         }
433
434         // adding the new determinant at the end
435         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
436         det->sender = sender;
437         det->receiver = receiver;
438         det->SN = SN;
439         det->TN = TN;
440         _numBufferedDets++;
441         _indexBufferedDets++;
442
443 #if COLLECT_STATS_MEMORY
444         bufferedDetsSize++;
445 #endif
446 #if COLLECT_STATS_DETS
447         numDets++;
448 #endif
449 }
450
451 /************************ Message logging methods ****************/
452
453 /**
454  * Sends a group message that might be a broadcast.
455  */
456 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
457         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
458                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
459                 void *origMsg = EnvToUsr(env);
460                 for(int i=0;i<CmiNumPes();i++){
461                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
462                                 void *copyMsg = CkCopyMsg(&origMsg);
463                                 envelope *copyEnv = UsrToEnv(copyMsg);
464                                 copyEnv->SN=0;
465                                 copyEnv->TN=0;
466                                 copyEnv->sender.type = TypeInvalid;
467                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
468                                 sendGroupMsg(copyEnv,i,_infoIdx);
469                         }
470                 }
471                 return;
472         }
473
474         // initializing values of envelope
475         env->SN=0;
476         env->TN=0;
477         env->sender.type = TypeInvalid;
478
479         CkObjID recver;
480         recver.type = TypeGroup;
481         recver.data.group.id = env->getGroupNum();
482         recver.data.group.onPE = destPE;
483         sendCommonMsg(recver,env,destPE,_infoIdx);
484 }
485
486 /**
487  * Sends a nodegroup message that might be a broadcast.
488  */
489 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
490         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
491                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
492                 void *origMsg = EnvToUsr(env);
493                 for(int i=0;i<CmiNumNodes();i++){
494                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
495                                 void *copyMsg = CkCopyMsg(&origMsg);
496                                 envelope *copyEnv = UsrToEnv(copyMsg);
497                                 copyEnv->SN=0;
498                                 copyEnv->TN=0;
499                                 copyEnv->sender.type = TypeInvalid;
500                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
501                         }
502                 }
503                 return;
504         }
505
506         // initializing values of envelope
507         env->SN=0;
508         env->TN=0;
509         env->sender.type = TypeInvalid;
510
511         CkObjID recver;
512         recver.type = TypeNodeGroup;
513         recver.data.group.id = env->getGroupNum();
514         recver.data.group.onPE = destNode;
515         sendCommonMsg(recver,env,destNode,_infoIdx);
516 }
517
518 /**
519  * Sends a message to an array element.
520  */
521 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
522         CkObjID recver;
523         recver.type = TypeArray;
524         recver.data.array.id = env->getsetArrayMgr();
525         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
526
527         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
528                 char recverString[100],senderString[100];
529                 
530                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
531         }
532
533         // initializing values of envelope
534         env->SN = 0;
535         env->TN = 0;
536
537         sendCommonMsg(recver,env,destPE,_infoIdx);
538 };
539
540 /**
541  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
542  */
543 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
544         envelope *env = _env;
545         MCount ticketNumber = 0;
546         int resend=0; //is it a resend
547         char recverName[100];
548         double _startTime=CkWallTimer();
549         
550         DEBUG_MEM(CmiMemoryCheck());
551
552         if(CpvAccess(_currentObj) == NULL){
553 //              CkAssert(0);
554                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
555                 generalCldEnqueue(destPE,env,_infoIdx);
556                 return;
557         }
558         
559         // setting message logging data in the envelope
560         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
561         if(env->sender.type == TypeInvalid){
562                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
563         }else{
564                 envelope *copyEnv = copyEnvelope(env);
565                 env = copyEnv;
566                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
567                 env->SN = 0;
568         }
569         
570         DEBUG_MEM(CmiMemoryCheck());
571
572         CkObjID &sender = env->sender;
573         env->recver = recver;
574
575         Chare *obj = (Chare *)env->sender.getObject();
576           
577         if(env->SN == 0){
578                 DEBUG_MEM(CmiMemoryCheck());
579                 env->SN = obj->mlogData->nextSN(recver);
580         }else{
581                 resend = 1;
582         }
583         
584         char senderString[100];
585 //      if(env->SN != 1){
586                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
587         //      CmiPrintStackTrace(0);
588 /*      }else{
589                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
590         }*/
591                 
592         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
593 //      CkPackMessage(&(mEntry->env));
594 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
595         
596         _startTime = CkWallTimer();
597
598         // uses the proper ticketing mechanism for local, team and general messages
599         if(isLocal(destPE)){
600                 sendLocalMsg(mEntry);
601         }else{
602                 if((teamSize > 1) && isTeamLocal(destPE)){
603
604                         // look to see if this message already has a ticket in the team-table
605                         Chare *senderObj = (Chare *)sender.getObject();
606                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
607                         if(ticketRow != NULL){
608                                 Ticket ticket = ticketRow->get(env->SN);
609                                 if(ticket.TN != 0){
610                                         ticketNumber = ticket.TN;
611                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
612                                 }
613                         }
614                 }
615                 
616                 // sending the message
617                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
618                 
619         }
620 }
621
622 /**
623  * Determines if the message is local or not. A message is local if:
624  * 1) Both the destination and origin are the same PE.
625  */
626 inline bool isLocal(int destPE){
627         // both the destination and the origin are the same PE
628         if(destPE == CkMyPe())
629                 return true;
630
631         return false;
632 }
633
634 /**
635  * Determines if the message is group local or not. A message is group local if:
636  * 1) They belong to the same group in the group-based message logging.
637  */
638 inline bool isTeamLocal(int destPE){
639
640         // they belong to the same group
641         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
642                 return true;
643
644         return false;
645 }
646
647 /**
648  * Method that does the actual send by creating a ticket request filling it up and sending it.
649  */
650 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
651         DEBUG(char recverString[100]);
652         DEBUG(char senderString[100]);
653
654         int totalSize;
655         StoreDeterminantsHeader header;
656         int sizes[2];
657         char *ptrs[2];
658
659         envelope *env = entry->env;
660         DEBUG(printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
661
662         // setting all the information
663         Chare *obj = (Chare *)entry->env->sender.getObject();
664         entry->env->recver = recver;
665         entry->env->SN = SN;
666         entry->env->TN = TN;
667         if(!resend){
668                 //TML: only stores message if either it goes to this processor or to a processor in a different group
669                 if(!isTeamLocal(entry->destPE)){
670                         obj->mlogData->addLogEntry(entry);
671 #if COLLECT_STATS_TEAM
672                         MLOGFT_totalMessages += 1.0;
673                         MLOGFT_totalLogSize += entry->env->getTotalsize();
674 #endif
675                 }else{
676                         // the message has to be deleted after it has been sent
677                         entry->env->freeMsg = true;
678                 }
679         }
680
681         // sending the determinants that causally affect this message
682         if(_numBufferedDets > 0){
683
684                 // modifying the actual number of determinants sent in message
685                 header.number = _numBufferedDets;
686                 header.index = _indexBufferedDets;
687                 header.phase = _phaseBufferedDets;
688                 header.PE = CmiMyPe();
689         
690                 // sending the message
691                 sizes[0] = sizeof(StoreDeterminantsHeader);
692                 sizes[1] = _numBufferedDets * sizeof(Determinant);
693                 ptrs[0] = (char *) &header;
694                 ptrs[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
695                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
696                 CmiSetHandler(&header, _storeDeterminantsHandlerIdx);
697                 CmiSyncVectorSend(destPE, 2, &sizes[0], &ptrs[0]);
698         }
699
700         // updating its message log entry
701         entry->indexBufDets = _indexBufferedDets;
702         entry->numBufDets = _numBufferedDets;
703
704         // sending the message
705         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
706
707         DEBUG_MEM(CmiMemoryCheck());
708 #if COLLECT_STATS_MSGS
709 #if COLLECT_STATS_MSGS_TOTAL
710         totalMsgsTarget++;
711         totalMsgsSize += (float)env->getTotalsize();
712 #else
713         numMsgsTarget[destPE]++;
714         sizeMsgsTarget[destPE] += env->getTotalsize();
715 #endif
716 #endif
717 #if COLLECT_STATS_DETS
718         numPiggyDets += _numBufferedDets;
719 #endif
720 #if COLLECT_STATS_MEMORY
721         msgLogSize += env->getTotalsize();
722 #endif
723 };
724
725
726 /**
727  * @brief Function to send a local message. It first gets a ticket and
728  * then enqueues the message. If we are recovering, then the message 
729  * is enqueued in a delay queue.
730  */
731 void sendLocalMsg(MlogEntry *entry){
732         DEBUG_PERF(double _startTime=CkWallTimer());
733         DEBUG_MEM(CmiMemoryCheck());
734         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
735         DEBUG(char senderString[100]);
736         DEBUG(char recverString[100]);
737         Ticket ticket;
738
739         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
740
741         // getting the receiver local object
742         Chare *recverObj = (Chare *)entry->env->recver.getObject();
743
744         // if receiver object is not NULL, we will ask it for a ticket
745         if(recverObj){
746
747 /*HERE          // if we are recovery, then we must put this off for later
748                 if(recverObj->mlogData->restartFlag){
749                         CpvAccess(_delayedLocalMsgs)->enq(entry);
750                         return;
751                 }
752
753                 // check if this combination of sender and SN has been already a ticket
754                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
755                         
756                 // assigned a new ticket in case this message is completely new
757                 if(ticket.TN == 0){
758                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
759
760                         // if TN == 0 we enqueue this message since we are recovering from a crash
761                         if(ticket.TN == 0){
762                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
763                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
764                                 
765 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
766                                 return;
767                         }
768                 }
769
770                 //TODO: check for the case when an invalid ticket is returned
771                 //TODO: check for OLD or RECEIVED TICKETS
772                 entry->env->TN = ticket.TN;
773                 CkAssert(entry->env->TN > 0);
774                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
775         
776                 DEBUG_MEM(CmiMemoryCheck());
777
778                 // adding the determinant to _localDets
779                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
780 */
781                 // sends the local message
782                 _skipCldEnqueue(CmiMyPe(),entry->env,entry->_infoIdx);  
783
784                 DEBUG_MEM(CmiMemoryCheck());
785         }else{
786                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
787         }
788 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
789 };
790
791 /****
792         The handler functions
793 *****/
794
795 /*** CAUSAL PROTOCOL HANDLERS ***/
796
797 /**
798  * @brief Removes the determinants after a particular index in the _localDets array.
799  */
800 void _removeDeterminantsHandler(char *buffer){
801         RemoveDeterminantsHeader *header;
802         int index, phase;
803
804         // getting the header from the message
805         header = (RemoveDeterminantsHeader *)buffer;
806         index = header->index;
807         phase = header->phase;
808
809         // fprintf(stderr,"ACK index=%d\n",index);
810
811         // updating the number of buffered determinants
812         if(phase == _phaseBufferedDets){
813                 if(index > _indexBufferedDets)
814                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
815                 CmiAssert(index <= _indexBufferedDets);
816                 _numBufferedDets = _indexBufferedDets - index;
817         }
818
819         // releasing memory
820         CmiFree(buffer);
821
822 }
823
824 /**
825  * @brief Stores the determinants coming from other processor.
826  */
827 void _storeDeterminantsHandler(char *buffer){
828         StoreDeterminantsHeader *header;
829         RemoveDeterminantsHeader removeHeader;
830         Determinant *detPtr, *det;
831         int i, n, index, phase, destPE;
832         CkVec<Determinant> *vec;
833
834
835         // getting the header from the message and pointing to the first determinant
836         header = (StoreDeterminantsHeader *)buffer;
837         n = header->number;
838         index = header->index;
839         phase = header->phase;
840         destPE = header->PE;
841         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
842
843         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
844         DEBUG_MEM(CmiMemoryCheck());
845
846         // going through all determinants and storing them into _remoteDets
847         for(i = 0; i < n; i++){
848                 det = new Determinant();
849                 det->sender = detPtr->sender;
850                 det->receiver = detPtr->receiver;
851                 det->SN = detPtr->SN;
852                 det->TN = detPtr->TN;
853
854                 // getting the determinant array
855                 vec = CpvAccess(_remoteDets)->get(det->receiver);
856                 if(vec == NULL){
857                         vec = new CkVec<Determinant>();
858                         CpvAccess(_remoteDets)->put(det->receiver) = vec;
859                 }
860 #if COLLECT_STATS_DETS
861 #if COLLECT_STATS_DETS_DUP
862                 for(int j=0; j<vec->size(); j++){
863                         if(isSameDet(&(*vec)[j],det)){
864                                 numDupDets++;
865                                 break;
866                         }
867                 }
868 #endif
869 #endif
870 #if COLLECT_STATS_MEMORY
871                 storedDetsSize++;
872 #endif
873                 vec->push_back(*det);
874                 detPtr = detPtr++;
875         }
876
877         DEBUG_MEM(CmiMemoryCheck());
878
879         // freeing buffer
880         CmiFree(buffer);
881
882         // sending the ACK back to the sender
883         removeHeader.index = index;
884         removeHeader.phase = phase;
885         CmiSetHandler(&removeHeader,_removeDeterminantsHandlerIdx);
886         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)&removeHeader);
887         
888 }
889
890 /**
891  *  If there are any delayed requests, process them first before 
892  *  processing this request
893  * */
894 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
895         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
896         double  _startTime = CkWallTimer();
897         CmiFree(ticketRequest);
898 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
899 }
900
901
902 /**
903  * @brief Gets a ticket for a recently received message.
904  * @pre env->recver has to be on this processor.
905  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
906  */
907 inline bool _getTicket(envelope *env, int *flag){
908         DEBUG_MEM(CmiMemoryCheck());
909         DEBUG(char recverName[100]);
910         DEBUG(char senderString[100]);
911         Ticket ticket;
912         
913         // getting information from request
914         CkObjID sender = env->sender;
915         CkObjID recver = env->recver;
916         MCount SN = env->SN;
917         MCount TN = env->TN;
918         Chare *recverObj = (Chare *)recver.getObject();
919         
920         DEBUG(recver.toString(recverName);)
921
922         // verifying whether we are recovering from a failure or not
923         if(recverObj->mlogData->restartFlag)
924                 return false;
925
926         // checking ticket table to see if this message already received a ticket
927         ticket = recverObj->mlogData->getTicket(sender, SN);
928         TN = ticket.TN;
929         
930         // checking if the message is team local and if it has a ticket already assigned
931         if(teamSize > 1 && TN != 0){
932                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
933                 recverObj->mlogData->verifyTicket(sender,SN,TN);
934         }
935
936         //check if a ticket for this has been already handed out to an object that used to be local but 
937         // is no longer so.. need for parallel restart
938         if(TN == 0){
939                 ticket = recverObj->mlogData->next_ticket(sender,SN);
940                 *flag = NEW_TICKET;
941         } else {
942                 *flag = OLD_TICKET;
943         }
944         if(ticket.TN > recverObj->mlogData->tProcessed){
945                 ticket.state = NEW_TICKET;
946         } else {
947                 ticket.state = OLD_TICKET;
948         }
949         // @todo: check for the case when an invalid ticket is returned
950         if(ticket.TN == 0){
951                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
952                 return false;
953         }
954                 
955         // setting the ticket number in the envelope
956         env->TN = ticket.TN;
957         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
958         return true;
959
960 };
961
962 bool fault_aware(CkObjID &recver){
963         switch(recver.type){
964                 case TypeChare:
965                         return false;
966                 case TypeMainChare:
967                         return false;
968                 case TypeGroup:
969                 case TypeNodeGroup:
970                 case TypeArray:
971                         return true;
972                 default:
973                         return false;
974         }
975 };
976
977 /* Preprocesses a received message */
978 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
979         DEBUG_NOW(char recverString[100]);
980         DEBUG_NOW(char senderString[100]);
981         DEBUG_MEM(CmiMemoryCheck());
982         int flag;
983         bool ticketSuccess;
984
985         // getting the receiver object
986         CkObjID recver = env->recver;
987         if(!fault_aware(recver))
988                 return 1;
989
990         Chare *obj = (Chare *)recver.getObject();
991         *objPointer = obj;
992         if(obj == NULL){
993                 int possiblePE = recver.guessPE();
994                 if(possiblePE != CkMyPe()){
995                         int totalSize = env->getTotalsize();
996                         CmiSyncSend(possiblePE,totalSize,(char *)env);
997                         
998                         DEBUG(printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
999                 }else{
1000                         // this is the case where a message is received and the object has not been initialized
1001                         // we delayed the delivery of the message
1002                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1003                         
1004                         DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1005                 }
1006                 return 0;
1007         }
1008
1009         // checking if message comes from an old incarnation
1010         // message must be discarded
1011         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1012                 CmiFree(env);
1013                 return 0;
1014         }
1015
1016         DEBUG_MEM(CmiMemoryCheck());
1017         DEBUG(printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1018
1019         // getting a ticket for this message
1020         ticketSuccess = _getTicket(env,&flag);
1021
1022         // we might be during recovery when this message arrives
1023         if(!ticketSuccess){
1024                 
1025                 // adding the message to a delay queue
1026                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1027                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1028
1029                 return 0;
1030         }
1031         
1032         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1033         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1034         
1035         if(flag == NEW_TICKET){
1036                 // storing determinant of message in data structure
1037                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1038         }
1039
1040         DEBUG_MEM(CmiMemoryCheck());
1041
1042         double _startTime = CkWallTimer();
1043 //env->sender.updatePosition(env->getSrcPe());
1044         if(env->TN == obj->mlogData->tProcessed+1){
1045                 //the message that needs to be processed now
1046                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1047                 // once we find a message that we can process we put back all the messages in the out of order queue
1048                 // back into the main scheduler queue. 
1049         DEBUG_MEM(CmiMemoryCheck());
1050                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1051                         void *qMsgPtr;
1052                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1053                         envelope *qEnv = (envelope *)qMsgPtr;
1054                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1055         DEBUG_MEM(CmiMemoryCheck());
1056                 }
1057 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1058                 //TODO: this might be a problem.. change made for leanMD
1059 //              CpvAccess(_currentObj) = obj;
1060         DEBUG_MEM(CmiMemoryCheck());
1061                 return 1;
1062         }
1063
1064         // checking if message has already been processed
1065         // message must be discarded
1066         if(env->TN <= obj->mlogData->tProcessed){
1067                 DEBUG(printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1068                 
1069                 CmiFree(env);
1070                 return 0;
1071         }
1072         //message that needs to be processed in the future
1073
1074         DEBUG(printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1075         //the message cant be processed now put it back in the out of order message Q, 
1076         //It will be transferred to the main queue later
1077         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1078 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1079         DEBUG_MEM(CmiMemoryCheck());
1080         
1081         return 0;
1082 }
1083
1084 /**
1085  * @brief Updates a few variables once a message has been processed.
1086  */
1087 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1088         DEBUG(char senderString[100]);
1089         if(obj){
1090                 if(sender.guessPE() == CkMyPe()){
1091                         if(entry != NULL){
1092                                 entry->env = NULL;
1093                         }
1094                 }
1095                 obj->mlogData->tProcessed++;
1096 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1097                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1098 //              CpvAccess(_currentObj)= NULL;
1099         }
1100         DEBUG_MEM(CmiMemoryCheck());
1101 }
1102
1103 /***
1104         Helpers for the handlers and message logging methods
1105 ***/
1106
1107 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1108 //      double _startTime = CkWallTimer();
1109         if(env->recver.type != TypeNodeGroup){
1110         //This repeats a step performed in skipCldEnq for messages sent to
1111         //other processors. I do this here so that messages to local processors
1112         //undergo the same transformation.. It lets the resend be uniform for 
1113         //all messages
1114 //              CmiSetXHandler(env,CmiGetHandler(env));
1115                 _skipCldEnqueue(destPE,env,_infoIdx);
1116         }else{
1117                 _noCldNodeEnqueue(destPE,env);
1118         }
1119 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1120 }
1121 //extern "C" int CmiGetNonLocalLength();
1122 /** This method is used to retry the ticket requests
1123  * that had been queued up earlier
1124  * */
1125
1126 int calledRetryTicketRequest=0;
1127
1128 void _pingHandler(CkPingMsg *msg){
1129         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1130         CmiFree(msg);
1131 }
1132
1133
1134 /*****************************************************************************
1135         Checkpointing methods..
1136                 Pack all the data on a processor and send it to the buddy periodically
1137                 Also used to throw away message logs
1138 *****************************************************************************/
1139 CkVec<TProcessedLog> processedTicketLog;
1140 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1141 void clearUpMigratedRetainedLists(int PE);
1142
1143 void checkpointAlarm(void *_dummy,double curWallTime){
1144         double diff = curWallTime-lastCompletedAlarm;
1145         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1146 /*      if(CkWallTimer()-lastRestart < 50){
1147                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1148                 return;
1149         }*/
1150         if(diff < ((chkptPeriod) - 2)){
1151                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1152                 return;
1153         }
1154         CheckpointRequest request;
1155         request.PE = CkMyPe();
1156         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1157         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1158 };
1159
1160 void _checkpointRequestHandler(CheckpointRequest *request){
1161         startMlogCheckpoint(NULL,CmiWallTimer());
1162 }
1163
1164 /**
1165  * @brief Starts the checkpoint phase after migration.
1166  */
1167 void startMlogCheckpoint(void *_dummy, double curWallTime){
1168         double _startTime = CkWallTimer();
1169
1170         // increasing the checkpoint counter
1171         checkpointCount++;
1172         
1173 #if DEBUG_CHECKPOINT
1174         if(CmiMyPe() == 0){
1175                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1176         }
1177 #endif
1178
1179         DEBUG_MEM(CmiMemoryCheck());
1180
1181         PUP::sizer psizer;
1182         psizer | checkpointCount;
1183         for(int i=0; i<CmiNumPes(); i++){
1184                 psizer | CpvAccess(_incarnation)[i];
1185         }
1186         CkPupROData(psizer);
1187         DEBUG_MEM(CmiMemoryCheck());
1188         CkPupGroupData(psizer,CmiTrue);
1189         DEBUG_MEM(CmiMemoryCheck());
1190         CkPupNodeGroupData(psizer,CmiTrue);
1191         DEBUG_MEM(CmiMemoryCheck());
1192         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1193         DEBUG_MEM(CmiMemoryCheck());
1194
1195         int dataSize = psizer.size();
1196         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1197         char *msg = (char *)CmiAlloc(totalSize);
1198         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1199         chkMsg->PE = CkMyPe();
1200         chkMsg->dataSize = dataSize;
1201         
1202         char *buf = &msg[sizeof(CheckPointDataMsg)];
1203         PUP::toMem pBuf(buf);
1204
1205         pBuf | checkpointCount;
1206         for(int i=0; i<CmiNumPes(); i++){
1207                 pBuf | CpvAccess(_incarnation)[i];
1208         }
1209         CkPupROData(pBuf);
1210         CkPupGroupData(pBuf,CmiTrue);
1211         CkPupNodeGroupData(pBuf,CmiTrue);
1212         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1213
1214         unAckedCheckpoint=1;
1215         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1216         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1217         
1218         /*
1219                 Store the highest Ticket number processed for each chare on this processor
1220         */
1221         processedTicketLog.removeAll();
1222         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1223
1224 #if DEBUG_CHECKPOINT
1225         if(CmiMyPe() == 0){
1226                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1227         }
1228 #endif
1229
1230 #if COLLECT_STATS_MEMORY
1231         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1232         msgLogSize = 0;
1233         bufferedDetsSize = 0;
1234         storedDetsSize = 0;
1235 #endif
1236
1237         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1238                 lastCompletedAlarm = curWallTime;
1239                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1240         }
1241         traceUserBracketEvent(28,_startTime,CkWallTimer());
1242 };
1243
1244 /**
1245  * @brief A chare adds the latest ticket number processed.
1246  */
1247 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1248         DEBUG(char objString[100]);
1249
1250         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1251         TProcessedLog logEntry;
1252         logEntry.recver = mlogData->objID;
1253         logEntry.tProcessed = mlogData->tProcessed;
1254         log->push_back(logEntry);
1255
1256         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1257 }
1258
1259 class ElementPacker : public CkLocIterator {
1260 private:
1261         CkLocMgr *locMgr;
1262         PUP::er &p;
1263 public:
1264         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1265         void addLocation(CkLocation &loc) {
1266                 CkArrayIndexMax idx=loc.getIndex();
1267                 CkGroupID gID = locMgr->ckGetGroupID();
1268                 p|gID;      // store loc mgr's GID as well for easier restore
1269                 p|idx;
1270                 p|loc;
1271         }
1272 };
1273
1274 /**
1275  * Pups all the array elements in this processor.
1276  */
1277 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1278         int numElements,i;
1279         int numGroups = CkpvAccess(_groupIDTable)->size();      
1280         if(!p.isUnpacking()){
1281                 numElements = CkCountArrayElements();
1282         }       
1283         p | numElements;
1284         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1285         if(!p.isUnpacking()){
1286                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1287         }else{
1288                 //Flush all recs of all LocMgrs before putting in new elements
1289 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1290                 for(int j=0;j<listsize;j++){
1291                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1292                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1293                                 listToSkip[j].idx.print();
1294                         }
1295                 }
1296                 
1297                 printf("numElements = %d\n",numElements);
1298         
1299                 for (int i=0; i<numElements; i++) {
1300                         CkGroupID gID;
1301                         CkArrayIndexMax idx;
1302                         p|gID;
1303                 p|idx;
1304                         int flag=0;
1305                         int matchedIdx=0;
1306                         for(int j=0;j<listsize;j++){
1307                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1308                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1309                                                 matchedIdx = j;
1310                                                 flag = 1;
1311                                                 break;
1312                                         }
1313                                 }
1314                         }
1315                         if(flag == 1){
1316                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1317                         }else{
1318                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1319                         }
1320                                 
1321                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1322                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1323                         mgr->resume(idx,p,create,flag);
1324                         if(flag == 1){
1325                                 int homePE = mgr->homePe(idx);
1326                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1327                         }
1328                 }
1329         }
1330 };
1331
1332
1333 void writeCheckpointToDisk(int size,char *chkpt){
1334         char fNameTemp[100];
1335         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1336         int fd = creat(fNameTemp,S_IRWXU);
1337         int ret = write(fd,chkpt,size);
1338         CkAssert(ret == size);
1339         close(fd);
1340         
1341         char fName[100];
1342         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1343         unlink(fName);
1344
1345         rename(fNameTemp,fName);
1346 }
1347
1348 //handler that receives the checkpoint from a processor
1349 //it stores it and acks it
1350 void _storeCheckpointHandler(char *msg){
1351         double _startTime=CkWallTimer();
1352                 
1353         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1354         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1355         
1356         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1357         
1358         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1359         if(oldChkpt != NULL){
1360                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1361                 CmiFree(oldmsg);
1362         }
1363         //turning off storing checkpoints
1364         
1365         int sendingPE = chkMsg->PE;
1366         
1367         CpvAccess(_storedCheckpointData)->buf = chkpt;
1368         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1369         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1370
1371         int count=0;
1372         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1373                 if(migratedNoticeList[j].fromPE == sendingPE){
1374                         migratedNoticeList[j].ackFrom = 1;
1375                 }else{
1376                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1377                 }
1378                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1379                         migratedNoticeList.remove(j);
1380                         count++;
1381                 }
1382                 
1383         }
1384         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1385         
1386         CheckPointAck ackMsg;
1387         ackMsg.PE = CkMyPe();
1388         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1389         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1390         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1391         
1392         traceUserBracketEvent(29,_startTime,CkWallTimer());
1393 };
1394
1395
1396 /**
1397  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1398  * @note The remove log request message looks like
1399                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1400  */
1401 void sendRemoveLogRequests(){
1402 #if SYNCHRONIZED_CHECKPOINT
1403         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1404 #endif
1405         double _startTime = CkWallTimer();      
1406
1407         // computing total message size
1408         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1409         char *requestMsg = (char *)CmiAlloc(totalSize);
1410
1411         // filling up the message
1412         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1413         request->PE = CkMyPe();
1414         request->numberObjects = processedTicketLog.size();
1415         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1416         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1417         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1418         int *numDeterminants = (int *)listDeterminants;
1419         numDeterminants[0] = 0; 
1420         listDeterminants = (char *)&numDeterminants[1];
1421
1422         // setting the handler in the message
1423         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1424         
1425         DEBUG_MEM(CmiMemoryCheck());
1426         for(int i=0;i<CkNumPes();i++){
1427                 CmiSyncSend(i,totalSize,requestMsg);
1428         }
1429         CmiFree(requestMsg);
1430
1431         clearUpMigratedRetainedLists(CmiMyPe());
1432         //TODO: clear ticketTable
1433         
1434         traceUserBracketEvent(30,_startTime,CkWallTimer());
1435
1436         DEBUG_MEM(CmiMemoryCheck());
1437 }
1438
1439
1440 void _checkpointAckHandler(CheckPointAck *ackMsg){
1441         DEBUG_MEM(CmiMemoryCheck());
1442         unAckedCheckpoint=0;
1443         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1444         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1445         if(onGoingLoadBalancing){
1446                 onGoingLoadBalancing = 0;
1447                 finishedCheckpointLoadBalancing();
1448         }else{
1449                 sendRemoveLogRequests();
1450         }
1451         CmiFree(ackMsg);
1452         
1453 };
1454
1455
1456 /**
1457  * @brief Inserts all the determinants into a hash table.
1458  */
1459 inline void populateDeterminantTable(char *data){
1460         DEBUG(char recverString[100]);
1461         DEBUG(char senderString[100]);
1462         int numDets, *numDetsPtr;
1463         Determinant *detList;
1464         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1465         SNToTicket *tickets;
1466         Ticket ticket;
1467
1468         RemoveLogRequest *request = (RemoveLogRequest *)data;
1469         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1470         
1471         numDetsPtr = (int *)&list[request->numberObjects];
1472         numDets = numDetsPtr[0];
1473         detList = (Determinant *)&numDetsPtr[1];
1474
1475         // inserting determinants into hashtable
1476         for(int i=0; i<numDets; i++){
1477                 table = detTable.get(detList[i].sender);
1478                 if(table == NULL){
1479                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1480                         detTable.put(detList[i].sender) = table;
1481                 }
1482                 tickets = table->get(detList[i].receiver);
1483                 if(tickets == NULL){
1484                         tickets = new SNToTicket();
1485                         table->put(detList[i].receiver) = tickets; 
1486                 }
1487                 ticket.TN = detList[i].TN;
1488                 tickets->put(detList[i].SN) = ticket;
1489         }
1490
1491         DEBUG_MEM(CmiMemoryCheck());    
1492
1493 }
1494
1495 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1496         int total;
1497         DEBUG(char nameString[100]);
1498         DEBUG_MEM(CmiMemoryCheck());
1499         CmiMemoryCheck();
1500         char *data = (char *)_data;
1501         RemoveLogRequest *request = (RemoveLogRequest *)data;
1502         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1503         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1504         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1505         SNToTicket *tickets;
1506         MCount TN;
1507
1508         int count=0;
1509         total = mlog->length();
1510         for(int i=0; i<total; i++){
1511                 MlogEntry *logEntry = mlog->deq();
1512
1513                 // looking message in determinant table
1514                 table = detTable.get(logEntry->env->sender);
1515                 if(table != NULL){
1516                         tickets = table->get(logEntry->env->recver);
1517                         if(tickets != NULL){
1518                                 TN = tickets->get(logEntry->env->SN).TN;
1519                                 if(TN != 0){
1520                                         logEntry->env->TN = TN;
1521                                 }
1522                         }
1523                 }
1524         
1525                 int match=0;
1526                 for(int j=0;j<request->numberObjects;j++){
1527                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1528                                 //this log Entry should be removed
1529                                 match = 1;
1530                                 break;
1531                         }
1532                 }
1533                 char senderString[100],recverString[100];
1534 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1535                 if(match){
1536                         count++;
1537                         delete logEntry;
1538                 }else{
1539                         mlog->enq(logEntry);
1540                 }
1541         }
1542         if(count > 0){
1543                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1544         }
1545         DEBUG_MEM(CmiMemoryCheck());
1546         CmiMemoryCheck();
1547 }
1548
1549 /**
1550  * @brief Removes messages in the log according to the received ticket numbers
1551  */
1552 void _removeProcessedLogHandler(char *requestMsg){
1553         double start = CkWallTimer();
1554
1555         // building map for determinants
1556         populateDeterminantTable(requestMsg);
1557
1558         // removing processed messages from the message log
1559         forAllCharesDo(removeProcessedLogs,requestMsg);
1560
1561         // @todo: clean determinant table 
1562         
1563         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1564         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1565         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1566         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1567         /*
1568                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1569         */
1570         DEBUG_MEM(CmiMemoryCheck());
1571         clearUpMigratedRetainedLists(request->PE);
1572         
1573         traceUserBracketEvent(20,start,CkWallTimer());
1574         CmiFree(requestMsg);    
1575 };
1576
1577
1578 void clearUpMigratedRetainedLists(int PE){
1579         int count=0;
1580         CmiMemoryCheck();
1581         
1582         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1583                 if(migratedNoticeList[j].toPE == PE){
1584                         migratedNoticeList[j].ackTo = 1;
1585                 }
1586                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1587                         migratedNoticeList.remove(j);
1588                         count++;
1589                 }
1590         }
1591         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1592         
1593         for(int j=retainedObjectList.size()-1;j>=0;j--){
1594                 if(retainedObjectList[j]->migRecord.toPE == PE){
1595                         RetainedMigratedObject *obj = retainedObjectList[j];
1596                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1597                         retainedObjectList.remove(j);
1598                         if(obj->msg != NULL){
1599                                 CmiMemoryCheck();
1600                                 CmiFree(obj->msg);
1601                         }
1602                         delete obj;
1603                 }
1604         }
1605 }
1606
1607 /***************************************************************
1608         Restart Methods and handlers
1609 ***************************************************************/        
1610
1611 /**
1612  * Function for restarting the crashed processor.
1613  * It sets the restart flag and contacts the buddy
1614  * processor to get the latest checkpoint.
1615  */
1616 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1617         RestartRequest msg;
1618
1619         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1620
1621         // setting the restart flag
1622         _restartFlag = 1;
1623         _numRestartResponses = 0;
1624
1625         // if we are using team-based message logging, all members of the group have to be restarted
1626         if(teamSize > 1){
1627                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1628                         if(i != CkMyPe() && i < CkNumPes()){
1629                                 // sending a message to the team member
1630                                 msg.PE = CkMyPe();
1631                             CmiSetHandler(&msg,_restartHandlerIdx);
1632                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1633                         }
1634                 }
1635         }
1636
1637         // requesting the latest checkpoint from its buddy
1638         msg.PE = CkMyPe();
1639         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1640         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1641 };
1642
1643 /**
1644  * Function to restart this processor.
1645  * The handler is invoked by a member of its same team in message logging.
1646  */
1647 void _restartHandler(RestartRequest *restartMsg){
1648         int i;
1649         int numGroups = CkpvAccess(_groupIDTable)->size();
1650         RestartRequest msg;
1651         
1652         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1653
1654     // setting the restart flag
1655         _restartFlag = 1;
1656         _numRestartResponses = 0;
1657
1658         // flushing all buffers
1659         //TEST END
1660 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1661         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1662         for(int i=0;i<numGroups;i++){
1663         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1664                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1665                 obj->flushStates();
1666                 obj->ckJustMigrated();
1667         }*/
1668
1669     // requesting the latest checkpoint from its buddy
1670         msg.PE = CkMyPe();
1671         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1672         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1673 }
1674
1675
1676 /**
1677  * Gets the stored checkpoint but calls another function in the sender.
1678  */
1679 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1680
1681         // retrieving the stored checkpoint
1682         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1683
1684         // making sure it is its buddy who is requesting the checkpoint
1685         CkAssert(restartMsg->PE == storedChkpt->PE);
1686
1687         storedRequest = restartMsg;
1688         verifyAckTotal = 0;
1689
1690         for(int i=0;i<migratedNoticeList.size();i++){
1691                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1692 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1693                         if(migratedNoticeList[i].ackFrom == 0){
1694                                 //need to verify if the object actually exists .. it might not
1695                                 //have been acked but it might exist on it
1696                                 VerifyAckMsg msg;
1697                                 msg.migRecord = migratedNoticeList[i];
1698                                 msg.index = i;
1699                                 msg.fromPE = CmiMyPe();
1700                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1701                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1702                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1703                                 verifyAckTotal++;
1704                         }
1705                 }
1706         }
1707
1708         // sending the checkpoint back to its buddy     
1709         if(verifyAckTotal == 0){
1710                 sendCheckpointData(MLOG_RESTARTED);
1711         }
1712         verifyAckCount = 0;
1713 }
1714
1715 /**
1716  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1717  */
1718 void _recvRestartCheckpointHandler(char *_restartData){
1719         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1720         MigrationRecord *migratedAwayElements;
1721
1722         globalLBID = restartData->lbGroupID;
1723         
1724         restartData->restartWallTime *= 1000;
1725         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1726         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1727         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1728
1729         
1730         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1731         char *buf = &_restartData[sizeof(RestartProcessorData)];
1732         
1733         if(restartData->numMigratedAwayElements != 0){
1734                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1735                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1736                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1737                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1738         }
1739
1740         // turning on the team recovery flag
1741         forAllCharesDo(setTeamRecovery,NULL);
1742         
1743         PUP::fromMem pBuf(buf);
1744         pBuf | checkpointCount;
1745         for(int i=0; i<CmiNumPes(); i++){
1746                 pBuf | CpvAccess(_incarnation)[i];
1747         }
1748         CkPupROData(pBuf);
1749         CkPupGroupData(pBuf,CmiFalse);
1750         CkPupNodeGroupData(pBuf,CmiFalse);
1751         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1752         CkAssert(pBuf.size() == restartData->checkPointSize);
1753         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1754
1755         // increasing the incarnation number of all the team members
1756         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1757                 CpvAccess(_incarnation)[i]++;
1758         }
1759         
1760         // turning off the team recovery flag
1761         forAllCharesDo(unsetTeamRecovery,NULL);
1762
1763         // initializing a few variables for handling local messages
1764         forAllCharesDo(initializeRestart,NULL);
1765         
1766         CmiFree(_restartData);  
1767
1768         /*HERE _initDone();
1769
1770         getGlobalStep(globalLBID);
1771         
1772         countUpdateHomeAcks = 0;
1773         RestartRequest updateHomeRequest;
1774         updateHomeRequest.PE = CmiMyPe();
1775         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1776         for(int i=0;i<CmiNumPes();i++){
1777                 if(i != CmiMyPe()){
1778                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1779                 }
1780         }
1781 */
1782
1783
1784         // Send out the request to resend logged messages to all other processors
1785         CkVec<TProcessedLog> objectVec;
1786         forAllCharesDo(createObjIDList, (void *)&objectVec);
1787         int numberObjects = objectVec.size();
1788         
1789         /*
1790                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1791         */
1792         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1793         char *resendMsg = (char *)CmiAlloc(totalSize);  
1794
1795         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1796         resendReq->PE =CkMyPe(); 
1797         resendReq->numberObjects = numberObjects;
1798         char *objList = &resendMsg[sizeof(ResendRequest)];
1799         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1800         
1801
1802         /* test for parallel restart migrate away object**/
1803 //      if(fastRecovery){
1804 //              distributeRestartedObjects();
1805 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1806 //      }
1807         
1808         /*      To make restart work for load balancing.. should only
1809         be used when checkpoint happens along with load balancing
1810         **/
1811 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1812
1813         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1814         CpvAccess(_currentObj) = lb;
1815         lb->ReceiveDummyMigration(restartDecisionNumber);
1816
1817         sleep(10);
1818         
1819         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1820         for(int i=0;i<CkNumPes();i++){
1821                 if(i != CkMyPe()){
1822                         CmiSyncSend(i,totalSize,resendMsg);
1823                 }       
1824         }
1825         _resendMessagesHandler(resendMsg);
1826
1827 }
1828
1829
1830 void CkMlogRestartDouble(void *,double){
1831         CkMlogRestart(NULL,NULL);
1832 };
1833
1834 //TML: restarting from local (group) failure
1835 void CkMlogRestartLocal(){
1836     CkMlogRestart(NULL,NULL);
1837 };
1838
1839 /**
1840  * Gets the stored checkpoint for its buddy processor.
1841  */
1842 void _getCheckpointHandler(RestartRequest *restartMsg){
1843
1844         // retrieving the stored checkpoint
1845         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1846
1847         // making sure it is its buddy who is requesting the checkpoint
1848         CkAssert(restartMsg->PE == storedChkpt->PE);
1849
1850         storedRequest = restartMsg;
1851         verifyAckTotal = 0;
1852
1853         for(int i=0;i<migratedNoticeList.size();i++){
1854                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1855 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1856                         if(migratedNoticeList[i].ackFrom == 0){
1857                                 //need to verify if the object actually exists .. it might not
1858                                 //have been acked but it might exist on it
1859                                 VerifyAckMsg msg;
1860                                 msg.migRecord = migratedNoticeList[i];
1861                                 msg.index = i;
1862                                 msg.fromPE = CmiMyPe();
1863                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1864                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1865                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1866                                 verifyAckTotal++;
1867                         }
1868                 }
1869         }
1870
1871         // sending the checkpoint back to its buddy     
1872         if(verifyAckTotal == 0){
1873                 sendCheckpointData(MLOG_CRASHED);
1874         }
1875         verifyAckCount = 0;
1876 }
1877
1878
1879 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1880         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1881         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1882         if(rec != NULL && rec->type() == CkLocRec::local){
1883                         //this location exists on this processor
1884                         //and needs to be removed       
1885                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1886                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1887                         
1888                         int localIdx = localRec->getLocalIndex();
1889                         LBDatabase *lbdb = localRec->getLBDB();
1890                         LDObjHandle ldHandle = localRec->getLdHandle();
1891                                 
1892                         locMgr->setDuringMigration(true);
1893                         
1894                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1895                         lbdb->UnregisterObj(ldHandle);
1896                         
1897                         locMgr->setDuringMigration(false);
1898                         
1899                         verifyAckedRequests++;
1900
1901         }
1902         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1903         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1904 };
1905
1906
1907 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1908         int index =     verifyReply->index;
1909         migratedNoticeList[index] = verifyReply->migRecord;
1910         verifyAckCount++;
1911         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1912         if(verifyAckCount == verifyAckTotal){
1913                 sendCheckpointData(MLOG_CRASHED);
1914         }
1915 }
1916
1917
1918 /**
1919  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1920  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1921  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1922  */
1923 void sendCheckpointData(int mode){      
1924         RestartRequest *restartMsg = storedRequest;
1925         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1926         int numMigratedAwayElements = migratedNoticeList.size();
1927         if(migratedNoticeList.size() != 0){
1928                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1929 //                      CkAssert(migratedNoticeList.size() == 0);
1930         }
1931         
1932         
1933         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1934         
1935         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1936         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1937         
1938         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1939         
1940         char *msg = (char *)CmiAlloc(totalSize);
1941         
1942         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1943         dataMsg->PE = CkMyPe();
1944         dataMsg->restartWallTime = CmiTimer();
1945         dataMsg->checkPointSize = storedChkpt->bufSize;
1946         
1947         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1948 //      dataMsg->numMigratedAwayElements = 0;
1949         
1950         dataMsg->numMigratedInElements = 0;
1951         dataMsg->migratedElementSize = 0;
1952         dataMsg->lbGroupID = globalLBID;
1953         /*msg layout 
1954                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1955                 Local MessageLog|
1956         */
1957         //store checkpoint data
1958         char *buf = &msg[sizeof(RestartProcessorData)];
1959
1960         if(dataMsg->numMigratedAwayElements != 0){
1961                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1962                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1963         }
1964         
1965
1966         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1967         buf = &buf[storedChkpt->bufSize];
1968
1969         if(mode == MLOG_RESTARTED){
1970                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
1971                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1972                 CmiFree(restartMsg);
1973         }else{
1974                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1975                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1976                 CmiFree(restartMsg);
1977         }
1978 };
1979
1980
1981 // this list is used to create a vector of the object ids of all
1982 //the chares on this processor currently and the highest TN processed by them 
1983 //the first argument is actually a CkVec<TProcessedLog> *
1984 void createObjIDList(void *data,ChareMlogData *mlogData){
1985         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1986         TProcessedLog entry;
1987         entry.recver = mlogData->objID;
1988         entry.tProcessed = mlogData->tProcessed;
1989         list->push_back(entry);
1990         DEBUG_TEAM(char objString[100]);
1991         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1992         DEBUG_RECOVERY(printLog(&entry));
1993 }
1994
1995
1996 /**
1997  * Receives the checkpoint data from its buddy, restores the state of all the objects
1998  * and asks everyone else to update its home.
1999  */
2000 void _recvCheckpointHandler(char *_restartData){
2001         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2002         MigrationRecord *migratedAwayElements;
2003
2004         globalLBID = restartData->lbGroupID;
2005         
2006         restartData->restartWallTime *= 1000;
2007         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2008         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2009         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2010
2011         
2012         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2013         char *buf = &_restartData[sizeof(RestartProcessorData)];
2014         
2015         if(restartData->numMigratedAwayElements != 0){
2016                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2017                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2018                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2019                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2020         }
2021         
2022         PUP::fromMem pBuf(buf);
2023
2024         pBuf | checkpointCount;
2025         for(int i=0; i<CmiNumPes(); i++){
2026                 pBuf | CpvAccess(_incarnation)[i];
2027         }
2028         CkPupROData(pBuf);
2029         CkPupGroupData(pBuf,CmiTrue);
2030         CkPupNodeGroupData(pBuf,CmiTrue);
2031         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2032         CkAssert(pBuf.size() == restartData->checkPointSize);
2033         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2034
2035         // increases the incarnation number
2036         CpvAccess(_incarnation)[CmiMyPe()]++;
2037         
2038         forAllCharesDo(initializeRestart,NULL);
2039         
2040         CmiFree(_restartData);
2041         
2042         _initDone();
2043
2044         getGlobalStep(globalLBID);
2045         
2046         countUpdateHomeAcks = 0;
2047         RestartRequest updateHomeRequest;
2048         updateHomeRequest.PE = CmiMyPe();
2049         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2050         for(int i=0;i<CmiNumPes();i++){
2051                 if(i != CmiMyPe()){
2052                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2053                 }
2054         }
2055
2056 }
2057
2058 /**
2059  * Receives the updateHome ACKs from all other processors. Once everybody
2060  * has replied, it sends a request to resend the logged messages.
2061  */
2062 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2063         countUpdateHomeAcks++;
2064         CmiFree(updateHomeAck);
2065         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2066         if(countUpdateHomeAcks != CmiNumPes()){
2067                 return;
2068         }
2069
2070         // Send out the request to resend logged messages to all other processors
2071         CkVec<TProcessedLog> objectVec;
2072         forAllCharesDo(createObjIDList, (void *)&objectVec);
2073         int numberObjects = objectVec.size();
2074         
2075         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2076         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2077         char *resendMsg = (char *)CmiAlloc(totalSize);  
2078
2079         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2080         resendReq->PE =CkMyPe(); 
2081         resendReq->numberObjects = numberObjects;
2082         char *objList = &resendMsg[sizeof(ResendRequest)];
2083         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2084
2085         /* test for parallel restart migrate away object**/
2086         if(fastRecovery){
2087                 distributeRestartedObjects();
2088                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2089         }
2090         
2091         /*      To make restart work for load balancing.. should only
2092         be used when checkpoint happens along with load balancing
2093         **/
2094 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2095
2096         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2097         CpvAccess(_currentObj) = lb;
2098         lb->ReceiveDummyMigration(restartDecisionNumber);
2099
2100 //HERE  sleep(10);
2101         
2102         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2103         for(int i=0;i<CkNumPes();i++){
2104                 if(i != CkMyPe()){
2105                         CmiSyncSend(i,totalSize,resendMsg);
2106                 }
2107         }
2108         _resendMessagesHandler(resendMsg);
2109         CmiFree(resendMsg);
2110
2111 };
2112
2113 /**
2114  * @brief Initializes variables and flags for restarting procedure.
2115  */
2116 void initializeRestart(void *data, ChareMlogData *mlogData){
2117         mlogData->resendReplyRecvd = 0;
2118         mlogData->receivedTNs = new CkVec<MCount>;
2119         mlogData->restartFlag = 1;
2120 };
2121
2122 /**
2123  * Updates the homePe of chare array elements.
2124  */
2125 void updateHomePE(void *data,ChareMlogData *mlogData){
2126         RestartRequest *updateRequest = (RestartRequest *)data;
2127         int PE = updateRequest->PE; //restarted PE
2128         //if this object is an array Element and its home is the restarted processor
2129         // the home processor needs to know its current location
2130         if(mlogData->objID.type == TypeArray){
2131                 //it is an array element
2132                 CkGroupID myGID = mlogData->objID.data.array.id;
2133                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2134                 CkArrayID aid(mlogData->objID.data.array.id);           
2135                 //check if the restarted processor is the home processor for this object
2136                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2137                 if(locMgr->homePe(myIdx) == PE){
2138                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2139                         DEBUG_RESTART(myIdx.print());
2140                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2141                 }
2142         }
2143 };
2144
2145
2146 /**
2147  * Updates the homePe for all chares in this processor.
2148  */
2149 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2150         int sender = updateRequest->PE;
2151         
2152         forAllCharesDo(updateHomePE,updateRequest);
2153         
2154         updateRequest->PE = CmiMyPe();
2155         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2156         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2157         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2158                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2159                 checkpointCount--;
2160                 startMlogCheckpoint(NULL,0);
2161         }
2162         if(sender == getCheckPointPE()){
2163                 for(int i=0;i<retainedObjectList.size();i++){
2164                         if(retainedObjectList[i]->acked == 0){
2165                                 MigrationNotice migMsg;
2166                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2167                                 migMsg.record = retainedObjectList[i];
2168                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2169                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2170                         }
2171                 }
2172         }
2173 }
2174
2175 /**
2176  * @brief Fills up the ticket vector for each chare.
2177  */
2178 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2179         DEBUG(char name[100]);
2180         ResendData *resendData = (ResendData *)data;
2181         int PE = resendData->PE; //restarted PE
2182         int count=0;
2183         CkHashtableIterator *iterator;
2184         void *objp;
2185         void *objkey;
2186         CkObjID *objID;
2187         SNToTicket *snToTicket;
2188         Ticket ticket;
2189         
2190         // traversing the team table looking up for the maximum TN received     
2191         iterator = mlogData->teamTable.iterator();
2192         while( (objp = iterator->next(&objkey)) != NULL ){
2193                 objID = (CkObjID *)objkey;
2194         
2195                 // traversing the resendData structure to add ticket numbers
2196                 for(int j=0;j<resendData->numberObjects;j++){
2197                         if((*objID) == (resendData->listObjects)[j].recver){
2198                                 snToTicket = *(SNToTicket **)objp;
2199 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2200                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2201                                         ticket = snToTicket->get(snIndex);      
2202                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2203                                                 //store the TNs that have been since the recver last checkpointed
2204                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2205                                         }
2206                                 }
2207                         }
2208                 }
2209         }
2210
2211         //releasing the memory for the iterator
2212         delete iterator;
2213 }
2214
2215
2216 /**
2217  * @brief Turns on the flag for team recovery that selectively restores
2218  * particular metadata information.
2219  */
2220 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2221         char name[100];
2222         mlogData->teamRecoveryFlag = 1; 
2223 }
2224
2225 /**
2226  * @brief Turns off the flag for team recovery.
2227  */
2228 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2229         mlogData->teamRecoveryFlag = 0;
2230 }
2231
2232 /**
2233  * Prints a processed log.
2234  */
2235 void printLog(TProcessedLog *log){
2236         char recverString[100];
2237         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2238 }
2239
2240 /**
2241  * Prints information about a message.
2242  */
2243 void printMsg(envelope *env, const char* par){
2244         char senderString[100];
2245         char recverString[100];
2246         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2247 }
2248
2249 /**
2250  * Prints information about a determinant.
2251  */
2252 void printDet(Determinant *det, const char* par){
2253         char senderString[100];
2254         char recverString[100];
2255         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2256 }
2257
2258 /**
2259  * @brief Resends all the logged messages to a particular chare list.
2260  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2261  * @param mlogData a particular chare living in this processor.
2262  */
2263 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2264         DEBUG_RESTART(char nameString[100]);
2265         DEBUG_RESTART(char recverString[100]);
2266         DEBUG_RESTART(char senderString[100]);
2267
2268         ResendData *resendData = (ResendData *)data;
2269         int PE = resendData->PE; //restarted PE
2270         int count=0;
2271         int ticketRequests=0;
2272         CkQ<MlogEntry *> *log = mlogData->getMlog();
2273
2274         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2275
2276         // traversing the message log to see if we must resend a message        
2277         for(int i=0;i<log->length();i++){
2278                 MlogEntry *logEntry = (*log)[i];
2279                 
2280                 // if we sent out the logs of a local message to buddy and it crashed
2281                 //before acknowledging 
2282                 envelope *env = logEntry->env;
2283                 if(env == NULL){
2284                         continue;
2285                 }
2286         
2287                 // resend if type is not invalid        
2288                 if(env->recver.type != TypeInvalid){
2289                         for(int j=0;j<resendData->numberObjects;j++){
2290                                 if(env->recver == (resendData->listObjects)[j].recver){
2291                                         if(PE != CkMyPe()){
2292                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2293                                                 if(env->recver.type == TypeNodeGroup){
2294                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2295                                                 }else{
2296                                                         CmiSetHandler(env,CmiGetXHandler(env));
2297                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2298                                                 }
2299                                         }else{
2300                                                 envelope *copyEnv = copyEnvelope(env);
2301                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2302                                         }
2303                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2304                                         count++;
2305                                 }
2306                         }//end of for loop of objects
2307                         
2308                 }       
2309         }
2310         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2311 }
2312
2313 /**
2314  * Resends messages since last checkpoint to the list of objects included in the 
2315  * request. It also sends stored remote determinants to the particular failed PE.
2316  */
2317 void _resendMessagesHandler(char *msg){
2318         ResendData d;
2319         CkVec<Determinant> *detVec;
2320         ResendRequest *resendReq = (ResendRequest *)msg;
2321
2322         // building the reply message
2323         char *listObjects = &msg[sizeof(ResendRequest)];
2324         d.numberObjects = resendReq->numberObjects;
2325         d.PE = resendReq->PE;
2326         d.listObjects = (TProcessedLog *)listObjects;
2327         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2328         detVec = new CkVec<Determinant>[d.numberObjects];
2329
2330         //Check if any of the retained objects need to be recreated
2331         //If they have not been recreated on the restarted processor
2332         //they need to be recreated on this processor
2333         int count=0;
2334         for(int i=0;i<retainedObjectList.size();i++){
2335                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2336                         count++;
2337                         int recreate=1;
2338                         for(int j=0;j<d.numberObjects;j++){
2339                                 if(d.listObjects[j].recver.type != TypeArray ){
2340                                         continue;
2341                                 }
2342                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2343                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2344                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2345                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asChild()){
2346                                                 recreate = 0;
2347                                                 break;
2348                                         }
2349                                 }
2350                         }
2351                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2352                         if(recreate){
2353                                 donotCountMigration=1;
2354                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2355                                 donotCountMigration=0;
2356                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2357                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2358                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2359                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2360                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2361                                 CmiAssert(rec->type() == CkLocRec::local);
2362                                 CkVec<CkMigratable *> eltList;
2363                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2364                                 for(int j=0;j<eltList.size();j++){
2365                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2366                                                 CpvAccess(_currentObj) = eltList[j];
2367                                                 eltList[j]->ResumeFromSync();
2368                                         }
2369                                 }
2370                                 retainedObjectList[i]->msg=NULL;        
2371                         }
2372                 }
2373         }
2374         
2375         if(count > 0){
2376 //              CmiAbort("retainedObjectList for restarted processor not empty");
2377         }
2378         
2379         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2380
2381
2382         //TML: examines the origin processor to determine if it belongs to the same group.
2383         // In that case, it only returns the maximum ticket received for each object in the list.
2384         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2385                 forAllCharesDo(fillTicketForChare,&d);
2386         else
2387                 forAllCharesDo(resendMessageForChare,&d);
2388
2389         // adding the stored determinants to the resendReplyMsg
2390         // traversing all the stored determinants
2391         CkVec<Determinant> *vec;
2392         for(int i=0; i<d.numberObjects; i++){
2393                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2394                 if(vec != NULL){
2395                         for(int j=0; j<vec->size(); j++){
2396
2397                                 // only relevant determinants are to be sent
2398                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2399
2400                                         // adding the ticket in the ticket vector
2401                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2402
2403                                         // adding the determinant in the determinant vector
2404                                         detVec[i].push_back((*vec)[j]);
2405
2406                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2407                                 }
2408                         }
2409                 }
2410         }
2411
2412         int totalDetStored = 0;
2413         for(int i=0;i<d.numberObjects;i++){
2414                 totalDetStored += detVec[i].size();
2415         }
2416
2417         //send back the maximum ticket number for a message sent to each object on the 
2418         //restarted processor
2419         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2420         
2421         int totalTNStored=0;
2422         for(int i=0;i<d.numberObjects;i++){
2423                 totalTNStored += d.ticketVecs[i].size();
2424         }
2425         
2426         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2427         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2428         
2429         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2430         resendReply->PE = CkMyPe();
2431         resendReply->numberObjects = d.numberObjects;
2432         
2433         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2434         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2435         for(int i=0;i<d.numberObjects;i++){
2436                 replyObjects[i] = d.listObjects[i].recver;
2437         }
2438         
2439         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2440         for(int i=0;i<d.numberObjects;i++){
2441                 int vecsize = d.ticketVecs[i].size();
2442                 memcpy(ticketList,&vecsize,sizeof(int));
2443                 ticketList = &ticketList[sizeof(int)];
2444                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2445                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2446         }
2447
2448         // adding the stored remote determinants to the message
2449         for(int i=0;i<d.numberObjects;i++){
2450                 int vecsize = detVec[i].size();
2451                 memcpy(ticketList,&vecsize,sizeof(int));
2452                 ticketList = &ticketList[sizeof(int)];
2453                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2454                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2455         }
2456
2457         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2458         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2459         
2460 /*      
2461         if(verifyAckRequestsUnacked){
2462                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2463                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2464                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2465                         LDObjHandle h;
2466                         lb->Migrated(h,1);
2467                 }
2468         }
2469         
2470         verifyAckRequestsUnacked=0;*/
2471         
2472         delete [] d.ticketVecs;
2473         delete [] detVec;
2474
2475         DEBUG_MEM(CmiMemoryCheck());
2476
2477         if(resendReq->PE != CkMyPe()){
2478                 CmiFree(msg);
2479         }       
2480 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2481         lastRestart = CmiWallTimer();
2482 }
2483
2484 void sortVec(CkVec<MCount> *TNvec);
2485 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2486
2487 /**
2488  * @brief Processes the messages in the delayed remote message queue
2489  */
2490 void processDelayedRemoteMsgQueue(){
2491         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2492         
2493         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2494                 void *qMsgPtr;
2495                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2496                 envelope *qEnv = (envelope *)qMsgPtr;
2497                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2498                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2499                 DEBUG_MEM(CmiMemoryCheck());
2500         }
2501
2502 }
2503
2504 /**
2505  * @brief Receives determinants stored on remote nodes.
2506  * Message format: |Header|ObjID list|TN list|Determinant list|
2507  * TN list = |number of TNs|list of TNs|...|
2508  */
2509 void _resendReplyHandler(char *msg){
2510         ResendRequest *resendReply = (ResendRequest *)msg;
2511         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2512         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2513         
2514 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2515         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2516         for(int i =0; i< resendReply->numberObjects;i++){
2517                 Chare *obj = (Chare *)listObjects[i].getObject();
2518                 
2519                 int vecsize;
2520                 memcpy(&vecsize,listTickets,sizeof(int));
2521                 listTickets = &listTickets[sizeof(int)];
2522                 MCount *listTNs = (MCount *)listTickets;
2523                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2524         
2525                 if(obj != NULL){
2526                         //the object was restarted on the processor on which it existed
2527                         processReceivedTN(obj,vecsize,listTNs);
2528                 }else{
2529                         //pack up objID vecsize and listTNs and send it to the correct processor
2530                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2531                         char *TNMsg = (char *)CmiAlloc(totalSize);
2532                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2533                         receivedTNData->recver = listObjects[i];
2534                         receivedTNData->numTNs = vecsize;
2535                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2536                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2537                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2538                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2539                 }
2540
2541         }
2542
2543         // traversing all the retrieved determinants
2544         for(int i = 0; i < resendReply->numberObjects; i++){
2545                 Chare *obj = (Chare *)listObjects[i].getObject();
2546                 
2547                 int vecsize;
2548                 memcpy(&vecsize,listTickets,sizeof(int));
2549                 listTickets = &listTickets[sizeof(int)];
2550                 Determinant *listDets = (Determinant *)listTickets;     
2551                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2552         
2553                 if(obj != NULL){
2554                         //the object was restarted on the processor on which it existed
2555                         processReceivedDet(obj,vecsize,listDets);
2556                 } else {
2557                         // pack the determinants and ship them to the other processor
2558                         // pack up objID vecsize and listDets and send it to the correct processor
2559                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2560                         char *detMsg = (char *)CmiAlloc(totalSize);
2561                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2562                         receivedDetData->recver = listObjects[i];
2563                         receivedDetData->numDets = vecsize;
2564                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2565                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2566                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2567                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2568                 }
2569
2570         }
2571
2572         // checking if the restart is over
2573         _numRestartResponses++;
2574         if(_numRestartResponses == CkNumPes()){
2575                 _numRestartResponses = 0;
2576                 processDelayedRemoteMsgQueue();
2577         }
2578
2579 };
2580
2581 /**
2582  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2583  */
2584 void _receivedDetDataHandler(ReceivedDetData *msg){
2585         DEBUG_NOW(char objName[100]);
2586         Chare *obj = (Chare *) msg->recver.getObject();
2587         if(obj){                
2588                 char *_msg = (char *)msg;
2589                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2590                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2591                 processReceivedDet(obj,msg->numDets,listDets);
2592                 CmiFree(msg);
2593         }else{
2594                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2595                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2596         }
2597 }
2598
2599 /**
2600  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2601  */
2602 void _receivedTNDataHandler(ReceivedTNData *msg){
2603         DEBUG_NOW(char objName[100]);
2604         Chare *obj = (Chare *) msg->recver.getObject();
2605         if(obj){                
2606                 char *_msg = (char *)msg;
2607                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2608                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2609                 processReceivedTN(obj,msg->numTNs,listTNs);
2610                 CmiFree(msg);
2611         }else{
2612                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2613                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2614         }
2615 };
2616
2617 /**
2618  * @brief Processes the received list of determinants from a particular PE.
2619  */
2620 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2621         Determinant *det;
2622
2623         // traversing the whole list of determinants
2624         for(int i=0; i<listSize; i++){
2625                 det = &listDets[i];
2626                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2627                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2628         }
2629         
2630         DEBUG_MEM(CmiMemoryCheck());
2631 }
2632         
2633 /**
2634  * @brief Processes the received list of tickets from a particular PE.
2635  */
2636 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2637         // increases the number of resendReply received
2638         obj->mlogData->resendReplyRecvd++;
2639
2640         DEBUG(char objName[100]);
2641         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2642         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2643         //if(obj->mlogData->receivedTNs == NULL)
2644         //      CkPrintf("NULL\n");     
2645         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2646
2647         // includes the tickets into the receivedTN structure
2648         for(int j=0;j<listSize;j++){
2649                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2650         }
2651         
2652         //if this object has received all the replies find the ticket numbers
2653         //that senders know about. Those less than the ticket number processed 
2654         //by the receiver can be thrown away. The rest need not be consecutive
2655         // ie there can be holes in the list of ticket numbers seen by senders
2656         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2657                 obj->mlogData->resendReplyRecvd = 0;
2658                 //sort the received TNS
2659                 sortVec(obj->mlogData->receivedTNs);
2660         
2661                 //after all the received tickets are in we need to sort them and then 
2662                 // calculate the holes  
2663                 if(obj->mlogData->receivedTNs->size() > 0){
2664                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2665                         int vecsize = obj->mlogData->receivedTNs->size();
2666                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2667                         
2668                         // updating tCount with the highest ticket handed out
2669                         if(teamSize > 1){
2670                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2671                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2672                         }else{
2673                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2674                         }
2675                         
2676                         if(numberHoles == 0){
2677                         }else{
2678                                 char objName[100];                                      
2679                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2680                                 obj->mlogData->numberHoles = numberHoles;
2681                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2682                                 int countHoles=0;
2683                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2684                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2685                                                 //the TNs are not consecutive at this point
2686                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2687                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2688                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2689                                                         countHoles++;
2690                                                 }       
2691                                         }
2692                                 }
2693                                 //Holes have been given new TN
2694                                 if(countHoles != numberHoles){
2695                                         char str[100];
2696                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2697                                 }
2698                                 CkAssert(countHoles == numberHoles);                                    
2699                                 obj->mlogData->currentHoles = numberHoles;
2700                         }
2701                 }
2702         
2703                 // cleaning up structures and getting ready to continue execution       
2704                 delete obj->mlogData->receivedTNs;
2705                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2706                 obj->mlogData->receivedTNs = NULL;
2707                 obj->mlogData->restartFlag = 0;
2708
2709                 processDelayedRemoteMsgQueue();
2710
2711                 DEBUG_RESTART(char objString[100]);
2712                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2713         }
2714
2715 }
2716
2717
2718 void sortVec(CkVec<MCount> *TNvec){
2719         //sort it ->its bloddy bubble sort
2720         //TODO: use quicksort
2721         for(int i=0;i<TNvec->size();i++){
2722                 for(int j=i+1;j<TNvec->size();j++){
2723                         if((*TNvec)[j] < (*TNvec)[i]){
2724                                 MCount temp;
2725                                 temp = (*TNvec)[i];
2726                                 (*TNvec)[i] = (*TNvec)[j];
2727                                 (*TNvec)[j] = temp;
2728                         }
2729                 }
2730         }
2731         //make it unique .. since its sorted all equal units will be consecutive
2732         MCount *tempArray = new MCount[TNvec->size()];
2733         int     uniqueCount=-1;
2734         for(int i=0;i<TNvec->size();i++){
2735                 tempArray[i] = 0;
2736                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2737                         uniqueCount++;
2738                         tempArray[uniqueCount] = (*TNvec)[i];
2739                 }
2740         }
2741         uniqueCount++;
2742         TNvec->removeAll();
2743         for(int i=0;i<uniqueCount;i++){
2744                 TNvec->push_back(tempArray[i]);
2745         }
2746         delete [] tempArray;
2747 }       
2748
2749 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2750         if(TNVec->size() == 0){
2751                 return -1; //not found in an empty vec
2752         }
2753         //binary search to find 
2754         int left=0;
2755         int right = TNVec->size();
2756         int mid = (left +right)/2;
2757         while(searchTN != (*TNVec)[mid] && left < right){
2758                 if((*TNVec)[mid] > searchTN){
2759                         right = mid-1;
2760                 }else{
2761                         left = mid+1;
2762                 }
2763                 mid = (left + right)/2;
2764         }
2765         if(left < right){
2766                 //mid is the element to be returned
2767                 return mid;
2768         }else{
2769                 if(mid < TNVec->size() && mid >=0){
2770                         if((*TNVec)[mid] == searchTN){
2771                                 return mid;
2772                         }else{
2773                                 return -1;
2774                         }
2775                 }else{
2776                         return -1;
2777                 }
2778         }
2779 };
2780
2781
2782 /*
2783         Method to do parallel restart. Distribute some of the array elements to other processors.
2784         The problem is that we cant use to charm entry methods to do migration as it will get
2785         stuck in the protocol that is going to restart
2786         Note: in order to avoid interference between the objects being recovered, the current PE
2787     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2788     objects end up stepping into each other's shoes (interference).
2789 */
2790
2791 class ElementDistributor: public CkLocIterator{
2792         CkLocMgr *locMgr;
2793         int *targetPE;
2794
2795         void pupLocation(CkLocation &loc,PUP::er &p){
2796                 CkArrayIndexMax idx=loc.getIndex();
2797                 CkGroupID gID = locMgr->ckGetGroupID();
2798                 p|gID;      // store loc mgr's GID as well for easier restore
2799                 p|idx;
2800                 p|loc;
2801         };
2802 public:
2803         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2804
2805         void addLocation(CkLocation &loc){
2806
2807                 // leaving object on this PE
2808                 if(*targetPE == CkMyPe()){
2809                         *targetPE = (*targetPE +1)%CkNumPes();
2810                         return;
2811                 }
2812                         
2813                 CkArrayIndexMax idx = loc.getIndex();
2814                 CkLocRec_local *rec = loc.getLocalRecord();
2815                         
2816                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2817                 idx.print();
2818                         
2819                 //TODO: an element that is being moved should leave some trace behind so that
2820                 // the arraybroadcaster can forward messages to it
2821                         
2822                 //pack up this location and send it across
2823                 PUP::sizer psizer;
2824                 pupLocation(loc,psizer);
2825                 int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2826                 char *msg = (char *)CmiAlloc(totalSize);
2827                 char *buf = &msg[CmiMsgHeaderSizeBytes];
2828                 PUP::toMem pmem(buf);
2829                 pmem.becomeDeleting();
2830                 pupLocation(loc,pmem);
2831                         
2832                 locMgr->setDuringMigration(CmiTrue);
2833                 delete rec;
2834                 locMgr->setDuringMigration(CmiFalse);
2835                 locMgr->inform(idx,*targetPE);
2836
2837                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2838                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2839
2840                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2841
2842                 //decide on the target processor for the next object
2843                 *targetPE = *targetPE + 1;
2844                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2845                         *targetPE = CkMyPe() + 1;
2846                 }
2847         }
2848
2849 };
2850
2851 /**
2852  * Distributes objects to accelerate recovery after a failure.
2853  */
2854 void distributeRestartedObjects(){
2855         int numGroups = CkpvAccess(_groupIDTable)->size();      
2856         int i;
2857         int targetPE=CkMyPe()+1;
2858         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2859 };
2860
2861 /**
2862  * Handler to update information about an object just received.
2863  */
2864 void _distributedLocationHandler(char *receivedMsg){
2865         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2866         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2867         PUP::fromMem pmem(buf);
2868         CkGroupID gID;
2869         CkArrayIndexMax idx;
2870         pmem |gID;
2871         pmem |idx;
2872         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2873         donotCountMigration=1;
2874         mgr->resume(idx,pmem,CmiTrue);
2875         donotCountMigration=0;
2876         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2877         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2878         idx.print();
2879
2880         CkLocRec *rec = mgr->elementRec(idx);
2881         CmiAssert(rec->type() == CkLocRec::local);
2882         
2883         CkVec<CkMigratable *> eltList;
2884         mgr->migratableList((CkLocRec_local *)rec,eltList);
2885         for(int i=0;i<eltList.size();i++){
2886                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2887                         CpvAccess(_currentObj) = eltList[i];
2888                         eltList[i]->ResumeFromSync();
2889                 }
2890         }
2891 }
2892
2893
2894 /** this method is used to send messages to a restarted processor to tell
2895  * it that a particular expected object is not going to get to it */
2896 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2897         DummyMigrationMsg buf;
2898         buf.flag = MLOG_OBJECT;
2899         buf.lbID = lbID;
2900         buf.mgrID = locMgrID;
2901         buf.idx = idx;
2902         buf.locationPE = locationPE;
2903         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2904         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2905 };
2906
2907
2908 /**this method is used by a restarted processor to tell other processors
2909  * that they are not going to receive these many objects.. just the count
2910  * not the objects themselves ***/
2911
2912 void sendDummyMigrationCounts(int *dummyCounts){
2913         DummyMigrationMsg buf;
2914         buf.flag = MLOG_COUNT;
2915         buf.lbID = globalLBID;
2916         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2917         for(int i=0;i<CmiNumPes();i++){
2918                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2919                         buf.count = dummyCounts[i];
2920                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2921                 }
2922         }
2923 }
2924
2925
2926 /** this handler is used to process a dummy migration msg.
2927  * it looks up the load balancer and calls migrated for it */
2928
2929 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2930         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2931         if(msg->flag == MLOG_OBJECT){
2932                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2933                 LDObjHandle h;
2934                 lb->Migrated(h,1);
2935         }
2936         if(msg->flag == MLOG_COUNT){
2937                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2938                 msg->count -= verifyAckedRequests;
2939                 for(int i=0;i<msg->count;i++){
2940                         LDObjHandle h;
2941                         lb->Migrated(h,1);
2942                 }
2943         }
2944         verifyAckedRequests=0;
2945         CmiFree(msg);
2946 };
2947
2948 /*****************************************************
2949         Implementation of a method that can be used to call
2950         any method on the ChareMlogData of all the chares on
2951         a processor currently
2952 ******************************************************/
2953
2954
2955 class ElementCaller :  public CkLocIterator {
2956 private:
2957         CkLocMgr *locMgr;
2958         MlogFn fnPointer;
2959         void *data;
2960 public:
2961         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2962                 locMgr = _locMgr;
2963                 fnPointer = _fnPointer;
2964                 data = _data;
2965         };
2966         void addLocation(CkLocation &loc){
2967                 CkVec<CkMigratable *> list;
2968                 CkLocRec_local *local = loc.getLocalRecord();
2969                 locMgr->migratableList (local,list);
2970                 for(int i=0;i<list.size();i++){
2971                         CkMigratable *migratableElement = list[i];
2972                         fnPointer(data,migratableElement->mlogData);
2973                 }
2974         }
2975 };
2976
2977 /**
2978  * Map function pointed by fnPointer over all the chares living in this processor.
2979  */
2980 void forAllCharesDo(MlogFn fnPointer, void *data){
2981         int numGroups = CkpvAccess(_groupIDTable)->size();
2982         for(int i=0;i<numGroups;i++){
2983                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2984                 fnPointer(data,obj->mlogData);
2985         }
2986         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2987         for(int i=0;i<numNodeGroups;i++){
2988                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2989                 fnPointer(data,obj->mlogData);
2990         }
2991         int i;
2992         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2993 };
2994
2995
2996 /******************************************************************
2997  Load Balancing
2998 ******************************************************************/
2999
3000 /**
3001  * This is the first time Converse is called after AtSync method has been called by every local object.
3002  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3003  * message logging, we can take advantage of this situation and garbage collect at this point.
3004  */
3005 void initMlogLBStep(CkGroupID gid){
3006         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3007         countLBMigratedAway = 0;
3008         countLBToMigrate=0;
3009         onGoingLoadBalancing=1;
3010         migrationDoneCalled=0;
3011         checkpointBarrierCount=0;
3012         if(globalLBID.idx != 0){
3013                 CmiAssert(globalLBID.idx == gid.idx);
3014         }
3015         globalLBID = gid;
3016 #if SYNCHRONIZED_CHECKPOINT
3017         garbageCollectMlog();
3018 #endif
3019 }
3020
3021 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3022         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3023         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3024         
3025         resumeLbFnPtr = _fnPtr;
3026         centralLb = _centralLb;
3027         migrationDoneCalled = 1;
3028         if(countLBToMigrate == countLBMigratedAway){
3029                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3030                 startMlogCheckpoint(NULL,CmiWallTimer());       
3031         }
3032 };
3033
3034 void finishedCheckpointLoadBalancing(){
3035         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3036         CheckpointBarrierMsg msg;
3037         msg.fromPE = CmiMyPe();
3038         msg.checkpointCount = checkpointCount;
3039
3040         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3041         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3042         
3043 };
3044
3045
3046 void sendMlogLocation(int targetPE, envelope *env){
3047 #if !SYNCHRONIZED_CHECKPOINT
3048         void *_msg = EnvToUsr(env);
3049         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3050
3051         int existing = 0;
3052         //if this object is already in the retainedobjectlust destined for this
3053         //processor it should not be sent
3054         
3055         for(int i=0;i<retainedObjectList.size();i++){
3056                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3057                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3058                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3059                         existing = 1;
3060                         break;
3061                 }
3062         }
3063
3064         if(existing){
3065                 return;
3066         }
3067         
3068         countLBToMigrate++;
3069         
3070         MigrationNotice migMsg;
3071         migMsg.migRecord.gID = msg->gid;
3072         migMsg.migRecord.idx = msg->idx;
3073         migMsg.migRecord.fromPE = CkMyPe();
3074         migMsg.migRecord.toPE =  targetPE;
3075         
3076         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3077         
3078         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3079         retainedObject->migRecord = migMsg.migRecord;
3080         retainedObject->acked  = 0;
3081         
3082         CkPackMessage(&env);
3083         
3084         migMsg.record = retainedObject;
3085         retainedObject->msg = env;
3086         int size = retainedObject->size = env->getTotalsize();
3087         
3088         retainedObjectList.push_back(retainedObject);
3089         
3090         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3091         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3092         
3093         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3094 #endif
3095 }
3096
3097 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3098         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3099         migratedNoticeList.push_back(msg->migRecord);
3100
3101         MigrationNoticeAck buf;
3102         buf.record = msg->record;
3103         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3104         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3105 }
3106
3107 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3108         
3109         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3110         retainedObject->acked = 1;
3111
3112         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3113         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3114
3115         //inform home about the new location of this object
3116         CkGroupID gID = retainedObject->migRecord.gID ;
3117         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3118         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3119         
3120         countLBMigratedAway++;
3121         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3122                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3123                 startMlogCheckpoint(NULL,CmiWallTimer());
3124         }
3125 };
3126
3127 void _receiveMlogLocationHandler(void *buf){
3128         envelope *env = (envelope *)buf;
3129         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3130         CkUnpackMessage(&env);
3131         void *_msg = EnvToUsr(env);
3132         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3133         CkGroupID gID= msg->gid;
3134         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3135         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3136         CpvAccess(_currentObj)=mgr;
3137         mgr->immigrate(msg);
3138 };
3139
3140
3141 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3142 /*      if(mlogData->objID.type == TypeArray){
3143                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3144         //      TODO: make sure later that atSync has been called and it needs 
3145         //      to be resumed from sync
3146         //
3147                 CpvAccess(_currentObj) = elt;
3148                 elt->ResumeFromSync();
3149         }*/
3150 }
3151
3152 /**
3153  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
3154  */
3155 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3156         if(checkpointBarrierCount == CmiNumPes()){
3157                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3158                 for(int i=0;i<CmiNumPes();i++){
3159                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3160                 }
3161         }
3162 }
3163
3164 /**
3165  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
3166  */ 
3167 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3168         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3169         if(msg->checkpointCount == checkpointCount){
3170                 checkpointBarrierCount++;
3171                 checkAndSendCheckpointBarrierAcks(msg);
3172         }else{
3173                 if(msg->checkpointCount-1 == checkpointCount){
3174                         checkpointBarrierCount++;
3175                         checkAndSendCheckpointBarrierAcks(msg);
3176                 }else{
3177                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3178                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3179                 }
3180         }
3181
3182         // deleting the received message
3183         CmiFree(msg);
3184 }
3185
3186 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3187         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3188         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3189
3190 #if !SYNCHRONIZED_CHECKPOINT
3191         // sending a notice to all senders to remove message logs
3192         sendRemoveLogRequests();
3193 #endif
3194
3195         // resuming LB function pointer
3196         (*resumeLbFnPtr)(centralLb);
3197
3198         // deleting message
3199         CmiFree(msg);
3200 }
3201
3202 /**
3203  * @brief Function to remove all messages in the message log of a particular chare.
3204  */
3205 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
3206         int total;
3207         MlogEntry *logEntry;
3208         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
3209
3210         // traversing the whole mes