Implementation of recovery for the team-based message-logging approach.
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2  * Message Logging Fault Tolerance Protocol
3  * It includes the main functions for the basic and team-based schemes.
4  */
5
6 #include "charm.h"
7 #include "ck.h"
8 #include "ckmessagelogging.h"
9 #include "queueing.h"
10 #include <sys/types.h>
11 #include <signal.h>
12 #include "CentralLB.h"
13
14 #ifdef _FAULT_MLOG_
15
16 //#define DEBUG(x)  if(_restartFlag) {x;}
17 #define DEBUG_MEM(x) // x
18 #define DEBUG(x)  //x
19 #define DEBUGRESTART(x)  //x
20 #define DEBUGLB(x) // x
21 #define DEBUG_TEAM(x)  x
22
23 #define BUFFERED_LOCAL
24 #define BUFFERED_REMOTE 
25
26 extern const char *idx2str(const CkArrayIndex &ind);
27 extern const char *idx2str(const ArrayElement *el);
28 const char *idx2str(const CkArrayIndexMax &ind){
29         return idx2str((const CkArrayIndex &)ind);
30 };
31
32 void getGlobalStep(CkGroupID gID);
33
34 bool fault_aware(CkObjID &recver);
35 void sendCheckpointData(int mode);
36 void createObjIDList(void *data,ChareMlogData *mlogData);
37 inline bool isLocal(int destPE);
38 inline bool isTeamLocal(int destPE);
39
40 int _restartFlag=0;
41 //ERASE int restarted=0; // it's not being used anywhere
42
43 //TML: variables for measuring savings with teams in message logging
44 float MLOGFT_totalLogSize = 0.0;
45 float MLOGFT_totalMessages = 0.0;
46 float MLOGFT_totalObjects = 0.0;
47
48 //TODO: remove for perf runs
49 int countHashRefs=0; //count the number of gets
50 int countHashCollisions=0;
51
52 //#define CHECKPOINT_DISK
53 char *checkpointDirectory=".";
54 int unAckedCheckpoint=0;
55
56 int countLocal=0,countBuffered=0;
57 int countPiggy=0;
58 int countClearBufferedLocalCalls=0;
59
60 int countUpdateHomeAcks=0;
61
62 extern int chkptPeriod;
63 extern bool parallelRestart;
64
65 char *killFile;
66 int killFlag=0;
67 int restartingMlogFlag=0;
68 void readKillFile();
69 double killTime=0.0;
70 int checkpointCount=0;
71
72
73 CpvDeclare(Chare *,_currentObj);
74 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
75 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
76 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
77 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
78 CpvDeclare(Queue, _outOfOrderMessageQueue);
79 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
80 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
81 CpvDeclare(char **,_bufferedTicketRequests);
82 CpvDeclare(int *,_numBufferedTicketRequests);
83 CpvDeclare(char *,_bufferTicketReply);
84
85
86
87 static double adjustChkptPeriod=0.0; //in ms
88 static double nextCheckpointTime=0.0;//in seconds
89
90 double lastBufferedLocalMessageCopyTime;
91
92 int _maxBufferedMessages;
93 int _maxBufferedTicketRequests;
94 int BUFFER_TIME=2; // in ms
95
96
97 int _ticketRequestHandlerIdx;
98 int _ticketHandlerIdx;
99 int _localMessageCopyHandlerIdx;
100 int _localMessageAckHandlerIdx;
101 int _pingHandlerIdx;
102 int _bufferedLocalMessageCopyHandlerIdx;
103 int _bufferedLocalMessageAckHandlerIdx;
104 int _bufferedTicketRequestHandlerIdx;
105 int _bufferedTicketHandlerIdx;
106
107
108 char objString[100];
109 int _checkpointRequestHandlerIdx;
110 int _storeCheckpointHandlerIdx;
111 int _checkpointAckHandlerIdx;
112 int _getCheckpointHandlerIdx;
113 int _recvCheckpointHandlerIdx;
114 int _removeProcessedLogHandlerIdx;
115
116 int _verifyAckRequestHandlerIdx;
117 int _verifyAckHandlerIdx;
118 int _dummyMigrationHandlerIdx;
119
120
121 int     _getGlobalStepHandlerIdx;
122 int     _recvGlobalStepHandlerIdx;
123
124 int _updateHomeRequestHandlerIdx;
125 int _updateHomeAckHandlerIdx;
126 int _resendMessagesHandlerIdx;
127 int _resendReplyHandlerIdx;
128 int _receivedTNDataHandlerIdx;
129 int _distributedLocationHandlerIdx;
130
131 //TML: integer constants for team-based message logging
132 int _restartHandlerIdx;
133 int _getRestartCheckpointHandlerIdx;
134 int _recvRestartCheckpointHandlerIdx;
135 void setTeamRecovery(void *data, ChareMlogData *mlogData);
136 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
137
138 int verifyAckTotal;
139 int verifyAckCount;
140
141 int verifyAckedRequests=0;
142
143 RestartRequest *storedRequest;
144
145
146
147 int _falseRestart =0; /**
148                                                                                                         For testing on clusters we might carry out restarts on 
149                                                                                                         a porcessor without actually starting it
150                                                                                                         1 -> false restart
151                                                                                                         0 -> restart after an actual crash
152                                                                                                 */                                                                                                                              
153
154 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
155 int _lockNewTicket=0;
156
157
158 //Load balancing globals
159 int onGoingLoadBalancing=0;
160 void *centralLb;
161 void (*resumeLbFnPtr)(void *);
162 int _receiveMlogLocationHandlerIdx;
163 int _receiveMigrationNoticeHandlerIdx;
164 int _receiveMigrationNoticeAckHandlerIdx;
165 int _checkpointBarrierHandlerIdx;
166 int _checkpointBarrierAckHandlerIdx;
167
168 CkVec<MigrationRecord> migratedNoticeList;
169 CkVec<RetainedMigratedObject *> retainedObjectList;
170 int donotCountMigration=0;
171 int countLBMigratedAway=0;
172 int countLBToMigrate=0;
173 int migrationDoneCalled=0;
174 int checkpointBarrierCount=0;
175 int globalResumeCount=0;
176 CkGroupID globalLBID;
177 int restartDecisionNumber=-1;
178
179
180 double lastCompletedAlarm=0;
181 double lastRestart=0;
182
183
184 //update location globals
185 int _receiveLocationHandlerIdx;
186
187
188
189 // initialize message logging datastructures and register handlers
190 void _messageLoggingInit(){
191         //current object
192         CpvInitialize(Chare *,_currentObj);
193         
194         //registering handlers for message logging
195         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
196         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
197         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
198         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
199         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
200         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
201         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
202         _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
203         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
204
205                 
206         //handlers for checkpointing
207         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
208         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
209         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
210         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
211
212
213         //handlers for restart
214         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
215         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
216         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
217         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
218         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
219         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
220         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
221         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
222         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
223         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
224         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
225
226         //TML: handlers for team-based message logging
227         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
228         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
229         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
230
231         
232         //handlers for load balancing
233         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
234         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
235         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
236         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
237         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
238         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
239         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
240         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
241         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
242
243         
244         //handlers for updating locations
245         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
246         
247         //Cpv variables for message logging
248         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
249         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
250         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
251         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
252         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
253         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
254         CpvInitialize(Queue, _outOfOrderMessageQueue);
255         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
256         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
257         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
258         
259         CpvInitialize(char **,_bufferedTicketRequests);
260         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
261         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
262         for(int i=0;i<CkNumPes();i++){
263                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
264                 CpvAccess(_numBufferedTicketRequests)[i]=0;
265         }
266   CpvInitialize(char *,_bufferTicketReply);
267         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
268         
269 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
270         CcdCallFnAfter(retryTicketRequest,NULL,100);    
271         
272         
273         //Cpv variables for checkpoint
274         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
275         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
276         
277 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
278 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
279 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
280         if(CkMyPe() == 0){
281 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
282 #ifdef  BUFFERED_LOCAL
283                 if(CmiMyPe() == 0){
284                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
285                 }
286 #endif
287         }
288 #ifdef  BUFFERED_REMOTE
289         if(CmiMyPe() == 0){
290                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
291         }
292 #endif
293
294         traceRegisterUserEvent("Remove Logs", 20);
295         traceRegisterUserEvent("Ticket Request Handler", 21);
296         traceRegisterUserEvent("Ticket Handler", 22);
297         traceRegisterUserEvent("Local Message Copy Handler", 23);
298         traceRegisterUserEvent("Local Message Ack Handler", 24);        
299         traceRegisterUserEvent("Preprocess current message",25);
300         traceRegisterUserEvent("Preprocess past message",26);
301         traceRegisterUserEvent("Preprocess future message",27);
302         traceRegisterUserEvent("Checkpoint",28);
303         traceRegisterUserEvent("Checkpoint Store",29);
304         traceRegisterUserEvent("Checkpoint Ack",30);
305         
306         traceRegisterUserEvent("Send Ticket Request",31);
307         traceRegisterUserEvent("Generalticketrequest1",32);
308         traceRegisterUserEvent("TicketLogLocal",33);
309         traceRegisterUserEvent("next_ticket and SN",34);
310         traceRegisterUserEvent("Timeout for buffered remote messages",35);
311         traceRegisterUserEvent("Timeout for buffered local messages",36);
312         traceRegisterUserEvent("Inform Location Home",37);
313         traceRegisterUserEvent("Receive Location Handler",38);
314         
315         lastCompletedAlarm=CmiWallTimer();
316         lastRestart = CmiWallTimer();
317 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
318         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
319 }
320
321 void killLocal(void *_dummy,double curWallTime);        
322
323 void readKillFile(){
324         FILE *fp=fopen(killFile,"r");
325         if(!fp){
326                 return;
327         }
328         int proc;
329         double sec;
330         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
331                 if(proc == CkMyPe()){
332                         killTime = CmiWallTimer()+sec;
333                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
334                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
335                 }
336         }
337         fclose(fp);
338 }
339
340 void killLocal(void *_dummy,double curWallTime){
341         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
342         if(CmiWallTimer()<killTime-1){
343                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
344         }else{  
345                 kill(getpid(),SIGKILL);
346         }
347 }
348
349
350
351 /************************ Message logging methods ****************/
352
353 // send a ticket request to a group on a processor
354 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
355         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
356                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
357                 void *origMsg = EnvToUsr(env);
358                 for(int i=0;i<CmiNumPes();i++){
359                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
360                                 void *copyMsg = CkCopyMsg(&origMsg);
361                                 envelope *copyEnv = UsrToEnv(copyMsg);
362                                 copyEnv->SN=0;
363                                 copyEnv->TN=0;
364                                 copyEnv->sender.type = TypeInvalid;
365                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
366                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
367                         }
368                 }
369                 return;
370         }
371         CkObjID recver;
372         recver.type = TypeGroup;
373         recver.data.group.id = env->getGroupNum();
374         recver.data.group.onPE = destPE;
375 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
376                 CmiPrintStackTrace(0);
377         }*/
378         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
379 }
380
381 //send a ticket request to a nodegroup
382 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
383         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
384                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
385                 void *origMsg = EnvToUsr(env);
386                 for(int i=0;i<CmiNumNodes();i++){
387                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
388                                 void *copyMsg = CkCopyMsg(&origMsg);
389                                 envelope *copyEnv = UsrToEnv(copyMsg);
390                                 copyEnv->SN=0;
391                                 copyEnv->TN=0;
392                                 copyEnv->sender.type = TypeInvalid;
393                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
394                         }
395                 }
396                 return;
397         }
398         CkObjID recver;
399         recver.type = TypeNodeGroup;
400         recver.data.group.id = env->getGroupNum();
401         recver.data.group.onPE = destNode;
402         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
403 }
404
405 //send a ticket request to an array element
406 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
407         CkObjID recver;
408         recver.type = TypeArray;
409         recver.data.array.id = env->getsetArrayMgr();
410         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
411
412         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
413                 char recverString[100],senderString[100];
414                 
415                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
416         }
417
418         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
419 };
420
421 /**
422  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
423  */
424 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
425         envelope *env = _env;
426         MCount ticketNumber = 0;
427         int resend=0; //is it a resend
428         char recverName[100];
429         double _startTime=CkWallTimer();
430         
431         if(CpvAccess(_currentObj) == NULL){
432 //              CkAssert(0);
433                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
434                 generalCldEnqueue(destPE,env,_infoIdx);
435                 return;
436         }
437         
438         if(env->sender.type == TypeInvalid){
439                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
440                 //Set message logging data in the envelope
441         }else{
442                 envelope *copyEnv = copyEnvelope(env);
443                 env = copyEnv;
444                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
445                 env->SN = 0;
446         }
447         
448         CkObjID &sender = env->sender;
449         env->recver = recver;
450
451         Chare *obj = (Chare *)env->sender.getObject();
452           
453         if(env->SN == 0){
454                 env->SN = obj->mlogData->nextSN(recver);
455         }else{
456                 resend = 1;
457         }
458         
459         char senderString[100];
460 //      if(env->SN != 1){
461                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
462         //      CmiPrintStackTrace(0);
463 /*      }else{
464                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
465         }*/
466                 
467         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
468 //      CkPackMessage(&(mEntry->env));
469 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
470         
471         _startTime = CkWallTimer();
472
473         // uses the proper ticketing mechanism for local, group and general messages
474         if(isLocal(destPE)){
475                 ticketLogLocalMessage(mEntry);
476         }else{
477                 if((TEAM_SIZE_MLOG > 1) && isTeamLocal(destPE)){
478
479                         // look to see if this message already has a ticket in the team-table
480                         Chare *senderObj = (Chare *)sender.getObject();
481                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
482                         if(ticketRow != NULL){
483                                 Ticket ticket = ticketRow->get(env->SN);
484                                 if(ticket.TN != 0){
485                                         ticketNumber = ticket.TN;
486                                         CkPrintf("[%d] Found a team preticketed message\n",CkMyPe());
487                                 }
488                         }
489                 }
490                 
491                 // sending the ticket request
492                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
493                 
494         }
495 }
496
497 /**
498  * Determines if the message is local or not. A message is local if:
499  * 1) Both the destination and origin are the same PE.
500  */
501 inline bool isLocal(int destPE){
502         // both the destination and the origin are the same PE
503         if(destPE == CkMyPe())
504                 return true;
505
506         return false;
507 }
508
509 /**
510  * Determines if the message is group local or not. A message is group local if:
511  * 1) They belong to the same group in the group-based message logging.
512  */
513 inline bool isTeamLocal(int destPE){
514
515         // they belong to the same group
516         if(TEAM_SIZE_MLOG > 1 && destPE/TEAM_SIZE_MLOG == CkMyPe()/TEAM_SIZE_MLOG)
517                 return true;
518
519         return false;
520 }
521
522
523
524 /**
525  * Method that does the actual send by creating a ticket request filling it up and sending it.
526  */
527 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
528         char recverString[100],senderString[100];
529         envelope *env = entry->env;
530         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
531 /*      envelope *env = entry->env;
532         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
533
534         Chare *obj = (Chare *)entry->env->sender.getObject();
535         if(!resend){
536                 //TML: only stores message if either it goes to this processor or to a processor in a different group
537                 if(!isTeamLocal(entry->destPE)){
538                         obj->mlogData->addLogEntry(entry);
539                         MLOGFT_totalMessages += 1.0;
540                         MLOGFT_totalLogSize += entry->env->getTotalsize();
541                 }else{
542                         // the message has to be deleted after it has been sent
543                         entry->env->freeMsg = true;
544                 }
545         }
546
547 #ifdef BUFFERED_REMOTE
548         //buffer the ticket request 
549         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
550                 //first message to this processor, buffer needs to be created
551                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
552                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
553                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
554         }
555         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
556         //Buffer the ticketrequests
557         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
558         ticketRequest->sender = sender;
559         ticketRequest->recver = recver;
560         ticketRequest->logEntry = entry;
561         ticketRequest->SN = SN;
562         ticketRequest->TN = TN;
563         ticketRequest->senderPE = CkMyPe();
564
565         CpvAccess(_numBufferedTicketRequests)[destPE]++;
566         
567         
568         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
569                 sendBufferedTicketRequests(destPE);
570         }else{
571                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
572                         int *checkPE = new int;
573                         *checkPE = destPE;
574                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
575                 }
576         }
577 #else
578
579         TicketRequest ticketRequest;
580         ticketRequest.sender = sender;
581         ticketRequest.recver = recver;
582         ticketRequest.logEntry = entry;
583         ticketRequest.SN = SN;
584         ticketRequest.TN = TN;
585         ticketRequest.senderPE = CkMyPe();
586         
587         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
588 //      CmiBecomeImmediate(&ticketRequest);
589         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
590 #endif
591         DEBUG_MEM(CmiMemoryCheck());
592 };
593
594 /**
595  * Send the ticket requests buffered for processor PE
596  **/
597 void sendBufferedTicketRequests(int destPE){
598         DEBUG_MEM(CmiMemoryCheck());
599         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
600         if(numberRequests == 0){
601                 return;
602         }
603         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
604         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
605         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
606         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
607         header->numberLogs = numberRequests;
608         
609         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
610         CmiSyncSend(destPE,totalSize,(char *)buf);
611         
612         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
613         DEBUG_MEM(CmiMemoryCheck());
614 };
615
616 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
617         int destPE = *(int *)_destPE;
618   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
619                 sendBufferedTicketRequests(destPE);
620 //              traceUserEvent(35);
621         }
622         delete (int *)_destPE;
623         DEBUG_MEM(CmiMemoryCheck());
624 };
625
626 /**
627  * Gets a ticket for a local message and then sends a copy to the buddy.
628  * This method is always in the main thread(not interrupt).. so it should 
629  * never find itself locked out of a newTicket.
630  */
631 void ticketLogLocalMessage(MlogEntry *entry){
632         double _startTime=CkWallTimer();
633         DEBUG_MEM(CmiMemoryCheck());
634
635         Chare *recverObj = (Chare *)entry->env->recver.getObject();
636         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
637         if(recverObj){
638                 //Consider the case, after a restart when this message has already been allotted a ticket number
639                 // and should get the same one as the old one.
640                 Ticket ticket;
641                 if(recverObj->mlogData->mapTable.numObjects() > 0){
642                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
643                 }else{
644                         ticket.TN = 0;
645                 }
646                 
647                 char senderString[100], recverString[100] ;
648                 
649                 if(ticket.TN == 0){
650                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
651         
652                         if(ticket.TN == 0){
653                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
654                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
655                                 
656         //              _lockNewTicket = 0;
657 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
658                                 return;
659                         }
660                 }       
661                 //TODO: check for the case when an invalid ticket is returned
662                 //TODO: check for OLD or RECEIVED TICKETS
663                 entry->env->TN = ticket.TN;
664                 CkAssert(entry->env->TN > 0);
665                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
666         
667                 // sends a copy of the metadata to the buddy    
668                 sendLocalMessageCopy(entry);
669                 
670                 DEBUG_MEM(CmiMemoryCheck());
671
672                 // sets the unackedLocal flag and stores the message in the log
673                 entry->unackedLocal = 1;
674                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
675
676                 DEBUG_MEM(CmiMemoryCheck());
677         }else{
678                 CkPrintf("[%d] Local message in team-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
679                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
680         }
681         _lockNewTicket=0;
682 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
683 };
684
685 /**
686  * Sends the metadata of a local message to its buddy.
687  */
688 void sendLocalMessageCopy(MlogEntry *entry){
689         LocalMessageLog msgLog;
690         msgLog.sender = entry->env->sender;
691         msgLog.recver = entry->env->recver;
692         msgLog.SN = entry->env->SN;
693         msgLog.TN = entry->env->TN;
694         msgLog.entry = entry;
695         msgLog.senderPE = CkMyPe();
696         
697         char recvString[100];
698         char senderString[100];
699         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
700
701 #ifdef BUFFERED_LOCAL
702         countLocal++;
703         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
704         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
705                 sendBufferedLocalMessageCopy();
706         }else{
707                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
708                         lastBufferedLocalMessageCopyTime = CkWallTimer();
709                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
710                         countClearBufferedLocalCalls++;
711                 }       
712         }
713 #else   
714         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
715         
716         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
717 #endif
718         DEBUG_MEM(CmiMemoryCheck());
719 };
720
721
722 void sendBufferedLocalMessageCopy(){
723         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
724         if(numberLogs == 0){
725                 return;
726         }
727         countBuffered++;
728         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
729         void *buf=CmiAlloc(totalSize);
730         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
731         header->numberLogs=numberLogs;
732
733         DEBUG_MEM(CmiMemoryCheck());
734         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
735         
736         char *ptr = (char *)buf;
737         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
738         
739         for(int i=0;i<numberLogs;i++){
740                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
741                 memcpy(ptr,&log,sizeof(LocalMessageLog));
742                 ptr = &ptr[sizeof(LocalMessageLog)];
743         }
744
745         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
746
747         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
748         DEBUG_MEM(CmiMemoryCheck());
749 };
750
751 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
752         countClearBufferedLocalCalls--;
753         if(countClearBufferedLocalCalls > 10){
754                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
755         }
756         DEBUG_MEM(CmiMemoryCheck());
757         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
758         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
759                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
760                         sendBufferedLocalMessageCopy();
761 //                      traceUserEvent(36);
762                 }
763         }
764         DEBUG_MEM(CmiMemoryCheck());
765 }
766
767 /****
768         The handler functions
769 *****/
770
771
772 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
773 /**
774  *  If there are any delayed requests, process them first before 
775  *  processing this request
776  * */
777 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
778         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
779         double  _startTime = CkWallTimer();
780         if(CpvAccess(_delayedTicketRequests)->length() > 0){
781                 retryTicketRequest(NULL,_startTime);
782         }
783         _processTicketRequest(ticketRequest);
784         CmiFree(ticketRequest);
785 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
786 }
787 /** Handler used for dealing with a bunch of ticket requests
788  * from one processor. The replies are also bunched together
789  * Does not use _ticketRequestHandler
790  * */
791 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
792         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
793         DEBUG_MEM(CmiMemoryCheck());
794         double _startTime = CkWallTimer();
795         if(CpvAccess(_delayedTicketRequests)->length() > 0){
796                 retryTicketRequest(NULL,_startTime);
797         }
798         DEBUG_MEM(CmiMemoryCheck());
799   int numRequests = recvdHeader->numberLogs;
800         char *msg = (char *)recvdHeader;
801         msg = &msg[sizeof(BufferedTicketRequestHeader)];
802         int senderPE=((TicketRequest *)msg)->senderPE;
803
804         
805         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
806         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
807         
808         char *ptr = (char *)buf;
809         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
810         header->numberLogs = 0;
811
812         DEBUG_MEM(CmiMemoryCheck());
813         
814         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
815         
816         for(int i=0;i<numRequests;i++){
817                 TicketRequest *request = (TicketRequest *)msg;
818                 msg = &msg[sizeof(TicketRequest)];
819                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
820
821                 if(replied){
822                         //the ticket request has been processed and 
823                         //the reply will be stored in the ptr
824                         header->numberLogs++;
825                         ptr = &ptr[sizeof(TicketReply)];
826                 }
827         }
828 /*      if(header->numberLogs == 0){
829                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
830         }*/
831
832         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
833         CmiSyncSend(senderPE,totalSize,(char *)buf);
834         CmiFree(recvdHeader);
835 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
836         DEBUG_MEM(CmiMemoryCheck());
837 };
838
839 /**Process the ticket request. 
840  * If it is processed and a reply is being sent 
841  * by this processor return true
842  * else return false.
843  * If a reply buffer is specified put the reply into that
844  * else send the reply
845  * */
846 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
847
848 /*      if(_lockNewTicket){
849                 printf("ddeded %d\n",CkMyPe());
850                 if(CmiIsImmediate(ticketRequest)){
851                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
852                 }
853                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
854                 
855         }else{
856                 _lockNewTicket = 1;
857         }*/
858
859         DEBUG_MEM(CmiMemoryCheck());
860
861         // getting information from request
862         CkObjID sender = ticketRequest->sender;
863         CkObjID recver = ticketRequest->recver;
864         MCount SN = ticketRequest->SN;
865         MCount TN = ticketRequest->TN;
866         Chare *recverObj = (Chare *)recver.getObject();
867         
868         DEBUG(char recverName[100]);
869         DEBUG(recver.toString(recverName);)
870
871         if(recverObj == NULL){
872                 int estPE = recver.guessPE();
873                 if(estPE == CkMyPe() || estPE == -1){           
874                         //try to fulfill the request after some time
875                         char senderString[100];
876                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
877                         if(estPE == CkMyPe() && recver.type == TypeArray){
878                                 CkArrayID aid(recver.data.array.id);            
879                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
880                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
881                         }
882                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
883                         *delayed = *ticketRequest;
884                         CpvAccess(_delayedTicketRequests)->enq(delayed);
885                         
886                 }else{
887                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
888                         TicketRequest forward = *ticketRequest;
889                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
890                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);
891                 }
892         DEBUG_MEM(CmiMemoryCheck());
893                 return false; // if the receverObj does not exist the ticket request cannot have been 
894                               // processed successfully
895         }else{
896                 char senderString[100];
897                 
898                 Ticket ticket;
899
900                 // checking if the message is team local and if it has a ticket already assigned
901                 if(TEAM_SIZE_MLOG > 1 && TN != 0){
902                         CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe());
903                         ticket.TN = TN;
904                         recverObj->mlogData->verifyTicket(sender,SN,TN);
905                 }
906
907                 //check if a ticket for this has been already handed out to an object that used to be local but 
908                 // is no longer so.. need for parallel restart
909                 if(recverObj->mlogData->mapTable.numObjects() > 0){
910                         
911                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
912                 }
913                 
914                 if(ticket.TN == 0){
915                         ticket = recverObj->mlogData->next_ticket(sender,SN);
916                 }
917                 if(ticket.TN > recverObj->mlogData->tProcessed){
918                         ticket.state = NEW_TICKET;
919                 }else{
920                         ticket.state = OLD_TICKET;
921                 }
922                 //TODO: check for the case when an invalid ticket is returned
923                 if(ticket.TN == 0){
924                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
925                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
926                         *delayed = *ticketRequest;
927                         CpvAccess(_delayedTicketRequests)->enq(delayed);
928                         return false;
929                 }
930 /*              if(ticket.TN < SN){ //error state this really should not happen
931                         recver.toString(recverName);
932                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
933                 }*/
934 //              CkAssert(ticket.TN >= SN);
935                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
936 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
937     if(reply == NULL){ 
938                         //There is no reply buffer and the ticketreply is going to be 
939                         //sent immediately
940                         TicketReply ticketReply;
941                         ticketReply.request = *ticketRequest;
942                         ticketReply.ticket = ticket;
943                         ticketReply.recverPE = CkMyPe();
944                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
945 //              CmiBecomeImmediate(&ticketReply);
946                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
947          }else{ // Store ticket reply in the buffer provided
948                  reply->request = *ticketRequest;
949                  reply->ticket = ticket;
950                  reply->recverPE = CkMyPe();
951                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
952                                                          // in case the ticket needs to be forwarded or something
953          }
954                 DEBUG_MEM(CmiMemoryCheck());
955                 return true;
956         }
957 //      _lockNewTicket=0;
958 };
959
960
961 /**
962  * @brief This function handles the ticket received after a request.
963  */
964 inline void _ticketHandler(TicketReply *ticketReply){
965
966         double _startTime = CkWallTimer();
967         DEBUG_MEM(CmiMemoryCheck());    
968         
969         char senderString[100];
970         CkObjID sender = ticketReply->request.sender;
971         CkObjID recver = ticketReply->request.recver;
972         
973         if(sender.guessPE() != CkMyPe()){
974                 DEBUG(CkAssert(sender.guessPE()>= 0));
975                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
976         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
977                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
978                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
979 #ifdef BUFFERED_REMOTE
980                 //will be freed by the buffered ticket handler most of the time
981                 //this might lead to a leak just after migration
982                 //when the ticketHandler is directly used without going through the buffered handler
983                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
984 #else
985                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
986 #endif  
987         }else{
988                 char recverName[100];
989                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
990                 MlogEntry *logEntry=NULL;
991                 if(ticketReply->ticket.state & FORWARDED_TICKET){
992                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
993                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
994                         Chare *senderObj = (Chare *)sender.getObject();
995                         if(senderObj){
996                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
997                                 for(int i=0;i<mlog->length();i++){
998                                         MlogEntry *tempEntry = (*mlog)[i];
999                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
1000                                                 logEntry = tempEntry;
1001                                                 break;
1002                                         }
1003                                 }
1004                                 if(logEntry == NULL){
1005 #ifdef BUFFERED_REMOTE
1006 #else
1007                                         CmiFree(ticketReply);
1008 #endif                                  
1009                                         return;
1010                                 }
1011                         }else{
1012                                 CmiAbort("This processor thinks it should have the sender\n");
1013                         }
1014                         ticketReply->ticket.state ^= FORWARDED_TICKET;
1015                 }else{
1016                         logEntry = ticketReply->request.logEntry;
1017                 }
1018                 if(logEntry->env->TN <= 0){
1019                         //This logEntry has not received a TN earlier
1020                         char recverString[100];
1021                         logEntry->env->TN = ticketReply->ticket.TN;
1022                         logEntry->env->setSrcPe(CkMyPe());
1023                         if(ticketReply->ticket.state == NEW_TICKET){
1024
1025                                 // if message is group local, we store its metadata in teamTable
1026                                 if(isTeamLocal(ticketReply->recverPE)){
1027                                         //DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
1028                                         Chare *senderObj = (Chare *)sender.getObject();
1029                                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
1030                                         if(ticketRow == NULL){
1031                                                 ticketRow = new SNToTicket();
1032                                                 senderObj->mlogData->teamTable.put(recver) = ticketRow; 
1033                                         }
1034                                         ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
1035                                 }
1036
1037                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
1038                                 if(ticketReply->recverPE != CkMyPe()){
1039                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
1040                                 }else{
1041                                         //It is now a local message use the local message protocol
1042                                         sendLocalMessageCopy(logEntry);
1043                                 }       
1044                         }
1045                 }else{
1046                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
1047                 }
1048                 recver.updatePosition(ticketReply->recverPE);
1049 #ifdef BUFFERED_REMOTE
1050 #else
1051                 CmiFree(ticketReply);
1052 #endif
1053         }
1054         CmiMemoryCheck();
1055
1056 //      traceUserBracketEvent(22,_startTime,CkWallTimer());     
1057 };
1058
1059 /**
1060  * Message to handle the bunch of tickets 
1061  * that we get from one processor. We send 
1062  * the tickets to be handled one at a time
1063  * */
1064
1065 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
1066         double _startTime=CmiWallTimer();
1067         int numTickets = recvdHeader->numberLogs;
1068         char *msg = (char *)recvdHeader;
1069         msg = &msg[sizeof(BufferedTicketRequestHeader)];
1070         DEBUG_MEM(CmiMemoryCheck());
1071         
1072         TicketReply *_reply = (TicketReply *)msg;
1073
1074         
1075         for(int i=0;i<numTickets;i++){
1076                 TicketReply *reply = (TicketReply *)msg;
1077                 _ticketHandler(reply);
1078                 
1079                 msg = &msg[sizeof(TicketReply)];
1080         }
1081         
1082         CmiFree(recvdHeader);
1083 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1084         DEBUG_MEM(CmiMemoryCheck());
1085 };
1086
1087 /**
1088  * Stores the metadata of a local message from its buddy.
1089  */
1090 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1091         double _startTime = CkWallTimer();
1092         
1093         char senderString[100],recverString[100];
1094         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1095 /*      if(!fault_aware(msgLog->recver)){
1096                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1097         }*/
1098         CpvAccess(_localMessageLog)->enq(*msgLog);
1099         
1100         LocalMessageLogAck ack;
1101         ack.entry = msgLog->entry;
1102         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1103         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1104         CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
1105         
1106 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1107 };
1108
1109 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1110         double _startTime = CkWallTimer();
1111         DEBUG_MEM(CmiMemoryCheck());
1112         
1113         int numLogs = recvdHeader->numberLogs;
1114         char *msg = (char *)recvdHeader;
1115
1116         //piggy back the logs already stored on this processor
1117         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1118         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1119 /*      if(numPiggyLogs > 0){
1120                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1121                         CmiAssert(0);
1122                 }
1123         }*/
1124         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1125         
1126         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1127         void *buf = CmiAlloc(totalSize);
1128         char *ptr = (char *)buf;
1129         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1130         
1131         msg = &msg[sizeof(BufferedLocalLogHeader)];
1132         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1133
1134         DEBUG_MEM(CmiMemoryCheck());
1135         int PE;
1136         for(int i=0;i<numLogs;i++){
1137                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1138                 CpvAccess(_localMessageLog)->enq(*msgLog);
1139                 PE = msgLog->senderPE;
1140                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1141
1142                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1143                 ack->entry = msgLog->entry;
1144                 
1145                 msg = &msg[sizeof(LocalMessageLog)];
1146                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1147         }
1148         DEBUG_MEM(CmiMemoryCheck());
1149
1150         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1151         piggyHeader->numberLogs = numPiggyLogs;
1152         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1153         if(numPiggyLogs > 0){
1154                 countPiggy++;
1155         }
1156
1157         for(int i=0;i<numPiggyLogs;i++){
1158                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1159                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1160                 ptr = &ptr[sizeof(LocalMessageLog)];
1161         }
1162         DEBUG_MEM(CmiMemoryCheck());
1163         
1164         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1165         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1166                 
1167 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1168                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1169                         if(!fault_aware(localLogEntry.recver)){
1170                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1171                         }
1172         }*/
1173         if(freeHeader){
1174                 CmiFree(recvdHeader);
1175         }
1176         DEBUG_MEM(CmiMemoryCheck());
1177 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1178 }
1179
1180
1181 void _localMessageAckHandler(LocalMessageLogAck *ack){
1182         
1183         double _startTime = CkWallTimer();
1184         
1185         MlogEntry *entry = ack->entry;
1186         if(entry == NULL){
1187                 CkExit();
1188         }
1189         entry->unackedLocal = 0;
1190         envelope *env = entry->env;
1191         char recverName[100];
1192         char senderString[100];
1193         DEBUG_MEM(CmiMemoryCheck());
1194         
1195         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1196         if(env == NULL)
1197                 return;
1198         CkAssert(env->SN > 0);
1199         CkAssert(env->TN > 0);
1200         env->sender.toString(senderString);
1201         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1202         env->recver.toString(recverName);
1203
1204         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1205         
1206 /*      
1207         void *origMsg = EnvToUsr(env);
1208         void *copyMsg = CkCopyMsg(&origMsg);
1209         envelope *copyEnv = UsrToEnv(copyMsg);
1210         entry->env = UsrToEnv(origMsg);*/
1211
1212 //      envelope *copyEnv = copyEnvelope(env);
1213
1214         envelope *copyEnv = env;
1215         copyEnv->localMlogEntry = entry;
1216
1217         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1218         if(CmiMyPe() != entry->destPE){
1219                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1220         }
1221 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1222         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1223         
1224         
1225 #ifdef BUFFERED_LOCAL
1226 #else
1227         CmiFree(ack);
1228 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1229 #endif
1230         
1231         DEBUG_MEM(CmiMemoryCheck());
1232         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1233 }
1234
1235
1236 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1237
1238         double _startTime=CkWallTimer();
1239         DEBUG_MEM(CmiMemoryCheck());
1240
1241         int numLogs = recvdHeader->numberLogs;
1242         char *msg = (char *)recvdHeader;
1243         msg = &msg[sizeof(BufferedLocalLogHeader)];
1244
1245         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1246         
1247         for(int i=0;i<numLogs;i++){
1248                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1249                 _localMessageAckHandler(ack);
1250                 
1251                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1252         }
1253
1254         //deal with piggy backed local logs
1255         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1256         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1257         if(piggyHeader->numberLogs > 0){
1258                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1259         }
1260         
1261         CmiFree(recvdHeader);
1262         DEBUG_MEM(CmiMemoryCheck());
1263 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1264 }
1265
1266 bool fault_aware(CkObjID &recver){
1267         switch(recver.type){
1268                 case TypeChare:
1269                         return false;
1270                 case TypeMainChare:
1271                         return false;
1272                 case TypeGroup:
1273                 case TypeNodeGroup:
1274                 case TypeArray:
1275                         return true;
1276                 default:
1277                         return false;
1278         }
1279 };
1280
1281 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1282         char recverString[100];
1283         char senderString[100];
1284         
1285         DEBUG_MEM(CmiMemoryCheck());
1286         CkObjID recver = env->recver;
1287         if(!fault_aware(recver))
1288                 return 1;
1289
1290
1291         Chare *obj = (Chare *)recver.getObject();
1292         *objPointer = obj;
1293         if(obj == NULL){
1294                 int possiblePE = recver.guessPE();
1295                 if(possiblePE != CkMyPe()){
1296                         int totalSize = env->getTotalsize();                    
1297                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1298                 }
1299                 return 0;
1300         }
1301
1302
1303         double _startTime = CkWallTimer();
1304 //env->sender.updatePosition(env->getSrcPe());
1305         if(env->TN == obj->mlogData->tProcessed+1){
1306                 //the message that needs to be processed now
1307                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1308                 // once we find a message that we can process we put back all the messages in the out of order queue
1309                 // back into the main scheduler queue. 
1310                 if(env->sender.guessPE() == CkMyPe()){
1311                         *logEntryPointer = env->localMlogEntry;
1312                 }
1313         DEBUG_MEM(CmiMemoryCheck());
1314                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1315                         void *qMsgPtr;
1316                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1317                         envelope *qEnv = (envelope *)qMsgPtr;
1318                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1319         DEBUG_MEM(CmiMemoryCheck());
1320                 }
1321 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1322                 //TODO: this might be a problem.. change made for leanMD
1323 //              CpvAccess(_currentObj) = obj;
1324         DEBUG_MEM(CmiMemoryCheck());
1325                 return 1;
1326         }
1327         if(env->TN <= obj->mlogData->tProcessed){
1328                 //message already processed
1329                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1330 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1331         DEBUG_MEM(CmiMemoryCheck());
1332                 return 0;
1333         }
1334         //message that needs to be processed in the future
1335
1336 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1337         //the message cant be processed now put it back in the out of order message Q, 
1338         //It will be transferred to the main queue later
1339         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1340 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1341         DEBUG_MEM(CmiMemoryCheck());
1342         
1343         return 0;
1344 }
1345
1346 /**
1347  * @brief Updates a few variables once a message has been processed.
1348  */
1349 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1350         DEBUG(char senderString[100]);
1351         if(obj){
1352                 if(sender.guessPE() == CkMyPe()){
1353                         if(entry != NULL){
1354                                 entry->env = NULL;
1355                         }
1356                 }
1357                 obj->mlogData->tProcessed++;
1358 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1359                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1360 //              CpvAccess(_currentObj)= NULL;
1361         }
1362         DEBUG_MEM(CmiMemoryCheck());
1363 }
1364
1365 /***
1366         Helpers for the handlers and message logging methods
1367 ***/
1368
1369 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1370 //      double _startTime = CkWallTimer();
1371         if(env->recver.type != TypeNodeGroup){
1372         //This repeats a step performed in skipCldEnq for messages sent to
1373         //other processors. I do this here so that messages to local processors
1374         //undergo the same transformation.. It lets the resend be uniform for 
1375         //all messages
1376 //              CmiSetXHandler(env,CmiGetHandler(env));
1377                 _skipCldEnqueue(destPE,env,_infoIdx);
1378         }else{
1379                 _noCldNodeEnqueue(destPE,env);
1380         }
1381 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1382 }
1383 //extern "C" int CmiGetNonLocalLength();
1384 /** This method is used to retry the ticket requests
1385  * that had been queued up earlier
1386  * */
1387
1388 int calledRetryTicketRequest=0;
1389
1390 void retryTicketRequestTimer(void *_dummy,double _time){
1391                 calledRetryTicketRequest=0;
1392                 retryTicketRequest(_dummy,_time);
1393 }
1394
1395 void retryTicketRequest(void *_dummy,double curWallTime){       
1396         double start = CkWallTimer();
1397         DEBUG_MEM(CmiMemoryCheck());
1398         int length = CpvAccess(_delayedTicketRequests)->length();
1399         for(int i=0;i<length;i++){
1400                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1401                 if(ticketRequest){
1402                         char senderString[100],recverString[100];
1403                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1404                         DEBUG_MEM(CmiMemoryCheck());
1405                         _processTicketRequest(ticketRequest);
1406                   CmiFree(ticketRequest);
1407                         DEBUG_MEM(CmiMemoryCheck());
1408                 }       
1409         }       
1410         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1411                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1412                 ticketLogLocalMessage(entry);
1413         }
1414         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1415 //      int converse_qLength = CmiGetNonLocalLength();
1416         
1417 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1418
1419 /*      PingMsg pingMsg;
1420         pingMsg.PE = CkMyPe();
1421         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1422         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1423                 for(int i=0;i<CkNumPes();i++){
1424                         if(i != CkMyPe()){
1425                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1426                         }
1427                 }
1428         }*/     
1429         //TODO: change this back to 100
1430         if(calledRetryTicketRequest == 0){
1431                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1432                 calledRetryTicketRequest =1;
1433         }
1434         DEBUG_MEM(CmiMemoryCheck());
1435 }
1436
1437 void _pingHandler(CkPingMsg *msg){
1438         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1439         CmiFree(msg);
1440 }
1441
1442
1443 /*****************************************************************************
1444         Checkpointing methods..
1445                 Pack all the data on a processor and send it to the buddy periodically
1446                 Also used to throw away message logs
1447 *****************************************************************************/
1448 CkVec<TProcessedLog> processedTicketLog;
1449 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1450 void clearUpMigratedRetainedLists(int PE);
1451
1452 void checkpointAlarm(void *_dummy,double curWallTime){
1453         double diff = curWallTime-lastCompletedAlarm;
1454         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1455 /*      if(CkWallTimer()-lastRestart < 50){
1456                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1457                 return;
1458         }*/
1459         if(diff < ((chkptPeriod) - 2)){
1460                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1461                 return;
1462         }
1463         CheckpointRequest request;
1464         request.PE = CkMyPe();
1465         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1466         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1467 };
1468
1469 void _checkpointRequestHandler(CheckpointRequest *request){
1470         startMlogCheckpoint(NULL,CmiWallTimer());       
1471 }
1472
1473 void startMlogCheckpoint(void *_dummy,double curWallTime){
1474         double _startTime = CkWallTimer();
1475         checkpointCount++;
1476 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1477                 kill(getpid(),SIGKILL);
1478         }*/
1479         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1480                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1481         }
1482         PUP::sizer psizer;
1483         DEBUG_MEM(CmiMemoryCheck());
1484
1485         psizer | checkpointCount;
1486         
1487         CkPupROData(psizer);
1488         DEBUG_MEM(CmiMemoryCheck());
1489         CkPupGroupData(psizer);
1490         DEBUG_MEM(CmiMemoryCheck());
1491         CkPupNodeGroupData(psizer);
1492         DEBUG_MEM(CmiMemoryCheck());
1493         pupArrayElementsSkip(psizer,NULL);
1494         DEBUG_MEM(CmiMemoryCheck());
1495
1496         int dataSize = psizer.size();
1497         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1498         char *msg = (char *)CmiAlloc(totalSize);
1499         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1500         chkMsg->PE = CkMyPe();
1501         chkMsg->dataSize = dataSize;
1502
1503         
1504         char *buf = &msg[sizeof(CheckPointDataMsg)];
1505         PUP::toMem pBuf(buf);   
1506
1507         pBuf | checkpointCount;
1508         
1509         CkPupROData(pBuf);
1510         CkPupGroupData(pBuf);
1511         CkPupNodeGroupData(pBuf);
1512         pupArrayElementsSkip(pBuf,NULL);
1513
1514         unAckedCheckpoint=1;
1515         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1516         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1517         
1518         /*
1519                 Store the highest Ticket number processed for each chare on this processor
1520         */
1521         processedTicketLog.removeAll();
1522         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1523         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1524                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1525         }
1526
1527         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1528                 lastCompletedAlarm = curWallTime;
1529                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1530         }
1531         traceUserBracketEvent(28,_startTime,CkWallTimer());
1532 };
1533
1534 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1535         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1536         TProcessedLog logEntry;
1537         logEntry.recver = mlogData->objID;
1538         logEntry.tProcessed = mlogData->tProcessed;
1539         log->push_back(logEntry);
1540         char objString[100];
1541         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1542 }
1543
1544 class ElementPacker : public CkLocIterator {
1545 private:
1546         CkLocMgr *locMgr;
1547         PUP::er &p;
1548 public:
1549                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1550                 void addLocation(CkLocation &loc) {
1551                         CkArrayIndexMax idx=loc.getIndex();
1552                         CkGroupID gID = locMgr->ckGetGroupID();
1553                         p|gID;      // store loc mgr's GID as well for easier restore
1554                         p|idx;
1555                         p|loc;
1556     }
1557 };
1558
1559
1560 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1561         int numElements,i;
1562   int numGroups = CkpvAccess(_groupIDTable)->size();    
1563         if(!p.isUnpacking()){
1564                 numElements = CkCountArrayElements();
1565         }       
1566         p | numElements;
1567         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1568         if(!p.isUnpacking()){
1569                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1570         }else{
1571                 //Flush all recs of all LocMgrs before putting in new elements
1572 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1573                 for(int j=0;j<listsize;j++){
1574                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1575                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1576                                 listToSkip[j].idx.print();
1577                         }
1578                 }
1579                 
1580  printf("numElements = %d\n",numElements);
1581         
1582           for (int i=0; i<numElements; i++) {
1583                         CkGroupID gID;
1584                         CkArrayIndexMax idx;
1585                         p|gID;
1586             p|idx;
1587                         int flag=0;
1588                         int matchedIdx=0;
1589                         for(int j=0;j<listsize;j++){
1590                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1591                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1592                                                 matchedIdx = j;
1593                                                 flag = 1;
1594                                                 break;
1595                                         }
1596                                 }
1597                         }
1598                         if(flag == 1){
1599                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1600                         }else{
1601                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1602                         }
1603                                 
1604                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1605                         mgr->resume(idx,p,flag);
1606                         if(flag == 1){
1607                                 int homePE = mgr->homePe(idx);
1608                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1609                         }
1610           }
1611         }
1612 };
1613
1614
1615 void writeCheckpointToDisk(int size,char *chkpt){
1616         char fNameTemp[100];
1617         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1618         int fd = creat(fNameTemp,S_IRWXU);
1619         int ret = write(fd,chkpt,size);
1620         CkAssert(ret == size);
1621         close(fd);
1622         
1623         char fName[100];
1624         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1625         unlink(fName);
1626
1627         rename(fNameTemp,fName);
1628         
1629 }
1630
1631 //handler that receives the checkpoint from a processor
1632 //it stores it and acks it
1633 void _storeCheckpointHandler(char *msg){
1634         
1635         double _startTime=CkWallTimer();
1636                 
1637         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1638         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1639         
1640         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1641         
1642         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1643         if(oldChkpt != NULL){
1644                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1645                 CmiFree(oldmsg);
1646         }
1647         //turning off storing checkpoints
1648         
1649         int sendingPE = chkMsg->PE;
1650         
1651         CpvAccess(_storedCheckpointData)->buf = chkpt;
1652         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1653         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1654
1655 #ifdef CHECKPOINT_DISK
1656         //store the checkpoint on disk
1657         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1658         CpvAccess(_storedCheckpointData)->buf = NULL;
1659         CmiFree(msg);
1660 #endif
1661
1662         int count=0;
1663         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1664                 if(migratedNoticeList[j].fromPE == sendingPE){
1665                         migratedNoticeList[j].ackFrom = 1;
1666                 }else{
1667                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1668                 }
1669                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1670                         migratedNoticeList.remove(j);
1671                         count++;
1672                 }
1673                 
1674         }
1675         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1676         
1677         CheckPointAck ackMsg;
1678         ackMsg.PE = CkMyPe();
1679         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1680         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1681         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1682         
1683         
1684         
1685         traceUserBracketEvent(29,_startTime,CkWallTimer());
1686 };
1687
1688
1689 void sendRemoveLogRequests(){
1690         double _startTime = CkWallTimer();      
1691         //send out the messages asking senders to throw away message logs below a certain ticket number
1692         /*
1693                 The remove log request message looks like
1694                 |RemoveLogRequest||List of TProcessedLog|
1695         */
1696         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1697         char *requestMsg = (char *)CmiAlloc(totalSize);
1698         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1699         request->PE = CkMyPe();
1700         request->numberObjects = processedTicketLog.size();
1701         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1702         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1703         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1704         
1705         DEBUG_MEM(CmiMemoryCheck());
1706         for(int i=0;i<CkNumPes();i++){
1707                 CmiSyncSend(i,totalSize,requestMsg);
1708         }
1709         CmiFree(requestMsg);
1710
1711         clearUpMigratedRetainedLists(CmiMyPe());
1712         //TODO: clear ticketTable
1713         
1714         traceUserBracketEvent(30,_startTime,CkWallTimer());
1715         DEBUG_MEM(CmiMemoryCheck());
1716 }
1717
1718
1719 void _checkpointAckHandler(CheckPointAck *ackMsg){
1720         DEBUG_MEM(CmiMemoryCheck());
1721         unAckedCheckpoint=0;
1722         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1723         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1724         if(onGoingLoadBalancing){
1725                 onGoingLoadBalancing = 0;
1726                 finishedCheckpointLoadBalancing();
1727         }else{
1728                 sendRemoveLogRequests();
1729         }
1730         CmiFree(ackMsg);
1731         
1732 };
1733
1734 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1735         DEBUG_MEM(CmiMemoryCheck());
1736         CmiMemoryCheck();
1737         char *data = (char *)_data;
1738         RemoveLogRequest *request = (RemoveLogRequest *)data;
1739         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1740         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1741
1742         int count=0;
1743         for(int i=0;i<mlog->length();i++){
1744                 MlogEntry *logEntry = mlog->deq();
1745                 int match=0;
1746                 for(int j=0;j<request->numberObjects;j++){
1747                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1748                                 //this log Entry should be removed
1749                                 match = 1;
1750                                 break;
1751                         }
1752                 }
1753                 char senderString[100],recverString[100];
1754 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1755                 if(match){
1756                         count++;
1757                         delete logEntry;
1758                 }else{
1759                         mlog->enq(logEntry);
1760                 }
1761         }
1762         if(count > 0){
1763                 char nameString[100];
1764                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1765         }
1766         DEBUG_MEM(CmiMemoryCheck());
1767         CmiMemoryCheck();
1768 }
1769
1770 void _removeProcessedLogHandler(char *requestMsg){
1771         double start = CkWallTimer();
1772         forAllCharesDo(removeProcessedLogs,requestMsg);
1773         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1774         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1775         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1776         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1777         if(request->PE == getCheckPointPE()){
1778                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1779                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1780                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1781                 int count=0;
1782 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1783                 DEBUG(char nameString[100];)
1784                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1785                 DEBUG(})*/
1786                 for(int i=0;i<localQ->length();i++){
1787                         LocalMessageLog localLogEntry = (*localQ)[i];
1788                         if(!fault_aware(localLogEntry.recver)){
1789                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1790                         }
1791                         bool keep = true;
1792                         for(int j=0;j<request->numberObjects;j++){                              
1793                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1794                                         keep = false;
1795                                         break;
1796                                 }
1797                         }       
1798                         if(keep){
1799                                 tempQ->enq(localLogEntry);
1800                         }else{
1801                                 count++;
1802                         }
1803                 }
1804                 delete localQ;
1805                 CpvAccess(_localMessageLog) = tempQ;
1806                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1807         }
1808
1809         /*
1810                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1811         */
1812         CmiMemoryCheck();
1813         clearUpMigratedRetainedLists(request->PE);
1814         
1815         traceUserBracketEvent(20,start,CkWallTimer());
1816         CmiFree(requestMsg);    
1817 };
1818
1819
1820 void clearUpMigratedRetainedLists(int PE){
1821         int count=0;
1822         CmiMemoryCheck();
1823         
1824         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1825                 if(migratedNoticeList[j].toPE == PE){
1826                         migratedNoticeList[j].ackTo = 1;
1827                 }
1828                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1829                         migratedNoticeList.remove(j);
1830                         count++;
1831                 }
1832         }
1833         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1834         
1835         for(int j=retainedObjectList.size()-1;j>=0;j--){
1836                 if(retainedObjectList[j]->migRecord.toPE == PE){
1837                         RetainedMigratedObject *obj = retainedObjectList[j];
1838                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1839                         retainedObjectList.remove(j);
1840                         if(obj->msg != NULL){
1841                                 CmiMemoryCheck();
1842                                 CmiFree(obj->msg);
1843                         }
1844                         delete obj;
1845                 }
1846         }
1847 }
1848
1849 /***************************************************************
1850         Restart Methods and handlers
1851 ***************************************************************/        
1852
1853 /**
1854  * Function for restarting the crashed processor.
1855  * It sets the restart flag and contacts the buddy
1856  * processor to get the latest checkpoint.
1857  */
1858 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1859         RestartRequest msg;
1860
1861         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1862
1863         // setting the restart flag
1864         _restartFlag = 1;
1865
1866         // if we are using group-based message logging, all members of the group have to be restarted
1867         if(TEAM_SIZE_MLOG > 1){
1868                 for(int i=(CkMyPe()/TEAM_SIZE_MLOG)*TEAM_SIZE_MLOG; i<((CkMyPe()/TEAM_SIZE_MLOG)+1)*TEAM_SIZE_MLOG; i++){
1869                         if(i != CkMyPe() && i < CkNumPes()){
1870                                 // sending a message to the group member
1871                                 msg.PE = CkMyPe();
1872                             CmiSetHandler(&msg,_restartHandlerIdx);
1873                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1874                         }
1875                 }
1876         }
1877
1878         // requesting the latest checkpoint from its buddy
1879         msg.PE = CkMyPe();
1880         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1881         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1882 };
1883
1884 /**
1885  * Function to restart this processor.
1886  * The handler is invoked by a member of its same team in message logging.
1887  */
1888 void _restartHandler(RestartRequest *restartMsg){
1889         int i;
1890         int numGroups = CkpvAccess(_groupIDTable)->size();
1891         RestartRequest msg;
1892         
1893         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1894
1895     // setting the restart flag
1896         _restartFlag = 1;
1897
1898         // flushing all buffers 
1899 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1900         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1901         for(int i=0;i<numGroups;i++){
1902         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1903                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1904                 obj->flushStates();
1905                 obj->ckJustMigrated();
1906         }*/
1907
1908     // requesting the latest checkpoint from its buddy
1909         msg.PE = CkMyPe();
1910         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1911         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1912 }
1913
1914
1915 /**
1916  * Gets the stored checkpoint but calls another function in the sender.
1917  */
1918 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1919
1920         // retrieving the stored checkpoint
1921         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1922
1923         // making sure it is its buddy who is requesting the checkpoint
1924         CkAssert(restartMsg->PE == storedChkpt->PE);
1925
1926         storedRequest = restartMsg;
1927         verifyAckTotal = 0;
1928
1929         for(int i=0;i<migratedNoticeList.size();i++){
1930                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1931 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1932                         if(migratedNoticeList[i].ackFrom == 0){
1933                                 //need to verify if the object actually exists .. it might not
1934                                 //have been acked but it might exist on it
1935                                 VerifyAckMsg msg;
1936                                 msg.migRecord = migratedNoticeList[i];
1937                                 msg.index = i;
1938                                 msg.fromPE = CmiMyPe();
1939                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1940                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1941                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1942                                 verifyAckTotal++;
1943                         }
1944                 }
1945         }
1946
1947         // sending the checkpoint back to its buddy     
1948         if(verifyAckTotal == 0){
1949                 sendCheckpointData(MLOG_RESTARTED);
1950         }
1951         verifyAckCount = 0;
1952 }
1953
1954 /**
1955  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1956  */
1957 void _recvRestartCheckpointHandler(char *_restartData){
1958         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1959         MigrationRecord *migratedAwayElements;
1960
1961         globalLBID = restartData->lbGroupID;
1962         
1963         restartData->restartWallTime *= 1000;
1964         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1965         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1966         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1967
1968         
1969         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1970         char *buf = &_restartData[sizeof(RestartProcessorData)];
1971         
1972         if(restartData->numMigratedAwayElements != 0){
1973                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1974                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1975                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1976                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1977         }
1978
1979         // turning on the team recovery flag
1980         forAllCharesDo(setTeamRecovery,NULL);
1981         
1982         PUP::fromMem pBuf(buf);
1983         pBuf | checkpointCount;
1984         CkPupROData(pBuf);
1985         CkPupGroupData(pBuf);
1986         CkPupNodeGroupData(pBuf);
1987 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1988         pupArrayElementsSkip(pBuf,NULL);
1989         CkAssert(pBuf.size() == restartData->checkPointSize);
1990         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1991         
1992         // turning off the team recovery flag
1993         forAllCharesDo(unsetTeamRecovery,NULL);
1994
1995         // initializing a few variables for handling local messages
1996         forAllCharesDo(initializeRestart,NULL);
1997         
1998         //store the restored local message log in a vector
1999         buf = &buf[restartData->checkPointSize];        
2000         for(int i=0;i<restartData->numLocalMessages;i++){
2001                 LocalMessageLog logEntry;
2002                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2003                 
2004                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2005                 if(recverObj!=NULL){
2006                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2007                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2008                         char senderString[100];
2009                         char recverString[100];
2010                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2011                 }else{
2012 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2013                 }
2014                 buf = &buf[sizeof(LocalMessageLog)];
2015         }
2016
2017         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2018         CmiFree(_restartData);  
2019
2020         /*HERE _initDone();
2021
2022         getGlobalStep(globalLBID);
2023         
2024         countUpdateHomeAcks = 0;
2025         RestartRequest updateHomeRequest;
2026         updateHomeRequest.PE = CmiMyPe();
2027         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2028         for(int i=0;i<CmiNumPes();i++){
2029                 if(i != CmiMyPe()){
2030                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2031                 }
2032         }
2033 */
2034
2035
2036         // Send out the request to resend logged messages to all other processors
2037         CkVec<TProcessedLog> objectVec;
2038         forAllCharesDo(createObjIDList, (void *)&objectVec);
2039         int numberObjects = objectVec.size();
2040         
2041         /*
2042                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2043         */
2044         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2045         char *resendMsg = (char *)CmiAlloc(totalSize);  
2046
2047         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2048         resendReq->PE =CkMyPe(); 
2049         resendReq->numberObjects = numberObjects;
2050         char *objList = &resendMsg[sizeof(ResendRequest)];
2051         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2052         
2053
2054         /* test for parallel restart migrate away object**/
2055 //      if(parallelRestart){
2056 //              distributeRestartedObjects();
2057 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2058 //      }
2059         
2060         /*      To make restart work for load balancing.. should only
2061         be used when checkpoint happens along with load balancing
2062         **/
2063 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2064
2065         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2066         CpvAccess(_currentObj) = lb;
2067         lb->ReceiveDummyMigration(restartDecisionNumber);
2068
2069         sleep(10);
2070         
2071         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2072         for(int i=0;i<CkNumPes();i++){
2073                 if(i != CkMyPe()){
2074                         CmiSyncSend(i,totalSize,resendMsg);
2075                 }       
2076         }
2077         _resendMessagesHandler(resendMsg);
2078
2079 }
2080
2081
2082 void CkMlogRestartDouble(void *,double){
2083         CkMlogRestart(NULL,NULL);
2084 };
2085
2086 //TML: restarting from local (group) failure
2087 void CkMlogRestartLocal(){
2088     CkMlogRestart(NULL,NULL);
2089 };
2090
2091
2092 void readCheckpointFromDisk(int size,char *buf){
2093         char fName[100];
2094         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
2095
2096         int fd = open(fName,O_RDONLY);
2097         int count=0;
2098         while(count < size){
2099                 count += read(fd,&buf[count],size-count);
2100         }
2101         close(fd);
2102         
2103 };
2104
2105
2106
2107 /**
2108  * Gets the stored checkpoint for its buddy processor.
2109  */
2110 void _getCheckpointHandler(RestartRequest *restartMsg){
2111
2112         // retrieving the stored checkpoint
2113         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
2114
2115         // making sure it is its buddy who is requesting the checkpoint
2116         CkAssert(restartMsg->PE == storedChkpt->PE);
2117
2118         storedRequest = restartMsg;
2119         verifyAckTotal = 0;
2120
2121         for(int i=0;i<migratedNoticeList.size();i++){
2122                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
2123 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
2124                         if(migratedNoticeList[i].ackFrom == 0){
2125                                 //need to verify if the object actually exists .. it might not
2126                                 //have been acked but it might exist on it
2127                                 VerifyAckMsg msg;
2128                                 msg.migRecord = migratedNoticeList[i];
2129                                 msg.index = i;
2130                                 msg.fromPE = CmiMyPe();
2131                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
2132                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
2133                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
2134                                 verifyAckTotal++;
2135                         }
2136                 }
2137         }
2138
2139         // sending the checkpoint back to its buddy     
2140         if(verifyAckTotal == 0){
2141                 sendCheckpointData(MLOG_CRASHED);
2142         }
2143         verifyAckCount = 0;
2144 }
2145
2146
2147 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
2148         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
2149         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
2150         if(rec != NULL && rec->type() == CkLocRec::local){
2151                         //this location exists on this processor
2152                         //and needs to be removed       
2153                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
2154                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
2155                         
2156                         int localIdx = localRec->getLocalIndex();
2157                         LBDatabase *lbdb = localRec->getLBDB();
2158                         LDObjHandle ldHandle = localRec->getLdHandle();
2159                                 
2160                         locMgr->setDuringMigration(true);
2161                         
2162                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
2163                         lbdb->UnregisterObj(ldHandle);
2164                         
2165                         locMgr->setDuringMigration(false);
2166                         
2167                         verifyAckedRequests++;
2168
2169         }
2170         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
2171         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
2172 };
2173
2174
2175 void _verifyAckHandler(VerifyAckMsg *verifyReply){
2176         int index =     verifyReply->index;
2177         migratedNoticeList[index] = verifyReply->migRecord;
2178         verifyAckCount++;
2179         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
2180         if(verifyAckCount == verifyAckTotal){
2181                 sendCheckpointData(MLOG_CRASHED);
2182         }
2183 }
2184
2185
2186 /**
2187  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
2188  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
2189  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
2190  */
2191 void sendCheckpointData(int mode){      
2192         RestartRequest *restartMsg = storedRequest;
2193         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
2194         int numMigratedAwayElements = migratedNoticeList.size();
2195         if(migratedNoticeList.size() != 0){
2196                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2197 //                      CkAssert(migratedNoticeList.size() == 0);
2198         }
2199         
2200         
2201         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2202         
2203         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2204         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2205         
2206         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
2207         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
2208         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2209         
2210         char *msg = (char *)CmiAlloc(totalSize);
2211         
2212         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2213         dataMsg->PE = CkMyPe();
2214         dataMsg->restartWallTime = CmiTimer();
2215         dataMsg->checkPointSize = storedChkpt->bufSize;
2216         
2217         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2218 //      dataMsg->numMigratedAwayElements = 0;
2219         
2220         dataMsg->numMigratedInElements = 0;
2221         dataMsg->migratedElementSize = 0;
2222         dataMsg->lbGroupID = globalLBID;
2223         /*msg layout 
2224                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2225                 Local MessageLog|
2226         */
2227         //store checkpoint data
2228         char *buf = &msg[sizeof(RestartProcessorData)];
2229
2230         if(dataMsg->numMigratedAwayElements != 0){
2231                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2232                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2233         }
2234         
2235
2236 #ifdef CHECKPOINT_DISK
2237         readCheckpointFromDisk(storedChkpt->bufSize,buf);
2238 #else   
2239         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2240 #endif
2241         buf = &buf[storedChkpt->bufSize];
2242
2243
2244         //store localmessage Log
2245         dataMsg->numLocalMessages = localMsgQ->length();
2246         for(int i=0;i<localMsgQ->length();i++){
2247                 if(!fault_aware(((*localMsgQ)[i]).recver )){
2248                         CmiAbort("Non fault aware localMsgQ");
2249                 }
2250                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
2251                 buf = &buf[sizeof(LocalMessageLog)];
2252         }
2253         
2254         if(mode == MLOG_RESTARTED){
2255                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2256                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2257                 CmiFree(restartMsg);
2258         }else{
2259                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2260                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2261                 CmiFree(restartMsg);
2262         }
2263 };
2264
2265
2266 // this list is used to create a vector of the object ids of all
2267 //the chares on this processor currently and the highest TN processed by them 
2268 //the first argument is actually a CkVec<TProcessedLog> *
2269 void createObjIDList(void *data,ChareMlogData *mlogData){
2270         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2271         TProcessedLog entry;
2272         entry.recver = mlogData->objID;
2273         entry.tProcessed = mlogData->tProcessed;
2274         list->push_back(entry);
2275         char objString[100];
2276         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2277 }
2278
2279
2280 /**
2281  * Receives the checkpoint data from its buddy, restores the state of all the objects
2282  * and asks everyone else to update its home.
2283  */
2284 void _recvCheckpointHandler(char *_restartData){
2285         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2286         MigrationRecord *migratedAwayElements;
2287
2288         globalLBID = restartData->lbGroupID;
2289         
2290         restartData->restartWallTime *= 1000;
2291         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2292         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2293         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2294
2295         
2296         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2297         char *buf = &_restartData[sizeof(RestartProcessorData)];
2298         
2299         if(restartData->numMigratedAwayElements != 0){
2300                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2301                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2302                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2303                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2304         }
2305         
2306         PUP::fromMem pBuf(buf);
2307
2308         pBuf | checkpointCount;
2309
2310         CkPupROData(pBuf);
2311         CkPupGroupData(pBuf);
2312         CkPupNodeGroupData(pBuf);
2313 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
2314         pupArrayElementsSkip(pBuf,NULL);
2315         CkAssert(pBuf.size() == restartData->checkPointSize);
2316         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2317         
2318         forAllCharesDo(initializeRestart,NULL);
2319         
2320         //store the restored local message log in a vector
2321         buf = &buf[restartData->checkPointSize];        
2322         for(int i=0;i<restartData->numLocalMessages;i++){
2323                 LocalMessageLog logEntry;
2324                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2325                 
2326                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2327                 if(recverObj!=NULL){
2328                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2329                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2330                         char senderString[100];
2331                         char recverString[100];
2332                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2333                 }else{
2334 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2335                 }
2336                 buf = &buf[sizeof(LocalMessageLog)];
2337         }
2338
2339         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2340
2341         CmiFree(_restartData);
2342         
2343         
2344         _initDone();
2345
2346         getGlobalStep(globalLBID);
2347         
2348         countUpdateHomeAcks = 0;
2349         RestartRequest updateHomeRequest;
2350         updateHomeRequest.PE = CmiMyPe();
2351         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2352         for(int i=0;i<CmiNumPes();i++){
2353                 if(i != CmiMyPe()){
2354                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2355                 }
2356         }
2357
2358 }
2359
2360 /**
2361  * Receives the updateHome ACKs from all other processors. Once everybody
2362  * has replied, it sends a request to resend the logged messages.
2363  */
2364 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2365         countUpdateHomeAcks++;
2366         CmiFree(updateHomeAck);
2367         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2368         if(countUpdateHomeAcks != CmiNumPes()){
2369                 return;
2370         }
2371
2372         // Send out the request to resend logged messages to all other processors
2373         CkVec<TProcessedLog> objectVec;
2374         forAllCharesDo(createObjIDList, (void *)&objectVec);
2375         int numberObjects = objectVec.size();
2376         
2377         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2378         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2379         char *resendMsg = (char *)CmiAlloc(totalSize);  
2380
2381         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2382         resendReq->PE =CkMyPe(); 
2383         resendReq->numberObjects = numberObjects;
2384         char *objList = &resendMsg[sizeof(ResendRequest)];
2385         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2386
2387         /* test for parallel restart migrate away object**/
2388         if(parallelRestart){
2389                 distributeRestartedObjects();
2390                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2391         }
2392         
2393         /*      To make restart work for load balancing.. should only
2394         be used when checkpoint happens along with load balancing
2395         **/
2396 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2397
2398         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2399         CpvAccess(_currentObj) = lb;
2400         lb->ReceiveDummyMigration(restartDecisionNumber);
2401
2402         sleep(10);
2403         
2404         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2405         for(int i=0;i<CkNumPes();i++){
2406                 if(i != CkMyPe()){
2407                         CmiSyncSend(i,totalSize,resendMsg);
2408                 }       
2409         }
2410         _resendMessagesHandler(resendMsg);
2411         CmiFree(resendMsg);
2412 };
2413
2414 /**
2415  * @brief Initializes variables and flags for restarting procedure.
2416  */
2417 void initializeRestart(void *data, ChareMlogData *mlogData){
2418         mlogData->resendReplyRecvd = 0;
2419         mlogData->receivedTNs = new CkVec<MCount>;
2420         mlogData->restartFlag = 1;
2421         mlogData->restoredLocalMsgLog.removeAll();
2422         mlogData->mapTable.empty();
2423 };
2424
2425 /**
2426  * Updates the homePe of chare array elements.
2427  */
2428 void updateHomePE(void *data,ChareMlogData *mlogData){
2429         RestartRequest *updateRequest = (RestartRequest *)data;
2430         int PE = updateRequest->PE; //restarted PE
2431         //if this object is an array Element and its home is the restarted processor
2432         // the home processor needs to know its current location
2433         if(mlogData->objID.type == TypeArray){
2434                 //it is an array element
2435                 CkGroupID myGID = mlogData->objID.data.array.id;
2436                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2437                 CkArrayID aid(mlogData->objID.data.array.id);           
2438                 //check if the restarted processor is the home processor for this object
2439                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2440                 if(locMgr->homePe(myIdx) == PE){
2441                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2442                         DEBUGRESTART(myIdx.print());
2443                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2444                 }
2445         }
2446 };
2447
2448
2449 /**
2450  * Updates the homePe for all chares in this processor.
2451  */
2452 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2453         int sender = updateRequest->PE;
2454         
2455         forAllCharesDo(updateHomePE,updateRequest);
2456         
2457         updateRequest->PE = CmiMyPe();
2458         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2459         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2460         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2461                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2462                 checkpointCount--;
2463                 startMlogCheckpoint(NULL,0);
2464         }
2465         if(sender == getCheckPointPE()){
2466                 for(int i=0;i<retainedObjectList.size();i++){
2467                         if(retainedObjectList[i]->acked == 0){
2468                                 MigrationNotice migMsg;
2469                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2470                                 migMsg.record = retainedObjectList[i];
2471                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2472                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2473                         }
2474                 }
2475         }
2476 }
2477
2478 /**
2479  * @brief Fills up the ticket vector for each chare.
2480  */
2481 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2482         ResendData *resendData = (ResendData *)data;
2483         int PE = resendData->PE; //restarted PE
2484         int count=0;
2485         CkHashtableIterator *iterator;
2486         void *objp;
2487         void *objkey;
2488         CkObjID *objID;
2489         SNToTicket *snToTicket;
2490         Ticket ticket;
2491         
2492         // traversing the team table looking up for the maximum TN received     
2493         iterator = mlogData->teamTable.iterator();
2494         while( (objp = iterator->next(&objkey)) != NULL ){
2495                 objID = (CkObjID *)objkey;
2496         
2497                 // traversing the resendData structure to add ticket numbers
2498                 for(int j=0;j<resendData->numberObjects;j++){
2499                         if((*objID) == (resendData->listObjects)[j].recver){
2500 char name[100];
2501                                 snToTicket = *(SNToTicket **)objp;
2502 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2503                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2504                                         ticket = snToTicket->get(snIndex);      
2505                                         if(ticket.TN > resendData->maxTickets[j]){
2506                                                 resendData->maxTickets[j] = ticket.TN;
2507                                         }
2508                                         if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2509                                                 //store the TNs that have been since the recver last checkpointed
2510                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2511                                         }
2512                                 }
2513                         }
2514                 }
2515         }
2516
2517         //releasing the memory for the iterator
2518         delete iterator;
2519 }
2520
2521
2522 /**
2523  * @brief Turns on the flag for team recovery that selectively restores
2524  * particular metadata information.
2525  */
2526 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2527         mlogData->teamRecoveryFlag = 1; 
2528 }
2529
2530 /**
2531  * @brief Turns off the flag for team recovery.
2532  */
2533 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2534         mlogData->teamRecoveryFlag = 0;
2535 }
2536
2537 //the data argument is of type ResendData which contains the 
2538 //array of objects on  the restartedProcessor
2539 //this method resends the messages stored in this chare's message log 
2540 //to the restarted processor. It also accumulates the maximum TN
2541 //for all the objects on the restarted processor
2542 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2543         char nameString[100];
2544         ResendData *resendData = (ResendData *)data;
2545         int PE = resendData->PE; //restarted PE
2546         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2547         int count=0;
2548         int ticketRequests=0;
2549         CkQ<MlogEntry *> *log = mlogData->getMlog();
2550         
2551         for(int i=0;i<log->length();i++){
2552                 MlogEntry *logEntry = (*log)[i];
2553                 
2554                 // if we sent out the logs of a local message to buddy and he crashed
2555                 //before acking
2556                 envelope *env = logEntry->env;
2557                 if(env == NULL){
2558                         continue;
2559                 }
2560                 if(logEntry->unackedLocal){
2561                         char recverString[100];
2562                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2563                         sendLocalMessageCopy(logEntry);
2564                 }
2565                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2566                 if(env->TN <= 0){
2567                         //ticket not yet replied send it out again
2568                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,0,1);
2569                 }
2570                 
2571                 if(env->recver.type != TypeInvalid){
2572                         int flag = 0;//marks if any of the restarted objects matched this log entry
2573                         for(int j=0;j<resendData->numberObjects;j++){
2574                                 if(env->recver == (resendData->listObjects)[j].recver){
2575                                         flag = 1;
2576                                         //message has a valid TN
2577                                         if(env->TN > 0){
2578                                                 //store maxTicket
2579                                                 if(env->TN > resendData->maxTickets[j]){
2580                                                         resendData->maxTickets[j] = env->TN;
2581                                                 }
2582                                                 //if the TN for this entry is more than the TN processed, send the message out
2583                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2584                                                         //store the TNs that have been since the recver last checkpointed
2585                                                         resendData->ticketVecs[j].push_back(env->TN);
2586                                                         
2587                                                         if(PE != CkMyPe()){
2588                                                                 if(env->recver.type == TypeNodeGroup){
2589                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2590                                                                 }else{
2591                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2592                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2593                                                                 }
2594                                                         }else{
2595                                                                 envelope *copyEnv = copyEnvelope(env);
2596                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2597                                                         }
2598                                                         char senderString[100];
2599                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2600                                                         count++;
2601                                                 }       
2602                                         }else{
2603 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2604                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2605                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2606                                                 CkAssert(logEntry->destPE != CkMyPe());
2607                                                 
2608                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2609                                                 
2610                                                 ticketRequests++;*/
2611                                         }
2612                                 }
2613                         }//end of for loop of objects
2614                         
2615                 }       
2616         }
2617         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2618 }
2619
2620 /**
2621  * Resends the messages since the last checkpoint to the list of objects included in the 
2622  * request.
2623  */
2624 void _resendMessagesHandler(char *msg){
2625         ResendData d;
2626         ResendRequest *resendReq = (ResendRequest *)msg;
2627
2628         // building the reply message
2629         char *listObjects = &msg[sizeof(ResendRequest)];
2630         d.numberObjects = resendReq->numberObjects;
2631         d.PE = resendReq->PE;
2632         d.listObjects = (TProcessedLog *)listObjects;
2633         d.maxTickets = new MCount[d.numberObjects];
2634         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2635         for(int i=0;i<d.numberObjects;i++){
2636                 d.maxTickets[i] = 0;
2637         }
2638
2639         //Check if any of the retained objects need to be recreated
2640         //If they have not been recreated on the restarted processor
2641         //they need to be recreated on this processor
2642         int count=0;
2643         for(int i=0;i<retainedObjectList.size();i++){
2644                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2645                         count++;
2646                         int recreate=1;
2647                         for(int j=0;j<d.numberObjects;j++){
2648                                 if(d.listObjects[j].recver.type != TypeArray ){
2649                                         continue;
2650                                 }
2651                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2652                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2653                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2654                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2655                                                 recreate = 0;
2656                                                 break;
2657                                         }
2658                                 }
2659                         }
2660                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2661                         if(recreate){
2662                                 donotCountMigration=1;
2663                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2664                                 donotCountMigration=0;
2665                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2666                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2667                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2668                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2669                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2670                                 CmiAssert(rec->type() == CkLocRec::local);
2671                                 CkVec<CkMigratable *> eltList;
2672                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2673                                 for(int j=0;j<eltList.size();j++){
2674                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2675                                                 CpvAccess(_currentObj) = eltList[j];
2676                                                 eltList[j]->ResumeFromSync();
2677                                         }
2678                                 }
2679                                 retainedObjectList[i]->msg=NULL;        
2680                         }
2681                 }
2682         }
2683         
2684         if(count > 0){
2685 //              CmiAbort("retainedObjectList for restarted processor not empty");
2686         }
2687         
2688         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2689
2690
2691         //TML: examines the origin processor to determine if it belongs to the same group.
2692         // In that case, it only returns the maximum ticket received for each object in the list.
2693         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2694                 forAllCharesDo(fillTicketForChare,&d);
2695         else
2696                 forAllCharesDo(resendMessageForChare,&d);
2697
2698         //send back the maximum ticket number for a message sent to each object on the 
2699         //restarted processor
2700         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2701         
2702         int totalTNStored=0;
2703         for(int i=0;i<d.numberObjects;i++){
2704                 totalTNStored += d.ticketVecs[i].size();
2705         }
2706         
2707         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2708         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2709         
2710         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2711         resendReply->PE = CkMyPe();
2712         resendReply->numberObjects = d.numberObjects;
2713         
2714         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2715         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2716         for(int i=0;i<d.numberObjects;i++){
2717                 replyObjects[i] = d.listObjects[i].recver;
2718         }
2719         
2720         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2721         for(int i=0;i<d.numberObjects;i++){
2722                 int vecsize = d.ticketVecs[i].size();
2723                 memcpy(ticketList,&vecsize,sizeof(int));
2724                 ticketList = &ticketList[sizeof(int)];
2725                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2726                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2727         }       
2728
2729         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2730         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2731         
2732 /*      
2733         if(verifyAckRequestsUnacked){
2734                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2735                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2736                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2737                         LDObjHandle h;
2738                         lb->Migrated(h,1);
2739                 }
2740         }
2741         
2742         verifyAckRequestsUnacked=0;*/
2743         
2744         delete [] d.maxTickets;
2745         delete [] d.ticketVecs;
2746         if(resendReq->PE != CkMyPe()){
2747                 CmiFree(msg);
2748         }       
2749 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2750         lastRestart = CmiWallTimer();
2751 }
2752
2753 void sortVec(CkVec<MCount> *TNvec);
2754 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2755
2756 /**
2757  * @brief Receives the tickets assigned to message to other objects.
2758  */
2759 void _resendReplyHandler(char *msg){    
2760         /**
2761                 need to rewrite this method to deal with parallel restart
2762         */
2763         ResendRequest *resendReply = (ResendRequest *)msg;
2764         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2765
2766         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2767         
2768 //      DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2769         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2770         for(int i =0; i< resendReply->numberObjects;i++){       
2771                 Chare *obj = (Chare *)listObjects[i].getObject();
2772                 
2773                 int vecsize;
2774                 memcpy(&vecsize,listTickets,sizeof(int));
2775                 listTickets = &listTickets[sizeof(int)];
2776                 MCount *listTNs = (MCount *)listTickets;        
2777                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2778                 
2779                 if(obj != NULL){
2780                         //the object was restarted on the processor on which it existed
2781                         processReceivedTN(obj,vecsize,listTNs);
2782                 }else{
2783                 //pack up objID vecsize and listTNs and send it to the correct processor
2784                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2785                         char *TNMsg = (char *)CmiAlloc(totalSize);
2786                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2787                         receivedTNData->recver = listObjects[i];
2788                         receivedTNData->numTNs = vecsize;
2789                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2790                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2791
2792                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2793                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2794                 }       
2795         }
2796 };
2797
2798 void _receivedTNDataHandler(ReceivedTNData *msg){
2799         char objName[100];
2800         Chare *obj = (Chare *) msg->recver.getObject();
2801         if(obj){                
2802                 char *_msg = (char *)msg;
2803                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2804                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2805                 processReceivedTN(obj,msg->numTNs,listTNs);
2806         }else{
2807                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2808                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2809         }
2810 };
2811
2812 /**
2813  * @brief Processes the received list of tickets from a particular PE.
2814  */
2815 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2816         // increases the number of resendReply received
2817         obj->mlogData->resendReplyRecvd++;
2818
2819         CkPrintf("[%d] processReceivedTN with %d listSize",CkMyPe(),listSize);  
2820         // includes the tickets into the receivedTN structure
2821         for(int j=0;j<listSize;j++){
2822                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2823         }
2824         
2825         //if this object has received all the replies find the ticket numbers
2826         //that senders know about. Those less than the ticket number processed 
2827         //by the receiver can be thrown away. The rest need not be consecutive
2828         // ie there can be holes in the list of ticket numbers seen by senders
2829         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2830                 obj->mlogData->resendReplyRecvd = 0;
2831                 //sort the received TNS
2832                 sortVec(obj->mlogData->receivedTNs);
2833         
2834                 //after all the received tickets are in we need to sort them and then 
2835                 // calculate the holes  
2836                 if(obj->mlogData->receivedTNs->size() > 0){
2837                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2838                         int vecsize = obj->mlogData->receivedTNs->size();
2839                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2840                         
2841                         // updating tCount with the highest ticket handed out
2842                         if(TEAM_SIZE_MLOG > 1){
2843                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2844                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2845                         }else{
2846                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2847                         }
2848                         
2849                         if(numberHoles == 0){
2850                         }else{
2851                                 char objName[100];                                      
2852                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2853                                 obj->mlogData->numberHoles = numberHoles;
2854                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2855                                 int countHoles=0;
2856                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2857                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2858                                                 //the TNs are not consecutive at this point
2859                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2860                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2861                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2862                                                         countHoles++;
2863                                                 }       
2864                                         }
2865                                 }
2866                                 //Holes have been given new TN
2867                                 if(countHoles != numberHoles){
2868                                         char str[100];
2869                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2870                                 }
2871                                 CkAssert(countHoles == numberHoles);                                    
2872                                 obj->mlogData->currentHoles = numberHoles;
2873                         }
2874                 }
2875         
2876                 // cleaning up structures and getting ready to continue execution       
2877                 delete obj->mlogData->receivedTNs;
2878                 obj->mlogData->receivedTNs = NULL;
2879                 obj->mlogData->restartFlag = 0;
2880
2881                 DEBUGRESTART(char objString[100]);
2882                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2883         }
2884
2885 }
2886
2887
2888 void sortVec(CkVec<MCount> *TNvec){
2889         //sort it ->its bloddy bubble sort
2890         //TODO: use quicksort
2891         for(int i=0;i<TNvec->size();i++){
2892                 for(int j=i+1;j<TNvec->size();j++){
2893                         if((*TNvec)[j] < (*TNvec)[i]){
2894                                 MCount temp;
2895                                 temp = (*TNvec)[i];
2896                                 (*TNvec)[i] = (*TNvec)[j];
2897                                 (*TNvec)[j] = temp;
2898                         }
2899                 }
2900         }
2901         //make it unique .. since its sorted all equal units will be consecutive
2902         MCount *tempArray = new MCount[TNvec->size()];
2903         int     uniqueCount=-1;
2904         for(int i=0;i<TNvec->size();i++){
2905                 tempArray[i] = 0;
2906                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2907                         uniqueCount++;
2908                         tempArray[uniqueCount] = (*TNvec)[i];
2909                 }
2910         }
2911         uniqueCount++;
2912         TNvec->removeAll();
2913         for(int i=0;i<uniqueCount;i++){
2914                 TNvec->push_back(tempArray[i]);
2915         }
2916         delete [] tempArray;
2917 }       
2918
2919 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2920         if(TNVec->size() == 0){
2921                 return -1; //not found in an empty vec
2922         }
2923         //binary search to find 
2924         int left=0;
2925         int right = TNVec->size();
2926         int mid = (left +right)/2;
2927         while(searchTN != (*TNVec)[mid] && left < right){
2928                 if((*TNVec)[mid] > searchTN){
2929                         right = mid-1;
2930                 }else{
2931                         left = mid+1;
2932                 }
2933                 mid = (left + right)/2;
2934         }
2935         if(left < right){
2936                 //mid is the element to be returned
2937                 return mid;
2938         }else{
2939                 if(mid < TNVec->size() && mid >=0){
2940                         if((*TNVec)[mid] == searchTN){
2941                                 return mid;
2942                         }else{
2943                                 return -1;
2944                         }
2945                 }else{
2946                         return -1;
2947                 }
2948         }
2949 };
2950
2951
2952 /*
2953         Method to do parallel restart. Distribute some of the array elements to other processors.
2954         The problem is that we cant use to charm entry methods to do migration as it will get
2955         stuck in the protocol that is going to restart
2956 */
2957
2958 class ElementDistributor: public CkLocIterator{
2959         CkLocMgr *locMgr;
2960         int *targetPE;
2961         void pupLocation(CkLocation &loc,PUP::er &p){
2962                 CkArrayIndexMax idx=loc.getIndex();
2963                 CkGroupID gID = locMgr->ckGetGroupID();
2964                 p|gID;      // store loc mgr's GID as well for easier restore
2965                 p|idx;
2966                 p|loc;
2967         };
2968         public:
2969                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2970                 void addLocation(CkLocation &loc){
2971                         if(*targetPE == CkMyPe()){
2972                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2973                                 return;
2974                         }
2975                         
2976                         CkArrayIndexMax idx=loc.getIndex();
2977                         CkLocRec_local *rec = loc.getLocalRecord();
2978                         
2979                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2980                         idx.print();
2981                         
2982
2983                         //TODO: an element that is being moved should leave some trace behind so that
2984                         // the arraybroadcaster can forward messages to it
2985                         
2986                         //pack up this location and send it across
2987                         PUP::sizer psizer;
2988                         pupLocation(loc,psizer);
2989                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2990                         char *msg = (char *)CmiAlloc(totalSize);
2991                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2992                         PUP::toMem pmem(buf);
2993                         pmem.becomeDeleting();
2994                         pupLocation(loc,pmem);
2995                         
2996                         locMgr->setDuringMigration(CmiTrue);                    
2997                         delete rec;
2998                         locMgr->setDuringMigration(CmiFalse);                   
2999                         locMgr->inform(idx,*targetPE);
3000
3001                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
3002                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
3003
3004                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
3005                         //decide on the target processor for the next object
3006                         *targetPE = (*targetPE +1)%CkNumPes();
3007                 }
3008                 
3009 };
3010
3011 void distributeRestartedObjects(){
3012         int numGroups = CkpvAccess(_groupIDTable)->size();      
3013         int i;
3014         int targetPE=CkMyPe();
3015         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
3016 };
3017
3018 void _distributedLocationHandler(char *receivedMsg){
3019         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
3020         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
3021         PUP::fromMem pmem(buf);
3022         CkGroupID gID;
3023         CkArrayIndexMax idx;
3024         pmem |gID;
3025         pmem |idx;
3026         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3027         donotCountMigration=1;
3028         mgr->resume(idx,pmem);
3029         donotCountMigration=0;
3030         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3031         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
3032         idx.print();
3033
3034         CkLocRec *rec = mgr->elementRec(idx);
3035         CmiAssert(rec->type() == CkLocRec::local);
3036         
3037         CkVec<CkMigratable *> eltList;
3038         mgr->migratableList((CkLocRec_local *)rec,eltList);
3039         for(int i=0;i<eltList.size();i++){
3040                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
3041                         CpvAccess(_currentObj) = eltList[i];
3042                         eltList[i]->ResumeFromSync();
3043                 }
3044         }
3045         
3046         
3047 }
3048
3049
3050 /** this method is used to send messages to a restarted processor to tell
3051  * it that a particular expected object is not going to get to it */
3052 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
3053         DummyMigrationMsg buf;
3054         buf.flag = MLOG_OBJECT;
3055         buf.lbID = lbID;
3056         buf.mgrID = locMgrID;
3057         buf.idx = idx;
3058         buf.locationPE = locationPE;
3059         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3060         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3061 };
3062
3063
3064 /**this method is used by a restarted processor to tell other processors
3065  * that they are not going to receive these many objects.. just the count
3066  * not the objects themselves ***/
3067
3068 void sendDummyMigrationCounts(int *dummyCounts){
3069         DummyMigrationMsg buf;
3070         buf.flag = MLOG_COUNT;
3071         buf.lbID = globalLBID;
3072         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3073         for(int i=0;i<CmiNumPes();i++){
3074                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3075                         buf.count = dummyCounts[i];
3076                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3077                 }
3078         }
3079 }
3080
3081
3082 /** this handler is used to process a dummy migration msg.
3083  * it looks up the load balancer and calls migrated for it */
3084
3085 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3086         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3087         if(msg->flag == MLOG_OBJECT){
3088                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3089                 LDObjHandle h;
3090                 lb->Migrated(h,1);
3091         }
3092         if(msg->flag == MLOG_COUNT){
3093                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3094                 msg->count -= verifyAckedRequests;
3095                 for(int i=0;i<msg->count;i++){
3096                         LDObjHandle h;
3097                         lb->Migrated(h,1);
3098                 }
3099         }
3100         verifyAckedRequests=0;
3101         CmiFree(msg);
3102 };
3103
3104
3105
3106
3107
3108
3109
3110 /*****************************************************
3111         Implementation of a method that can be used to call
3112         any method on the ChareMlogData of all the chares on
3113         a processor currently
3114 ******************************************************/
3115
3116
3117 class ElementCaller :  public CkLocIterator {
3118 private:
3119         CkLocMgr *locMgr;
3120         MlogFn fnPointer;
3121         void *data;
3122 public:
3123         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3124                 locMgr = _locMgr;
3125                 fnPointer = _fnPointer;
3126                 data = _data;
3127         };
3128         void addLocation(CkLocation &loc){
3129                 CkVec<CkMigratable *> list;
3130                 CkLocRec_local *local = loc.getLocalRecord();
3131                 locMgr->migratableList (local,list);
3132                 for(int i=0;i<list.size();i++){
3133                         CkMigratable *migratableElement = list[i];
3134                         fnPointer(data,migratableElement->mlogData);
3135                 }
3136         }
3137 };
3138
3139 /**
3140  * Map function pointed by fnPointer over all the chares living in this processor.
3141  */
3142 void forAllCharesDo(MlogFn fnPointer,void *data){
3143         int numGroups = CkpvAccess(_groupIDTable)->size();
3144         for(int i=0;i<numGroups;i++){
3145                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3146                 fnPointer(data,obj->mlogData);
3147         }
3148         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3149         for(int i=0;i<numNodeGroups;i++){
3150                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3151                 fnPointer(data,obj->mlogData);
3152         }
3153         int i;
3154         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3155         
3156         
3157 };
3158
3159
3160 /******************************************************************
3161  Load Balancing
3162 ******************************************************************/
3163
3164 void initMlogLBStep(CkGroupID gid){
3165         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3166         countLBMigratedAway = 0;
3167         countLBToMigrate=0;
3168         onGoingLoadBalancing=1;
3169         migrationDoneCalled=0;
3170         checkpointBarrierCount=0;
3171         if(globalLBID.idx != 0){
3172                 CmiAssert(globalLBID.idx == gid.idx);
3173         }
3174         globalLBID = gid;
3175 }
3176
3177 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3178         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3179         
3180         resumeLbFnPtr = _fnPtr;
3181         centralLb = _centralLb;
3182         migrationDoneCalled = 1;
3183         if(countLBToMigrate == countLBMigratedAway){
3184                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3185                 startMlogCheckpoint(NULL,CmiWallTimer());       
3186         }
3187 };
3188
3189 void finishedCheckpointLoadBalancing(){
3190         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3191         CheckpointBarrierMsg msg;
3192         msg.fromPE = CmiMyPe();
3193         msg.checkpointCount = checkpointCount;
3194
3195         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3196         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3197         
3198 };
3199
3200
3201 void sendMlogLocation(int targetPE,envelope *env){
3202         void *_msg = EnvToUsr(env);
3203         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3204
3205
3206         int existing = 0;
3207         //if this object is already in the retainedobjectlust destined for this
3208         //processor it should not be sent
3209         
3210         for(int i=0;i<retainedObjectList.size();i++){
3211                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3212                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3213                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3214                         existing = 1;
3215                         break;
3216                 }
3217         }
3218
3219         if(existing){
3220                 return;
3221         }
3222         
3223         
3224         countLBToMigrate++;
3225         
3226         MigrationNotice migMsg;
3227         migMsg.migRecord.gID = msg->gid;
3228         migMsg.migRecord.idx = msg->idx;
3229         migMsg.migRecord.fromPE = CkMyPe();
3230         migMsg.migRecord.toPE =  targetPE;
3231         
3232         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3233         
3234         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3235         retainedObject->migRecord = migMsg.migRecord;
3236         retainedObject->acked  = 0;
3237         
3238         CkPackMessage(&env);
3239         
3240         migMsg.record = retainedObject;
3241         retainedObject->msg = env;
3242         int size = retainedObject->size = env->getTotalsize();
3243         
3244         retainedObjectList.push_back(retainedObject);
3245         
3246         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3247         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3248         
3249         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3250
3251 }
3252
3253 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3254         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3255         migratedNoticeList.push_back(msg->migRecord);
3256
3257         MigrationNoticeAck buf;
3258         buf.record = msg->record;
3259         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3260         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3261 }
3262
3263 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3264         
3265         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3266         retainedObject->acked = 1;
3267
3268         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3269         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3270
3271         //inform home about the new location of this object
3272         CkGroupID gID = retainedObject->migRecord.gID ;
3273         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3274         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3275         
3276         countLBMigratedAway++;
3277         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3278                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3279                 startMlogCheckpoint(NULL,CmiWallTimer());
3280         }
3281 };
3282
3283 void _receiveMlogLocationHandler(void *buf){
3284         envelope *env = (envelope *)buf;
3285         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3286         CkUnpackMessage(&env);
3287         void *_msg = EnvToUsr(env);
3288         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3289         CkGroupID gID= msg->gid;
3290         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3291         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3292         CpvAccess(_currentObj)=mgr;
3293         mgr->immigrate(msg);
3294 };
3295
3296
3297 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3298 /*      if(mlogData->objID.type == TypeArray){
3299                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3300         //      TODO: make sure later that atSync has been called and it needs 
3301         //      to be resumed from sync
3302         //
3303                 CpvAccess(_currentObj) = elt;
3304                 elt->ResumeFromSync();
3305         }*/
3306 }
3307
3308 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3309         if(checkpointBarrierCount == CmiNumPes()){
3310                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3311                 for(int i=0;i<CmiNumPes();i++){
3312                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3313                 }
3314         }
3315 }
3316
3317 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3318         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3319         if(msg->checkpointCount == checkpointCount){
3320                 checkpointBarrierCount++;
3321                 checkAndSendCheckpointBarrierAcks(msg);
3322         }else{
3323                 if(msg->checkpointCount-1 == checkpointCount){
3324                         checkpointBarrierCount++;
3325                         checkAndSendCheckpointBarrierAcks(msg);
3326                 }else{
3327                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3328                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3329                 }
3330         }
3331         CmiFree(msg);
3332 }
3333
3334 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3335         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3336         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3337         sendRemoveLogRequests();
3338         (*resumeLbFnPtr)(centralLb);
3339         CmiFree(msg);
3340 }
3341
3342 /**
3343         method that informs an array elements home processor of its current location
3344         It is a converse method to bypass the charm++ message logging framework
3345 */
3346
3347 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3348         double _startTime = CmiWallTimer();
3349         CurrentLocationMsg msg;
3350         msg.mgrID = locMgrID;
3351         msg.idx = idx;
3352         msg.locationPE = currentPE;
3353         msg.fromPE = CkMyPe();
3354
3355         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3356         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3357         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3358         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3359 }
3360
3361
3362 void _receiveLocationHandler(CurrentLocationMsg *data){
3363         double _startTime = CmiWallTimer();
3364         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3365         if(mgr == NULL){
3366                 CmiFree(data);
3367                 return;
3368         }
3369         CkLocRec *rec = mgr->elementNrec(data->idx);
3370         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3371         if(rec != NULL){
3372                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3373                         if(data->fromPE == data->locationPE){
3374                                 CmiAbort("Another processor has the same object");
3375                         }
3376                 }
3377         }
3378         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3379                 int targetPE = data->fromPE;
3380                 data->fromPE = CmiMyPe();
3381                 data->locationPE = CmiMyPe();
3382                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3383                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3384         }else{
3385                 mgr->inform(data->idx,data->locationPE);
3386         }
3387         CmiFree(data);
3388         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3389 }
3390
3391
3392
3393 void getGlobalStep(CkGroupID gID){
3394         LBStepMsg msg;
3395         int destPE = 0;
3396         msg.lbID = gID;
3397         msg.fromPE = CmiMyPe();
3398         msg.step = -1;
3399         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3400         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3401 };
3402
3403 void _getGlobalStepHandler(LBStepMsg *msg){
3404         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3405         msg->step = lb->step();
3406         CmiAssert(msg->fromPE != CmiMyPe());
3407         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3408         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3409         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3410 };
3411
3412 void _recvGlobalStepHandler(LBStepMsg *msg){
3413         
3414         restartDecisionNumber=msg->step;
3415         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3416         _updateHomeAckHandler(dummyAck);
3417 };
3418
3419 /**
3420  * @brief Function to wrap up performance information.
3421  */
3422 void _messageLoggingExit(){
3423 /*      if(CkMyPe() == 0){
3424                 if(countBuffered != 0){
3425                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3426                 }
3427
3428 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3429         }
3430         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3431         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3432
3433         //TML: printing some statistics for group approach
3434         //if(TEAM_SIZE_MLOG > 1)
3435                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3436
3437 }
3438
3439 /**
3440         The method for returning the actual object pointed to by an id
3441         If the object doesnot exist on the processor it returns NULL
3442 **/
3443
3444 void* CkObjID::getObject(){
3445         
3446                 switch(type){
3447                         case TypeChare: 
3448                                 return CkLocalChare(&data.chare.id);
3449                         case TypeMainChare:
3450                                 return CkLocalChare(&data.chare.id);
3451                         case TypeGroup:
3452         
3453                                 CkAssert(data.group.onPE == CkMyPe());
3454                                 return CkLocalBranch(data.group.id);
3455                         case TypeNodeGroup:
3456                                 CkAssert(data.group.onPE == CkMyNode());
3457                                 //CkLocalNodeBranch(data.group.id);
3458                                 {
3459                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3460                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3461                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3462         
3463                                         return retval;
3464                                 }       
3465                         case TypeArray:
3466                                 {
3467         
3468         
3469                                         CkArrayID aid(data.array.id);
3470         
3471                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3472         
3473                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
3474         
3475                                         return aProxy.ckLocal();
3476                                 }
3477                         default:
3478                                 CkAssert(0);
3479                 }
3480 }
3481
3482
3483 int CkObjID::guessPE(){
3484                 switch(type){
3485                         case TypeChare:
3486                         case TypeMainChare:
3487                                 return data.chare.id.onPE;
3488                         case TypeGroup:
3489                         case TypeNodeGroup:
3490                                 return data.group.onPE;
3491                         case TypeArray:
3492                                 {
3493                                         CkArrayID aid(data.array.id);
3494                                         if(aid.ckLocalBranch() == NULL){
3495                                                 return -1;
3496                                         }
3497                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
3498                                 }
3499                         default:
3500                                 CkAssert(0);
3501                 }
3502 };
3503
3504 char *CkObjID::toString(char *buf) const {
3505         
3506         switch(type){
3507                 case TypeChare:
3508                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3509                         break;
3510                 case TypeMainChare:
3511                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);       
3512                         break;
3513                 case TypeGroup:
3514                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);