491b3cc76483fdfa383299c4bdd5e97c27b1c4db
[charm.git] / src / ck-core / ckmessagelogging.C
1 #include "charm.h"
2 #include "ck.h"
3 #include "ckmessagelogging.h"
4 #include "queueing.h"
5 #include <sys/types.h>
6 #include <signal.h>
7 #include "CentralLB.h"
8
9 #ifdef _FAULT_MLOG_
10
11 //#define DEBUG(x)  if(_restartFlag) {x;}
12 #define DEBUG(x)  //x
13 #define DEBUGRESTART(x)  //x
14 #define DEBUGLB(x) // x
15
16 #define BUFFERED_LOCAL
17 #define BUFFERED_REMOTE
18
19 extern const char *idx2str(const CkArrayIndex &ind);
20 extern const char *idx2str(const ArrayElement *el);
21 const char *idx2str(const CkArrayIndexMax &ind){
22         return idx2str((const CkArrayIndex &)ind);
23 };
24
25 void getGlobalStep(CkGroupID gID);
26
27 bool fault_aware(CkObjID &recver);
28
29 int _restartFlag=0;
30 int restarted=0;
31
32 //GML: variables for measuring savings with groups in message logging
33 float MLOGFT_totalLogSize = 0.0;
34 float MLOGFT_totalMessages = 0.0;
35 float MLOGFT_totalObjects = 0.0;
36
37 //TODO: remove for perf runs
38 int countHashRefs=0; //count the number of gets
39 int countHashCollisions=0;
40
41
42 //#define CHECKPOINT_DISK
43 char *checkpointDirectory=".";
44 int unAckedCheckpoint=0;
45
46 int countLocal=0,countBuffered=0;
47 int countPiggy=0;
48 int countClearBufferedLocalCalls=0;
49
50 int countUpdateHomeAcks=0;
51
52 extern int chkptPeriod;
53 extern bool parallelRestart;
54
55 char *killFile;
56 int killFlag=0;
57 int restartingMlogFlag=0;
58 void readKillFile();
59 double killTime=0.0;
60 int checkpointCount=0;
61
62
63 CpvDeclare(Chare *,_currentObj);
64 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
65 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
66 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
67 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
68 CpvDeclare(Queue, _outOfOrderMessageQueue);
69 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
70 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
71 CpvDeclare(char **,_bufferedTicketRequests);
72 CpvDeclare(int *,_numBufferedTicketRequests);
73 CpvDeclare(char *,_bufferTicketReply);
74
75
76
77 static double adjustChkptPeriod=0.0; //in ms
78 static double nextCheckpointTime=0.0;//in seconds
79
80 double lastBufferedLocalMessageCopyTime;
81
82 int _maxBufferedMessages;
83 int _maxBufferedTicketRequests;
84 int BUFFER_TIME=2; // in ms
85
86
87 int _ticketRequestHandlerIdx;
88 int _ticketHandlerIdx;
89 int _localMessageCopyHandlerIdx;
90 int _localMessageAckHandlerIdx;
91 int _pingHandlerIdx;
92 int _bufferedLocalMessageCopyHandlerIdx;
93 int _bufferedLocalMessageAckHandlerIdx;
94 int _bufferedTicketRequestHandlerIdx;
95 int _bufferedTicketHandlerIdx;
96
97
98 char objString[100];
99 int _checkpointRequestHandlerIdx;
100 int _storeCheckpointHandlerIdx;
101 int _checkpointAckHandlerIdx;
102 int _getCheckpointHandlerIdx;
103 int _recvCheckpointHandlerIdx;
104 int _removeProcessedLogHandlerIdx;
105
106 int _verifyAckRequestHandlerIdx;
107 int _verifyAckHandlerIdx;
108 int _dummyMigrationHandlerIdx;
109
110
111 int     _getGlobalStepHandlerIdx;
112 int     _recvGlobalStepHandlerIdx;
113
114 int _updateHomeRequestHandlerIdx;
115 int _updateHomeAckHandlerIdx;
116 int _resendMessagesHandlerIdx;
117 int _resendReplyHandlerIdx;
118 int _receivedTNDataHandlerIdx;
119 int _distributedLocationHandlerIdx;
120
121
122 int verifyAckTotal;
123 int verifyAckCount;
124
125 int verifyAckedRequests=0;
126
127 RestartRequest *storedRequest;
128
129
130
131 int _falseRestart =0; /**
132                                                                                                         For testing on clusters we might carry out restarts on 
133                                                                                                         a porcessor without actually starting it
134                                                                                                         1 -> false restart
135                                                                                                         0 -> restart after an actual crash
136                                                                                                 */                                                                                                                              
137
138 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
139 int _lockNewTicket=0;
140
141
142 //Load balancing globals
143 int onGoingLoadBalancing=0;
144 void *centralLb;
145 void (*resumeLbFnPtr)(void *);
146 int _receiveMlogLocationHandlerIdx;
147 int _receiveMigrationNoticeHandlerIdx;
148 int _receiveMigrationNoticeAckHandlerIdx;
149 int _checkpointBarrierHandlerIdx;
150 int _checkpointBarrierAckHandlerIdx;
151
152 CkVec<MigrationRecord> migratedNoticeList;
153 CkVec<RetainedMigratedObject *> retainedObjectList;
154 int donotCountMigration=0;
155 int countLBMigratedAway=0;
156 int countLBToMigrate=0;
157 int migrationDoneCalled=0;
158 int checkpointBarrierCount=0;
159 int globalResumeCount=0;
160 CkGroupID globalLBID;
161 int restartDecisionNumber=-1;
162
163
164 double lastCompletedAlarm=0;
165 double lastRestart=0;
166
167
168 //update location globals
169 int _receiveLocationHandlerIdx;
170
171
172
173 // initialize message logging datastructures and register handlers
174 void _messageLoggingInit(){
175         //current object
176         CpvInitialize(Chare *,_currentObj);
177         
178         //registering handlers for message logging
179         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
180         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
181         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
182         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
183         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
184         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
185         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
186   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
187         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
188
189                 
190         //handlers for checkpointing
191         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
192         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
193         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
194         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
195
196
197         //handlers for restart
198         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
199         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
200         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
201         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
202         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
203         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
204         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
205         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
206         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
207         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
208         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
209
210         
211         //handlers for load balancing
212         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
213         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
214         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
215         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
216         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
217         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
218         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
219         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
220         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
221
222         
223         //handlers for updating locations
224         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
225         
226         //Cpv variables for message logging
227         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
228         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
229         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
230         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
231         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
232         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
233         CpvInitialize(Queue, _outOfOrderMessageQueue);
234         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
235         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
236         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
237         
238         CpvInitialize(char **,_bufferedTicketRequests);
239         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
240         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
241         for(int i=0;i<CkNumPes();i++){
242                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
243                 CpvAccess(_numBufferedTicketRequests)[i]=0;
244         }
245   CpvInitialize(char *,_bufferTicketReply);
246         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
247         
248 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
249         CcdCallFnAfter(retryTicketRequest,NULL,100);    
250         
251         
252         //Cpv variables for checkpoint
253         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
254         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
255         
256 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
257 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
258 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
259         if(CkMyPe() == 0){
260 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
261 #ifdef  BUFFERED_LOCAL
262                 if(CmiMyPe() == 0){
263                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
264                 }
265 #endif
266         }
267 #ifdef  BUFFERED_REMOTE
268         if(CmiMyPe() == 0){
269                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
270         }
271 #endif
272
273         traceRegisterUserEvent("Remove Logs", 20);
274         traceRegisterUserEvent("Ticket Request Handler", 21);
275         traceRegisterUserEvent("Ticket Handler", 22);
276         traceRegisterUserEvent("Local Message Copy Handler", 23);
277         traceRegisterUserEvent("Local Message Ack Handler", 24);        
278         traceRegisterUserEvent("Preprocess current message",25);
279         traceRegisterUserEvent("Preprocess past message",26);
280         traceRegisterUserEvent("Preprocess future message",27);
281         traceRegisterUserEvent("Checkpoint",28);
282         traceRegisterUserEvent("Checkpoint Store",29);
283         traceRegisterUserEvent("Checkpoint Ack",30);
284         
285         traceRegisterUserEvent("Send Ticket Request",31);
286         traceRegisterUserEvent("Generalticketrequest1",32);
287         traceRegisterUserEvent("TicketLogLocal",33);
288         traceRegisterUserEvent("next_ticket and SN",34);
289         traceRegisterUserEvent("Timeout for buffered remote messages",35);
290         traceRegisterUserEvent("Timeout for buffered local messages",36);
291         traceRegisterUserEvent("Inform Location Home",37);
292         traceRegisterUserEvent("Receive Location Handler",38);
293         
294         lastCompletedAlarm=CmiWallTimer();
295         lastRestart = CmiWallTimer();
296 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
297         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
298 //      printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
299 //      if(killFlag){
300 //              readKillFile();
301 //      }
302 }
303
304 void killLocal(void *_dummy,double curWallTime);        
305
306 void readKillFile(){
307         FILE *fp=fopen(killFile,"r");
308         if(!fp){
309                 return;
310         }
311         int proc;
312         double sec;
313         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
314                 if(proc == CkMyPe()){
315                         killTime = CmiWallTimer()+sec;
316                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
317                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
318                 }
319         }
320         fclose(fp);
321 }
322
323 void killLocal(void *_dummy,double curWallTime){
324         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
325         if(CmiWallTimer()<killTime-1){
326                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
327         }else{  
328                 kill(getpid(),SIGKILL);
329         }
330 }
331
332
333
334 /************************ Message logging methods ****************/
335
336 // send a ticket request to a group on a processor
337 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
338         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
339                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
340                 void *origMsg = EnvToUsr(env);
341                 for(int i=0;i<CmiNumPes();i++){
342                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
343                                 void *copyMsg = CkCopyMsg(&origMsg);
344                                 envelope *copyEnv = UsrToEnv(copyMsg);
345                                 copyEnv->SN=0;
346                                 copyEnv->TN=0;
347                                 copyEnv->sender.type = TypeInvalid;
348                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
349                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
350                         }
351                 }
352                 return;
353         }
354         CkObjID recver;
355         recver.type = TypeGroup;
356         recver.data.group.id = env->getGroupNum();
357         recver.data.group.onPE = destPE;
358 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
359                 CmiPrintStackTrace(0);
360         }*/
361         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
362 }
363
364 //send a ticket request to a nodegroup
365 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
366         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
367                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
368                 void *origMsg = EnvToUsr(env);
369                 for(int i=0;i<CmiNumNodes();i++){
370                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
371                                 void *copyMsg = CkCopyMsg(&origMsg);
372                                 envelope *copyEnv = UsrToEnv(copyMsg);
373                                 copyEnv->SN=0;
374                                 copyEnv->TN=0;
375                                 copyEnv->sender.type = TypeInvalid;
376                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
377                         }
378                 }
379                 return;
380         }
381         CkObjID recver;
382         recver.type = TypeNodeGroup;
383         recver.data.group.id = env->getGroupNum();
384         recver.data.group.onPE = destNode;
385         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
386 }
387
388 //send a ticket request to an array element
389 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
390         CkObjID recver;
391         recver.type = TypeArray;
392         recver.data.array.id = env->getsetArrayMgr();
393         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
394
395         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
396                 char recverString[100],senderString[100];
397                 
398                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
399         }
400
401         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
402 };
403
404 //a method to generate the actual ticket requests for groups,nodegroups or arrays
405 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
406         envelope *env = _env;
407         int resend=0; //is it a resend
408         char recverName[100];
409         double _startTime=CkWallTimer();
410         
411         if(CpvAccess(_currentObj) == NULL){
412 //              CkAssert(0);
413                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
414                 generalCldEnqueue(destPE,env,_infoIdx);
415                 return;
416         }
417         
418         if(env->sender.type == TypeInvalid){
419                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
420                 //Set message logging data in the envelope
421         }else{
422                 envelope *copyEnv = copyEnvelope(env);
423                 env = copyEnv;
424                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
425                 env->SN = 0;
426         }
427         
428         
429         CkObjID &sender = env->sender;
430         env->recver = recver;
431
432         Chare *obj = (Chare *)env->sender.getObject();
433           
434         if(env->SN == 0){
435                 env->SN = obj->mlogData->nextSN(recver);
436         }else{
437                 resend = 1;
438         }
439         
440         
441         char senderString[100];
442 //      if(env->SN != 1){
443                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
444         //      CmiPrintStackTrace(0);
445 /*      }else{
446                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
447         }*/
448                 
449         MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
450 //      CkPackMessage(&(mEntry->env));
451 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
452         
453         
454
455         _startTime = CkWallTimer();
456         if(destPE == CkMyPe()){
457                 ticketLogLocalMessage(mEntry);
458         }else{
459                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
460 //              traceUserBracketEvent(31,_startTime,CkWallTimer());                     
461         }
462 }
463
464 //method that does the actual send by creating a ticketrequest filling it up and sending it
465 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
466         char recverString[100],senderString[100];
467         envelope *env = entry->env;
468         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
469 /*      envelope *env = entry->env;
470         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
471
472         Chare *obj = (Chare *)entry->env->sender.getObject();
473         if(!resend){
474                 //GML: only stores message if either it goes to this processor or to a processor in a different group
475                 if(CkMyPe() != destPE && CkMyPe()/GROUP_SIZE_MLOG != destPE/GROUP_SIZE_MLOG){
476                         obj->mlogData->addLogEntry(entry);
477                         MLOGFT_totalMessages += 1.0;
478                         MLOGFT_totalLogSize += entry->env->getTotalsize();
479                 }
480         }
481         
482
483 #ifdef BUFFERED_REMOTE
484         //buffer the ticket request 
485         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
486                 //first message to this processor, buffer needs to be created
487                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
488                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
489                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
490         }
491         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
492         //Buffer the ticketrequests
493         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
494         ticketRequest->sender = sender;
495         ticketRequest->recver = recver;
496         ticketRequest->logEntry = entry;
497         ticketRequest->SN = SN;
498         ticketRequest->senderPE = CkMyPe();
499
500         CpvAccess(_numBufferedTicketRequests)[destPE]++;
501         
502         
503         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
504                 sendBufferedTicketRequests(destPE);
505         }else{
506                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
507                         int *checkPE = new int;
508                         *checkPE = destPE;
509                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
510                 }
511         }
512 #else
513
514         TicketRequest ticketRequest;
515         ticketRequest.sender = sender;
516         ticketRequest.recver = recver;
517         ticketRequest.logEntry = entry;
518         ticketRequest.SN = SN;
519         ticketRequest.senderPE = CkMyPe();
520         
521         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
522 //      CmiBecomeImmediate(&ticketRequest);
523         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
524 #endif
525         DEBUG(CmiMemoryCheck());
526 };
527
528 /**
529  * Send the ticket requests buffered for processor PE
530  **/
531 void sendBufferedTicketRequests(int destPE){
532         DEBUG(CmiMemoryCheck());
533         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
534         if(numberRequests == 0){
535                 return;
536         }
537         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
538         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
539         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
540         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
541         header->numberLogs = numberRequests;
542         
543         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
544         CmiSyncSend(destPE,totalSize,(char *)buf);
545         
546         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
547         DEBUG(CmiMemoryCheck());
548 };
549
550 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
551         int destPE = *(int *)_destPE;
552   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
553                 sendBufferedTicketRequests(destPE);
554 //              traceUserEvent(35);
555         }
556         delete (int *)_destPE;
557         DEBUG(CmiMemoryCheck());
558 };
559
560
561
562
563 //get a ticket for a local message and then send a copy to the buddy
564 void ticketLogLocalMessage(MlogEntry *entry){
565         //this method is always in the main thread(not interrupt).. so it should 
566         //never find itself locked out of a newTicket
567 //      CkAssert(_lockNewTicket==0);
568         double _startTime=CkWallTimer();
569         
570 //      _lockNewTicket=1;
571         
572                 DEBUG(CmiMemoryCheck());
573         
574
575         Chare *recverObj = (Chare *)entry->env->recver.getObject();
576         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
577         if(recverObj){
578                 //Consider the case, after a restart when this message has already been allotted a ticket number
579                 // and should get the same one as the old one.
580                 Ticket ticket;
581                 if(recverObj->mlogData->mapTable.numObjects() > 0){
582                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
583                 }else{
584                         ticket.TN = 0;
585                 }
586                 
587                 char senderString[100], recverString[100] ;
588                 
589                 if(ticket.TN == 0){
590                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
591         
592                         if(ticket.TN == 0){
593                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
594                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
595                                 
596         //              _lockNewTicket = 0;
597 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
598                                 return;
599                         }
600                 }       
601                 //TODO: check for the case when an invalid ticket is returned
602                 //TODO: check for OLD or RECEIVED TICKETS
603                 entry->env->TN = ticket.TN;
604                 CkAssert(entry->env->TN > 0);
605                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
606                 
607
608                 sendLocalMessageCopy(entry);
609                 
610                 
611                 DEBUG(CmiMemoryCheck());
612                 entry->unackedLocal = 1;
613                 DEBUG(CmiMemoryCheck());
614                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
615                 DEBUG(CmiMemoryCheck());
616         }else{
617                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
618         }
619         _lockNewTicket=0;
620 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
621 };
622
623
624 void sendLocalMessageCopy(MlogEntry *entry){
625         LocalMessageLog msgLog;
626         msgLog.sender = entry->env->sender;
627         msgLog.recver = entry->env->recver;
628         msgLog.SN = entry->env->SN;
629         msgLog.TN = entry->env->TN;
630         msgLog.entry = entry;
631         msgLog.PE = CkMyPe();
632         
633         char recvString[100];
634         char senderString[100];
635         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
636
637 #ifdef BUFFERED_LOCAL
638         countLocal++;
639         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
640         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
641                 sendBufferedLocalMessageCopy();
642         }else{
643                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
644                         lastBufferedLocalMessageCopyTime = CkWallTimer();
645                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
646                         countClearBufferedLocalCalls++;
647                 }       
648         }
649 #else   
650         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
651         
652         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
653 #endif
654         DEBUG(CmiMemoryCheck());
655 };
656
657
658 void sendBufferedLocalMessageCopy(){
659         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
660         if(numberLogs == 0){
661                 return;
662         }
663         countBuffered++;
664         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
665         void *buf=CmiAlloc(totalSize);
666         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
667         header->numberLogs=numberLogs;
668
669         DEBUG(CmiMemoryCheck());
670         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
671         
672         char *ptr = (char *)buf;
673         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
674         
675         for(int i=0;i<numberLogs;i++){
676                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
677                 memcpy(ptr,&log,sizeof(LocalMessageLog));
678                 ptr = &ptr[sizeof(LocalMessageLog)];
679         }
680
681         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
682
683         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
684         DEBUG(CmiMemoryCheck());
685 };
686
687 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
688         countClearBufferedLocalCalls--;
689         if(countClearBufferedLocalCalls > 10){
690                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
691         }
692         DEBUG(CmiMemoryCheck());
693         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
694         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
695                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
696                         sendBufferedLocalMessageCopy();
697 //                      traceUserEvent(36);
698                 }
699         }
700         DEBUG(CmiMemoryCheck());
701 }
702
703 /****
704         The handler functions
705 *****/
706
707
708 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
709 /**
710  *  If there are any delayed requests, process them first before 
711  *  processing this request
712  * */
713 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
714         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
715         double  _startTime = CkWallTimer();
716         if(CpvAccess(_delayedTicketRequests)->length() > 0){
717                 retryTicketRequest(NULL,_startTime);
718         }
719         _processTicketRequest(ticketRequest);
720         CmiFree(ticketRequest);
721 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
722 }
723 /** Handler used for dealing with a bunch of ticket requests
724  * from one processor. The replies are also bunched together
725  * Does not use _ticketRequestHandler
726  * */
727 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
728         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
729         DEBUG(CmiMemoryCheck());
730         double _startTime = CkWallTimer();
731         if(CpvAccess(_delayedTicketRequests)->length() > 0){
732                 retryTicketRequest(NULL,_startTime);
733         }
734         DEBUG(CmiMemoryCheck());
735   int numRequests = recvdHeader->numberLogs;
736         char *msg = (char *)recvdHeader;
737         msg = &msg[sizeof(BufferedTicketRequestHeader)];
738         int senderPE=((TicketRequest *)msg)->senderPE;
739
740         
741         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
742         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
743         
744         char *ptr = (char *)buf;
745         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
746         header->numberLogs = 0;
747
748         DEBUG(CmiMemoryCheck());
749         
750         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
751         
752         for(int i=0;i<numRequests;i++){
753                 TicketRequest *request = (TicketRequest *)msg;
754                 msg = &msg[sizeof(TicketRequest)];
755                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
756
757                 if(replied){
758                         //the ticket request has been processed and 
759                         //the reply will be stored in the ptr
760                         header->numberLogs++;
761                         ptr = &ptr[sizeof(TicketReply)];
762                 }
763         }
764 /*      if(header->numberLogs == 0){
765                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
766         }*/
767
768         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
769         CmiSyncSend(senderPE,totalSize,(char *)buf);
770         CmiFree(recvdHeader);
771 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
772         DEBUG(CmiMemoryCheck());
773 };
774
775 /**Process the ticket request. 
776  * If it is processed and a reply is being sent 
777  * by this processor return true
778  * else return false.
779  * If a reply buffer is specified put the reply into that
780  * else send the reply
781  * */
782 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
783
784 /*      if(_lockNewTicket){
785                 printf("ddeded %d\n",CkMyPe());
786                 if(CmiIsImmediate(ticketRequest)){
787                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
788                 }
789                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
790                 
791         }else{
792                 _lockNewTicket = 1;
793         }*/
794
795         DEBUG(CmiMemoryCheck());
796         CkObjID sender = ticketRequest->sender;
797         CkObjID recver = ticketRequest->recver;
798         MCount SN = ticketRequest->SN;
799         
800         
801         Chare *recverObj = (Chare *)recver.getObject();
802         
803         
804         char recverName[100];
805         DEBUG(recver.toString(recverName);)
806         if(recverObj == NULL){
807                 int estPE = recver.guessPE();
808                 if(estPE == CkMyPe() || estPE == -1){           
809                         //try to fulfill the request after some time
810                         char senderString[100];
811                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
812                         if(estPE == CkMyPe() && recver.type == TypeArray){
813                                 CkArrayID aid(recver.data.array.id);            
814                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
815                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
816
817                         }
818                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
819                         *delayed = *ticketRequest;
820                         CpvAccess(_delayedTicketRequests)->enq(delayed);
821                         
822                 }else{
823                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
824                         TicketRequest forward = *ticketRequest;
825                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
826                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
827                         
828                         
829                 }
830         DEBUG(CmiMemoryCheck());
831                 return false; // if the receverObj does not exist the ticket request cannot have been 
832                               // processed successfully
833         }else{
834                 char senderString[100];
835                 Ticket ticket;
836                 //check if a ticket for this has been already handed out to an object that used to be local but 
837                 // is no longer so.. need for parallel restart
838                 
839                 if(recverObj->mlogData->mapTable.numObjects() > 0){
840                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
841                 }
842                 
843                 if(ticket.TN == 0){
844                         ticket = recverObj->mlogData->next_ticket(sender,SN);
845                 }
846                 if(ticket.TN > recverObj->mlogData->tProcessed){
847                         ticket.state = NEW_TICKET;
848                 }else{
849                         ticket.state = OLD_TICKET;
850                 }
851                 //TODO: check for the case when an invalid ticket is returned
852                 if(ticket.TN == 0){
853                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
854                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
855                         *delayed = *ticketRequest;
856                         CpvAccess(_delayedTicketRequests)->enq(delayed);
857                         return false;
858                 }
859 /*              if(ticket.TN < SN){ //error state this really should not happen
860                         recver.toString(recverName);
861                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
862                 }*/
863 //              CkAssert(ticket.TN >= SN);
864                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
865
866 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
867     if(reply == NULL){ 
868                         //There is no reply buffer and the ticketreply is going to be 
869                         //sent immediately
870                         TicketReply ticketReply;
871                         ticketReply.request = *ticketRequest;
872                         ticketReply.ticket = ticket;
873                         ticketReply.recverPE = CkMyPe();
874                 
875                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
876 //              CmiBecomeImmediate(&ticketReply);
877                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
878          }else{ // Store ticket reply in the buffer provided
879                  reply->request = *ticketRequest;
880                  reply->ticket = ticket;
881                  reply->recverPE = CkMyPe();
882                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
883                                                          // in case the ticket needs to be forwarded or something
884
885          }
886                 DEBUG(CmiMemoryCheck());
887                 return true;
888         }
889 //      _lockNewTicket=0;
890 };
891
892 inline void _ticketHandler(TicketReply *ticketReply){
893
894         double _startTime = CkWallTimer();
895         DEBUG(CmiMemoryCheck());
896         
897         
898         char senderString[100];
899         CkObjID sender = ticketReply->request.sender;
900         CkObjID recver = ticketReply->request.recver;
901         
902         if(sender.guessPE() != CkMyPe()){               
903                 DEBUG(CkAssert(sender.guessPE()>= 0));
904                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
905         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
906                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
907                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
908 #ifdef BUFFERED_REMOTE
909                 //will be freed by the buffered ticket handler most of the time
910                 //this might lead to a leak just after migration
911                 //when the ticketHandler is directly used without going through the buffered handler
912                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
913 #else
914                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
915 #endif  
916         }else{
917                 char recverName[100];
918                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
919                 MlogEntry *logEntry=NULL;
920                 if(ticketReply->ticket.state & FORWARDED_TICKET){
921                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
922                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
923                         Chare *senderObj = (Chare *)sender.getObject();
924                         if(senderObj){
925                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
926                                 for(int i=0;i<mlog->length();i++){
927                                         MlogEntry *tempEntry = (*mlog)[i];
928                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
929                                                 logEntry = tempEntry;
930                                                 break;
931                                         }
932                                 }
933                                 if(logEntry == NULL){
934 #ifdef BUFFERED_REMOTE
935 #else
936                                         CmiFree(ticketReply);
937 #endif                                  
938                                         return;
939                                 }
940                         }else{
941                                 CmiAbort("This processor thinks it should have the sender\n");
942                         }
943                         ticketReply->ticket.state ^= FORWARDED_TICKET;
944                 }else{
945                         logEntry = ticketReply->request.logEntry;
946                 }
947                 if(logEntry->env->TN <= 0){
948                         //This logEntry has not received a TN earlier
949                         char recverString[100];
950                         logEntry->env->TN = ticketReply->ticket.TN;
951                         logEntry->env->setSrcPe(CkMyPe());
952                         if(ticketReply->ticket.state == NEW_TICKET){
953                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
954                                 if(ticketReply->recverPE != CkMyPe()){
955                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
956                                 }else{
957                                         //It is now a local message use the local message protocol
958                                         sendLocalMessageCopy(logEntry);
959                                 }               
960                         }
961                 }else{
962                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
963                 }
964                 recver.updatePosition(ticketReply->recverPE);
965 #ifdef BUFFERED_REMOTE
966 #else
967                 CmiFree(ticketReply);
968 #endif
969         }
970         CmiMemoryCheck();
971 #ifdef BUFFERED_REMOTE
972 #else
973 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
974 #endif
975         
976 };
977
978 /**
979  * Message to handle the bunch of tickets 
980  * that we get from one processor. We send 
981  * the tickets to be handled one at a time
982  * */
983
984 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
985         double _startTime=CmiWallTimer();
986         int numTickets = recvdHeader->numberLogs;
987         char *msg = (char *)recvdHeader;
988         msg = &msg[sizeof(BufferedTicketRequestHeader)];
989         DEBUG(CmiMemoryCheck());
990         
991         TicketReply *_reply = (TicketReply *)msg;
992
993         
994         for(int i=0;i<numTickets;i++){
995                 TicketReply *reply = (TicketReply *)msg;
996                 _ticketHandler(reply);
997                 
998                 msg = &msg[sizeof(TicketReply)];
999         }
1000         
1001         CmiFree(recvdHeader);
1002 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1003         DEBUG(CmiMemoryCheck());
1004 };
1005
1006
1007 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1008         double _startTime = CkWallTimer();
1009         
1010         char senderString[100],recverString[100];
1011         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1012 /*      if(!fault_aware(msgLog->recver)){
1013                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1014         }*/
1015         CpvAccess(_localMessageLog)->enq(*msgLog);
1016         
1017         LocalMessageLogAck ack;
1018         ack.entry = msgLog->entry;
1019         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1020         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1021         CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
1022         
1023 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1024 };
1025
1026 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1027         double _startTime = CkWallTimer();
1028         DEBUG(CmiMemoryCheck());
1029         
1030         int numLogs = recvdHeader->numberLogs;
1031         char *msg = (char *)recvdHeader;
1032
1033         //piggy back the logs already stored on this processor
1034         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1035         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1036 /*      if(numPiggyLogs > 0){
1037                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1038                         CmiAssert(0);
1039                 }
1040         }*/
1041         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1042         
1043         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1044         void *buf = CmiAlloc(totalSize);
1045         char *ptr = (char *)buf;
1046         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1047         
1048         msg = &msg[sizeof(BufferedLocalLogHeader)];
1049         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1050
1051         DEBUG(CmiMemoryCheck());
1052         int PE;
1053         for(int i=0;i<numLogs;i++){
1054                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1055                 CpvAccess(_localMessageLog)->enq(*msgLog);
1056                 PE = msgLog->PE;
1057                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1058
1059                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1060                 ack->entry = msgLog->entry;
1061                 
1062                 msg = &msg[sizeof(LocalMessageLog)];
1063                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1064         }
1065         DEBUG(CmiMemoryCheck());
1066
1067         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1068         piggyHeader->numberLogs = numPiggyLogs;
1069         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1070         if(numPiggyLogs > 0){
1071                 countPiggy++;
1072         }
1073
1074         for(int i=0;i<numPiggyLogs;i++){
1075                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1076                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1077                 ptr = &ptr[sizeof(LocalMessageLog)];
1078         }
1079         DEBUG(CmiMemoryCheck());
1080         
1081         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1082         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1083                 
1084 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1085                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1086                         if(!fault_aware(localLogEntry.recver)){
1087                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1088                         }
1089         }*/
1090         if(freeHeader){
1091                 CmiFree(recvdHeader);
1092         }
1093         DEBUG(CmiMemoryCheck());
1094 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1095 }
1096
1097
1098 void _localMessageAckHandler(LocalMessageLogAck *ack){
1099         
1100         double _startTime = CkWallTimer();
1101         
1102         MlogEntry *entry = ack->entry;
1103         if(entry == NULL){
1104                 CkExit();
1105         }
1106         entry->unackedLocal = 0;
1107         envelope *env = entry->env;
1108         char recverName[100];
1109         char senderString[100];
1110         DEBUG(CmiMemoryCheck());
1111         
1112         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1113         if(env == NULL)
1114                 return;
1115         CkAssert(env->SN > 0);
1116         CkAssert(env->TN > 0);
1117         env->sender.toString(senderString);
1118         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1119         env->recver.toString(recverName);
1120
1121         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1122         
1123 /*      
1124         void *origMsg = EnvToUsr(env);
1125         void *copyMsg = CkCopyMsg(&origMsg);
1126         envelope *copyEnv = UsrToEnv(copyMsg);
1127         entry->env = UsrToEnv(origMsg);*/
1128
1129 //      envelope *copyEnv = copyEnvelope(env);
1130
1131         envelope *copyEnv = env;
1132         copyEnv->localMlogEntry = entry;
1133
1134         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1135         if(CmiMyPe() != entry->destPE){
1136                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1137         }
1138 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1139         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1140         
1141         
1142 #ifdef BUFFERED_LOCAL
1143 #else
1144         CmiFree(ack);
1145 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1146 #endif
1147         
1148         DEBUG(CmiMemoryCheck());
1149         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1150 }
1151
1152
1153 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1154
1155         double _startTime=CkWallTimer();
1156         DEBUG(CmiMemoryCheck());
1157
1158         int numLogs = recvdHeader->numberLogs;
1159         char *msg = (char *)recvdHeader;
1160         msg = &msg[sizeof(BufferedLocalLogHeader)];
1161
1162         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1163         
1164         for(int i=0;i<numLogs;i++){
1165                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1166                 _localMessageAckHandler(ack);
1167                 
1168                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1169         }
1170
1171         //deal with piggy backed local logs
1172         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1173         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1174         if(piggyHeader->numberLogs > 0){
1175                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1176         }
1177         
1178         CmiFree(recvdHeader);
1179         DEBUG(CmiMemoryCheck());
1180 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1181 }
1182
1183
1184
1185
1186
1187
1188 bool fault_aware(CkObjID &recver){
1189         switch(recver.type){
1190                 case TypeChare:
1191                         return false;
1192                 case TypeGroup:
1193                 case TypeNodeGroup:
1194                 case TypeArray:
1195                         return true;
1196                 default:
1197                         return false;
1198         }
1199 };
1200
1201 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1202         char recverString[100];
1203         char senderString[100];
1204         
1205         DEBUG(CmiMemoryCheck());
1206         CkObjID recver = env->recver;
1207         if(!fault_aware(recver))
1208                 return 1;
1209
1210
1211         Chare *obj = (Chare *)recver.getObject();
1212         *objPointer = obj;
1213         if(obj == NULL){
1214                 int possiblePE = recver.guessPE();
1215                 if(possiblePE != CkMyPe()){
1216                         int totalSize = env->getTotalsize();                    
1217                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1218                 }
1219                 return 0;
1220         }
1221
1222
1223         double _startTime = CkWallTimer();
1224 //env->sender.updatePosition(env->getSrcPe());
1225         if(env->TN == obj->mlogData->tProcessed+1){
1226                 //the message that needs to be processed now
1227                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1228                 // once we find a message that we can process we put back all the messages in the out of order queue
1229                 // back into the main scheduler queue. 
1230                 if(env->sender.guessPE() == CkMyPe()){
1231                         *logEntryPointer = env->localMlogEntry;
1232                 }
1233         DEBUG(CmiMemoryCheck());
1234                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1235                         void *qMsgPtr;
1236                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1237                         envelope *qEnv = (envelope *)qMsgPtr;
1238                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1239         DEBUG(CmiMemoryCheck());
1240                 }
1241 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1242                 //TODO: this might be a problem.. change made for leanMD
1243 //              CpvAccess(_currentObj) = obj;
1244         DEBUG(CmiMemoryCheck());
1245                 return 1;
1246         }
1247         if(env->TN <= obj->mlogData->tProcessed){
1248                 //message already processed
1249                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1250 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1251         DEBUG(CmiMemoryCheck());
1252                 return 0;
1253         }
1254         //message that needs to be processed in the future
1255
1256 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1257         //the message cant be processed now put it back in the out of order message Q, 
1258         //It will be transferred to the main queue later
1259         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1260 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1261         DEBUG(CmiMemoryCheck());
1262         
1263         return 0;
1264 }
1265
1266 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1267         char senderString[100];
1268         if(obj){
1269                 if(sender.guessPE() == CkMyPe()){
1270                         if(entry != NULL){
1271                                 entry->env = NULL;
1272                         }
1273                 }
1274                 obj->mlogData->tProcessed++;
1275 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1276                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1277 //              CpvAccess(_currentObj)= NULL;
1278         }
1279         DEBUG(CmiMemoryCheck());
1280 }
1281
1282 /***
1283         Helpers for the handlers and message logging methods
1284 ***/
1285
1286 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1287 //      double _startTime = CkWallTimer();
1288         if(env->recver.type != TypeNodeGroup){
1289         //This repeats a step performed in skipCldEnq for messages sent to
1290         //other processors. I do this here so that messages to local processors
1291         //undergo the same transformation.. It lets the resend be uniform for 
1292         //all messages
1293 //              CmiSetXHandler(env,CmiGetHandler(env));
1294                 _skipCldEnqueue(destPE,env,_infoIdx);
1295         }else{
1296                 _noCldNodeEnqueue(destPE,env);
1297         }
1298 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1299 }
1300 //extern "C" int CmiGetNonLocalLength();
1301 /** This method is used to retry the ticket requests
1302  * that had been queued up earlier
1303  * */
1304
1305 int calledRetryTicketRequest=0;
1306
1307 void retryTicketRequestTimer(void *_dummy,double _time){
1308                 calledRetryTicketRequest=0;
1309                 retryTicketRequest(_dummy,_time);
1310 }
1311
1312 void retryTicketRequest(void *_dummy,double curWallTime){       
1313         double start = CkWallTimer();
1314         DEBUG(CmiMemoryCheck());
1315         int length = CpvAccess(_delayedTicketRequests)->length();
1316         for(int i=0;i<length;i++){
1317                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1318                 if(ticketRequest){
1319                         char senderString[100],recverString[100];
1320                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1321                         DEBUG(CmiMemoryCheck());
1322                         _processTicketRequest(ticketRequest);
1323                   CmiFree(ticketRequest);
1324                         DEBUG(CmiMemoryCheck());
1325                 }       
1326         }       
1327         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1328                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1329                 ticketLogLocalMessage(entry);
1330         }
1331         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1332 //      int converse_qLength = CmiGetNonLocalLength();
1333         
1334 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1335
1336 /*      PingMsg pingMsg;
1337         pingMsg.PE = CkMyPe();
1338         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1339         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1340                 for(int i=0;i<CkNumPes();i++){
1341                         if(i != CkMyPe()){
1342                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1343                         }
1344                 }
1345         }*/     
1346         //TODO: change this back to 100
1347         if(calledRetryTicketRequest == 0){
1348                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1349                 calledRetryTicketRequest =1;
1350         }
1351         DEBUG(CmiMemoryCheck());
1352 }
1353
1354 void _pingHandler(CkPingMsg *msg){
1355         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1356         CmiFree(msg);
1357 }
1358
1359
1360 /*****************************************************************************
1361         Checkpointing methods..
1362                 Pack all the data on a processor and send it to the buddy periodically
1363                 Also used to throw away message logs
1364 *****************************************************************************/
1365 CkVec<TProcessedLog> processedTicketLog;
1366 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1367 void clearUpMigratedRetainedLists(int PE);
1368
1369 void checkpointAlarm(void *_dummy,double curWallTime){
1370         double diff = curWallTime-lastCompletedAlarm;
1371         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1372 /*      if(CkWallTimer()-lastRestart < 50){
1373                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1374                 return;
1375         }*/
1376         if(diff < ((chkptPeriod) - 2)){
1377                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1378                 return;
1379         }
1380         CheckpointRequest request;
1381         request.PE = CkMyPe();
1382         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1383         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1384 };
1385
1386 void _checkpointRequestHandler(CheckpointRequest *request){
1387         startMlogCheckpoint(NULL,CmiWallTimer());       
1388 }
1389
1390 void startMlogCheckpoint(void *_dummy,double curWallTime){
1391         double _startTime = CkWallTimer();
1392         checkpointCount++;
1393 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1394                 kill(getpid(),SIGKILL);
1395         }*/
1396         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1397                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1398         }
1399         PUP::sizer psizer;
1400         DEBUG(CmiMemoryCheck());
1401
1402         psizer | checkpointCount;
1403         
1404         CkPupROData(psizer);
1405         DEBUG(CmiMemoryCheck());
1406         CkPupGroupData(psizer);
1407         DEBUG(CmiMemoryCheck());
1408         CkPupNodeGroupData(psizer);
1409         DEBUG(CmiMemoryCheck());
1410         pupArrayElementsSkip(psizer,NULL);
1411         DEBUG(CmiMemoryCheck());
1412
1413         int dataSize = psizer.size();
1414         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1415         char *msg = (char *)CmiAlloc(totalSize);
1416         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1417         chkMsg->PE = CkMyPe();
1418         chkMsg->dataSize = dataSize;
1419
1420         
1421         char *buf = &msg[sizeof(CheckPointDataMsg)];
1422         PUP::toMem pBuf(buf);   
1423
1424         pBuf | checkpointCount;
1425         
1426         CkPupROData(pBuf);
1427         CkPupGroupData(pBuf);
1428         CkPupNodeGroupData(pBuf);
1429         pupArrayElementsSkip(pBuf,NULL);
1430
1431         unAckedCheckpoint=1;
1432         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1433         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1434         
1435         /*
1436                 Store the highest Ticket number processed for each chare on this processor
1437         */
1438         processedTicketLog.removeAll();
1439         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1440         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1441                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1442         }
1443
1444         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1445                 lastCompletedAlarm = curWallTime;
1446                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1447         }
1448         traceUserBracketEvent(28,_startTime,CkWallTimer());
1449 };
1450
1451 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1452         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1453         TProcessedLog logEntry;
1454         logEntry.recver = mlogData->objID;
1455         logEntry.tProcessed = mlogData->tProcessed;
1456         log->push_back(logEntry);
1457         char objString[100];
1458         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1459 }
1460
1461 class ElementPacker : public CkLocIterator {
1462 private:
1463         CkLocMgr *locMgr;
1464         PUP::er &p;
1465 public:
1466                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1467                 void addLocation(CkLocation &loc) {
1468                         CkArrayIndexMax idx=loc.getIndex();
1469                         CkGroupID gID = locMgr->ckGetGroupID();
1470                         p|gID;      // store loc mgr's GID as well for easier restore
1471                         p|idx;
1472                         p|loc;
1473     }
1474 };
1475
1476
1477 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1478         int numElements,i;
1479   int numGroups = CkpvAccess(_groupIDTable)->size();    
1480         if(!p.isUnpacking()){
1481                 numElements = CkCountArrayElements();
1482         }       
1483         p | numElements;
1484         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1485         if(!p.isUnpacking()){
1486                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1487         }else{
1488                 //Flush all recs of all LocMgrs before putting in new elements
1489 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1490                 for(int j=0;j<listsize;j++){
1491                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1492                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1493                                 listToSkip[j].idx.print();
1494                         }
1495                 }
1496                 
1497  printf("numElements = %d\n",numElements);
1498         
1499           for (int i=0; i<numElements; i++) {
1500                         CkGroupID gID;
1501                         CkArrayIndexMax idx;
1502                         p|gID;
1503             p|idx;
1504                         int flag=0;
1505                         int matchedIdx=0;
1506                         for(int j=0;j<listsize;j++){
1507                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1508                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1509                                                 matchedIdx = j;
1510                                                 flag = 1;
1511                                                 break;
1512                                         }
1513                                 }
1514                         }
1515                         if(flag == 1){
1516                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1517                         }else{
1518                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1519                         }
1520                                 
1521                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1522                         mgr->resume(idx,p,flag);
1523                         if(flag == 1){
1524                                 int homePE = mgr->homePe(idx);
1525                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1526                         }
1527           }
1528         }
1529 };
1530
1531
1532 void writeCheckpointToDisk(int size,char *chkpt){
1533         char fNameTemp[100];
1534         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1535         int fd = creat(fNameTemp,S_IRWXU);
1536         int ret = write(fd,chkpt,size);
1537         CkAssert(ret == size);
1538         close(fd);
1539         
1540         char fName[100];
1541         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1542         unlink(fName);
1543
1544         rename(fNameTemp,fName);
1545         
1546 }
1547
1548 //handler that receives the checkpoint from a processor
1549 //it stores it and acks it
1550 void _storeCheckpointHandler(char *msg){
1551         
1552         double _startTime=CkWallTimer();
1553                 
1554         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1555         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1556         
1557         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1558         
1559         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1560         if(oldChkpt != NULL){
1561                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1562                 CmiFree(oldmsg);
1563         }
1564         //turning off storing checkpoints
1565         
1566         int sendingPE = chkMsg->PE;
1567         
1568         CpvAccess(_storedCheckpointData)->buf = chkpt;
1569         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1570         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1571
1572 #ifdef CHECKPOINT_DISK
1573         //store the checkpoint on disk
1574         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1575         CpvAccess(_storedCheckpointData)->buf = NULL;
1576         CmiFree(msg);
1577 #endif
1578
1579         int count=0;
1580         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1581                 if(migratedNoticeList[j].fromPE == sendingPE){
1582                         migratedNoticeList[j].ackFrom = 1;
1583                 }else{
1584                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1585                 }
1586                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1587                         migratedNoticeList.remove(j);
1588                         count++;
1589                 }
1590                 
1591         }
1592         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1593         
1594         CheckPointAck ackMsg;
1595         ackMsg.PE = CkMyPe();
1596         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1597         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1598         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1599         
1600         
1601         
1602         traceUserBracketEvent(29,_startTime,CkWallTimer());
1603 };
1604
1605
1606 void sendRemoveLogRequests(){
1607         double _startTime = CkWallTimer();      
1608         //send out the messages asking senders to throw away message logs below a certain ticket number
1609         /*
1610                 The remove log request message looks like
1611                 |RemoveLogRequest||List of TProcessedLog|
1612         */
1613         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1614         char *requestMsg = (char *)CmiAlloc(totalSize);
1615         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1616         request->PE = CkMyPe();
1617         request->numberObjects = processedTicketLog.size();
1618         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1619         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1620         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1621         
1622         DEBUG(CmiMemoryCheck());
1623         for(int i=0;i<CkNumPes();i++){
1624                 CmiSyncSend(i,totalSize,requestMsg);
1625         }
1626         CmiFree(requestMsg);
1627
1628         clearUpMigratedRetainedLists(CmiMyPe());
1629         //TODO: clear ticketTable
1630         
1631         traceUserBracketEvent(30,_startTime,CkWallTimer());
1632         DEBUG(CmiMemoryCheck());
1633 }
1634
1635
1636 void _checkpointAckHandler(CheckPointAck *ackMsg){
1637         DEBUG(CmiMemoryCheck());
1638         unAckedCheckpoint=0;
1639         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1640         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1641         if(onGoingLoadBalancing){
1642                 onGoingLoadBalancing = 0;
1643                 finishedCheckpointLoadBalancing();
1644         }else{
1645                 sendRemoveLogRequests();
1646         }
1647         CmiFree(ackMsg);
1648         
1649 };
1650
1651 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1652         DEBUG(CmiMemoryCheck());
1653         CmiMemoryCheck();
1654         char *data = (char *)_data;
1655         RemoveLogRequest *request = (RemoveLogRequest *)data;
1656         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1657         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1658
1659         int count=0;
1660         for(int i=0;i<mlog->length();i++){
1661                 MlogEntry *logEntry = mlog->deq();
1662                 int match=0;
1663                 for(int j=0;j<request->numberObjects;j++){
1664                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1665                                 //this log Entry should be removed
1666                                 match = 1;
1667                                 break;
1668                         }
1669                 }
1670                 char senderString[100],recverString[100];
1671 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1672                 if(match){
1673                         count++;
1674                         delete logEntry;
1675                 }else{
1676                         mlog->enq(logEntry);
1677                 }
1678         }
1679         if(count > 0){
1680                 char nameString[100];
1681                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1682         }
1683         DEBUG(CmiMemoryCheck());
1684         CmiMemoryCheck();
1685 };
1686
1687 void _removeProcessedLogHandler(char *requestMsg){
1688         double start = CkWallTimer();
1689         forAllCharesDo(removeProcessedLogs,requestMsg);
1690         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1691         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1692         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1693         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1694         if(request->PE == getCheckPointPE()){
1695                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1696                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1697                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1698                 int count=0;
1699 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1700                 DEBUG(char nameString[100];)
1701                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1702                 DEBUG(})*/
1703                 for(int i=0;i<localQ->length();i++){
1704                         LocalMessageLog localLogEntry = (*localQ)[i];
1705                         if(!fault_aware(localLogEntry.recver)){
1706                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1707                         }
1708                         bool keep = true;
1709                         for(int j=0;j<request->numberObjects;j++){                              
1710                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1711                                         keep = false;
1712                                         break;
1713                                 }
1714                         }       
1715                         if(keep){
1716                                 tempQ->enq(localLogEntry);
1717                         }else{
1718                                 count++;
1719                         }
1720                 }
1721                 delete localQ;
1722                 CpvAccess(_localMessageLog) = tempQ;
1723                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1724         }
1725
1726         /*
1727                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1728         */
1729         CmiMemoryCheck();
1730         clearUpMigratedRetainedLists(request->PE);
1731         
1732         traceUserBracketEvent(20,start,CkWallTimer());
1733         CmiFree(requestMsg);    
1734 };
1735
1736
1737 void clearUpMigratedRetainedLists(int PE){
1738         int count=0;
1739         CmiMemoryCheck();
1740         
1741         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1742                 if(migratedNoticeList[j].toPE == PE){
1743                         migratedNoticeList[j].ackTo = 1;
1744                 }
1745                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1746                         migratedNoticeList.remove(j);
1747                         count++;
1748                 }
1749         }
1750         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1751         
1752         for(int j=retainedObjectList.size()-1;j>=0;j--){
1753                 if(retainedObjectList[j]->migRecord.toPE == PE){
1754                         RetainedMigratedObject *obj = retainedObjectList[j];
1755                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1756                         retainedObjectList.remove(j);
1757                         if(obj->msg != NULL){
1758                                 CmiMemoryCheck();
1759                                 CmiFree(obj->msg);
1760                         }
1761                         delete obj;
1762                 }
1763         }
1764 }
1765
1766 /***************************************************************
1767         Restart Methods and handlers
1768 ***************************************************************/        
1769
1770 /**
1771  * Function for restarting an object with message logging
1772  */
1773 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1774         printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1775         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1776         _restartFlag = 1;
1777         RestartRequest msg;
1778         msg.PE = CkMyPe();
1779         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1780         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1781 };
1782
1783 void CkMlogRestartDouble(void *,double){
1784         CkMlogRestart(NULL,NULL);
1785 };
1786
1787 //GML: restarting from local (group) failure
1788 void CkMlogRestartLocal(){
1789     CkMlogRestart(NULL,NULL);
1790 };
1791
1792
1793 void readCheckpointFromDisk(int size,char *buf){
1794         char fName[100];
1795         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1796
1797         int fd = open(fName,O_RDONLY);
1798         int count=0;
1799         while(count < size){
1800                 count += read(fd,&buf[count],size-count);
1801         }
1802         close(fd);
1803         
1804 };
1805
1806
1807 void sendCheckpointData();
1808
1809 void _getCheckpointHandler(RestartRequest *restartMsg){
1810         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1811         CkAssert(restartMsg->PE == storedChkpt->PE);
1812         storedRequest = restartMsg;
1813         
1814         verifyAckTotal = 0;
1815         for(int i=0;i<migratedNoticeList.size();i++){
1816                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1817 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1818                         if(migratedNoticeList[i].ackFrom == 0){
1819                                 //need to verify if the object actually exists .. it might not
1820                                 //have been acked but it might exist on it
1821                                 VerifyAckMsg msg;
1822                                 msg.migRecord = migratedNoticeList[i];
1823                                 msg.index = i;
1824                                 msg.fromPE = CmiMyPe();
1825                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1826                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1827                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1828                                 verifyAckTotal++;
1829                         }
1830                 }
1831         }
1832         
1833         if(verifyAckTotal == 0){
1834                 sendCheckpointData();
1835         }
1836         verifyAckCount = 0;
1837 }
1838
1839
1840 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1841         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1842         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1843         if(rec != NULL && rec->type() == CkLocRec::local){
1844                         //this location exists on this processor
1845                         //and needs to be removed       
1846                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1847                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1848                         
1849                         int localIdx = localRec->getLocalIndex();
1850                         LBDatabase *lbdb = localRec->getLBDB();
1851                         LDObjHandle ldHandle = localRec->getLdHandle();
1852                                 
1853                         locMgr->setDuringMigration(true);
1854                         
1855                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1856                         lbdb->UnregisterObj(ldHandle);
1857                         
1858                         locMgr->setDuringMigration(false);
1859                         
1860                         verifyAckedRequests++;
1861
1862         }
1863         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1864         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1865 };
1866
1867
1868 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1869         int index =     verifyReply->index;
1870         migratedNoticeList[index] = verifyReply->migRecord;
1871         verifyAckCount++;
1872         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1873         if(verifyAckCount == verifyAckTotal){
1874                 sendCheckpointData();
1875         }
1876 }
1877
1878
1879
1880 void sendCheckpointData(){      
1881         RestartRequest *restartMsg = storedRequest;
1882         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1883         int numMigratedAwayElements = migratedNoticeList.size();
1884         if(migratedNoticeList.size() != 0){
1885                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1886 //                      CkAssert(migratedNoticeList.size() == 0);
1887         }
1888         
1889         
1890         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1891         
1892         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1893         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1894         
1895         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
1896         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
1897         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1898         
1899         char *msg = (char *)CmiAlloc(totalSize);
1900         
1901         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1902         dataMsg->PE = CkMyPe();
1903         dataMsg->restartWallTime = CmiTimer();
1904         dataMsg->checkPointSize = storedChkpt->bufSize;
1905         
1906         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1907 //      dataMsg->numMigratedAwayElements = 0;
1908         
1909         dataMsg->numMigratedInElements = 0;
1910         dataMsg->migratedElementSize = 0;
1911         dataMsg->lbGroupID = globalLBID;
1912         /*msg layout 
1913                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1914                 Local MessageLog|
1915         */
1916         //store checkpoint data
1917         char *buf = &msg[sizeof(RestartProcessorData)];
1918
1919         if(dataMsg->numMigratedAwayElements != 0){
1920                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1921                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1922         }
1923         
1924
1925 #ifdef CHECKPOINT_DISK
1926         readCheckpointFromDisk(storedChkpt->bufSize,buf);
1927 #else   
1928         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1929 #endif
1930         buf = &buf[storedChkpt->bufSize];
1931
1932
1933         //store localmessage Log
1934         dataMsg->numLocalMessages = localMsgQ->length();
1935         for(int i=0;i<localMsgQ->length();i++){
1936                 if(!fault_aware(((*localMsgQ)[i]).recver )){
1937                         CmiAbort("Non fault aware localMsgQ");
1938                 }
1939                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
1940                 buf = &buf[sizeof(LocalMessageLog)];
1941         }
1942         
1943         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1944         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1945         CmiFree(restartMsg);
1946 };
1947
1948
1949 // this list is used to create a vector of the object ids of all
1950 //the chares on this processor currently and the highest TN processed by them 
1951 //the first argument is actually a CkVec<TProcessedLog> *
1952 void createObjIDList(void *data,ChareMlogData *mlogData){
1953         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1954         TProcessedLog entry;
1955         entry.recver = mlogData->objID;
1956         entry.tProcessed = mlogData->tProcessed;
1957         list->push_back(entry);
1958         char objString[100];
1959         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1960 }
1961
1962
1963 /**
1964  * Receives the checkpoint data from its buddy, restores the state of all the objects
1965  * and asks everyone else to update its home.
1966  */
1967 void _recvCheckpointHandler(char *_restartData){
1968         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1969         MigrationRecord *migratedAwayElements;
1970
1971         globalLBID = restartData->lbGroupID;
1972         
1973         restartData->restartWallTime *= 1000;
1974         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1975         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1976         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1977
1978         
1979         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1980         char *buf = &_restartData[sizeof(RestartProcessorData)];
1981         
1982         if(restartData->numMigratedAwayElements != 0){
1983                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1984                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1985                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1986                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1987         }
1988         
1989         PUP::fromMem pBuf(buf);
1990
1991         pBuf | checkpointCount;
1992
1993         CkPupROData(pBuf);
1994         CkPupGroupData(pBuf);
1995         CkPupNodeGroupData(pBuf);
1996 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1997         pupArrayElementsSkip(pBuf,NULL);
1998         CkAssert(pBuf.size() == restartData->checkPointSize);
1999         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2000         
2001         forAllCharesDo(initializeRestart,NULL);
2002         
2003         //store the restored local message log in a vector
2004         buf = &buf[restartData->checkPointSize];        
2005         for(int i=0;i<restartData->numLocalMessages;i++){
2006                 LocalMessageLog logEntry;
2007                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2008                 
2009                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2010                 if(recverObj!=NULL){
2011                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2012                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2013                         char senderString[100];
2014                         char recverString[100];
2015                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2016                 }else{
2017 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2018                 }
2019                 buf = &buf[sizeof(LocalMessageLog)];
2020         }
2021
2022         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2023
2024         CmiFree(_restartData);
2025         
2026         
2027         _initDone();
2028
2029         getGlobalStep(globalLBID);
2030         
2031         countUpdateHomeAcks = 0;
2032         RestartRequest updateHomeRequest;
2033         updateHomeRequest.PE = CmiMyPe();
2034         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2035         for(int i=0;i<CmiNumPes();i++){
2036                 if(i != CmiMyPe()){
2037                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2038                 }
2039         }
2040
2041 }
2042
2043 /**
2044  * Receives the updateHome ACKs from all other processors. Once everybody
2045  * has replied, it sends a request to resed the logged messages.
2046  */
2047 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2048
2049         CkPrintf("[%d] Updating Home Ack Handler\n",CkMyPe());
2050
2051         countUpdateHomeAcks++;
2052         CmiFree(updateHomeAck);
2053         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2054         if(countUpdateHomeAcks != CmiNumPes()){
2055                 return;
2056         }
2057
2058         // Send out the request to resend logged messages to all other processors
2059         CkVec<TProcessedLog> objectVec;
2060         forAllCharesDo(createObjIDList, (void *)&objectVec);
2061         int numberObjects = objectVec.size();
2062         
2063         /*
2064                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2065         */
2066         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2067         char *resendMsg = (char *)CmiAlloc(totalSize);
2068         
2069
2070         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2071         resendReq->PE =CkMyPe(); 
2072         resendReq->numberObjects = numberObjects;
2073         char *objList = &resendMsg[sizeof(ResendRequest)];
2074         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2075         
2076
2077         
2078
2079         /* test for parallel restart migrate away object**/
2080         if(parallelRestart){
2081                 distributeRestartedObjects();
2082                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2083         }
2084         
2085         /*      To make restart work for load balancing.. should only
2086         be used when checkpoint happens along with load balancing
2087         **/
2088 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2089
2090         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2091         CpvAccess(_currentObj) = lb;
2092         lb->ReceiveDummyMigration(restartDecisionNumber);
2093
2094         sleep(10);
2095         
2096         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2097         for(int i=0;i<CkNumPes();i++){
2098                 if(i != CkMyPe()){
2099                         CmiSyncSend(i,totalSize,resendMsg);
2100                 }       
2101         }
2102         _resendMessagesHandler(resendMsg);
2103         CmiFree(resendMsg);
2104 };
2105
2106 void initializeRestart(void *data,ChareMlogData *mlogData){
2107         mlogData->resendReplyRecvd = 0;
2108         mlogData->receivedTNs = new CkVec<MCount>;
2109         mlogData->restartFlag = 1;
2110         mlogData->restoredLocalMsgLog.removeAll();
2111         mlogData->mapTable.empty();
2112 };
2113
2114 /**
2115  * Updates the homePe of chare array elements.
2116  */
2117 void updateHomePE(void *data,ChareMlogData *mlogData){
2118         RestartRequest *updateRequest = (RestartRequest *)data;
2119         int PE = updateRequest->PE; //restarted PE
2120         //if this object is an array Element and its home is the restarted processor
2121         // the home processor needs to know its current location
2122         if(mlogData->objID.type == TypeArray){
2123                 //it is an array element
2124                 CkGroupID myGID = mlogData->objID.data.array.id;
2125                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2126                 CkArrayID aid(mlogData->objID.data.array.id);           
2127                 //check if the restarted processor is the home processor for this object
2128                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2129                 if(locMgr->homePe(myIdx) == PE){
2130                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2131                         DEBUGRESTART(myIdx.print());
2132                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2133                 }
2134         }
2135 };
2136
2137
2138 /**
2139  * Updates the homePe for all chares in this processor.
2140  */
2141 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2142
2143         CkPrintf("[%d] ---------------->HERE\n",CkMyPe());
2144         
2145         int sender = updateRequest->PE;
2146         
2147         forAllCharesDo(updateHomePE,updateRequest);
2148         
2149         updateRequest->PE = CmiMyPe();
2150         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2151         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2152         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2153                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2154                 checkpointCount--;
2155                 startMlogCheckpoint(NULL,0);
2156         }
2157         if(sender == getCheckPointPE()){
2158                 for(int i=0;i<retainedObjectList.size();i++){
2159                         if(retainedObjectList[i]->acked == 0){
2160                                 MigrationNotice migMsg;
2161                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2162                                 migMsg.record = retainedObjectList[i];
2163                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2164                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2165                         }
2166                 }
2167         }
2168 }
2169
2170
2171 //the data argument is of type ResendData which contains the 
2172 //array of objects on  the restartedProcessor
2173 //this method resends the messages stored in this chare's message log 
2174 //to the restarted processor. It also accumulates the maximum TN
2175 //for all the objects on the restarted processor
2176 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2177         char nameString[100];
2178         ResendData *resendData = (ResendData *)data;
2179         int PE = resendData->PE; //restarted PE
2180         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2181         int count=0;
2182         int ticketRequests=0;
2183         CkQ<MlogEntry *> *log = mlogData->getMlog();
2184
2185         
2186         
2187         
2188         for(int i=0;i<log->length();i++){
2189                 MlogEntry *logEntry = (*log)[i];
2190                 
2191                 // if we sent out the logs of a local message to buddy and he crashed
2192                 //before acking
2193                 envelope *env = logEntry->env;
2194                 if(env == NULL){
2195                         continue;
2196                 }
2197                 if(logEntry->unackedLocal){
2198                         char recverString[100];
2199                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2200                         sendLocalMessageCopy(logEntry);
2201                 }
2202                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2203                 if(env->TN <= 0){
2204                         //ticket not yet replied send it out again
2205                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2206                 }
2207                 
2208                 if(env->recver.type != TypeInvalid){
2209                         int flag = 0;//marks if any of the restarted objects matched this log entry
2210                         for(int j=0;j<resendData->numberObjects;j++){
2211                                 if(env->recver == (resendData->listObjects)[j].recver){
2212                                         flag = 1;
2213                                         //message has a valid TN
2214                                         if(env->TN > 0){
2215                                                 //store maxTicket
2216                                                 if(env->TN > resendData->maxTickets[j]){
2217                                                         resendData->maxTickets[j] = env->TN;
2218                                                 }
2219                                                 //if the TN for this entry is more than the TN processed, send the message out
2220                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2221                                                         //store the TNs that have been since the recver last checkpointed
2222                                                         resendData->ticketVecs[j].push_back(env->TN);
2223                                                         
2224                                                         if(PE != CkMyPe()){
2225                                                                 if(env->recver.type == TypeNodeGroup){
2226                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2227                                                                 }else{
2228                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2229                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2230                                                                 }
2231                                                         }else{
2232                                                                 envelope *copyEnv = copyEnvelope(env);
2233                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2234                                                         }
2235                                                         char senderString[100];
2236                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2237                                                         count++;
2238                                                 }       
2239                                         }else{
2240 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2241                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2242                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2243                                                 CkAssert(logEntry->destPE != CkMyPe());
2244                                                 
2245                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2246                                                 
2247                                                 ticketRequests++;*/
2248                                         }
2249                                 }
2250                         }//end of for loop of objects
2251                         
2252                 }       
2253         }
2254         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2255 }
2256
2257 /**
2258  * Resends the messages since the last checkpoint to the list of objects included in the 
2259  * request.
2260  */
2261 void _resendMessagesHandler(char *msg){
2262         ResendRequest *resendReq = (ResendRequest *)msg;
2263
2264         //GML: examines the origin processor to determine if it belongs to the same group
2265         if(resendReq->PE != CkMyPe() && resendReq->PE/GROUP_SIZE_MLOG == CkMyPe()/GROUP_SIZE_MLOG){
2266                 //TODO: change the function call from restart to group-restart to avoid cyclic calls,
2267                 // for now it will work only with 1 failure
2268                 if(_restartFlag)
2269                         return;
2270                 CmiMemoryCheck();
2271                 CkPrintf("[%d] RESTART: same group\n",CkMyPe());
2272                 //HERE _resetNodeBocInitVec();
2273 /*      int numGroups = CkpvAccess(_groupIDTable)->size();
2274                 int i;
2275                 CKLOCMGR_LOOP(mgr->startInserting(););
2276                 CKLOCMGR_LOOP(mgr->flushAllRecs(););
2277 */
2278                 // rolls back to the previous checkpoint and sends a broadcast to resend messages to this processor
2279                 CkMlogRestartLocal();
2280                 return;
2281         }
2282
2283
2284         char *listObjects = &msg[sizeof(ResendRequest)];
2285         ResendData d;
2286         d.numberObjects = resendReq->numberObjects;
2287         d.PE = resendReq->PE;
2288         d.listObjects = (TProcessedLog *)listObjects;
2289         d.maxTickets = new MCount[d.numberObjects];
2290         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2291         for(int i=0;i<d.numberObjects;i++){
2292                 d.maxTickets[i] = 0;
2293         }
2294
2295         //Check if any of the retained objects need to be recreated
2296         //If they have not been recreated on the restarted processor
2297         //they need to be recreated on this processor
2298         int count=0;
2299         for(int i=0;i<retainedObjectList.size();i++){
2300                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2301                         count++;
2302                         int recreate=1;
2303                         for(int j=0;j<d.numberObjects;j++){
2304                                 if(d.listObjects[j].recver.type != TypeArray ){
2305                                         continue;
2306                                 }
2307                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2308                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2309                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2310                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2311                                                 recreate = 0;
2312                                                 break;
2313                                         }
2314                                 }
2315                         }
2316                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2317                         if(recreate){
2318                                 donotCountMigration=1;
2319                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2320                                 donotCountMigration=0;
2321                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2322                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2323                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2324
2325                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2326                                 
2327                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2328                                 CmiAssert(rec->type() == CkLocRec::local);
2329                                 CkVec<CkMigratable *> eltList;
2330                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2331                                 for(int j=0;j<eltList.size();j++){
2332                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2333                                                 CpvAccess(_currentObj) = eltList[j];
2334                                                 eltList[j]->ResumeFromSync();
2335                                         }
2336                                 }
2337
2338
2339                                 
2340                                 retainedObjectList[i]->msg=NULL;
2341                                 
2342                                         
2343                         }
2344                 }
2345         }
2346         
2347         if(count > 0){
2348 //              CmiAbort("retainedObjectList for restarted processor not empty");
2349         }
2350
2351
2352         
2353         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2354         forAllCharesDo(resendMessageForChare,&d);
2355
2356         //send back the maximum ticket number for a message sent to each object on the 
2357         //restarted processor
2358         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2359         
2360         int totalTNStored=0;
2361         for(int i=0;i<d.numberObjects;i++){
2362                 totalTNStored += d.ticketVecs[i].size();
2363         }
2364         
2365         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2366         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2367         
2368         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2369         resendReply->PE = CkMyPe();
2370         resendReply->numberObjects = d.numberObjects;
2371         
2372         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2373         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2374         for(int i=0;i<d.numberObjects;i++){
2375                 replyObjects[i] = d.listObjects[i].recver;
2376         }
2377         
2378         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2379         for(int i=0;i<d.numberObjects;i++){
2380                 int vecsize = d.ticketVecs[i].size();
2381                 memcpy(ticketList,&vecsize,sizeof(int));
2382                 ticketList = &ticketList[sizeof(int)];
2383                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2384                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2385         }       
2386
2387         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2388         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2389         
2390 /*      
2391         if(verifyAckRequestsUnacked){
2392                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2393                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2394                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2395                         LDObjHandle h;
2396                         lb->Migrated(h,1);
2397                 }
2398         }
2399         
2400         verifyAckRequestsUnacked=0;*/
2401
2402
2403         
2404         delete [] d.maxTickets;
2405         delete [] d.ticketVecs;
2406         if(resendReq->PE != CkMyPe()){
2407                 CmiFree(msg);
2408         }       
2409 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2410         lastRestart = CmiWallTimer();
2411 }
2412
2413 void sortVec(CkVec<MCount> *TNvec);
2414 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2415
2416 void _resendReplyHandler(char *msg){    
2417         /**
2418                 need to rewrite this method to deal with parallel restart
2419         */
2420         ResendRequest *resendReply = (ResendRequest *)msg;
2421         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2422
2423         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2424         
2425         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2426         for(int i =0; i< resendReply->numberObjects;i++){       
2427                 Chare *obj = (Chare *)listObjects[i].getObject();
2428                 
2429                 int vecsize;
2430                 memcpy(&vecsize,listTickets,sizeof(int));
2431                 listTickets = &listTickets[sizeof(int)];
2432                 MCount *listTNs = (MCount *)listTickets;
2433                 
2434                 
2435                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2436                 
2437                 if(obj != NULL){
2438                         //the object was restarted on the processor on which it existed
2439                         processReceivedTN(obj,vecsize,listTNs);
2440                 }else{
2441                 //pack up objID vecsize and listTNs and send it to the correct processor
2442                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2443                         char *TNMsg = (char *)CmiAlloc(totalSize);
2444                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2445                         receivedTNData->recver = listObjects[i];
2446                         receivedTNData->numTNs = vecsize;
2447                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2448                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2449
2450                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2451                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2452                 }
2453                 
2454         }
2455 };
2456
2457 void _receivedTNDataHandler(ReceivedTNData *msg){
2458         char objName[100];
2459         Chare *obj = (Chare *) msg->recver.getObject();
2460         if(obj){                
2461                 char *_msg = (char *)msg;
2462                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2463                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2464                 processReceivedTN(obj,msg->numTNs,listTNs);
2465         }else{
2466                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2467                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2468         }
2469 };
2470
2471
2472 void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
2473         obj->mlogData->resendReplyRecvd++;
2474
2475         
2476         for(int j=0;j<listSize;j++){
2477                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2478         }
2479         
2480         //if this object has received all the replies find the ticket numbers
2481         //that senders know about. Those less than the ticket number processed 
2482         //by the receiver can be thrown away. The rest need not be consecutive
2483         // ie there can be holes in the list of ticket numbers seen by senders
2484         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2485                 obj->mlogData->resendReplyRecvd = 0;
2486                 //sort the received TNS
2487                 sortVec(obj->mlogData->receivedTNs);
2488         
2489                 //after all the received tickets are in we need to sort them and then 
2490                 // calculate the holes
2491                 
2492                 if(obj->mlogData->receivedTNs->size() > 0){
2493                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2494                         int vecsize = obj->mlogData->receivedTNs->size();
2495                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2496                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2497                         if(numberHoles == 0){
2498                         }else{
2499                                 char objName[100];                                      
2500                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2501                                 obj->mlogData->numberHoles = numberHoles;
2502                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2503                                 int countHoles=0;
2504                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2505                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2506                                                 //the TNs are not consecutive at this point
2507                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2508                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2509                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2510                                                         countHoles++;
2511                                                 }       
2512                                         }
2513                                 }
2514                                 //Holes have been given new TN
2515                                 if(countHoles != numberHoles){
2516                                         char str[100];
2517                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2518                                 }
2519                                 CkAssert(countHoles == numberHoles);                                    
2520                                 obj->mlogData->currentHoles = numberHoles;
2521                         }
2522                 }       
2523                 
2524                 delete obj->mlogData->receivedTNs;
2525                 obj->mlogData->receivedTNs = NULL;
2526                 obj->mlogData->restartFlag = 0;
2527                 char objString[100];
2528                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2529         }
2530
2531 }
2532
2533
2534 void sortVec(CkVec<MCount> *TNvec){
2535         //sort it ->its bloddy bubble sort
2536         //TODO: use quicksort
2537         for(int i=0;i<TNvec->size();i++){
2538                 for(int j=i+1;j<TNvec->size();j++){
2539                         if((*TNvec)[j] < (*TNvec)[i]){
2540                                 MCount temp;
2541                                 temp = (*TNvec)[i];
2542                                 (*TNvec)[i] = (*TNvec)[j];
2543                                 (*TNvec)[j] = temp;
2544                         }
2545                 }
2546         }
2547         //make it unique .. since its sorted all equal units will be consecutive
2548         MCount *tempArray = new MCount[TNvec->size()];
2549         int     uniqueCount=-1;
2550         for(int i=0;i<TNvec->size();i++){
2551                 tempArray[i] = 0;
2552                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2553                         uniqueCount++;
2554                         tempArray[uniqueCount] = (*TNvec)[i];
2555                 }
2556         }
2557         uniqueCount++;
2558         TNvec->removeAll();
2559         for(int i=0;i<uniqueCount;i++){
2560                 TNvec->push_back(tempArray[i]);
2561         }
2562         delete [] tempArray;
2563 }       
2564
2565 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2566         if(TNVec->size() == 0){
2567                 return -1; //not found in an empty vec
2568         }
2569         //binary search to find 
2570         int left=0;
2571         int right = TNVec->size();
2572         int mid = (left +right)/2;
2573         while(searchTN != (*TNVec)[mid] && left < right){
2574                 if((*TNVec)[mid] > searchTN){
2575                         right = mid-1;
2576                 }else{
2577                         left = mid+1;
2578                 }
2579                 mid = (left + right)/2;
2580         }
2581         if(left < right){
2582                 //mid is the element to be returned
2583                 return mid;
2584         }else{
2585                 if(mid < TNVec->size() && mid >=0){
2586                         if((*TNVec)[mid] == searchTN){
2587                                 return mid;
2588                         }else{
2589                                 return -1;
2590                         }
2591                 }else{
2592                         return -1;
2593                 }
2594         }
2595 };
2596
2597
2598 /*
2599         Method to do parallel restart. Distribute some of the array elements to other processors.
2600         The problem is that we cant use to charm entry methods to do migration as it will get
2601         stuck in the protocol that is going to restart
2602 */
2603
2604 class ElementDistributor: public CkLocIterator{
2605         CkLocMgr *locMgr;
2606         int *targetPE;
2607         void pupLocation(CkLocation &loc,PUP::er &p){
2608                 CkArrayIndexMax idx=loc.getIndex();
2609                 CkGroupID gID = locMgr->ckGetGroupID();
2610                 p|gID;      // store loc mgr's GID as well for easier restore
2611                 p|idx;
2612                 p|loc;
2613         };
2614         public:
2615                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2616                 void addLocation(CkLocation &loc){
2617                         if(*targetPE == CkMyPe()){
2618                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2619                                 return;
2620                         }
2621                         
2622                         CkArrayIndexMax idx=loc.getIndex();
2623                         CkLocRec_local *rec = loc.getLocalRecord();
2624                         
2625                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2626                         idx.print();
2627                         
2628
2629                         //TODO: an element that is being moved should leave some trace behind so that
2630                         // the arraybroadcaster can forward messages to it
2631                         
2632                         //pack up this location and send it across
2633                         PUP::sizer psizer;
2634                         pupLocation(loc,psizer);
2635                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2636                         char *msg = (char *)CmiAlloc(totalSize);
2637                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2638                         PUP::toMem pmem(buf);
2639                         pmem.becomeDeleting();
2640                         pupLocation(loc,pmem);
2641                         
2642                         locMgr->setDuringMigration(CmiTrue);                    
2643                         delete rec;
2644                         locMgr->setDuringMigration(CmiFalse);                   
2645                         locMgr->inform(idx,*targetPE);
2646
2647                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2648                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2649
2650                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2651                         //decide on the target processor for the next object
2652                         *targetPE = (*targetPE +1)%CkNumPes();
2653                 }
2654                 
2655 };
2656
2657 void distributeRestartedObjects(){
2658         int numGroups = CkpvAccess(_groupIDTable)->size();      
2659         int i;
2660         int targetPE=CkMyPe();
2661         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2662 };
2663
2664 void _distributedLocationHandler(char *receivedMsg){
2665         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2666         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2667         PUP::fromMem pmem(buf);
2668         CkGroupID gID;
2669         CkArrayIndexMax idx;
2670         pmem |gID;
2671         pmem |idx;
2672         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2673         donotCountMigration=1;
2674         mgr->resume(idx,pmem);
2675         donotCountMigration=0;
2676         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2677         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2678         idx.print();
2679
2680         CkLocRec *rec = mgr->elementRec(idx);
2681         CmiAssert(rec->type() == CkLocRec::local);
2682         
2683         CkVec<CkMigratable *> eltList;
2684         mgr->migratableList((CkLocRec_local *)rec,eltList);
2685         for(int i=0;i<eltList.size();i++){
2686                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2687                         CpvAccess(_currentObj) = eltList[i];
2688                         eltList[i]->ResumeFromSync();
2689                 }
2690         }
2691         
2692         
2693 }
2694
2695
2696 /** this method is used to send messages to a restarted processor to tell
2697  * it that a particular expected object is not going to get to it */
2698 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2699         DummyMigrationMsg buf;
2700         buf.flag = MLOG_OBJECT;
2701         buf.lbID = lbID;
2702         buf.mgrID = locMgrID;
2703         buf.idx = idx;
2704         buf.locationPE = locationPE;
2705         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2706         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2707 };
2708
2709
2710 /**this method is used by a restarted processor to tell other processors
2711  * that they are not going to receive these many objects.. just the count
2712  * not the objects themselves ***/
2713
2714 void sendDummyMigrationCounts(int *dummyCounts){
2715         DummyMigrationMsg buf;
2716         buf.flag = MLOG_COUNT;
2717         buf.lbID = globalLBID;
2718         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2719         for(int i=0;i<CmiNumPes();i++){
2720                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2721                         buf.count = dummyCounts[i];
2722                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2723                 }
2724         }
2725 }
2726
2727
2728 /** this handler is used to process a dummy migration msg.
2729  * it looks up the load balancer and calls migrated for it */
2730
2731 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2732         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2733         if(msg->flag == MLOG_OBJECT){
2734                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2735                 LDObjHandle h;
2736                 lb->Migrated(h,1);
2737         }
2738         if(msg->flag == MLOG_COUNT){
2739                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2740                 msg->count -= verifyAckedRequests;
2741                 for(int i=0;i<msg->count;i++){
2742                         LDObjHandle h;
2743                         lb->Migrated(h,1);
2744                 }
2745         }
2746         verifyAckedRequests=0;
2747         CmiFree(msg);
2748 };
2749
2750
2751
2752
2753
2754
2755
2756 /*****************************************************
2757         Implementation of a method that can be used to call
2758         any method on the ChareMlogData of all the chares on
2759         a processor currently
2760 ******************************************************/
2761
2762
2763 class ElementCaller :  public CkLocIterator {
2764 private:
2765         CkLocMgr *locMgr;
2766         MlogFn fnPointer;
2767         void *data;
2768 public:
2769         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2770                 locMgr = _locMgr;
2771                 fnPointer = _fnPointer;
2772                 data = _data;
2773         };
2774         void addLocation(CkLocation &loc){
2775                 CkVec<CkMigratable *> list;
2776                 CkLocRec_local *local = loc.getLocalRecord();
2777                 locMgr->migratableList (local,list);
2778                 for(int i=0;i<list.size();i++){
2779                         CkMigratable *migratableElement = list[i];
2780                         fnPointer(data,migratableElement->mlogData);
2781                 }
2782         }
2783 };
2784
2785 void forAllCharesDo(MlogFn fnPointer,void *data){
2786         int numGroups = CkpvAccess(_groupIDTable)->size();
2787         for(int i=0;i<numGroups;i++){
2788                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2789                 fnPointer(data,obj->mlogData);
2790         }
2791         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2792         for(int i=0;i<numNodeGroups;i++){
2793                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2794                 fnPointer(data,obj->mlogData);
2795         }
2796         int i;
2797         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2798         
2799         
2800 };
2801
2802
2803 /******************************************************************
2804  Load Balancing
2805 ******************************************************************/
2806
2807 void initMlogLBStep(CkGroupID gid){
2808         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
2809         countLBMigratedAway = 0;
2810         countLBToMigrate=0;
2811         onGoingLoadBalancing=1;
2812         migrationDoneCalled=0;
2813         checkpointBarrierCount=0;
2814         if(globalLBID.idx != 0){
2815                 CmiAssert(globalLBID.idx == gid.idx);
2816         }
2817         globalLBID = gid;
2818 }
2819
2820 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
2821         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
2822         
2823         resumeLbFnPtr = _fnPtr;
2824         centralLb = _centralLb;
2825         migrationDoneCalled = 1;
2826         if(countLBToMigrate == countLBMigratedAway){
2827                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2828                 startMlogCheckpoint(NULL,CmiWallTimer());       
2829         }
2830 };
2831
2832 void finishedCheckpointLoadBalancing(){
2833         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
2834         CheckpointBarrierMsg msg;
2835         msg.fromPE = CmiMyPe();
2836         msg.checkpointCount = checkpointCount;
2837
2838         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
2839         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
2840         
2841 };
2842
2843
2844 void sendMlogLocation(int targetPE,envelope *env){
2845         void *_msg = EnvToUsr(env);
2846         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2847
2848
2849         int existing = 0;
2850         //if this object is already in the retainedobjectlust destined for this
2851         //processor it should not be sent
2852         
2853         for(int i=0;i<retainedObjectList.size();i++){
2854                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
2855                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
2856                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
2857                         existing = 1;
2858                         break;
2859                 }
2860         }
2861
2862         if(existing){
2863                 return;
2864         }
2865         
2866         
2867         countLBToMigrate++;
2868         
2869         MigrationNotice migMsg;
2870         migMsg.migRecord.gID = msg->gid;
2871         migMsg.migRecord.idx = msg->idx;
2872         migMsg.migRecord.fromPE = CkMyPe();
2873         migMsg.migRecord.toPE =  targetPE;
2874         
2875         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
2876         
2877         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
2878         retainedObject->migRecord = migMsg.migRecord;
2879         retainedObject->acked  = 0;
2880         
2881         CkPackMessage(&env);
2882         
2883         migMsg.record = retainedObject;
2884         retainedObject->msg = env;
2885         int size = retainedObject->size = env->getTotalsize();
2886         
2887         retainedObjectList.push_back(retainedObject);
2888         
2889         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2890         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2891         
2892         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
2893
2894 }
2895
2896 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
2897         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
2898         migratedNoticeList.push_back(msg->migRecord);
2899
2900         MigrationNoticeAck buf;
2901         buf.record = msg->record;
2902         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
2903         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
2904 }
2905
2906 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
2907         
2908         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
2909         retainedObject->acked = 1;
2910
2911         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
2912         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
2913
2914         //inform home about the new location of this object
2915         CkGroupID gID = retainedObject->migRecord.gID ;
2916         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2917         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
2918         
2919         countLBMigratedAway++;
2920         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
2921                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2922                 startMlogCheckpoint(NULL,CmiWallTimer());
2923         }
2924 };
2925
2926 void _receiveMlogLocationHandler(void *buf){
2927         envelope *env = (envelope *)buf;
2928         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
2929         CkUnpackMessage(&env);
2930         void *_msg = EnvToUsr(env);
2931         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2932         CkGroupID gID= msg->gid;
2933         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
2934         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2935         CpvAccess(_currentObj)=mgr;
2936         mgr->immigrate(msg);
2937 };
2938
2939
2940 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
2941 /*      if(mlogData->objID.type == TypeArray){
2942                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
2943         //      TODO: make sure later that atSync has been called and it needs 
2944         //      to be resumed from sync
2945         //
2946                 CpvAccess(_currentObj) = elt;
2947                 elt->ResumeFromSync();
2948         }*/
2949 }
2950
2951 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
2952         if(checkpointBarrierCount == CmiNumPes()){
2953                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
2954                 for(int i=0;i<CmiNumPes();i++){
2955                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
2956                 }
2957         }
2958 }
2959
2960 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
2961         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
2962         if(msg->checkpointCount == checkpointCount){
2963                 checkpointBarrierCount++;
2964                 checkAndSendCheckpointBarrierAcks(msg);
2965         }else{
2966                 if(msg->checkpointCount-1 == checkpointCount){
2967                         checkpointBarrierCount++;
2968                         checkAndSendCheckpointBarrierAcks(msg);
2969                 }else{
2970                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
2971                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
2972                 }
2973         }
2974         CmiFree(msg);
2975 }
2976
2977 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
2978         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
2979         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
2980         sendRemoveLogRequests();
2981         (*resumeLbFnPtr)(centralLb);
2982         CmiFree(msg);
2983 }
2984
2985 /**
2986         method that informs an array elements home processor of its current location
2987         It is a converse method to bypass the charm++ message logging framework
2988 */
2989
2990 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
2991         double _startTime = CmiWallTimer();
2992         CurrentLocationMsg msg;
2993         msg.mgrID = locMgrID;
2994         msg.idx = idx;
2995         msg.locationPE = currentPE;
2996         msg.fromPE = CkMyPe();
2997
2998         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
2999         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3000         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3001         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3002 }
3003
3004
3005 void _receiveLocationHandler(CurrentLocationMsg *data){
3006         double _startTime = CmiWallTimer();
3007         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3008         if(mgr == NULL){
3009                 CmiFree(data);
3010                 return;
3011         }
3012         CkLocRec *rec = mgr->elementNrec(data->idx);
3013         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3014         if(rec != NULL){
3015                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3016                         if(data->fromPE == data->locationPE){
3017                                 CmiAbort("Another processor has the same object");
3018                         }
3019                 }
3020         }
3021         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3022                 int targetPE = data->fromPE;
3023                 data->fromPE = CmiMyPe();
3024                 data->locationPE = CmiMyPe();
3025                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3026                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3027         }else{
3028                 mgr->inform(data->idx,data->locationPE);
3029         }
3030         CmiFree(data);
3031         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3032 }
3033
3034
3035
3036 void getGlobalStep(CkGroupID gID){
3037         LBStepMsg msg;
3038         int destPE = 0;
3039         msg.lbID = gID;
3040         msg.fromPE = CmiMyPe();
3041         msg.step = -1;
3042         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3043         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3044 };
3045
3046 void _getGlobalStepHandler(LBStepMsg *msg){
3047         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3048         msg->step = lb->step();
3049         CmiAssert(msg->fromPE != CmiMyPe());
3050         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3051         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3052         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3053 };
3054
3055 void _recvGlobalStepHandler(LBStepMsg *msg){
3056         
3057         restartDecisionNumber=msg->step;
3058         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3059         _updateHomeAckHandler(dummyAck);
3060 };
3061
3062
3063
3064
3065
3066
3067
3068
3069 void _messageLoggingExit(){
3070 /*      if(CkMyPe() == 0){
3071                 if(countBuffered != 0){
3072                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3073                 }
3074
3075 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3076         }
3077         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3078         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3079
3080         //GML: printing some statistics for group approach
3081         if(GROUP_SIZE_MLOG > 1)
3082                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3083
3084 }
3085 /**********************************
3086         * The methods of the message logging
3087         * data structure stored in each chare
3088         ********************************/
3089
3090 MCount ChareMlogData::nextSN(const CkObjID &recver){