Added check to make sure message is not NULL before processing it.
[charm.git] / src / ck-core / ckmessagelogging.C
1 #include "charm.h"
2 #include "ck.h"
3 #include "ckmessagelogging.h"
4 #include "queueing.h"
5 #include <sys/types.h>
6 #include <signal.h>
7 #include "CentralLB.h"
8
9 #ifdef _FAULT_MLOG_
10
11 //#define DEBUG(x)  if(_restartFlag) {x;}
12 #define DEBUG(x)  //x
13 #define DEBUGRESTART(x)  //x
14 #define DEBUGLB(x) // x
15
16 #define BUFFERED_LOCAL
17 #define BUFFERED_REMOTE
18
19 extern const char *idx2str(const CkArrayIndex &ind);
20 extern const char *idx2str(const ArrayElement *el);
21 const char *idx2str(const CkArrayIndexMax &ind){
22         return idx2str((const CkArrayIndex &)ind);
23 };
24
25 void getGlobalStep(CkGroupID gID);
26
27 bool fault_aware(CkObjID &recver);
28
29 int _restartFlag=0;
30 int restarted=0;
31
32 //GML: variables for measuring savings with groups in message logging
33 float MLOGFT_totalLogSize = 0.0;
34 float MLOGFT_totalMessages = 0.0;
35 float MLOGFT_totalObjects = 0.0;
36
37 //TODO: remove for perf runs
38 int countHashRefs=0; //count the number of gets
39 int countHashCollisions=0;
40
41
42 //#define CHECKPOINT_DISK
43 char *checkpointDirectory=".";
44 int unAckedCheckpoint=0;
45
46 int countLocal=0,countBuffered=0;
47 int countPiggy=0;
48 int countClearBufferedLocalCalls=0;
49
50 int countUpdateHomeAcks=0;
51
52 extern int chkptPeriod;
53 extern bool parallelRestart;
54
55 char *killFile;
56 int killFlag=0;
57 int restartingMlogFlag=0;
58 void readKillFile();
59 double killTime=0.0;
60 int checkpointCount=0;
61
62
63 CpvDeclare(Chare *,_currentObj);
64 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
65 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
66 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
67 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
68 CpvDeclare(Queue, _outOfOrderMessageQueue);
69 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
70 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
71 CpvDeclare(char **,_bufferedTicketRequests);
72 CpvDeclare(int *,_numBufferedTicketRequests);
73 CpvDeclare(char *,_bufferTicketReply);
74
75
76
77 static double adjustChkptPeriod=0.0; //in ms
78 static double nextCheckpointTime=0.0;//in seconds
79
80 double lastBufferedLocalMessageCopyTime;
81
82 int _maxBufferedMessages;
83 int _maxBufferedTicketRequests;
84 int BUFFER_TIME=2; // in ms
85
86
87 int _ticketRequestHandlerIdx;
88 int _ticketHandlerIdx;
89 int _localMessageCopyHandlerIdx;
90 int _localMessageAckHandlerIdx;
91 int _pingHandlerIdx;
92 int _bufferedLocalMessageCopyHandlerIdx;
93 int _bufferedLocalMessageAckHandlerIdx;
94 int _bufferedTicketRequestHandlerIdx;
95 int _bufferedTicketHandlerIdx;
96
97
98 char objString[100];
99 int _checkpointRequestHandlerIdx;
100 int _storeCheckpointHandlerIdx;
101 int _checkpointAckHandlerIdx;
102 int _getCheckpointHandlerIdx;
103 int _recvCheckpointHandlerIdx;
104 int _removeProcessedLogHandlerIdx;
105
106 int _verifyAckRequestHandlerIdx;
107 int _verifyAckHandlerIdx;
108 int _dummyMigrationHandlerIdx;
109
110
111 int     _getGlobalStepHandlerIdx;
112 int     _recvGlobalStepHandlerIdx;
113
114 int _updateHomeRequestHandlerIdx;
115 int _updateHomeAckHandlerIdx;
116 int _resendMessagesHandlerIdx;
117 int _resendReplyHandlerIdx;
118 int _receivedTNDataHandlerIdx;
119 int _distributedLocationHandlerIdx;
120
121
122 int verifyAckTotal;
123 int verifyAckCount;
124
125 int verifyAckedRequests=0;
126
127 RestartRequest *storedRequest;
128
129
130
131 int _falseRestart =0; /**
132                                                                                                         For testing on clusters we might carry out restarts on 
133                                                                                                         a porcessor without actually starting it
134                                                                                                         1 -> false restart
135                                                                                                         0 -> restart after an actual crash
136                                                                                                 */                                                                                                                              
137
138 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
139 int _lockNewTicket=0;
140
141
142 //Load balancing globals
143 int onGoingLoadBalancing=0;
144 void *centralLb;
145 void (*resumeLbFnPtr)(void *);
146 int _receiveMlogLocationHandlerIdx;
147 int _receiveMigrationNoticeHandlerIdx;
148 int _receiveMigrationNoticeAckHandlerIdx;
149 int _checkpointBarrierHandlerIdx;
150 int _checkpointBarrierAckHandlerIdx;
151
152 CkVec<MigrationRecord> migratedNoticeList;
153 CkVec<RetainedMigratedObject *> retainedObjectList;
154 int donotCountMigration=0;
155 int countLBMigratedAway=0;
156 int countLBToMigrate=0;
157 int migrationDoneCalled=0;
158 int checkpointBarrierCount=0;
159 int globalResumeCount=0;
160 CkGroupID globalLBID;
161 int restartDecisionNumber=-1;
162
163
164 double lastCompletedAlarm=0;
165 double lastRestart=0;
166
167
168 //update location globals
169 int _receiveLocationHandlerIdx;
170
171
172
173 // initialize message logging datastructures and register handlers
174 void _messageLoggingInit(){
175         //current object
176         CpvInitialize(Chare *,_currentObj);
177         
178         //registering handlers for message logging
179         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
180         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
181         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
182         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
183         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
184         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
185         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
186   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
187         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
188
189                 
190         //handlers for checkpointing
191         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
192         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
193         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
194         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
195
196
197         //handlers for restart
198         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
199         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
200         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
201         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
202         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
203         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
204         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
205         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
206         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
207         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
208         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
209
210         
211         //handlers for load balancing
212         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
213         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
214         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
215         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
216         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
217         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
218         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
219         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
220         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
221
222         
223         //handlers for updating locations
224         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
225         
226         //Cpv variables for message logging
227         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
228         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
229         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
230         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
231         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
232         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
233         CpvInitialize(Queue, _outOfOrderMessageQueue);
234         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
235         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
236         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
237         
238         CpvInitialize(char **,_bufferedTicketRequests);
239         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
240         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
241         for(int i=0;i<CkNumPes();i++){
242                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
243                 CpvAccess(_numBufferedTicketRequests)[i]=0;
244         }
245   CpvInitialize(char *,_bufferTicketReply);
246         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
247         
248 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
249         CcdCallFnAfter(retryTicketRequest,NULL,100);    
250         
251         
252         //Cpv variables for checkpoint
253         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
254         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
255         
256 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
257 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
258 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
259         if(CkMyPe() == 0){
260 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
261 #ifdef  BUFFERED_LOCAL
262                 if(CmiMyPe() == 0){
263                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
264                 }
265 #endif
266         }
267 #ifdef  BUFFERED_REMOTE
268         if(CmiMyPe() == 0){
269                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
270         }
271 #endif
272
273         traceRegisterUserEvent("Remove Logs", 20);
274         traceRegisterUserEvent("Ticket Request Handler", 21);
275         traceRegisterUserEvent("Ticket Handler", 22);
276         traceRegisterUserEvent("Local Message Copy Handler", 23);
277         traceRegisterUserEvent("Local Message Ack Handler", 24);        
278         traceRegisterUserEvent("Preprocess current message",25);
279         traceRegisterUserEvent("Preprocess past message",26);
280         traceRegisterUserEvent("Preprocess future message",27);
281         traceRegisterUserEvent("Checkpoint",28);
282         traceRegisterUserEvent("Checkpoint Store",29);
283         traceRegisterUserEvent("Checkpoint Ack",30);
284         
285         traceRegisterUserEvent("Send Ticket Request",31);
286         traceRegisterUserEvent("Generalticketrequest1",32);
287         traceRegisterUserEvent("TicketLogLocal",33);
288         traceRegisterUserEvent("next_ticket and SN",34);
289         traceRegisterUserEvent("Timeout for buffered remote messages",35);
290         traceRegisterUserEvent("Timeout for buffered local messages",36);
291         traceRegisterUserEvent("Inform Location Home",37);
292         traceRegisterUserEvent("Receive Location Handler",38);
293         
294         lastCompletedAlarm=CmiWallTimer();
295         lastRestart = CmiWallTimer();
296 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
297         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
298 //      printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
299 //      if(killFlag){
300 //              readKillFile();
301 //      }
302 }
303
304 void killLocal(void *_dummy,double curWallTime);        
305
306 void readKillFile(){
307         FILE *fp=fopen(killFile,"r");
308         if(!fp){
309                 return;
310         }
311         int proc;
312         double sec;
313         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
314                 if(proc == CkMyPe()){
315                         killTime = CmiWallTimer()+sec;
316                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
317                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
318                 }
319         }
320         fclose(fp);
321 }
322
323 void killLocal(void *_dummy,double curWallTime){
324         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
325         if(CmiWallTimer()<killTime-1){
326                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
327         }else{  
328                 kill(getpid(),SIGKILL);
329         }
330 }
331
332
333
334 /************************ Message logging methods ****************/
335
336 // send a ticket request to a group on a processor
337 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
338         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
339                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
340                 void *origMsg = EnvToUsr(env);
341                 for(int i=0;i<CmiNumPes();i++){
342                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
343                                 void *copyMsg = CkCopyMsg(&origMsg);
344                                 envelope *copyEnv = UsrToEnv(copyMsg);
345                                 copyEnv->SN=0;
346                                 copyEnv->TN=0;
347                                 copyEnv->sender.type = TypeInvalid;
348                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
349                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
350                         }
351                 }
352                 return;
353         }
354         CkObjID recver;
355         recver.type = TypeGroup;
356         recver.data.group.id = env->getGroupNum();
357         recver.data.group.onPE = destPE;
358 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
359                 CmiPrintStackTrace(0);
360         }*/
361         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
362 }
363
364 //send a ticket request to a nodegroup
365 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
366         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
367                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
368                 void *origMsg = EnvToUsr(env);
369                 for(int i=0;i<CmiNumNodes();i++){
370                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
371                                 void *copyMsg = CkCopyMsg(&origMsg);
372                                 envelope *copyEnv = UsrToEnv(copyMsg);
373                                 copyEnv->SN=0;
374                                 copyEnv->TN=0;
375                                 copyEnv->sender.type = TypeInvalid;
376                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
377                         }
378                 }
379                 return;
380         }
381         CkObjID recver;
382         recver.type = TypeNodeGroup;
383         recver.data.group.id = env->getGroupNum();
384         recver.data.group.onPE = destNode;
385         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
386 }
387
388 //send a ticket request to an array element
389 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
390         CkObjID recver;
391         recver.type = TypeArray;
392         recver.data.array.id = env->getsetArrayMgr();
393         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
394
395         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
396                 char recverString[100],senderString[100];
397                 
398                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
399         }
400
401         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
402 };
403
404 //a method to generate the actual ticket requests for groups,nodegroups or arrays
405 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
406         envelope *env = _env;
407         int resend=0; //is it a resend
408         char recverName[100];
409         double _startTime=CkWallTimer();
410         
411         if(CpvAccess(_currentObj) == NULL){
412 //              CkAssert(0);
413                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
414                 generalCldEnqueue(destPE,env,_infoIdx);
415                 return;
416         }
417         
418         if(env->sender.type == TypeInvalid){
419                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
420                 //Set message logging data in the envelope
421         }else{
422                 envelope *copyEnv = copyEnvelope(env);
423                 env = copyEnv;
424                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
425                 env->SN = 0;
426         }
427         
428         
429         CkObjID &sender = env->sender;
430         env->recver = recver;
431
432         Chare *obj = (Chare *)env->sender.getObject();
433           
434         if(env->SN == 0){
435                 env->SN = obj->mlogData->nextSN(recver);
436         }else{
437                 resend = 1;
438         }
439         
440         
441         char senderString[100];
442 //      if(env->SN != 1){
443                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
444         //      CmiPrintStackTrace(0);
445 /*      }else{
446                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
447         }*/
448                 
449         MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
450 //      CkPackMessage(&(mEntry->env));
451 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
452         
453         
454
455         _startTime = CkWallTimer();
456         if(destPE == CkMyPe()){
457                 ticketLogLocalMessage(mEntry);
458         }else{
459                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
460 //              traceUserBracketEvent(31,_startTime,CkWallTimer());                     
461         }
462 }
463
464 //method that does the actual send by creating a ticketrequest filling it up and sending it
465 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
466         char recverString[100],senderString[100];
467         envelope *env = entry->env;
468         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
469 /*      envelope *env = entry->env;
470         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
471
472         Chare *obj = (Chare *)entry->env->sender.getObject();
473         if(!resend){
474                 //GML: only stores message if either it goes to this processor or to a processor in a different group
475                 if(CkMyPe() != destPE && CkMyPe()/GROUP_SIZE_MLOG != destPE/GROUP_SIZE_MLOG){
476                         obj->mlogData->addLogEntry(entry);
477                         MLOGFT_totalMessages += 1.0;
478                         MLOGFT_totalLogSize += entry->env->getTotalsize();
479                 }
480         }
481         
482
483 #ifdef BUFFERED_REMOTE
484         //buffer the ticket request 
485         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
486                 //first message to this processor, buffer needs to be created
487                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
488                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
489                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
490         }
491         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
492         //Buffer the ticketrequests
493         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
494         ticketRequest->sender = sender;
495         ticketRequest->recver = recver;
496         ticketRequest->logEntry = entry;
497         ticketRequest->SN = SN;
498         ticketRequest->senderPE = CkMyPe();
499
500         CpvAccess(_numBufferedTicketRequests)[destPE]++;
501         
502         
503         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
504                 sendBufferedTicketRequests(destPE);
505         }else{
506                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
507                         int *checkPE = new int;
508                         *checkPE = destPE;
509                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
510                 }
511         }
512 #else
513
514         TicketRequest ticketRequest;
515         ticketRequest.sender = sender;
516         ticketRequest.recver = recver;
517         ticketRequest.logEntry = entry;
518         ticketRequest.SN = SN;
519         ticketRequest.senderPE = CkMyPe();
520         
521         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
522 //      CmiBecomeImmediate(&ticketRequest);
523         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
524 #endif
525         DEBUG(CmiMemoryCheck());
526 };
527
528 /**
529  * Send the ticket requests buffered for processor PE
530  **/
531 void sendBufferedTicketRequests(int destPE){
532         DEBUG(CmiMemoryCheck());
533         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
534         if(numberRequests == 0){
535                 return;
536         }
537         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
538         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
539         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
540         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
541         header->numberLogs = numberRequests;
542         
543         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
544         CmiSyncSend(destPE,totalSize,(char *)buf);
545         
546         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
547         DEBUG(CmiMemoryCheck());
548 };
549
550 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
551         int destPE = *(int *)_destPE;
552   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
553                 sendBufferedTicketRequests(destPE);
554 //              traceUserEvent(35);
555         }
556         delete (int *)_destPE;
557         DEBUG(CmiMemoryCheck());
558 };
559
560
561
562
563 //get a ticket for a local message and then send a copy to the buddy
564 void ticketLogLocalMessage(MlogEntry *entry){
565         //this method is always in the main thread(not interrupt).. so it should 
566         //never find itself locked out of a newTicket
567 //      CkAssert(_lockNewTicket==0);
568         double _startTime=CkWallTimer();
569         
570 //      _lockNewTicket=1;
571         
572                 DEBUG(CmiMemoryCheck());
573         
574
575         Chare *recverObj = (Chare *)entry->env->recver.getObject();
576         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
577         if(recverObj){
578                 //Consider the case, after a restart when this message has already been allotted a ticket number
579                 // and should get the same one as the old one.
580                 Ticket ticket;
581                 if(recverObj->mlogData->mapTable.numObjects() > 0){
582                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
583                 }else{
584                         ticket.TN = 0;
585                 }
586                 
587                 char senderString[100], recverString[100] ;
588                 
589                 if(ticket.TN == 0){
590                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
591         
592                         if(ticket.TN == 0){
593                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
594                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
595                                 
596         //              _lockNewTicket = 0;
597 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
598                                 return;
599                         }
600                 }       
601                 //TODO: check for the case when an invalid ticket is returned
602                 //TODO: check for OLD or RECEIVED TICKETS
603                 entry->env->TN = ticket.TN;
604                 CkAssert(entry->env->TN > 0);
605                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
606                 
607
608                 sendLocalMessageCopy(entry);
609                 
610                 
611                 DEBUG(CmiMemoryCheck());
612                 entry->unackedLocal = 1;
613                 DEBUG(CmiMemoryCheck());
614                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
615                 DEBUG(CmiMemoryCheck());
616         }else{
617                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
618         }
619         _lockNewTicket=0;
620 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
621 };
622
623
624 void sendLocalMessageCopy(MlogEntry *entry){
625         LocalMessageLog msgLog;
626         msgLog.sender = entry->env->sender;
627         msgLog.recver = entry->env->recver;
628         msgLog.SN = entry->env->SN;
629         msgLog.TN = entry->env->TN;
630         msgLog.entry = entry;
631         msgLog.PE = CkMyPe();
632         
633         char recvString[100];
634         char senderString[100];
635         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
636
637 #ifdef BUFFERED_LOCAL
638         countLocal++;
639         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
640         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
641                 sendBufferedLocalMessageCopy();
642         }else{
643                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
644                         lastBufferedLocalMessageCopyTime = CkWallTimer();
645                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
646                         countClearBufferedLocalCalls++;
647                 }       
648         }
649 #else   
650         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
651         
652         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
653 #endif
654         DEBUG(CmiMemoryCheck());
655 };
656
657
658 void sendBufferedLocalMessageCopy(){
659         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
660         if(numberLogs == 0){
661                 return;
662         }
663         countBuffered++;
664         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
665         void *buf=CmiAlloc(totalSize);
666         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
667         header->numberLogs=numberLogs;
668
669         DEBUG(CmiMemoryCheck());
670         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
671         
672         char *ptr = (char *)buf;
673         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
674         
675         for(int i=0;i<numberLogs;i++){
676                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
677                 memcpy(ptr,&log,sizeof(LocalMessageLog));
678                 ptr = &ptr[sizeof(LocalMessageLog)];
679         }
680
681         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
682
683         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
684         DEBUG(CmiMemoryCheck());
685 };
686
687 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
688         countClearBufferedLocalCalls--;
689         if(countClearBufferedLocalCalls > 10){
690                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
691         }
692         DEBUG(CmiMemoryCheck());
693         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
694         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
695                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
696                         sendBufferedLocalMessageCopy();
697 //                      traceUserEvent(36);
698                 }
699         }
700         DEBUG(CmiMemoryCheck());
701 }
702
703 /****
704         The handler functions
705 *****/
706
707
708 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
709 /**
710  *  If there are any delayed requests, process them first before 
711  *  processing this request
712  * */
713 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
714         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
715         double  _startTime = CkWallTimer();
716         if(CpvAccess(_delayedTicketRequests)->length() > 0){
717                 retryTicketRequest(NULL,_startTime);
718         }
719         _processTicketRequest(ticketRequest);
720         CmiFree(ticketRequest);
721 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
722 }
723 /** Handler used for dealing with a bunch of ticket requests
724  * from one processor. The replies are also bunched together
725  * Does not use _ticketRequestHandler
726  * */
727 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
728         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
729         DEBUG(CmiMemoryCheck());
730         double _startTime = CkWallTimer();
731         if(CpvAccess(_delayedTicketRequests)->length() > 0){
732                 retryTicketRequest(NULL,_startTime);
733         }
734         DEBUG(CmiMemoryCheck());
735   int numRequests = recvdHeader->numberLogs;
736         char *msg = (char *)recvdHeader;
737         msg = &msg[sizeof(BufferedTicketRequestHeader)];
738         int senderPE=((TicketRequest *)msg)->senderPE;
739
740         
741         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
742         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
743         
744         char *ptr = (char *)buf;
745         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
746         header->numberLogs = 0;
747
748         DEBUG(CmiMemoryCheck());
749         
750         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
751         
752         for(int i=0;i<numRequests;i++){
753                 TicketRequest *request = (TicketRequest *)msg;
754                 msg = &msg[sizeof(TicketRequest)];
755                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
756
757                 if(replied){
758                         //the ticket request has been processed and 
759                         //the reply will be stored in the ptr
760                         header->numberLogs++;
761                         ptr = &ptr[sizeof(TicketReply)];
762                 }
763         }
764 /*      if(header->numberLogs == 0){
765                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
766         }*/
767
768         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
769         CmiSyncSend(senderPE,totalSize,(char *)buf);
770         CmiFree(recvdHeader);
771 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
772         DEBUG(CmiMemoryCheck());
773 };
774
775 /**Process the ticket request. 
776  * If it is processed and a reply is being sent 
777  * by this processor return true
778  * else return false.
779  * If a reply buffer is specified put the reply into that
780  * else send the reply
781  * */
782 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
783
784 /*      if(_lockNewTicket){
785                 printf("ddeded %d\n",CkMyPe());
786                 if(CmiIsImmediate(ticketRequest)){
787                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
788                 }
789                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
790                 
791         }else{
792                 _lockNewTicket = 1;
793         }*/
794
795         DEBUG(CmiMemoryCheck());
796         CkObjID sender = ticketRequest->sender;
797         CkObjID recver = ticketRequest->recver;
798         MCount SN = ticketRequest->SN;
799         
800         
801         Chare *recverObj = (Chare *)recver.getObject();
802         
803         
804         char recverName[100];
805         DEBUG(recver.toString(recverName);)
806         if(recverObj == NULL){
807                 int estPE = recver.guessPE();
808                 if(estPE == CkMyPe() || estPE == -1){           
809                         //try to fulfill the request after some time
810                         char senderString[100];
811                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
812                         if(estPE == CkMyPe() && recver.type == TypeArray){
813                                 CkArrayID aid(recver.data.array.id);            
814                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
815                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
816
817                         }
818                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
819                         *delayed = *ticketRequest;
820                         CpvAccess(_delayedTicketRequests)->enq(delayed);
821                         
822                 }else{
823                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
824                         TicketRequest forward = *ticketRequest;
825                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
826                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
827                         
828                         
829                 }
830         DEBUG(CmiMemoryCheck());
831                 return false; // if the receverObj does not exist the ticket request cannot have been 
832                               // processed successfully
833         }else{
834                 char senderString[100];
835                 Ticket ticket;
836                 //check if a ticket for this has been already handed out to an object that used to be local but 
837                 // is no longer so.. need for parallel restart
838                 
839                 if(recverObj->mlogData->mapTable.numObjects() > 0){
840                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
841                 }
842                 
843                 if(ticket.TN == 0){
844                         ticket = recverObj->mlogData->next_ticket(sender,SN);
845                 }
846                 if(ticket.TN > recverObj->mlogData->tProcessed){
847                         ticket.state = NEW_TICKET;
848                 }else{
849                         ticket.state = OLD_TICKET;
850                 }
851                 //TODO: check for the case when an invalid ticket is returned
852                 if(ticket.TN == 0){
853                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
854                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
855                         *delayed = *ticketRequest;
856                         CpvAccess(_delayedTicketRequests)->enq(delayed);
857                         return false;
858                 }
859 /*              if(ticket.TN < SN){ //error state this really should not happen
860                         recver.toString(recverName);
861                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
862                 }*/
863 //              CkAssert(ticket.TN >= SN);
864                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
865
866 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
867     if(reply == NULL){ 
868                         //There is no reply buffer and the ticketreply is going to be 
869                         //sent immediately
870                         TicketReply ticketReply;
871                         ticketReply.request = *ticketRequest;
872                         ticketReply.ticket = ticket;
873                         ticketReply.recverPE = CkMyPe();
874                 
875                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
876 //              CmiBecomeImmediate(&ticketReply);
877                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
878          }else{ // Store ticket reply in the buffer provided
879                  reply->request = *ticketRequest;
880                  reply->ticket = ticket;
881                  reply->recverPE = CkMyPe();
882                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
883                                                          // in case the ticket needs to be forwarded or something
884
885          }
886                 DEBUG(CmiMemoryCheck());
887                 return true;
888         }
889 //      _lockNewTicket=0;
890 };
891
892 inline void _ticketHandler(TicketReply *ticketReply){
893
894         double _startTime = CkWallTimer();
895         DEBUG(CmiMemoryCheck());
896         
897         
898         char senderString[100];
899         CkObjID sender = ticketReply->request.sender;
900         CkObjID recver = ticketReply->request.recver;
901         
902         if(sender.guessPE() != CkMyPe()){               
903                 DEBUG(CkAssert(sender.guessPE()>= 0));
904                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
905         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
906                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
907                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
908 #ifdef BUFFERED_REMOTE
909                 //will be freed by the buffered ticket handler most of the time
910                 //this might lead to a leak just after migration
911                 //when the ticketHandler is directly used without going through the buffered handler
912                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
913 #else
914                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
915 #endif  
916         }else{
917                 char recverName[100];
918                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
919                 MlogEntry *logEntry=NULL;
920                 if(ticketReply->ticket.state & FORWARDED_TICKET){
921                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
922                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
923                         Chare *senderObj = (Chare *)sender.getObject();
924                         if(senderObj){
925                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
926                                 for(int i=0;i<mlog->length();i++){
927                                         MlogEntry *tempEntry = (*mlog)[i];
928                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
929                                                 logEntry = tempEntry;
930                                                 break;
931                                         }
932                                 }
933                                 if(logEntry == NULL){
934 #ifdef BUFFERED_REMOTE
935 #else
936                                         CmiFree(ticketReply);
937 #endif                                  
938                                         return;
939                                 }
940                         }else{
941                                 CmiAbort("This processor thinks it should have the sender\n");
942                         }
943                         ticketReply->ticket.state ^= FORWARDED_TICKET;
944                 }else{
945                         logEntry = ticketReply->request.logEntry;
946                 }
947                 if(logEntry->env->TN <= 0){
948                         //This logEntry has not received a TN earlier
949                         char recverString[100];
950                         logEntry->env->TN = ticketReply->ticket.TN;
951                         logEntry->env->setSrcPe(CkMyPe());
952                         if(ticketReply->ticket.state == NEW_TICKET){
953                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
954                                 if(ticketReply->recverPE != CkMyPe()){
955                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
956                                 }else{
957                                         //It is now a local message use the local message protocol
958                                         sendLocalMessageCopy(logEntry);
959                                 }               
960                         }
961                 }else{
962                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
963                 }
964                 recver.updatePosition(ticketReply->recverPE);
965 #ifdef BUFFERED_REMOTE
966 #else
967                 CmiFree(ticketReply);
968 #endif
969         }
970         CmiMemoryCheck();
971 #ifdef BUFFERED_REMOTE
972 #else
973 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
974 #endif
975         
976 };
977
978 /**
979  * Message to handle the bunch of tickets 
980  * that we get from one processor. We send 
981  * the tickets to be handled one at a time
982  * */
983
984 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
985         double _startTime=CmiWallTimer();
986         int numTickets = recvdHeader->numberLogs;
987         char *msg = (char *)recvdHeader;
988         msg = &msg[sizeof(BufferedTicketRequestHeader)];
989         DEBUG(CmiMemoryCheck());
990         
991         TicketReply *_reply = (TicketReply *)msg;
992
993         
994         for(int i=0;i<numTickets;i++){
995                 TicketReply *reply = (TicketReply *)msg;
996                 _ticketHandler(reply);
997                 
998                 msg = &msg[sizeof(TicketReply)];
999         }
1000         
1001         CmiFree(recvdHeader);
1002 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1003         DEBUG(CmiMemoryCheck());
1004 };
1005
1006
1007 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1008         double _startTime = CkWallTimer();
1009         
1010         char senderString[100],recverString[100];
1011         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1012 /*      if(!fault_aware(msgLog->recver)){
1013                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1014         }*/
1015         CpvAccess(_localMessageLog)->enq(*msgLog);
1016         
1017         LocalMessageLogAck ack;
1018         ack.entry = msgLog->entry;
1019         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1020         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1021         CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
1022         
1023 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1024 };
1025
1026 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1027         double _startTime = CkWallTimer();
1028         DEBUG(CmiMemoryCheck());
1029         
1030         int numLogs = recvdHeader->numberLogs;
1031         char *msg = (char *)recvdHeader;
1032
1033         //piggy back the logs already stored on this processor
1034         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1035         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1036 /*      if(numPiggyLogs > 0){
1037                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1038                         CmiAssert(0);
1039                 }
1040         }*/
1041         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1042         
1043         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1044         void *buf = CmiAlloc(totalSize);
1045         char *ptr = (char *)buf;
1046         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1047         
1048         msg = &msg[sizeof(BufferedLocalLogHeader)];
1049         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1050
1051         DEBUG(CmiMemoryCheck());
1052         int PE;
1053         for(int i=0;i<numLogs;i++){
1054                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1055                 CpvAccess(_localMessageLog)->enq(*msgLog);
1056                 PE = msgLog->PE;
1057                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1058
1059                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1060                 ack->entry = msgLog->entry;
1061                 
1062                 msg = &msg[sizeof(LocalMessageLog)];
1063                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1064         }
1065         DEBUG(CmiMemoryCheck());
1066
1067         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1068         piggyHeader->numberLogs = numPiggyLogs;
1069         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1070         if(numPiggyLogs > 0){
1071                 countPiggy++;
1072         }
1073
1074         for(int i=0;i<numPiggyLogs;i++){
1075                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1076                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1077                 ptr = &ptr[sizeof(LocalMessageLog)];
1078         }
1079         DEBUG(CmiMemoryCheck());
1080         
1081         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1082         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1083                 
1084 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1085                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1086                         if(!fault_aware(localLogEntry.recver)){
1087                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1088                         }
1089         }*/
1090         if(freeHeader){
1091                 CmiFree(recvdHeader);
1092         }
1093         DEBUG(CmiMemoryCheck());
1094 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1095 }
1096
1097
1098 void _localMessageAckHandler(LocalMessageLogAck *ack){
1099         
1100         double _startTime = CkWallTimer();
1101         
1102         MlogEntry *entry = ack->entry;
1103         if(entry == NULL){
1104                 CkExit();
1105         }
1106         entry->unackedLocal = 0;
1107         envelope *env = entry->env;
1108         char recverName[100];
1109         char senderString[100];
1110         DEBUG(CmiMemoryCheck());
1111         
1112         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1113         if(env == NULL)
1114                 return;
1115         CkAssert(env->SN > 0);
1116         CkAssert(env->TN > 0);
1117         env->sender.toString(senderString);
1118         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1119         env->recver.toString(recverName);
1120
1121         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1122         
1123 /*      
1124         void *origMsg = EnvToUsr(env);
1125         void *copyMsg = CkCopyMsg(&origMsg);
1126         envelope *copyEnv = UsrToEnv(copyMsg);
1127         entry->env = UsrToEnv(origMsg);*/
1128
1129 //      envelope *copyEnv = copyEnvelope(env);
1130
1131         envelope *copyEnv = env;
1132         copyEnv->localMlogEntry = entry;
1133
1134         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1135         if(CmiMyPe() != entry->destPE){
1136                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1137         }
1138 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1139         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1140         
1141         
1142 #ifdef BUFFERED_LOCAL
1143 #else
1144         CmiFree(ack);
1145 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1146 #endif
1147         
1148         DEBUG(CmiMemoryCheck());
1149         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1150 }
1151
1152
1153 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1154
1155         double _startTime=CkWallTimer();
1156         DEBUG(CmiMemoryCheck());
1157
1158         int numLogs = recvdHeader->numberLogs;
1159         char *msg = (char *)recvdHeader;
1160         msg = &msg[sizeof(BufferedLocalLogHeader)];
1161
1162         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1163         
1164         for(int i=0;i<numLogs;i++){
1165                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1166                 _localMessageAckHandler(ack);
1167                 
1168                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1169         }
1170
1171         //deal with piggy backed local logs
1172         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1173         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1174         if(piggyHeader->numberLogs > 0){
1175                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1176         }
1177         
1178         CmiFree(recvdHeader);
1179         DEBUG(CmiMemoryCheck());
1180 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1181 }
1182
1183
1184
1185
1186
1187
1188 bool fault_aware(CkObjID &recver){
1189         switch(recver.type){
1190                 case TypeChare:
1191                         return false;
1192                 case TypeGroup:
1193                 case TypeNodeGroup:
1194                 case TypeArray:
1195                         return true;
1196                 default:
1197                         return false;
1198         }
1199 };
1200
1201 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1202         char recverString[100];
1203         char senderString[100];
1204         
1205         DEBUG(CmiMemoryCheck());
1206         CkObjID recver = env->recver;
1207         if(!fault_aware(recver))
1208                 return 1;
1209
1210
1211         Chare *obj = (Chare *)recver.getObject();
1212         *objPointer = obj;
1213         if(obj == NULL){
1214                 int possiblePE = recver.guessPE();
1215                 if(possiblePE != CkMyPe()){
1216                         int totalSize = env->getTotalsize();                    
1217                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1218                 }
1219                 return 0;
1220         }
1221
1222
1223         double _startTime = CkWallTimer();
1224 //env->sender.updatePosition(env->getSrcPe());
1225         if(env->TN == obj->mlogData->tProcessed+1){
1226                 //the message that needs to be processed now
1227                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1228                 // once we find a message that we can process we put back all the messages in the out of order queue
1229                 // back into the main scheduler queue. 
1230                 if(env->sender.guessPE() == CkMyPe()){
1231                         *logEntryPointer = env->localMlogEntry;
1232                 }
1233         DEBUG(CmiMemoryCheck());
1234                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1235                         void *qMsgPtr;
1236                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1237                         if(qMsgPtr != NULL){
1238                           envelope *qEnv = (envelope *)qMsgPtr;
1239                           CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());               
1240                         }
1241         DEBUG(CmiMemoryCheck());
1242                 }
1243 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1244                 //TODO: this might be a problem.. change made for leanMD
1245 //              CpvAccess(_currentObj) = obj;
1246         DEBUG(CmiMemoryCheck());
1247                 return 1;
1248         }
1249         if(env->TN <= obj->mlogData->tProcessed){
1250                 //message already processed
1251                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1252 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1253         DEBUG(CmiMemoryCheck());
1254                 return 0;
1255         }
1256         //message that needs to be processed in the future
1257
1258 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1259         //the message cant be processed now put it back in the out of order message Q, 
1260         //It will be transferred to the main queue later
1261         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1262 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1263         DEBUG(CmiMemoryCheck());
1264         
1265         return 0;
1266 }
1267
1268 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1269         char senderString[100];
1270         if(obj){
1271                 if(sender.guessPE() == CkMyPe()){
1272                         if(entry != NULL){
1273                                 entry->env = NULL;
1274                         }
1275                 }
1276                 obj->mlogData->tProcessed++;
1277 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1278                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1279 //              CpvAccess(_currentObj)= NULL;
1280         }
1281         DEBUG(CmiMemoryCheck());
1282 }
1283
1284 /***
1285         Helpers for the handlers and message logging methods
1286 ***/
1287
1288 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1289 //      double _startTime = CkWallTimer();
1290         if(env->recver.type != TypeNodeGroup){
1291         //This repeats a step performed in skipCldEnq for messages sent to
1292         //other processors. I do this here so that messages to local processors
1293         //undergo the same transformation.. It lets the resend be uniform for 
1294         //all messages
1295 //              CmiSetXHandler(env,CmiGetHandler(env));
1296                 _skipCldEnqueue(destPE,env,_infoIdx);
1297         }else{
1298                 _noCldNodeEnqueue(destPE,env);
1299         }
1300 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1301 }
1302 //extern "C" int CmiGetNonLocalLength();
1303 /** This method is used to retry the ticket requests
1304  * that had been queued up earlier
1305  * */
1306
1307 int calledRetryTicketRequest=0;
1308
1309 void retryTicketRequestTimer(void *_dummy,double _time){
1310                 calledRetryTicketRequest=0;
1311                 retryTicketRequest(_dummy,_time);
1312 }
1313
1314 void retryTicketRequest(void *_dummy,double curWallTime){       
1315         double start = CkWallTimer();
1316         DEBUG(CmiMemoryCheck());
1317         int length = CpvAccess(_delayedTicketRequests)->length();
1318         for(int i=0;i<length;i++){
1319                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1320                 if(ticketRequest){
1321                         char senderString[100],recverString[100];
1322                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1323                         DEBUG(CmiMemoryCheck());
1324                         _processTicketRequest(ticketRequest);
1325                   CmiFree(ticketRequest);
1326                         DEBUG(CmiMemoryCheck());
1327                 }       
1328         }       
1329         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1330                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1331                 ticketLogLocalMessage(entry);
1332         }
1333         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1334 //      int converse_qLength = CmiGetNonLocalLength();
1335         
1336 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1337
1338 /*      PingMsg pingMsg;
1339         pingMsg.PE = CkMyPe();
1340         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1341         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1342                 for(int i=0;i<CkNumPes();i++){
1343                         if(i != CkMyPe()){
1344                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1345                         }
1346                 }
1347         }*/     
1348         //TODO: change this back to 100
1349         if(calledRetryTicketRequest == 0){
1350                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1351                 calledRetryTicketRequest =1;
1352         }
1353         DEBUG(CmiMemoryCheck());
1354 }
1355
1356 void _pingHandler(CkPingMsg *msg){
1357         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1358         CmiFree(msg);
1359 }
1360
1361
1362 /*****************************************************************************
1363         Checkpointing methods..
1364                 Pack all the data on a processor and send it to the buddy periodically
1365                 Also used to throw away message logs
1366 *****************************************************************************/
1367 CkVec<TProcessedLog> processedTicketLog;
1368 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1369 void clearUpMigratedRetainedLists(int PE);
1370
1371 void checkpointAlarm(void *_dummy,double curWallTime){
1372         double diff = curWallTime-lastCompletedAlarm;
1373         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1374 /*      if(CkWallTimer()-lastRestart < 50){
1375                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1376                 return;
1377         }*/
1378         if(diff < ((chkptPeriod) - 2)){
1379                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1380                 return;
1381         }
1382         CheckpointRequest request;
1383         request.PE = CkMyPe();
1384         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1385         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1386 };
1387
1388 void _checkpointRequestHandler(CheckpointRequest *request){
1389         startMlogCheckpoint(NULL,CmiWallTimer());       
1390 }
1391
1392 void startMlogCheckpoint(void *_dummy,double curWallTime){
1393         double _startTime = CkWallTimer();
1394         checkpointCount++;
1395 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1396                 kill(getpid(),SIGKILL);
1397         }*/
1398         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1399                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1400         }
1401         PUP::sizer psizer;
1402         DEBUG(CmiMemoryCheck());
1403
1404         psizer | checkpointCount;
1405         
1406         CkPupROData(psizer);
1407         DEBUG(CmiMemoryCheck());
1408         CkPupGroupData(psizer);
1409         DEBUG(CmiMemoryCheck());
1410         CkPupNodeGroupData(psizer);
1411         DEBUG(CmiMemoryCheck());
1412         pupArrayElementsSkip(psizer,NULL);
1413         DEBUG(CmiMemoryCheck());
1414
1415         int dataSize = psizer.size();
1416         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1417         char *msg = (char *)CmiAlloc(totalSize);
1418         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1419         chkMsg->PE = CkMyPe();
1420         chkMsg->dataSize = dataSize;
1421
1422         
1423         char *buf = &msg[sizeof(CheckPointDataMsg)];
1424         PUP::toMem pBuf(buf);   
1425
1426         pBuf | checkpointCount;
1427         
1428         CkPupROData(pBuf);
1429         CkPupGroupData(pBuf);
1430         CkPupNodeGroupData(pBuf);
1431         pupArrayElementsSkip(pBuf,NULL);
1432
1433         unAckedCheckpoint=1;
1434         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1435         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1436         
1437         /*
1438                 Store the highest Ticket number processed for each chare on this processor
1439         */
1440         processedTicketLog.removeAll();
1441         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1442         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1443                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1444         }
1445
1446         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1447                 lastCompletedAlarm = curWallTime;
1448                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1449         }
1450         traceUserBracketEvent(28,_startTime,CkWallTimer());
1451 };
1452
1453 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1454         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1455         TProcessedLog logEntry;
1456         logEntry.recver = mlogData->objID;
1457         logEntry.tProcessed = mlogData->tProcessed;
1458         log->push_back(logEntry);
1459         char objString[100];
1460         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1461 }
1462
1463 class ElementPacker : public CkLocIterator {
1464 private:
1465         CkLocMgr *locMgr;
1466         PUP::er &p;
1467 public:
1468                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1469                 void addLocation(CkLocation &loc) {
1470                         CkArrayIndexMax idx=loc.getIndex();
1471                         CkGroupID gID = locMgr->ckGetGroupID();
1472                         p|gID;      // store loc mgr's GID as well for easier restore
1473                         p|idx;
1474                         p|loc;
1475     }
1476 };
1477
1478
1479 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1480         int numElements,i;
1481   int numGroups = CkpvAccess(_groupIDTable)->size();    
1482         if(!p.isUnpacking()){
1483                 numElements = CkCountArrayElements();
1484         }       
1485         p | numElements;
1486         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1487         if(!p.isUnpacking()){
1488                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1489         }else{
1490                 //Flush all recs of all LocMgrs before putting in new elements
1491 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1492                 for(int j=0;j<listsize;j++){
1493                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1494                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1495                                 listToSkip[j].idx.print();
1496                         }
1497                 }
1498                 
1499  printf("numElements = %d\n",numElements);
1500         
1501           for (int i=0; i<numElements; i++) {
1502                         CkGroupID gID;
1503                         CkArrayIndexMax idx;
1504                         p|gID;
1505             p|idx;
1506                         int flag=0;
1507                         int matchedIdx=0;
1508                         for(int j=0;j<listsize;j++){
1509                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1510                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1511                                                 matchedIdx = j;
1512                                                 flag = 1;
1513                                                 break;
1514                                         }
1515                                 }
1516                         }
1517                         if(flag == 1){
1518                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1519                         }else{
1520                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1521                         }
1522                                 
1523                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1524                         mgr->resume(idx,p,flag);
1525                         if(flag == 1){
1526                                 int homePE = mgr->homePe(idx);
1527                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1528                         }
1529           }
1530         }
1531 };
1532
1533
1534 void writeCheckpointToDisk(int size,char *chkpt){
1535         char fNameTemp[100];
1536         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1537         int fd = creat(fNameTemp,S_IRWXU);
1538         int ret = write(fd,chkpt,size);
1539         CkAssert(ret == size);
1540         close(fd);
1541         
1542         char fName[100];
1543         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1544         unlink(fName);
1545
1546         rename(fNameTemp,fName);
1547         
1548 }
1549
1550 //handler that receives the checkpoint from a processor
1551 //it stores it and acks it
1552 void _storeCheckpointHandler(char *msg){
1553         
1554         double _startTime=CkWallTimer();
1555                 
1556         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1557         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1558         
1559         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1560         
1561         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1562         if(oldChkpt != NULL){
1563                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1564                 CmiFree(oldmsg);
1565         }
1566         //turning off storing checkpoints
1567         
1568         int sendingPE = chkMsg->PE;
1569         
1570         CpvAccess(_storedCheckpointData)->buf = chkpt;
1571         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1572         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1573
1574 #ifdef CHECKPOINT_DISK
1575         //store the checkpoint on disk
1576         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1577         CpvAccess(_storedCheckpointData)->buf = NULL;
1578         CmiFree(msg);
1579 #endif
1580
1581         int count=0;
1582         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1583                 if(migratedNoticeList[j].fromPE == sendingPE){
1584                         migratedNoticeList[j].ackFrom = 1;
1585                 }else{
1586                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1587                 }
1588                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1589                         migratedNoticeList.remove(j);
1590                         count++;
1591                 }
1592                 
1593         }
1594         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1595         
1596         CheckPointAck ackMsg;
1597         ackMsg.PE = CkMyPe();
1598         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1599         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1600         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1601         
1602         
1603         
1604         traceUserBracketEvent(29,_startTime,CkWallTimer());
1605 };
1606
1607
1608 void sendRemoveLogRequests(){
1609         double _startTime = CkWallTimer();      
1610         //send out the messages asking senders to throw away message logs below a certain ticket number
1611         /*
1612                 The remove log request message looks like
1613                 |RemoveLogRequest||List of TProcessedLog|
1614         */
1615         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1616         char *requestMsg = (char *)CmiAlloc(totalSize);
1617         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1618         request->PE = CkMyPe();
1619         request->numberObjects = processedTicketLog.size();
1620         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1621         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1622         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1623         
1624         DEBUG(CmiMemoryCheck());
1625         for(int i=0;i<CkNumPes();i++){
1626                 CmiSyncSend(i,totalSize,requestMsg);
1627         }
1628         CmiFree(requestMsg);
1629
1630         clearUpMigratedRetainedLists(CmiMyPe());
1631         //TODO: clear ticketTable
1632         
1633         traceUserBracketEvent(30,_startTime,CkWallTimer());
1634         DEBUG(CmiMemoryCheck());
1635 }
1636
1637
1638 void _checkpointAckHandler(CheckPointAck *ackMsg){
1639         DEBUG(CmiMemoryCheck());
1640         unAckedCheckpoint=0;
1641         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1642         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1643         if(onGoingLoadBalancing){
1644                 onGoingLoadBalancing = 0;
1645                 finishedCheckpointLoadBalancing();
1646         }else{
1647                 sendRemoveLogRequests();
1648         }
1649         CmiFree(ackMsg);
1650         
1651 };
1652
1653 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1654         DEBUG(CmiMemoryCheck());
1655         CmiMemoryCheck();
1656         char *data = (char *)_data;
1657         RemoveLogRequest *request = (RemoveLogRequest *)data;
1658         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1659         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1660
1661         int count=0;
1662         for(int i=0;i<mlog->length();i++){
1663                 MlogEntry *logEntry = mlog->deq();
1664                 int match=0;
1665                 for(int j=0;j<request->numberObjects;j++){
1666                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1667                                 //this log Entry should be removed
1668                                 match = 1;
1669                                 break;
1670                         }
1671                 }
1672                 char senderString[100],recverString[100];
1673 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1674                 if(match){
1675                         count++;
1676                         delete logEntry;
1677                 }else{
1678                         mlog->enq(logEntry);
1679                 }
1680         }
1681         if(count > 0){
1682                 char nameString[100];
1683                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1684         }
1685         DEBUG(CmiMemoryCheck());
1686         CmiMemoryCheck();
1687 };
1688
1689 void _removeProcessedLogHandler(char *requestMsg){
1690         double start = CkWallTimer();
1691         forAllCharesDo(removeProcessedLogs,requestMsg);
1692         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1693         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1694         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1695         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1696         if(request->PE == getCheckPointPE()){
1697                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1698                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1699                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1700                 int count=0;
1701 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1702                 DEBUG(char nameString[100];)
1703                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1704                 DEBUG(})*/
1705                 for(int i=0;i<localQ->length();i++){
1706                         LocalMessageLog localLogEntry = (*localQ)[i];
1707                         if(!fault_aware(localLogEntry.recver)){
1708                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1709                         }
1710                         bool keep = true;
1711                         for(int j=0;j<request->numberObjects;j++){                              
1712                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1713                                         keep = false;
1714                                         break;
1715                                 }
1716                         }       
1717                         if(keep){
1718                                 tempQ->enq(localLogEntry);
1719                         }else{
1720                                 count++;
1721                         }
1722                 }
1723                 delete localQ;
1724                 CpvAccess(_localMessageLog) = tempQ;
1725                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1726         }
1727
1728         /*
1729                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1730         */
1731         CmiMemoryCheck();
1732         clearUpMigratedRetainedLists(request->PE);
1733         
1734         traceUserBracketEvent(20,start,CkWallTimer());
1735         CmiFree(requestMsg);    
1736 };
1737
1738
1739 void clearUpMigratedRetainedLists(int PE){
1740         int count=0;
1741         CmiMemoryCheck();
1742         
1743         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1744                 if(migratedNoticeList[j].toPE == PE){
1745                         migratedNoticeList[j].ackTo = 1;
1746                 }
1747                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1748                         migratedNoticeList.remove(j);
1749                         count++;
1750                 }
1751         }
1752         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1753         
1754         for(int j=retainedObjectList.size()-1;j>=0;j--){
1755                 if(retainedObjectList[j]->migRecord.toPE == PE){
1756                         RetainedMigratedObject *obj = retainedObjectList[j];
1757                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1758                         retainedObjectList.remove(j);
1759                         if(obj->msg != NULL){
1760                                 CmiMemoryCheck();
1761                                 CmiFree(obj->msg);
1762                         }
1763                         delete obj;
1764                 }
1765         }
1766 }
1767
1768 /***************************************************************
1769         Restart Methods and handlers
1770 ***************************************************************/        
1771
1772 /**
1773  * Function for restarting an object with message logging
1774  */
1775 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1776         printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1777         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1778         _restartFlag = 1;
1779         RestartRequest msg;
1780         msg.PE = CkMyPe();
1781         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1782         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1783 };
1784
1785 void CkMlogRestartDouble(void *,double){
1786         CkMlogRestart(NULL,NULL);
1787 };
1788
1789 //GML: restarting from local (group) failure
1790 void CkMlogRestartLocal(){
1791     CkMlogRestart(NULL,NULL);
1792 };
1793
1794
1795 void readCheckpointFromDisk(int size,char *buf){
1796         char fName[100];
1797         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1798
1799         int fd = open(fName,O_RDONLY);
1800         int count=0;
1801         while(count < size){
1802                 count += read(fd,&buf[count],size-count);
1803         }
1804         close(fd);
1805         
1806 };
1807
1808
1809 void sendCheckpointData();
1810
1811 void _getCheckpointHandler(RestartRequest *restartMsg){
1812         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1813         CkAssert(restartMsg->PE == storedChkpt->PE);
1814         storedRequest = restartMsg;
1815         
1816         verifyAckTotal = 0;
1817         for(int i=0;i<migratedNoticeList.size();i++){
1818                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1819 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1820                         if(migratedNoticeList[i].ackFrom == 0){
1821                                 //need to verify if the object actually exists .. it might not
1822                                 //have been acked but it might exist on it
1823                                 VerifyAckMsg msg;
1824                                 msg.migRecord = migratedNoticeList[i];
1825                                 msg.index = i;
1826                                 msg.fromPE = CmiMyPe();
1827                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1828                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1829                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1830                                 verifyAckTotal++;
1831                         }
1832                 }
1833         }
1834         
1835         if(verifyAckTotal == 0){
1836                 sendCheckpointData();
1837         }
1838         verifyAckCount = 0;
1839 }
1840
1841
1842 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1843         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1844         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1845         if(rec != NULL && rec->type() == CkLocRec::local){
1846                         //this location exists on this processor
1847                         //and needs to be removed       
1848                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1849                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1850                         
1851                         int localIdx = localRec->getLocalIndex();
1852                         LBDatabase *lbdb = localRec->getLBDB();
1853                         LDObjHandle ldHandle = localRec->getLdHandle();
1854                                 
1855                         locMgr->setDuringMigration(true);
1856                         
1857                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1858                         lbdb->UnregisterObj(ldHandle);
1859                         
1860                         locMgr->setDuringMigration(false);
1861                         
1862                         verifyAckedRequests++;
1863
1864         }
1865         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1866         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1867 };
1868
1869
1870 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1871         int index =     verifyReply->index;
1872         migratedNoticeList[index] = verifyReply->migRecord;
1873         verifyAckCount++;
1874         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1875         if(verifyAckCount == verifyAckTotal){
1876                 sendCheckpointData();
1877         }
1878 }
1879
1880
1881
1882 void sendCheckpointData(){      
1883         RestartRequest *restartMsg = storedRequest;
1884         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1885         int numMigratedAwayElements = migratedNoticeList.size();
1886         if(migratedNoticeList.size() != 0){
1887                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1888 //                      CkAssert(migratedNoticeList.size() == 0);
1889         }
1890         
1891         
1892         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1893         
1894         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1895         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1896         
1897         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
1898         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
1899         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1900         
1901         char *msg = (char *)CmiAlloc(totalSize);
1902         
1903         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1904         dataMsg->PE = CkMyPe();
1905         dataMsg->restartWallTime = CmiTimer();
1906         dataMsg->checkPointSize = storedChkpt->bufSize;
1907         
1908         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1909 //      dataMsg->numMigratedAwayElements = 0;
1910         
1911         dataMsg->numMigratedInElements = 0;
1912         dataMsg->migratedElementSize = 0;
1913         dataMsg->lbGroupID = globalLBID;
1914         /*msg layout 
1915                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1916                 Local MessageLog|
1917         */
1918         //store checkpoint data
1919         char *buf = &msg[sizeof(RestartProcessorData)];
1920
1921         if(dataMsg->numMigratedAwayElements != 0){
1922                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1923                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1924         }
1925         
1926
1927 #ifdef CHECKPOINT_DISK
1928         readCheckpointFromDisk(storedChkpt->bufSize,buf);
1929 #else   
1930         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1931 #endif
1932         buf = &buf[storedChkpt->bufSize];
1933
1934
1935         //store localmessage Log
1936         dataMsg->numLocalMessages = localMsgQ->length();
1937         for(int i=0;i<localMsgQ->length();i++){
1938                 if(!fault_aware(((*localMsgQ)[i]).recver )){
1939                         CmiAbort("Non fault aware localMsgQ");
1940                 }
1941                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
1942                 buf = &buf[sizeof(LocalMessageLog)];
1943         }
1944         
1945         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1946         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1947         CmiFree(restartMsg);
1948 };
1949
1950
1951 // this list is used to create a vector of the object ids of all
1952 //the chares on this processor currently and the highest TN processed by them 
1953 //the first argument is actually a CkVec<TProcessedLog> *
1954 void createObjIDList(void *data,ChareMlogData *mlogData){
1955         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1956         TProcessedLog entry;
1957         entry.recver = mlogData->objID;
1958         entry.tProcessed = mlogData->tProcessed;
1959         list->push_back(entry);
1960         char objString[100];
1961         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1962 }
1963
1964
1965 /**
1966  * Receives the checkpoint data from its buddy, restores the state of all the objects
1967  * and asks everyone else to update its home.
1968  */
1969 void _recvCheckpointHandler(char *_restartData){
1970         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1971         MigrationRecord *migratedAwayElements;
1972
1973         globalLBID = restartData->lbGroupID;
1974         
1975         restartData->restartWallTime *= 1000;
1976         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1977         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1978         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1979
1980         
1981         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1982         char *buf = &_restartData[sizeof(RestartProcessorData)];
1983         
1984         if(restartData->numMigratedAwayElements != 0){
1985                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1986                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1987                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1988                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1989         }
1990         
1991         PUP::fromMem pBuf(buf);
1992
1993         pBuf | checkpointCount;
1994
1995         CkPupROData(pBuf);
1996         CkPupGroupData(pBuf);
1997         CkPupNodeGroupData(pBuf);
1998 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1999         pupArrayElementsSkip(pBuf,NULL);
2000         CkAssert(pBuf.size() == restartData->checkPointSize);
2001         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2002         
2003         forAllCharesDo(initializeRestart,NULL);
2004         
2005         //store the restored local message log in a vector
2006         buf = &buf[restartData->checkPointSize];        
2007         for(int i=0;i<restartData->numLocalMessages;i++){
2008                 LocalMessageLog logEntry;
2009                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2010                 
2011                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2012                 if(recverObj!=NULL){
2013                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2014                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2015                         char senderString[100];
2016                         char recverString[100];
2017                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2018                 }else{
2019 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2020                 }
2021                 buf = &buf[sizeof(LocalMessageLog)];
2022         }
2023
2024         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2025
2026         CmiFree(_restartData);
2027         
2028         
2029         _initDone();
2030
2031         getGlobalStep(globalLBID);
2032         
2033         countUpdateHomeAcks = 0;
2034         RestartRequest updateHomeRequest;
2035         updateHomeRequest.PE = CmiMyPe();
2036         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2037         for(int i=0;i<CmiNumPes();i++){
2038                 if(i != CmiMyPe()){
2039                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2040                 }
2041         }
2042
2043 }
2044
2045 /**
2046  * Receives the updateHome ACKs from all other processors. Once everybody
2047  * has replied, it sends a request to resed the logged messages.
2048  */
2049 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2050
2051         CkPrintf("[%d] Updating Home Ack Handler\n",CkMyPe());
2052
2053         countUpdateHomeAcks++;
2054         CmiFree(updateHomeAck);
2055         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2056         if(countUpdateHomeAcks != CmiNumPes()){
2057                 return;
2058         }
2059
2060         // Send out the request to resend logged messages to all other processors
2061         CkVec<TProcessedLog> objectVec;
2062         forAllCharesDo(createObjIDList, (void *)&objectVec);
2063         int numberObjects = objectVec.size();
2064         
2065         /*
2066                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2067         */
2068         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2069         char *resendMsg = (char *)CmiAlloc(totalSize);
2070         
2071
2072         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2073         resendReq->PE =CkMyPe(); 
2074         resendReq->numberObjects = numberObjects;
2075         char *objList = &resendMsg[sizeof(ResendRequest)];
2076         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2077         
2078
2079         
2080
2081         /* test for parallel restart migrate away object**/
2082         if(parallelRestart){
2083                 distributeRestartedObjects();
2084                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2085         }
2086         
2087         /*      To make restart work for load balancing.. should only
2088         be used when checkpoint happens along with load balancing
2089         **/
2090 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2091
2092         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2093         CpvAccess(_currentObj) = lb;
2094         lb->ReceiveDummyMigration(restartDecisionNumber);
2095
2096         sleep(10);
2097         
2098         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2099         for(int i=0;i<CkNumPes();i++){
2100                 if(i != CkMyPe()){
2101                         CmiSyncSend(i,totalSize,resendMsg);
2102                 }       
2103         }
2104         _resendMessagesHandler(resendMsg);
2105         CmiFree(resendMsg);
2106 };
2107
2108 void initializeRestart(void *data,ChareMlogData *mlogData){
2109         mlogData->resendReplyRecvd = 0;
2110         mlogData->receivedTNs = new CkVec<MCount>;
2111         mlogData->restartFlag = 1;
2112         mlogData->restoredLocalMsgLog.removeAll();
2113         mlogData->mapTable.empty();
2114 };
2115
2116 /**
2117  * Updates the homePe of chare array elements.
2118  */
2119 void updateHomePE(void *data,ChareMlogData *mlogData){
2120         RestartRequest *updateRequest = (RestartRequest *)data;
2121         int PE = updateRequest->PE; //restarted PE
2122         //if this object is an array Element and its home is the restarted processor
2123         // the home processor needs to know its current location
2124         if(mlogData->objID.type == TypeArray){
2125                 //it is an array element
2126                 CkGroupID myGID = mlogData->objID.data.array.id;
2127                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2128                 CkArrayID aid(mlogData->objID.data.array.id);           
2129                 //check if the restarted processor is the home processor for this object
2130                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2131                 if(locMgr->homePe(myIdx) == PE){
2132                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2133                         DEBUGRESTART(myIdx.print());
2134                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2135                 }
2136         }
2137 };
2138
2139
2140 /**
2141  * Updates the homePe for all chares in this processor.
2142  */
2143 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2144
2145         CkPrintf("[%d] ---------------->HERE\n",CkMyPe());
2146         
2147         int sender = updateRequest->PE;
2148         
2149         forAllCharesDo(updateHomePE,updateRequest);
2150         
2151         updateRequest->PE = CmiMyPe();
2152         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2153         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2154         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2155                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2156                 checkpointCount--;
2157                 startMlogCheckpoint(NULL,0);
2158         }
2159         if(sender == getCheckPointPE()){
2160                 for(int i=0;i<retainedObjectList.size();i++){
2161                         if(retainedObjectList[i]->acked == 0){
2162                                 MigrationNotice migMsg;
2163                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2164                                 migMsg.record = retainedObjectList[i];
2165                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2166                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2167                         }
2168                 }
2169         }
2170 }
2171
2172
2173 //the data argument is of type ResendData which contains the 
2174 //array of objects on  the restartedProcessor
2175 //this method resends the messages stored in this chare's message log 
2176 //to the restarted processor. It also accumulates the maximum TN
2177 //for all the objects on the restarted processor
2178 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2179         char nameString[100];
2180         ResendData *resendData = (ResendData *)data;
2181         int PE = resendData->PE; //restarted PE
2182         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2183         int count=0;
2184         int ticketRequests=0;
2185         CkQ<MlogEntry *> *log = mlogData->getMlog();
2186
2187         
2188         
2189         
2190         for(int i=0;i<log->length();i++){
2191                 MlogEntry *logEntry = (*log)[i];
2192                 
2193                 // if we sent out the logs of a local message to buddy and he crashed
2194                 //before acking
2195                 envelope *env = logEntry->env;
2196                 if(env == NULL){
2197                         continue;
2198                 }
2199                 if(logEntry->unackedLocal){
2200                         char recverString[100];
2201                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2202                         sendLocalMessageCopy(logEntry);
2203                 }
2204                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2205                 if(env->TN <= 0){
2206                         //ticket not yet replied send it out again
2207                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2208                 }
2209                 
2210                 if(env->recver.type != TypeInvalid){
2211                         int flag = 0;//marks if any of the restarted objects matched this log entry
2212                         for(int j=0;j<resendData->numberObjects;j++){
2213                                 if(env->recver == (resendData->listObjects)[j].recver){
2214                                         flag = 1;
2215                                         //message has a valid TN
2216                                         if(env->TN > 0){
2217                                                 //store maxTicket
2218                                                 if(env->TN > resendData->maxTickets[j]){
2219                                                         resendData->maxTickets[j] = env->TN;
2220                                                 }
2221                                                 //if the TN for this entry is more than the TN processed, send the message out
2222                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2223                                                         //store the TNs that have been since the recver last checkpointed
2224                                                         resendData->ticketVecs[j].push_back(env->TN);
2225                                                         
2226                                                         if(PE != CkMyPe()){
2227                                                                 if(env->recver.type == TypeNodeGroup){
2228                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2229                                                                 }else{
2230                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2231                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2232                                                                 }
2233                                                         }else{
2234                                                                 envelope *copyEnv = copyEnvelope(env);
2235                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2236                                                         }
2237                                                         char senderString[100];
2238                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2239                                                         count++;
2240                                                 }       
2241                                         }else{
2242 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2243                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2244                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2245                                                 CkAssert(logEntry->destPE != CkMyPe());
2246                                                 
2247                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2248                                                 
2249                                                 ticketRequests++;*/
2250                                         }
2251                                 }
2252                         }//end of for loop of objects
2253                         
2254                 }       
2255         }
2256         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2257 }
2258
2259 /**
2260  * Resends the messages since the last checkpoint to the list of objects included in the 
2261  * request.
2262  */
2263 void _resendMessagesHandler(char *msg){
2264         ResendRequest *resendReq = (ResendRequest *)msg;
2265
2266         //GML: examines the origin processor to determine if it belongs to the same group
2267         if(resendReq->PE != CkMyPe() && resendReq->PE/GROUP_SIZE_MLOG == CkMyPe()/GROUP_SIZE_MLOG){
2268                 //TODO: change the function call from restart to group-restart to avoid cyclic calls,
2269                 // for now it will work only with 1 failure
2270                 if(_restartFlag)
2271                         return;
2272                 CmiMemoryCheck();
2273                 CkPrintf("[%d] RESTART: same group\n",CkMyPe());
2274                 //HERE _resetNodeBocInitVec();
2275 /*      int numGroups = CkpvAccess(_groupIDTable)->size();
2276                 int i;
2277                 CKLOCMGR_LOOP(mgr->startInserting(););
2278                 CKLOCMGR_LOOP(mgr->flushAllRecs(););
2279 */
2280                 // rolls back to the previous checkpoint and sends a broadcast to resend messages to this processor
2281                 CkMlogRestartLocal();
2282                 return;
2283         }
2284
2285
2286         char *listObjects = &msg[sizeof(ResendRequest)];
2287         ResendData d;
2288         d.numberObjects = resendReq->numberObjects;
2289         d.PE = resendReq->PE;
2290         d.listObjects = (TProcessedLog *)listObjects;
2291         d.maxTickets = new MCount[d.numberObjects];
2292         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2293         for(int i=0;i<d.numberObjects;i++){
2294                 d.maxTickets[i] = 0;
2295         }
2296
2297         //Check if any of the retained objects need to be recreated
2298         //If they have not been recreated on the restarted processor
2299         //they need to be recreated on this processor
2300         int count=0;
2301         for(int i=0;i<retainedObjectList.size();i++){
2302                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2303                         count++;
2304                         int recreate=1;
2305                         for(int j=0;j<d.numberObjects;j++){
2306                                 if(d.listObjects[j].recver.type != TypeArray ){
2307                                         continue;
2308                                 }
2309                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2310                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2311                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2312                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2313                                                 recreate = 0;
2314                                                 break;
2315                                         }
2316                                 }
2317                         }
2318                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2319                         if(recreate){
2320                                 donotCountMigration=1;
2321                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2322                                 donotCountMigration=0;
2323                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2324                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2325                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2326
2327                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2328                                 
2329                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2330                                 CmiAssert(rec->type() == CkLocRec::local);
2331                                 CkVec<CkMigratable *> eltList;
2332                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2333                                 for(int j=0;j<eltList.size();j++){
2334                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2335                                                 CpvAccess(_currentObj) = eltList[j];
2336                                                 eltList[j]->ResumeFromSync();
2337                                         }
2338                                 }
2339
2340
2341                                 
2342                                 retainedObjectList[i]->msg=NULL;
2343                                 
2344                                         
2345                         }
2346                 }
2347         }
2348         
2349         if(count > 0){
2350 //              CmiAbort("retainedObjectList for restarted processor not empty");
2351         }
2352
2353
2354         
2355         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2356         forAllCharesDo(resendMessageForChare,&d);
2357
2358         //send back the maximum ticket number for a message sent to each object on the 
2359         //restarted processor
2360         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2361         
2362         int totalTNStored=0;
2363         for(int i=0;i<d.numberObjects;i++){
2364                 totalTNStored += d.ticketVecs[i].size();
2365         }
2366         
2367         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2368         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2369         
2370         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2371         resendReply->PE = CkMyPe();
2372         resendReply->numberObjects = d.numberObjects;
2373         
2374         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2375         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2376         for(int i=0;i<d.numberObjects;i++){
2377                 replyObjects[i] = d.listObjects[i].recver;
2378         }
2379         
2380         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2381         for(int i=0;i<d.numberObjects;i++){
2382                 int vecsize = d.ticketVecs[i].size();
2383                 memcpy(ticketList,&vecsize,sizeof(int));
2384                 ticketList = &ticketList[sizeof(int)];
2385                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2386                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2387         }       
2388
2389         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2390         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2391         
2392 /*      
2393         if(verifyAckRequestsUnacked){
2394                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2395                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2396                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2397                         LDObjHandle h;
2398                         lb->Migrated(h,1);
2399                 }
2400         }
2401         
2402         verifyAckRequestsUnacked=0;*/
2403
2404
2405         
2406         delete [] d.maxTickets;
2407         delete [] d.ticketVecs;
2408         if(resendReq->PE != CkMyPe()){
2409                 CmiFree(msg);
2410         }       
2411 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2412         lastRestart = CmiWallTimer();
2413 }
2414
2415 void sortVec(CkVec<MCount> *TNvec);
2416 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2417
2418 void _resendReplyHandler(char *msg){    
2419         /**
2420                 need to rewrite this method to deal with parallel restart
2421         */
2422         ResendRequest *resendReply = (ResendRequest *)msg;
2423         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2424
2425         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2426         
2427         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2428         for(int i =0; i< resendReply->numberObjects;i++){       
2429                 Chare *obj = (Chare *)listObjects[i].getObject();
2430                 
2431                 int vecsize;
2432                 memcpy(&vecsize,listTickets,sizeof(int));
2433                 listTickets = &listTickets[sizeof(int)];
2434                 MCount *listTNs = (MCount *)listTickets;
2435                 
2436                 
2437                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2438                 
2439                 if(obj != NULL){
2440                         //the object was restarted on the processor on which it existed
2441                         processReceivedTN(obj,vecsize,listTNs);
2442                 }else{
2443                 //pack up objID vecsize and listTNs and send it to the correct processor
2444                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2445                         char *TNMsg = (char *)CmiAlloc(totalSize);
2446                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2447                         receivedTNData->recver = listObjects[i];
2448                         receivedTNData->numTNs = vecsize;
2449                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2450                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2451
2452                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2453                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2454                 }
2455                 
2456         }
2457 };
2458
2459 void _receivedTNDataHandler(ReceivedTNData *msg){
2460         char objName[100];
2461         Chare *obj = (Chare *) msg->recver.getObject();
2462         if(obj){                
2463                 char *_msg = (char *)msg;
2464                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2465                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2466                 processReceivedTN(obj,msg->numTNs,listTNs);
2467         }else{
2468                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2469                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2470         }
2471 };
2472
2473
2474 void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
2475         obj->mlogData->resendReplyRecvd++;
2476
2477         
2478         for(int j=0;j<listSize;j++){
2479                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2480         }
2481         
2482         //if this object has received all the replies find the ticket numbers
2483         //that senders know about. Those less than the ticket number processed 
2484         //by the receiver can be thrown away. The rest need not be consecutive
2485         // ie there can be holes in the list of ticket numbers seen by senders
2486         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2487                 obj->mlogData->resendReplyRecvd = 0;
2488                 //sort the received TNS
2489                 sortVec(obj->mlogData->receivedTNs);
2490         
2491                 //after all the received tickets are in we need to sort them and then 
2492                 // calculate the holes
2493                 
2494                 if(obj->mlogData->receivedTNs->size() > 0){
2495                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2496                         int vecsize = obj->mlogData->receivedTNs->size();
2497                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2498                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2499                         if(numberHoles == 0){
2500                         }else{
2501                                 char objName[100];                                      
2502                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2503                                 obj->mlogData->numberHoles = numberHoles;
2504                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2505                                 int countHoles=0;
2506                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2507                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2508                                                 //the TNs are not consecutive at this point
2509                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2510                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2511                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2512                                                         countHoles++;
2513                                                 }       
2514                                         }
2515                                 }
2516                                 //Holes have been given new TN
2517                                 if(countHoles != numberHoles){
2518                                         char str[100];
2519                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2520                                 }
2521                                 CkAssert(countHoles == numberHoles);                                    
2522                                 obj->mlogData->currentHoles = numberHoles;
2523                         }
2524                 }       
2525                 
2526                 delete obj->mlogData->receivedTNs;
2527                 obj->mlogData->receivedTNs = NULL;
2528                 obj->mlogData->restartFlag = 0;
2529                 char objString[100];
2530                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2531         }
2532
2533 }
2534
2535
2536 void sortVec(CkVec<MCount> *TNvec){
2537         //sort it ->its bloddy bubble sort
2538         //TODO: use quicksort
2539         for(int i=0;i<TNvec->size();i++){
2540                 for(int j=i+1;j<TNvec->size();j++){
2541                         if((*TNvec)[j] < (*TNvec)[i]){
2542                                 MCount temp;
2543                                 temp = (*TNvec)[i];
2544                                 (*TNvec)[i] = (*TNvec)[j];
2545                                 (*TNvec)[j] = temp;
2546                         }
2547                 }
2548         }
2549         //make it unique .. since its sorted all equal units will be consecutive
2550         MCount *tempArray = new MCount[TNvec->size()];
2551         int     uniqueCount=-1;
2552         for(int i=0;i<TNvec->size();i++){
2553                 tempArray[i] = 0;
2554                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2555                         uniqueCount++;
2556                         tempArray[uniqueCount] = (*TNvec)[i];
2557                 }
2558         }
2559         uniqueCount++;
2560         TNvec->removeAll();
2561         for(int i=0;i<uniqueCount;i++){
2562                 TNvec->push_back(tempArray[i]);
2563         }
2564         delete [] tempArray;
2565 }       
2566
2567 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2568         if(TNVec->size() == 0){
2569                 return -1; //not found in an empty vec
2570         }
2571         //binary search to find 
2572         int left=0;
2573         int right = TNVec->size();
2574         int mid = (left +right)/2;
2575         while(searchTN != (*TNVec)[mid] && left < right){
2576                 if((*TNVec)[mid] > searchTN){
2577                         right = mid-1;
2578                 }else{
2579                         left = mid+1;
2580                 }
2581                 mid = (left + right)/2;
2582         }
2583         if(left < right){
2584                 //mid is the element to be returned
2585                 return mid;
2586         }else{
2587                 if(mid < TNVec->size() && mid >=0){
2588                         if((*TNVec)[mid] == searchTN){
2589                                 return mid;
2590                         }else{
2591                                 return -1;
2592                         }
2593                 }else{
2594                         return -1;
2595                 }
2596         }
2597 };
2598
2599
2600 /*
2601         Method to do parallel restart. Distribute some of the array elements to other processors.
2602         The problem is that we cant use to charm entry methods to do migration as it will get
2603         stuck in the protocol that is going to restart
2604 */
2605
2606 class ElementDistributor: public CkLocIterator{
2607         CkLocMgr *locMgr;
2608         int *targetPE;
2609         void pupLocation(CkLocation &loc,PUP::er &p){
2610                 CkArrayIndexMax idx=loc.getIndex();
2611                 CkGroupID gID = locMgr->ckGetGroupID();
2612                 p|gID;      // store loc mgr's GID as well for easier restore
2613                 p|idx;
2614                 p|loc;
2615         };
2616         public:
2617                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2618                 void addLocation(CkLocation &loc){
2619                         if(*targetPE == CkMyPe()){
2620                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2621                                 return;
2622                         }
2623                         
2624                         CkArrayIndexMax idx=loc.getIndex();
2625                         CkLocRec_local *rec = loc.getLocalRecord();
2626                         
2627                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2628                         idx.print();
2629                         
2630
2631                         //TODO: an element that is being moved should leave some trace behind so that
2632                         // the arraybroadcaster can forward messages to it
2633                         
2634                         //pack up this location and send it across
2635                         PUP::sizer psizer;
2636                         pupLocation(loc,psizer);
2637                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2638                         char *msg = (char *)CmiAlloc(totalSize);
2639                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2640                         PUP::toMem pmem(buf);
2641                         pmem.becomeDeleting();
2642                         pupLocation(loc,pmem);
2643                         
2644                         locMgr->setDuringMigration(CmiTrue);                    
2645                         delete rec;
2646                         locMgr->setDuringMigration(CmiFalse);                   
2647                         locMgr->inform(idx,*targetPE);
2648
2649                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2650                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2651
2652                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2653                         //decide on the target processor for the next object
2654                         *targetPE = (*targetPE +1)%CkNumPes();
2655                 }
2656                 
2657 };
2658
2659 void distributeRestartedObjects(){
2660         int numGroups = CkpvAccess(_groupIDTable)->size();      
2661         int i;
2662         int targetPE=CkMyPe();
2663         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2664 };
2665
2666 void _distributedLocationHandler(char *receivedMsg){
2667         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2668         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2669         PUP::fromMem pmem(buf);
2670         CkGroupID gID;
2671         CkArrayIndexMax idx;
2672         pmem |gID;
2673         pmem |idx;
2674         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2675         donotCountMigration=1;
2676         mgr->resume(idx,pmem);
2677         donotCountMigration=0;
2678         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2679         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2680         idx.print();
2681
2682         CkLocRec *rec = mgr->elementRec(idx);
2683         CmiAssert(rec->type() == CkLocRec::local);
2684         
2685         CkVec<CkMigratable *> eltList;
2686         mgr->migratableList((CkLocRec_local *)rec,eltList);
2687         for(int i=0;i<eltList.size();i++){
2688                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2689                         CpvAccess(_currentObj) = eltList[i];
2690                         eltList[i]->ResumeFromSync();
2691                 }
2692         }
2693         
2694         
2695 }
2696
2697
2698 /** this method is used to send messages to a restarted processor to tell
2699  * it that a particular expected object is not going to get to it */
2700 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2701         DummyMigrationMsg buf;
2702         buf.flag = MLOG_OBJECT;
2703         buf.lbID = lbID;
2704         buf.mgrID = locMgrID;
2705         buf.idx = idx;
2706         buf.locationPE = locationPE;
2707         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2708         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2709 };
2710
2711
2712 /**this method is used by a restarted processor to tell other processors
2713  * that they are not going to receive these many objects.. just the count
2714  * not the objects themselves ***/
2715
2716 void sendDummyMigrationCounts(int *dummyCounts){
2717         DummyMigrationMsg buf;
2718         buf.flag = MLOG_COUNT;
2719         buf.lbID = globalLBID;
2720         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2721         for(int i=0;i<CmiNumPes();i++){
2722                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2723                         buf.count = dummyCounts[i];
2724                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2725                 }
2726         }
2727 }
2728
2729
2730 /** this handler is used to process a dummy migration msg.
2731  * it looks up the load balancer and calls migrated for it */
2732
2733 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2734         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2735         if(msg->flag == MLOG_OBJECT){
2736                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2737                 LDObjHandle h;
2738                 lb->Migrated(h,1);
2739         }
2740         if(msg->flag == MLOG_COUNT){
2741                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2742                 msg->count -= verifyAckedRequests;
2743                 for(int i=0;i<msg->count;i++){
2744                         LDObjHandle h;
2745                         lb->Migrated(h,1);
2746                 }
2747         }
2748         verifyAckedRequests=0;
2749         CmiFree(msg);
2750 };
2751
2752
2753
2754
2755
2756
2757
2758 /*****************************************************
2759         Implementation of a method that can be used to call
2760         any method on the ChareMlogData of all the chares on
2761         a processor currently
2762 ******************************************************/
2763
2764
2765 class ElementCaller :  public CkLocIterator {
2766 private:
2767         CkLocMgr *locMgr;
2768         MlogFn fnPointer;
2769         void *data;
2770 public:
2771         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2772                 locMgr = _locMgr;
2773                 fnPointer = _fnPointer;
2774                 data = _data;
2775         };
2776         void addLocation(CkLocation &loc){
2777                 CkVec<CkMigratable *> list;
2778                 CkLocRec_local *local = loc.getLocalRecord();
2779                 locMgr->migratableList (local,list);
2780                 for(int i=0;i<list.size();i++){
2781                         CkMigratable *migratableElement = list[i];
2782                         fnPointer(data,migratableElement->mlogData);
2783                 }
2784         }
2785 };
2786
2787 void forAllCharesDo(MlogFn fnPointer,void *data){
2788         int numGroups = CkpvAccess(_groupIDTable)->size();
2789         for(int i=0;i<numGroups;i++){
2790                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2791                 fnPointer(data,obj->mlogData);
2792         }
2793         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2794         for(int i=0;i<numNodeGroups;i++){
2795                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2796                 fnPointer(data,obj->mlogData);
2797         }
2798         int i;
2799         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2800         
2801         
2802 };
2803
2804
2805 /******************************************************************
2806  Load Balancing
2807 ******************************************************************/
2808
2809 void initMlogLBStep(CkGroupID gid){
2810         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
2811         countLBMigratedAway = 0;
2812         countLBToMigrate=0;
2813         onGoingLoadBalancing=1;
2814         migrationDoneCalled=0;
2815         checkpointBarrierCount=0;
2816         if(globalLBID.idx != 0){
2817                 CmiAssert(globalLBID.idx == gid.idx);
2818         }
2819         globalLBID = gid;
2820 }
2821
2822 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
2823         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
2824         
2825         resumeLbFnPtr = _fnPtr;
2826         centralLb = _centralLb;
2827         migrationDoneCalled = 1;
2828         if(countLBToMigrate == countLBMigratedAway){
2829                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2830                 startMlogCheckpoint(NULL,CmiWallTimer());       
2831         }
2832 };
2833
2834 void finishedCheckpointLoadBalancing(){
2835         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
2836         CheckpointBarrierMsg msg;
2837         msg.fromPE = CmiMyPe();
2838         msg.checkpointCount = checkpointCount;
2839
2840         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
2841         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
2842         
2843 };
2844
2845
2846 void sendMlogLocation(int targetPE,envelope *env){
2847         void *_msg = EnvToUsr(env);
2848         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2849
2850
2851         int existing = 0;
2852         //if this object is already in the retainedobjectlust destined for this
2853         //processor it should not be sent
2854         
2855         for(int i=0;i<retainedObjectList.size();i++){
2856                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
2857                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
2858                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
2859                         existing = 1;
2860                         break;
2861                 }
2862         }
2863
2864         if(existing){
2865                 return;
2866         }
2867         
2868         
2869         countLBToMigrate++;
2870         
2871         MigrationNotice migMsg;
2872         migMsg.migRecord.gID = msg->gid;
2873         migMsg.migRecord.idx = msg->idx;
2874         migMsg.migRecord.fromPE = CkMyPe();
2875         migMsg.migRecord.toPE =  targetPE;
2876         
2877         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
2878         
2879         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
2880         retainedObject->migRecord = migMsg.migRecord;
2881         retainedObject->acked  = 0;
2882         
2883         CkPackMessage(&env);
2884         
2885         migMsg.record = retainedObject;
2886         retainedObject->msg = env;
2887         int size = retainedObject->size = env->getTotalsize();
2888         
2889         retainedObjectList.push_back(retainedObject);
2890         
2891         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2892         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2893         
2894         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
2895
2896 }
2897
2898 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
2899         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
2900         migratedNoticeList.push_back(msg->migRecord);
2901
2902         MigrationNoticeAck buf;
2903         buf.record = msg->record;
2904         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
2905         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
2906 }
2907
2908 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
2909         
2910         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
2911         retainedObject->acked = 1;
2912
2913         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
2914         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
2915
2916         //inform home about the new location of this object
2917         CkGroupID gID = retainedObject->migRecord.gID ;
2918         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2919         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
2920         
2921         countLBMigratedAway++;
2922         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
2923                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2924                 startMlogCheckpoint(NULL,CmiWallTimer());
2925         }
2926 };
2927
2928 void _receiveMlogLocationHandler(void *buf){
2929         envelope *env = (envelope *)buf;
2930         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
2931         CkUnpackMessage(&env);
2932         void *_msg = EnvToUsr(env);
2933         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2934         CkGroupID gID= msg->gid;
2935         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
2936         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2937         CpvAccess(_currentObj)=mgr;
2938         mgr->immigrate(msg);
2939 };
2940
2941
2942 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
2943 /*      if(mlogData->objID.type == TypeArray){
2944                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
2945         //      TODO: make sure later that atSync has been called and it needs 
2946         //      to be resumed from sync
2947         //
2948                 CpvAccess(_currentObj) = elt;
2949                 elt->ResumeFromSync();
2950         }*/
2951 }
2952
2953 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
2954         if(checkpointBarrierCount == CmiNumPes()){
2955                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
2956                 for(int i=0;i<CmiNumPes();i++){
2957                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
2958                 }
2959         }
2960 }
2961
2962 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
2963         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
2964         if(msg->checkpointCount == checkpointCount){
2965                 checkpointBarrierCount++;
2966                 checkAndSendCheckpointBarrierAcks(msg);
2967         }else{
2968                 if(msg->checkpointCount-1 == checkpointCount){
2969                         checkpointBarrierCount++;
2970                         checkAndSendCheckpointBarrierAcks(msg);
2971                 }else{
2972                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
2973                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
2974                 }
2975         }
2976         CmiFree(msg);
2977 }
2978
2979 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
2980         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
2981         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
2982         sendRemoveLogRequests();
2983         (*resumeLbFnPtr)(centralLb);
2984         CmiFree(msg);
2985 }
2986
2987 /**
2988         method that informs an array elements home processor of its current location
2989         It is a converse method to bypass the charm++ message logging framework
2990 */
2991
2992 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
2993         double _startTime = CmiWallTimer();
2994         CurrentLocationMsg msg;
2995         msg.mgrID = locMgrID;
2996         msg.idx = idx;
2997         msg.locationPE = currentPE;
2998         msg.fromPE = CkMyPe();
2999
3000         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3001         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3002         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3003         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3004 }
3005
3006
3007 void _receiveLocationHandler(CurrentLocationMsg *data){
3008         double _startTime = CmiWallTimer();
3009         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3010         if(mgr == NULL){
3011                 CmiFree(data);
3012                 return;
3013         }
3014         CkLocRec *rec = mgr->elementNrec(data->idx);
3015         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3016         if(rec != NULL){
3017                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3018                         if(data->fromPE == data->locationPE){
3019                                 CmiAbort("Another processor has the same object");
3020                         }
3021                 }
3022         }
3023         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3024                 int targetPE = data->fromPE;
3025                 data->fromPE = CmiMyPe();
3026                 data->locationPE = CmiMyPe();
3027                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3028                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3029         }else{
3030                 mgr->inform(data->idx,data->locationPE);
3031         }
3032         CmiFree(data);
3033         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3034 }
3035
3036
3037
3038 void getGlobalStep(CkGroupID gID){
3039         LBStepMsg msg;
3040         int destPE = 0;
3041         msg.lbID = gID;
3042         msg.fromPE = CmiMyPe();
3043         msg.step = -1;
3044         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3045         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3046 };
3047
3048 void _getGlobalStepHandler(LBStepMsg *msg){
3049         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3050         msg->step = lb->step();
3051         CmiAssert(msg->fromPE != CmiMyPe());
3052         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3053         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3054         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3055 };
3056
3057 void _recvGlobalStepHandler(LBStepMsg *msg){
3058         
3059         restartDecisionNumber=msg->step;
3060         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3061         _updateHomeAckHandler(dummyAck);
3062 };
3063
3064
3065
3066
3067
3068
3069
3070
3071 void _messageLoggingExit(){
3072 /*      if(CkMyPe() == 0){
3073                 if(countBuffered != 0){
3074                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3075                 }
3076
3077 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3078         }
3079         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3080         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3081
3082         //GML: printing some statistics for group approach
3083         if(GROUP_SIZE_MLOG > 1)
3084                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3085
3086 }